Pull消费者客户端(主动拉取消息的消费者)即构造了DefaultMQPullConsumer对象,DefaultMQPullConsumer继承了ClientConfig类。我们先看其构造方法

[java] view plain copy
public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {  
    this.consumerGroup = consumerGroup;  
    defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);  
}

 

这里只是简单设置了consumerGroup消费者组名,表示消费者属于哪个组。构造了DefaultMQPullConsumerImpl的实例,DefaultMQPullConsumerImpl的构造方法很简单,只是绑定了DefaultMQPullConsumer、配置了传入的rpcHook。

DefaultMQPullConsumer内部封装了DefaultMQPullConsumerImpl,其中还维护这一些配置信息。这里维护着消费者订阅的topic集合。

[java] view plain copy
private Set<String> registerTopics = new HashSet<String>();

整个消费者客户端的启动,调用了DefaultMQPullConsumer的start()方法,内部直接调用DefaultMQPullConsumerImpl的start()方法,这个start方法加了synchronized修饰。

[java] view plain copy
    public synchronized void start() throws MQClientException {  
        switch (this.serviceState) {  
            case CREATE_JUST:  
                this.serviceState = ServiceState.START_FAILED;  
  
                this.checkConfig();  
  
                this.copySubscription();  
  
                if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {  
                    this.defaultMQPullConsumer.changeInstanceNameToPID();  
                }  
  
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer  
                                                                                                                , this.rpcHook);  
  
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());  
                this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());  
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());  
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);  
  
                this.pullAPIWrapper = new PullAPIWrapper(  
                    mQClientFactory,  
                    this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());  
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);  
  
                if (this.defaultMQPullConsumer.getOffsetStore() != null) {  
                    this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();  
                } else {  
                    switch (this.defaultMQPullConsumer.getMessageModel()) {  
                        case BROADCASTING:  
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer  
                                                                                                        .getConsumerGroup());  
                            break;  
                        case CLUSTERING:  
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer  
                                                                                                        .getConsumerGroup());  
                            break;  
                        default:  
                            break;  
                    }  
                    this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);  
                }  
  
                this.offsetStore.load();  
  
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);  
                if (!registerOK) {  
                    this.serviceState = ServiceState.CREATE_JUST;  
  
                    throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()  
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl  
                                                                                            .GROUP_NAME_DUPLICATE_URL), null);  
                }  
  
                mQClientFactory.start();  
                log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());  
                this.serviceState = ServiceState.RUNNING;  
                break;  
            case RUNNING:  
            case START_FAILED:  
            case SHUTDOWN_ALREADY:  
                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "  
                    + this.serviceState  
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),  
                    null);  
            default:  
                break;  
        }  
  
    }

一开始的serverState的状态自然为CREAT_JUST,调用checkConfig(),其中先是对ConsumerGroup进行验证,非空,合法(符合正则规则,且长度不超过配置最大值),且不为默认值(防止消费者集群名冲突),然后对消费者消息模式、消息队列分配算法进行非空、合法校验。

关于消费者消息模式有BroadCasting(广播)跟Clustering(集群)两种、默认是Clustering(集群)配置在DefaultMQPullConsumer中。关于消费者的消息分配算法,在DefaultMQPullConsumer中实现有默认的消息分配算法,allocateMessageQueueStrategy = new AllocateMessageQueueAveragely();(平均分配算法)。其实现了AllocateMessageQueueStrategy接口,重点看其实现的allocate()方法。

[java] view plain copy
@Override  
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,  
    List<String> cidAll) {  
    if (currentCID == null || currentCID.length() < 1) {  
        throw new IllegalArgumentException("currentCID is empty");  
    }  
    if (mqAll == null || mqAll.isEmpty()) {  
        throw new IllegalArgumentException("mqAll is null or mqAll empty");  
    }  
    if (cidAll == null || cidAll.isEmpty()) {  
        throw new IllegalArgumentException("cidAll is null or cidAll empty");  
    }  
  
    List<MessageQueue> result = new ArrayList<MessageQueue>();  
    if (!cidAll.contains(currentCID)) {  
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",  
            consumerGroup,  
            currentCID,  
            cidAll);  
        return result;  
    }  
  
    int index = cidAll.indexOf(currentCID);  
    int mod = mqAll.size() % cidAll.size();  
    int averageSize =  
        mqAll.size() <= cidAll.size() ? 1 : (mod > 0 &amp;&amp; index < mod ? mqAll.size() / cidAll.size()  
            + 1 : mqAll.size() / cidAll.size());  
    int startIndex = (mod > 0 &amp;&amp; index < mod) ? index * averageSize : index * averageSize + mod;  
    int range = Math.min(averageSize, mqAll.size() - startIndex);  
    for (int i = 0; i < range; i++) {  
        result.add(mqAll.get((startIndex + i) % mqAll.size()));  
    }  
    return result;  
}

 

传入的参数有当前消费者id,所有消息队列数组,以及当前所有消费者数组。先简单验证非空,再通过消费者数组大小跟消息队列大小根据平均算法算出当前消费者该分配哪些消息队列集合。逻辑不难。RocketMQ还提供了循环平均、一致性哈希、配置分配等算法,这里默认采用平均分配。

我们再回到DefaultMQPullConsumerImpl的start()方法,checkConfig后,调用copySubscription()方法,将配置在DefaultMQPullConsumer中的topic信息构造成并构造成subscriptionData数据结构,以topic为key以subscriptionData为value以键值对形式存到rebalanceImpl的subscriptionInner中。

[java] view plain copy
private void copySubscription() throws MQClientException {  
    try {  
        Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();  
        if (registerTopics != null) {  
            for (final String topic : registerTopics) {  
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),  
                    topic, SubscriptionData.SUB_ALL);  
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);  
            }  
        }  
    } catch (Exception e) {  
        throw new MQClientException("subscription exception", e);  
    }  
}

  

接下来从MQCLientManager中得到MQClient的实例,这个步骤跟生产者客户端相同。

再往后是对rebalanceImpl的配置,我们重点看下rebalanceImpl,它是在DefaultMQPullConsumerImpl成员中直接构造private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);即在DefaultMQPullConsumerImpl初始化的时候构造。接下来对其消费者组名、消息模式(默认集群)、队列分配算法(默认平均分配)、消费者客户端实例进行配置,配置信息都是从DefaultMQPullConsumer中取得。

[java] view plain copy
public abstract class RebalanceImpl {  
    protected static final Logger log = ClientLogger.getLog();  
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);  
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =  
        new ConcurrentHashMap<String, Set<MessageQueue>>();  
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =  
        new ConcurrentHashMap<String, SubscriptionData>();  
    protected String consumerGroup;  
    protected MessageModel messageModel;  
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;  
    protected MQClientInstance mQClientFactory;

接下来构造了PullAPIWrapper,仅仅调用其构造方法,简单的配置下

[java] view plain copy
public PullAPIWrapper(MQClientInstance mQClientFactory, String consumerGroup, boolean unitMode) {  
    this.mQClientFactory = mQClientFactory;  
    this.consumerGroup = consumerGroup;  
    this.unitMode = unitMode;  
}

 

然后初始化消费者的offsetStore,offset即偏移量,可以理解为消费进度,这里根据不同的消息模式来选择不同的策略。如果是广播模式,那么所有消费者都应该收到订阅的消息,那么每个消费者只应该自己消费的消费队列的进度,那么需要把消费进度即offsetStore存于本地采用LocalFileOffsetStroe,相反的如果是集群模式,那么集群中的消费者来平均消费消息队列,那么应该把消费进度存于远程采用RemoteBrokerOffsetStore。然后调用相应的load方法加载。

之后将当前消费者注册在MQ客户端实例上之后,调用MQClientInstance的start()方法,启动消费者客户端。

[java] view plain copy
    public void start() throws MQClientException {  
  
        synchronized (this) {  
            switch (this.serviceState) {  
                case CREATE_JUST:  
                    this.serviceState = ServiceState.START_FAILED;  
                    // If not specified,looking address from name server  
                    if (null == this.clientConfig.getNamesrvAddr()) {  
                        this.mQClientAPIImpl.fetchNameServerAddr();  
                    }  
                    // Start request-response channel  
                    this.mQClientAPIImpl.start();  
                    // Start various schedule tasks  
                    this.startScheduledTask();  
                    // Start pull service  
                    this.pullMessageService.start();  
                    // Start rebalance service  
                    this.rebalanceService.start();  
                    // Start push service  
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);  
                    log.info("the client factory [{}] start OK", this.clientId);  
                    this.serviceState = ServiceState.RUNNING;  
                    break;  
                case RUNNING:  
                    break;  
                case SHUTDOWN_ALREADY:  
                    break;  
                case START_FAILED:  
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed."  
                                                                                                                                , null);  
                default:  
                    break;  
            }  
        }  
    }

 

看到这里应该很熟悉,跟生产者客户端这里是同一段代码,无非解析路由消息并完成路由消息的配置,启动netty客户端,启动定时任务(定时更新从名称服务器获取路由信息更新本地路由信息,心跳,调整线程数量),后面启动pull server、rebalance service、push service最后把serviceState状态设为Running表示客户端启动。

我们在这里重点看下RebalanceService的启动。下面贴出的是RebalanceService的run()方法。

[java] view plain copy
@Override  
public void run() {  
    log.info(this.getServiceName() + " service started");  
  
    while (!this.isStopped()) {  
        this.waitForRunning(waitInterval);  
        this.mqClientFactory.doRebalance();  
    }  
  
    log.info(this.getServiceName() + " service end");  
}

可以看到,只要这个线程没有被停止(客户端没关闭),会一直循环调用客户端的doRebalance()方法。

[java] view plain copy
public void doRebalance() {  
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {  
        MQConsumerInner impl = entry.getValue();  
        if (impl != null) {  
            try {  
                impl.doRebalance();  
            } catch (Throwable e) {  
                log.error("doRebalance exception", e);  
            }  
        }  
    }  
}  

MQClientInstance遍历consumerTable(之前注册的时候以consumerGroup为key,以消费者客户端DefaultMQPullConsumerImpl为value存入consumerTable中)中的每个元素,循环调用其元素的doRebalance()方法。那我们看DefaultMQPullConsumerImpl的doRebalance方法。

 [java] view plain copy
 @Override  
 public void doRebalance() {  
     if (this.rebalanceImpl != null) {  
         this.rebalanceImpl.doRebalance(false);  
     }  
 }

直接调用了rebalanceImpl的doRebalance方法

[java] view plain copy
public void doRebalance(final boolean isOrder) {  
    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();  
    if (subTable != null) {  
        for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {  
            final String topic = entry.getKey();  
            try {  
                this.rebalanceByTopic(topic, isOrder);  
            } catch (Throwable e) {  
                if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  
                    log.warn("rebalanceByTopic Exception", e);  
                }  
            }  
        }  
    }  
  
    this.truncateMessageQueueNotMyTopic();  
}  

可以看到先得到subTable即subscriptionInner,之前根据配置的每个topic生成的SubscriptionData数据结构的map。先遍历该map,得到每个topic,针对每个topic调用rebalanceByTopic()

 [java] view plain copy
     private void rebalanceByTopic(final String topic, final boolean isOrder) {  
         switch (messageModel) {  
             case BROADCASTING: {  
                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);  
                 if (mqSet != null) {  
                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);  
                     if (changed) {  
                         this.messageQueueChanged(topic, mqSet, mqSet);  
                         log.info("messageQueueChanged {} {} {} {}",  
                             consumerGroup,  
                             topic,  
                             mqSet,  
                             mqSet);  
                     }  
                 } else {  
                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);  
                 }  
                 break;  
             }  
             case CLUSTERING: {  
                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);  
                 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);  
                 if (null == mqSet) {  
                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  
                         log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);  
                     }  
                 }  
   
                 if (null == cidAll) {  
                     log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);  
                 }  
   
                 if (mqSet != null &amp;&amp; cidAll != null) {  
                     List<MessageQueue> mqAll = new ArrayList<MessageQueue>();  
                     mqAll.addAll(mqSet);  
   
                     Collections.sort(mqAll);  
                     Collections.sort(cidAll);  
   
                     AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;  
   
                     List<MessageQueue> allocateResult = null;  
                     try {  
                         allocateResult = strategy.allocate(  
                             this.consumerGroup,  
                             this.mQClientFactory.getClientId(),  
                             mqAll,  
                             cidAll);  
                     } catch (Throwable e) {  
                         log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",  
                              strategy.getName(), e);  
                         return;  
                     }  
   
                     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();  
                     if (allocateResult != null) {  
                         allocateResultSet.addAll(allocateResult);  
                     }  
   
                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);  
                     if (changed) {  
                         log.info(  
                             "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}  
                         , cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",  
                             strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),  
                             allocateResultSet.size(), allocateResultSet);  
                         this.messageQueueChanged(topic, mqSet, allocateResultSet);  
                     }  
                 }  
                 break;  
             }  
             default:  
                 break;  
         }  
     }

我们先重点关注集群模式下,先得到topic的本地路由信息,再通过topic跟这个消费者的组名,调用netty客户端的同步网络访问topic指定的broker,从broker端得到与其连接的且是指定消费组名下订阅指定topic的消费者id的集合。然后采用默认的分配算法的allocate()进行队列给消费者平均分配。然后调用updateProcessQueueTableInRebalance()方法判断是否重新队列分配。

 [java] view plain copy
 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,  
     final boolean isOrder) {  
     boolean changed = false;  
   
     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();  
     while (it.hasNext()) {  
         Entry<MessageQueue, ProcessQueue> next = it.next();  
         MessageQueue mq = next.getKey();  
         ProcessQueue pq = next.getValue();  
   
         if (mq.getTopic().equals(topic)) {  
             if (!mqSet.contains(mq)) {  
                 pq.setDropped(true);  
                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {  
                     it.remove();  
                     changed = true;  
                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);  
                 }  
             } else if (pq.isPullExpired()) {  
                 switch (this.consumeType()) {  
                     case CONSUME_ACTIVELY:  
                         break;  
                     case CONSUME_PASSIVELY:  
                         pq.setDropped(true);  
                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {  
                             it.remove();  
                             changed = true;  
                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",  
                                 consumerGroup, mq);  
                         }  
                         break;  
                     default:  
                         break;  
                 }  
             }  
         }  
     }  
   
     List<PullRequest> pullRequestList = new ArrayList<PullRequest>();  
     for (MessageQueue mq : mqSet) {  
         if (!this.processQueueTable.containsKey(mq)) {  
             if (isOrder &amp;&amp; !this.lock(mq)) {  
                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);  
                 continue;  
             }  
   
             this.removeDirtyOffset(mq);  
             ProcessQueue pq = new ProcessQueue();  
             long nextOffset = this.computePullFromWhere(mq);  
             if (nextOffset >= 0) {  
                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);  
                 if (pre != null) {  
                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);  
                 } else {  
                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);  
                     PullRequest pullRequest = new PullRequest();  
                     pullRequest.setConsumerGroup(consumerGroup);  
                     pullRequest.setNextOffset(nextOffset);  
                     pullRequest.setMessageQueue(mq);  
                     pullRequest.setProcessQueue(pq);  
                     pullRequestList.add(pullRequest);  
                     changed = true;  
                 }  
             } else {  
                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);  
             }  
         }  
     }  
   
     this.dispatchPullRequest(pullRequestList);  
   
     return changed;  
 }

先遍历processQueueTable,看其topic下的该处理消息队列是否还是应该处理,由于新分配之后,消息队列可能会改变,所以原该处理的消息队列可能没必要处理,因此没必要处理的消息队列移除。当然也有可能多出需要处理的消息队列,于是需要建立其与processQueue的对应关系,先调用computerPullFromWhere得到该条消息下次拉取数据的位置,在RebalancePullImpl中实现了该方法直接返回0,把该处理的mq封装成pq后,更新到processQueueTable中。若有更新,无论是增加还是删除,则changed都设为true。(这个地方讲的有点模糊,他是客户端pull与push区别的关键,实际上push不过是在pull之上封装了下操作,后面我们会重新回来分析。)

方法返回后,如果changed为true,会调用messageQueueChanged方法来通知配置在DefaultMQPullConsumer中的相关messageQueueListener,我们可以看到RebalancePullImpl中的实现。

 [java] view plain copy
 @Override  
 public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {  
     MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();  
     if (messageQueueListener != null) {  
         try {  
             messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);  
         } catch (Throwable e) {  
             log.error("messageQueueChanged exception", e);  
         }  
     }  
 }

广播模式则比较简单,由于所有消费者都要处理,少了队列分配这个步骤。

本文转载自:https://blog.csdn.net/panxj856856/article/details/80725630