rocketmq中MessageQueueSelector的作用是什么

今天就跟大家聊聊有关rocketmq中MessageQueueSelector的作用是什么,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

创新互联公司是一家专注于网站设计制作、网站制作和成都IDC机房托管的网络公司,有着丰富的建站经验和案例。

MessageQueueSelector

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/MessageQueueSelector.java

public interface MessageQueueSelector {
    MessageQueue select(final List mqs, final Message msg, final Object arg);
}
  • MessageQueueSelector接口定义了select方法,返回MessageQueue;它有几个实现类,分别是SelectMessageQueueByHash、SelectMessageQueueByRandom、SelectMessageQueueByMachineRoom

SelectMessageQueueByHash

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java

public class SelectMessageQueueByHash implements MessageQueueSelector {

    @Override
    public MessageQueue select(List mqs, Message msg, Object arg) {
        int value = arg.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value = value % mqs.size();
        return mqs.get(value);
    }
}
  • SelectMessageQueueByHash实现了MessageQueueSelector接口,其select方法取arg参数的hashcode的绝对值,然后对mqs.size()取余,得到目标队列在mqs的下标

SelectMessageQueueByRandom

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandom.java

public class SelectMessageQueueByRandom implements MessageQueueSelector {
    private Random random = new Random(System.currentTimeMillis());

    @Override
    public MessageQueue select(List mqs, Message msg, Object arg) {
        int value = random.nextInt(mqs.size());
        return mqs.get(value);
    }
}
  • SelectMessageQueueByRandom实现了MessageQueueSelector接口,其select方法直接根据mqs.size()随机一个值作为目标队列在mqs的下标

SelectMessageQueueByMachineRoom

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java

public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
    private Set consumeridcs;

    @Override
    public MessageQueue select(List mqs, Message msg, Object arg) {
        return null;
    }

    public Set getConsumeridcs() {
        return consumeridcs;
    }

    public void setConsumeridcs(Set consumeridcs) {
        this.consumeridcs = consumeridcs;
    }
}
  • SelectMessageQueueByMachineRoom实现了MessageQueueSelector接口,其select方法目前返回null

RocketMQTemplate

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate implements InitializingBean, DisposableBean {
    private static final  Logger log = LoggerFactory.getLogger(RocketMQTemplate.class);

    private DefaultMQProducer producer;

    private ObjectMapper objectMapper;

    private String charset = "UTF-8";

    private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash();

    private final Map cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!!

    public DefaultMQProducer getProducer() {
        return producer;
    }

    public void setProducer(DefaultMQProducer producer) {
        this.producer = producer;
    }

    public ObjectMapper getObjectMapper() {
        return objectMapper;
    }

    public void setObjectMapper(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public String getCharset() {
        return charset;
    }

    public void setCharset(String charset) {
        this.charset = charset;
    }

    public MessageQueueSelector getMessageQueueSelector() {
        return messageQueueSelector;
    }

    public void setMessageQueueSelector(MessageQueueSelector messageQueueSelector) {
        this.messageQueueSelector = messageQueueSelector;
    }

    //......
}
  • RocketMQTemplate默认创建的MessageQueueSelector是SelectMessageQueueByHash

看完上述内容,你们对rocketmq中MessageQueueSelector的作用是什么有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。


网站栏目:rocketmq中MessageQueueSelector的作用是什么
网页路径:http://myzitong.com/article/giogig.html