rocketmq中DefaultRocketMQListenerContainer的原理及用法

这篇文章主要介绍“rocketmq中DefaultRocketMQListenerContainer的原理及用法”,在日常操作中,相信很多人在rocketmq中DefaultRocketMQListenerContainer的原理及用法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”rocketmq中DefaultRocketMQListenerContainer的原理及用法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

成都创新互联是专业的南丰网站建设公司,南丰接单;提供成都网站制作、成都做网站,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行南丰网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!

本文主要研究一下rocketmq的DefaultRocketMQListenerContainer

DefaultRocketMQListenerContainer

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,
    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
    private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);

    private ApplicationContext applicationContext;

    /**
     * The name of the DefaultRocketMQListenerContainer instance
     */
    private String name;

    private long suspendCurrentQueueTimeMillis = 1000;

    /**
     * Message consume retry strategy
 -1,no retry,put into DLQ directly
 0,broker control retry frequency
     * >0,client control retry frequency.      */     private int delayLevelWhenNextConsume = 0;     private String nameServer;     private AccessChannel accessChannel = AccessChannel.LOCAL;     private String consumerGroup;     private String topic;     private int consumeThreadMax = 64;     private String charset = "UTF-8";     private ObjectMapper objectMapper;     private RocketMQListener rocketMQListener;     private RocketMQMessageListener rocketMQMessageListener;     private DefaultMQPushConsumer consumer;     private Class messageType;     private boolean running;     // The following properties came from @RocketMQMessageListener.     private ConsumeMode consumeMode;     private SelectorType selectorType;     private String selectorExpression;     private MessageModel messageModel;     private long consumeTimeout;     //......     public void setRocketMQMessageListener(RocketMQMessageListener anno) {         this.rocketMQMessageListener = anno;         this.consumeMode = anno.consumeMode();         this.consumeThreadMax = anno.consumeThreadMax();         this.messageModel = anno.messageModel();         this.selectorExpression = anno.selectorExpression();         this.selectorType = anno.selectorType();         this.consumeTimeout = anno.consumeTimeout();     }     @Override     public void setupMessageListener(RocketMQListener rocketMQListener) {         this.rocketMQListener = rocketMQListener;     }     @Override     public void destroy() {         this.setRunning(false);         if (Objects.nonNull(consumer)) {             consumer.shutdown();         }         log.info("container destroyed, {}", this.toString());     }     @Override     public boolean isAutoStartup() {         return true;     }     @Override     public void stop(Runnable callback) {         stop();         callback.run();     }     @Override     public void start() {         if (this.isRunning()) {             throw new IllegalStateException("container already running. " + this.toString());         }         try {             consumer.start();         } catch (MQClientException e) {             throw new IllegalStateException("Failed to start RocketMQ push consumer", e);         }         this.setRunning(true);         log.info("running container: {}", this.toString());     }     @Override     public void stop() {         if (this.isRunning()) {             if (Objects.nonNull(consumer)) {                 consumer.shutdown();             }             setRunning(false);         }     }     @Override     public boolean isRunning() {         return running;     }     private void setRunning(boolean running) {         this.running = running;     }     @Override     public int getPhase() {         // Returning Integer.MAX_VALUE only suggests that         // we will be the first bean to shutdown and last bean to start         return Integer.MAX_VALUE;     }     @Override     public void afterPropertiesSet() throws Exception {         initRocketMQPushConsumer();         this.messageType = getMessageType();         log.debug("RocketMQ messageType: {}", messageType.getName());     }     @Override     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {         this.applicationContext = applicationContext;     }     @Override     public String toString() {         return "DefaultRocketMQListenerContainer{" +             "consumerGroup='" + consumerGroup + '\'' +             ", nameServer='" + nameServer + '\'' +             ", topic='" + topic + '\'' +             ", consumeMode=" + consumeMode +             ", selectorType=" + selectorType +             ", selectorExpression='" + selectorExpression + '\'' +             ", messageModel=" + messageModel +             '}';     }     private void initRocketMQPushConsumer() throws MQClientException {         Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");         Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");         Assert.notNull(nameServer, "Property 'nameServer' is required");         Assert.notNull(topic, "Property 'topic' is required");         RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),             this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());         boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();         if (Objects.nonNull(rpcHook)) {             consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),                 enableMsgTrace, this.applicationContext.getEnvironment().                 resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));             consumer.setVipChannelEnabled(false);             consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));         } else {             log.debug("Access-key or secret-key not configure in " + this + ".");             consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,                     this.applicationContext.getEnvironment().                     resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));         }         String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());         if (customizedNameServer != null) {             consumer.setNamesrvAddr(customizedNameServer);         } else {             consumer.setNamesrvAddr(nameServer);         }         if (accessChannel != null) {             consumer.setAccessChannel(accessChannel);         }         consumer.setConsumeThreadMax(consumeThreadMax);         if (consumeThreadMax < consumer.getConsumeThreadMin()) {             consumer.setConsumeThreadMin(consumeThreadMax);         }         consumer.setConsumeTimeout(consumeTimeout);         consumer.setInstanceName(this.name);         switch (messageModel) {             case BROADCASTING:                 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);                 break;             case CLUSTERING:                 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);                 break;             default:                 throw new IllegalArgumentException("Property 'messageModel' was wrong.");         }         switch (selectorType) {             case TAG:                 consumer.subscribe(topic, selectorExpression);                 break;             case SQL92:                 consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));                 break;             default:                 throw new IllegalArgumentException("Property 'selectorType' was wrong.");         }         switch (consumeMode) {             case ORDERLY:                 consumer.setMessageListener(new DefaultMessageListenerOrderly());                 break;             case CONCURRENTLY:                 consumer.setMessageListener(new DefaultMessageListenerConcurrently());                 break;             default:                 throw new IllegalArgumentException("Property 'consumeMode' was wrong.");         }         if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {             ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);         }     }     private Class getMessageType() {         Class targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);         Type[] interfaces = targetClass.getGenericInterfaces();         Class superclass = targetClass.getSuperclass();         while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) {             interfaces = superclass.getGenericInterfaces();             superclass = targetClass.getSuperclass();         }         if (Objects.nonNull(interfaces)) {             for (Type type : interfaces) {                 if (type instanceof ParameterizedType) {                     ParameterizedType parameterizedType = (ParameterizedType) type;                     if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) {                         Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();                         if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {                             return (Class) actualTypeArguments[0];                         } else {                             return Object.class;                         }                     }                 }             }             return Object.class;         } else {             return Object.class;         }     }     //...... }
  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout

  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法

  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

DefaultMessageListenerConcurrently

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    rocketMQListener.onMessage(doConvertMessage(messageExt));
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", messageExt, e);
                    context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
  • DefaultMessageListenerConcurrently方法实现了MessageListenerConcurrently接口;它的consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,一旦异常则返回ConsumeConcurrentlyStatus.RECONSUME_LATER

DefaultMessageListenerOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {

        @SuppressWarnings("unchecked")
        @Override
        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    rocketMQListener.onMessage(doConvertMessage(messageExt));
                    long costTime = System.currentTimeMillis() - now;
                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", messageExt, e);
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    }
  • DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeOrderlyStatus.SUCCESS,一旦异常则返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

小结

  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout

  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法

  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

到此,关于“rocketmq中DefaultRocketMQListenerContainer的原理及用法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


网站名称:rocketmq中DefaultRocketMQListenerContainer的原理及用法
网页URL:http://myzitong.com/article/gdepos.html