头条创作挑战赛一、前言 当consumer向broker发起RequestCode。PULLMESSAGE消息拉取请求时,broker是怎么处理的呢?带着这个疑问,就开始分析消息拉取过程中broker端的处理流程; 从BrokerController的registerProcessor方法注册的事件处理器以及我们之前分析的BrokerController核心组件来看,consumer消息拉取处理组件为:PullMessageProcessor; 二、源码导读类继承关系及构造方法;处理请求方法;处理请求方法涉及到的核心值对象;从messageStore中拉取到具体的消息; 注:处理请求方法中延迟执行拉取消息逻辑是通过拉取消息长轮询挂起服务PullRequestHoldService进行实现的,这个会放在下一篇进行分析;三、源码分析1、类继承关系及构造方法拉取消息处理组件publicclassPullMessageProcessorextendsAsyncNettyRequestProcessorimplementsNettyRequestProcessor{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。BROKERLOGGERNAME);privatefinalBrokerControllerbrokerController;消费消息回调钩子privateListConsumeMessageHookconsumeMessageHookList;publicPullMessageProcessor(finalBrokerControllerbrokerController){this。brokerControllerbrokerController;}} 2、处理请求方法netty网络服务器收到的拉取消息请求的处理组件OverridepublicRemotingCommandprocessRequest(finalChannelHandlerContextctx,RemotingCommandrequest)throwsRemotingCommandException{returnthis。processRequest(ctx。channel(),request,true);} RocketMQ作者将这个方法写的比较长,建议大家写的时候过长的方法还是拆分一下;处理请求方法里面最重要的逻辑就是找消息存储组件查询到这一次要拉取的消息org。apache。rocketmq。store。DefaultMessageStoregetMessage;这个方法是MessageStore消息存储组件中的,这个后续会进行详细分析,这一篇就只分析getMessage方法 我们先以processRequest作为入口进行分析:Channel结合NIO中的Channel概念,进行理解RemotingCommand传入的请求参数,这个在之前的分析中提到过brokerAllowSuspend是否允许被挂起,也就是是否允许在未找到消息的时候,暂时挂起处理线程,第一次传入的参数默认为true。处理拉取消息请求privateRemotingCommandprocessRequest(finalChannelchannel,RemotingCommandrequest,booleanbrokerAllowSuspend)throwsRemotingCommandException{finallongbeginTimeMillsthis。brokerController。getMessageStore()。now();RemotingCommandresponseRemotingCommand。createResponseCommand(PullMessageResponseHeader。class);finalPullMessageResponseHeaderresponseHeader(PullMessageResponseHeader)response。readCustomHeader();finalPullMessageRequestHeaderrequestHeader(PullMessageRequestHeader)request。decodeCommandCustomHeader(PullMessageRequestHeader。class);response。setOpaque(request。getOpaque());log。debug(receivePullMessagerequestcommand,{},request);校验当前broker是否可读if(!PermName。isReadable(this。brokerController。getBrokerConfig()。getBrokerPermission())){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(String。format(thebroker〔s〕pullingmessageisforbidden,this。brokerController。getBrokerConfig()。getBrokerIP1()));returnresponse;}查询或者自动创建consumerGroup对应的subscriptionGroupConfig保存subscriptionGroupConfig信息到UsersxxxstoreconfigsubscriptionGroup。json中SubscriptionGroupConfigsubscriptionGroupConfigthis。brokerController。getSubscriptionGroupManager()。findSubscriptionGroupConfig(requestHeader。getConsumerGroup());if(nullsubscriptionGroupConfig){response。setCode(ResponseCode。SUBSCRIPTIONGROUPNOTEXIST);response。setRemark(String。format(subscriptiongroup〔s〕doesnotexist,s,requestHeader。getConsumerGroup(),FAQUrl。suggestTodo(FAQUrl。SUBSCRIPTIONGROUPNOTEXIST)));returnresponse;}if(!subscriptionGroupConfig。isConsumeEnable()){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(subscriptiongroupnopermission,requestHeader。getConsumerGroup());returnresponse;}如果没有消息时,是否在broker端挂起等待,默认truefinalbooleanhasSuspendFlagPullSysFlag。hasSuspendFlag(requestHeader。getSysFlag());是否消息拉取时就提交offsetfinalbooleanhasCommitOffsetFlagPullSysFlag。hasCommitOffsetFlag(requestHeader。getSysFlag());请求头是否包含订阅信息finalbooleanhasSubscriptionFlagPullSysFlag。hasSubscriptionFlag(requestHeader。getSysFlag());获取挂起超时finallongsuspendTimeoutMillisLonghasSuspendFlag?requestHeader。getSuspendTimeoutMillis():0;当前这个请求是要拉取哪个topic的数据TopicConfigtopicConfigthis。brokerController。getTopicConfigManager()。selectTopicConfig(requestHeader。getTopic());if(nulltopicConfig){log。error(thetopic{}notexist,consumer:{},requestHeader。getTopic(),RemotingHelper。parseChannelRemoteAddr(channel));response。setCode(ResponseCode。TOPICNOTEXIST);response。setRemark(String。format(topic〔s〕notexist,applyfirstplease!s,requestHeader。getTopic(),FAQUrl。suggestTodo(FAQUrl。APPLYTOPICURL)));returnresponse;}判断读的权限if(!PermName。isReadable(topicConfig。getPerm())){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(thetopic〔requestHeader。getTopic()〕pullingmessageisforbidden);returnresponse;}判断queueId是否超过读取队列数if(requestHeader。getQueueId()0requestHeader。getQueueId()topicConfig。getReadQueueNums()){StringerrorInfoString。format(queueId〔d〕isillegal,topic:〔s〕topicConfig。readQueueNums:〔d〕consumer:〔s〕,requestHeader。getQueueId(),requestHeader。getTopic(),topicConfig。getReadQueueNums(),channel。remoteAddress());log。warn(errorInfo);response。setCode(ResponseCode。SYSTEMERROR);response。setRemark(errorInfo);returnresponse;}第一个是拿消费组对这个topic的订阅数据SubscriptionDatasubscriptionDatanull;消费者过滤数据ConsumerFilterDataconsumerFilterDatanull;如果说有一个订阅标识if(hasSubscriptionFlag){try{针对某个topic,去进行一定的订阅,支持某一种表达式类型subscriptionDataFilterAPI。build(requestHeader。getTopic(),requestHeader。getSubscription(),requestHeader。getExpressionType());如果说过滤表达式的类型不是tag标签if(!ExpressionType。isTagType(subscriptionData。getExpressionType())){构建针对topic,消费组做一个订阅,我的表达式类型,子版本号consumerFilterDataConsumerFilterManager。build(requestHeader。getTopic(),requestHeader。getConsumerGroup(),requestHeader。getSubscription(),requestHeader。getExpressionType(),requestHeader。getSubVersion());assertconsumerFilterData!null;}}catch(Exceptione){log。warn(Parsetheconsumerssubscription〔{}〕failed,group:{},requestHeader。getSubscription(),requestHeader。getConsumerGroup());response。setCode(ResponseCode。SUBSCRIPTIONPARSEFAILED);response。setRemark(parsetheconsumerssubscriptionfailed);returnresponse;}}如果说没有订阅标识else{一开始你需要跟我建立一个连接,连接建立了以后消费组和消费者都会归consumermanager来管理先获取到你的消费组的信息ConsumerGroupInfoconsumerGroupInfothis。brokerController。getConsumerManager()。getConsumerGroupInfo(requestHeader。getConsumerGroup());if(nullconsumerGroupInfo){log。warn(theconsumersgroupinfonotexist,group:{},requestHeader。getConsumerGroup());response。setCode(ResponseCode。SUBSCRIPTIONNOTEXIST);response。setRemark(theconsumersgroupinfonotexistFAQUrl。suggestTodo(FAQUrl。SAMEGROUPDIFFERENTTOPIC));returnresponse;}if(!subscriptionGroupConfig。isConsumeBroadcastEnable()consumerGroupInfo。getMessageModel()MessageModel。BROADCASTING){response。setCode(ResponseCode。NOPERMISSION);response。setRemark(theconsumergroup〔requestHeader。getConsumerGroup()〕cannotconsumebybroadcastway);returnresponse;}从我的消费组订阅的topic里的订阅数据获取出来subscriptionDataconsumerGroupInfo。findSubscriptionData(requestHeader。getTopic());if(nullsubscriptionData){log。warn(theconsumerssubscriptionnotexist,group:{},topic:{},requestHeader。getConsumerGroup(),requestHeader。getTopic());response。setCode(ResponseCode。SUBSCRIPTIONNOTEXIST);response。setRemark(theconsumerssubscriptionnotexistFAQUrl。suggestTodo(FAQUrl。SAMEGROUPDIFFERENTTOPIC));returnresponse;}if(subscriptionData。getSubVersion()requestHeader。getSubVersion()){log。warn(Thebrokerssubscriptionisnotlatest,group:{}{},requestHeader。getConsumerGroup(),subscriptionData。getSubString());response。setCode(ResponseCode。SUBSCRIPTIONNOTLATEST);response。setRemark(theconsumerssubscriptionnotlatest);returnresponse;}if(!ExpressionType。isTagType(subscriptionData。getExpressionType())){consumerFilterDatathis。brokerController。getConsumerFilterManager()。get(requestHeader。getTopic(),requestHeader。getConsumerGroup());if(consumerFilterDatanull){response。setCode(ResponseCode。FILTERDATANOTEXIST);response。setRemark(Thebrokersconsumerfilterdataisnotexist!Yourexpressionmaybewrong!);returnresponse;}if(consumerFilterData。getClientVersion()requestHeader。getSubVersion()){log。warn(Thebrokersconsumerfilterdataisnotlatest,group:{},topic:{},serverV:{},clientV:{},requestHeader。getConsumerGroup(),requestHeader。getTopic(),consumerFilterData。getClientVersion(),requestHeader。getSubVersion());response。setCode(ResponseCode。FILTERDATANOTLATEST);response。setRemark(theconsumersconsumerfilterdatanotlatest);returnresponse;}}}校验表达式类型if(!ExpressionType。isTagType(subscriptionData。getExpressionType())!this。brokerController。getBrokerConfig()。isEnablePropertyFilter()){response。setCode(ResponseCode。SYSTEMERROR);response。setRemark(ThebrokerdoesnotsupportconsumertofiltermessagebysubscriptionData。getExpressionType());returnresponse;}构建出来一个消息过滤器MessageFiltermessageFilter;重试消息是否启用过滤if(this。brokerController。getBrokerConfig()。isFilterSupportRetry()){messageFilternewExpressionForRetryMessageFilter(subscriptionData,consumerFilterData,this。brokerController。getConsumerFilterManager());}else{messageFilternewExpressionMessageFilter(subscriptionData,consumerFilterData,this。brokerController。getConsumerFilterManager());}找消息存储组件查询到这一次我要拉取的消息finalGetMessageResultgetMessageResultthis。brokerController。getMessageStore()。getMessage(requestHeader。getConsumerGroup(),消费组,谁requestHeader。getTopic(),topic,对哪个topicrequestHeader。getQueueId(),queueId,对topic里的哪个queueIdrequestHeader。getQueueOffset(),queue偏移量,从哪个queue的offset偏移量开始requestHeader。getMaxMsgNums(),最大的消息数量,拉取多少条数据messageFilter消息过滤器,如何过滤数据);如果说拉取到了数据以后if(getMessageResult!null){response。setRemark(getMessageResult。getStatus()。name());responseHeader。setNextBeginOffset(getMessageResult。getNextBeginOffset());responseHeader。setMinOffset(getMessageResult。getMinOffset());responseHeader。setMaxOffset(getMessageResult。getMaxOffset());拉取消息是有一个读写分离的概念,正常情况下,我写入和拉取消息都是针对master节点来的有可能master节点负载和压力很大,我就会建议你从slave节点来拉取if(getMessageResult。isSuggestPullingFromSlave()){设置SuggestWhichBrokerId为1responseHeader。setSuggestWhichBrokerId(subscriptionGroupConfig。getWhichBrokerWhenConsumeSlowly());}else{responseHeader。setSuggestWhichBrokerId(MixAll。MASTERID);}switch(this。brokerController。getMessageStoreConfig()。getBrokerRole()){caseASYNCMASTER:caseSYNCMASTER:break;如果本机是slave角色caseSLAVE:并且不允许从slave获取数据if(!this。brokerController。getBrokerConfig()。isSlaveReadEnable()){response。setCode(ResponseCode。PULLRETRYIMMEDIATELY);responseHeader。setSuggestWhichBrokerId(MixAll。MASTERID);}break;}设置建议读取消息的节点if(this。brokerController。getBrokerConfig()。isSlaveReadEnable()){consumetooslow,redirecttoanothermachine消费太慢,指向从节点if(getMessageResult。isSuggestPullingFromSlave()){responseHeader。setSuggestWhichBrokerId(subscriptionGroupConfig。getWhichBrokerWhenConsumeSlowly());}consumeokelse{responseHeader。setSuggestWhichBrokerId(subscriptionGroupConfig。getBrokerId());}}else{responseHeader。setSuggestWhichBrokerId(MixAll。MASTERID);}switch(getMessageResult。getStatus()){caseFOUND:response。setCode(ResponseCode。SUCCESS);break;caseMESSAGEWASREMOVING:response。setCode(ResponseCode。PULLRETRYIMMEDIATELY);break;caseNOMATCHEDLOGICQUEUE:caseNOMESSAGEINQUEUE:if(0!requestHeader。getQueueOffset()){response。setCode(ResponseCode。PULLOFFSETMOVED);XXX:warnandnotifymelog。info(thebrokerstorenoqueuedata,fixtherequestoffset{}to{},Topic:{}QueueId:{}ConsumerGroup:{},requestHeader。getQueueOffset(),getMessageResult。getNextBeginOffset(),requestHeader。getTopic(),requestHeader。getQueueId(),requestHeader。getConsumerGroup());}else{response。setCode(ResponseCode。PULLNOTFOUND);}break;caseNOMATCHEDMESSAGE:response。setCode(ResponseCode。PULLRETRYIMMEDIATELY);break;caseOFFSETFOUNDNULL:response。setCode(ResponseCode。PULLNOTFOUND);break;caseOFFSETOVERFLOWBADLY:response。setCode(ResponseCode。PULLOFFSETMOVED);XXX:warnandnotifymelog。info(therequestoffset:{}overflowbadly,brokermaxoffset:{},consumer:{},requestHeader。getQueueOffset(),getMessageResult。getMaxOffset(),channel。remoteAddress());break;caseOFFSETOVERFLOWONE:response。setCode(ResponseCode。PULLNOTFOUND);break;caseOFFSETTOOSMALL:response。setCode(ResponseCode。PULLOFFSETMOVED);log。info(therequestoffsettoosmall。group{},topic{},requestOffset{},brokerMinOffset{},clientIp{},requestHeader。getConsumerGroup(),requestHeader。getTopic(),requestHeader。getQueueOffset(),getMessageResult。getMinOffset(),channel。remoteAddress());break;default:assertfalse;break;}执行消费消息的hook函数if(this。hasConsumeMessageHook()){ConsumeMessageContextcontextnewConsumeMessageContext();context。setConsumerGroup(requestHeader。getConsumerGroup());context。setTopic(requestHeader。getTopic());context。setQueueId(requestHeader。getQueueId());Stringownerrequest。getExtFields()。get(BrokerStatsManager。COMMERCIALOWNER);switch(response。getCode()){caseResponseCode。SUCCESS:intcommercialBaseCountbrokerController。getBrokerConfig()。getCommercialBaseCount();intincValuegetMessageResult。getMsgCount4Commercial()commercialBaseCount;context。setCommercialRcvStats(BrokerStatsManager。StatsType。RCVSUCCESS);context。setCommercialRcvTimes(incValue);context。setCommercialRcvSize(getMessageResult。getBufferTotalSize());context。setCommercialOwner(owner);break;caseResponseCode。PULLNOTFOUND:if(!brokerAllowSuspend){context。setCommercialRcvStats(BrokerStatsManager。StatsType。RCVEPOLLS);context。setCommercialRcvTimes(1);context。setCommercialOwner(owner);}break;caseResponseCode。PULLRETRYIMMEDIATELY:caseResponseCode。PULLOFFSETMOVED:context。setCommercialRcvStats(BrokerStatsManager。StatsType。RCVEPOLLS);context。setCommercialRcvTimes(1);context。setCommercialOwner(owner);break;default:assertfalse;break;}执行消费消息回调钩子this。executeConsumeMessageHookBefore(context);}switch(response。getCode()){请求成功caseResponseCode。SUCCESS:数据统计this。brokerController。getBrokerStatsManager()。incGroupGetNums(requestHeader。getConsumerGroup(),requestHeader。getTopic(),getMessageResult。getMessageCount());this。brokerController。getBrokerStatsManager()。incGroupGetSize(requestHeader。getConsumerGroup(),requestHeader。getTopic(),getMessageResult。getBufferTotalSize());this。brokerController。getBrokerStatsManager()。incBrokerGetNums(getMessageResult。getMessageCount());从内存发送数据if(this。brokerController。getBrokerConfig()。isTransferMsgByHeap()){finalbyte〔〕rthis。readGetMessageResult(getMessageResult,requestHeader。getConsumerGroup(),requestHeader。getTopic(),requestHeader。getQueueId());this。brokerController。getBrokerStatsManager()。incGroupGetLatency(requestHeader。getConsumerGroup(),requestHeader。getTopic(),requestHeader。getQueueId(),(int)(this。brokerController。getMessageStore()。now()beginTimeMills));response。setBody(r);}从磁盘发送数据else{try{FileRegionfileRegionnewManyMessageTransfer(response。encodeHeader(getMessageResult。getBufferTotalSize()),getMessageResult);channel。writeAndFlush(fileRegion)。addListener(newChannelFutureListener(){OverridepublicvoidoperationComplete(ChannelFuturefuture)throwsException{getMessageResult。release();if(!future。isSuccess()){log。error(transfermanymessagebypagecachefailed,{},channel。remoteAddress(),future。cause());}}});}catch(Throwablee){log。error(transfermanymessagebypagecacheexception,e);getMessageResult。release();}responsenull;}break;caseResponseCode。PULLNOTFOUND:hasSuspendFlag,构建消息拉取时的拉取标记,默认为trueif(brokerAllowSuspendhasSuspendFlag){取自DefaultMQPullConsumer的brokerSuspendMaxTimeMillis属性longpollingTimeMillssuspendTimeoutMillisLong;如果不支持长轮询,则忽略brokerSuspendMaxTimeMillis属性,使用shortPollingTimeMills,默认为1000ms作为下一次拉取消息的等待时间if(!this。brokerController。getBrokerConfig()。isLongPollingEnable()){pollingTimeMillsthis。brokerController。getBrokerConfig()。getShortPollingTimeMills();}StringtopicrequestHeader。getTopic();longoffsetrequestHeader。getQueueOffset();intqueueIdrequestHeader。getQueueId();创建PullRequest,然后提交给PullRequestHoldService线程去调度,触发消息拉取PullRequestpullRequestnewPullRequest(request,channel,pollingTimeMills,this。brokerController。getMessageStore()。now(),offset,subscriptionData,messageFilter);pullRequestHoldService定时线程,最大延迟5秒判断是否有消息到达,然后执行消息的拉取this。brokerController。getPullRequestHoldService()。suspendPullRequest(topic,queueId,pullRequest);设置responsenull,则此时此次调用不会向客户端输出任何字节,客户端网络请求客户端的读事件不会触发,不会触发对响应结果的处理,处于等待状态responsenull;break;}caseResponseCode。PULLRETRYIMMEDIATELY:break;caseResponseCode。PULLOFFSETMOVED:if(this。brokerController。getMessageStoreConfig()。getBrokerRole()!BrokerRole。SLAVEthis。brokerController。getMessageStoreConfig()。isOffsetCheckInSlave()){MessageQueuemqnewMessageQueue();mq。setTopic(requestHeader。getTopic());mq。setQueueId(requestHeader。getQueueId());mq。setBrokerName(this。brokerController。getBrokerConfig()。getBrokerName());OffsetMovedEventeventnewOffsetMovedEvent();event。setConsumerGroup(requestHeader。getConsumerGroup());event。setMessageQueue(mq);event。setOffsetRequest(requestHeader。getQueueOffset());event。setOffsetNew(getMessageResult。getNextBeginOffset());this。generateOffsetMovedEvent(event);log。warn(PULLOFFSETMOVED:correctionoffset。topic{},groupId{},requestOffset{},newOffset{},suggestBrokerId{},requestHeader。getTopic(),requestHeader。getConsumerGroup(),event。getOffsetRequest(),event。getOffsetNew(),responseHeader。getSuggestWhichBrokerId());}else{responseHeader。setSuggestWhichBrokerId(subscriptionGroupConfig。getBrokerId());response。setCode(ResponseCode。PULLRETRYIMMEDIATELY);log。warn(PULLOFFSETMOVED:nonecorrection。topic{},groupId{},requestOffset{},suggestBrokerId{},requestHeader。getTopic(),requestHeader。getConsumerGroup(),requestHeader。getQueueOffset(),responseHeader。getSuggestWhichBrokerId());}break;default:assertfalse;}}else{response。setCode(ResponseCode。SYSTEMERROR);response。setRemark(storegetMessagereturnnull);}booleanstoreOffsetEnablebrokerAllowSuspend;storeOffsetEnablestoreOffsetEnablehasCommitOffsetFlag;storeOffsetEnablestoreOffsetEnablethis。brokerController。getMessageStoreConfig()。getBrokerRole()!BrokerRole。SLAVE;基于broker来存储你最新提交的offset偏移量if(storeOffsetEnable){提交offset,保存在内存中ConsumerOffsetManager。offsetTable中this。brokerController。getConsumerOffsetManager()。commitOffset(RemotingHelper。parseChannelRemoteAddr(channel),requestHeader。getConsumerGroup(),requestHeader。getTopic(),requestHeader。getQueueId(),requestHeader。getCommitOffset());}returnresponse;}3、处理请求方法涉及到的核心值对象publicclassSubscriptionGroupConfig{消费组名称privateStringgroupName;是否启用消费privatebooleanconsumeEnabletrue;是否启用从最小偏移量开始消费privatebooleanconsumeFromMinEnabletrue;是否启用消费广播privatebooleanconsumeBroadcastEnabletrue;重试队列数量privateintretryQueueNums1;重试最大次数privateintretryMaxTimes16;masteridprivatelongbrokerIdMixAll。MASTERID;慢消费的时候选用哪个brokerprivatelongwhichBrokerWhenConsumeSlowly1;是否启用通知消费者ids变化privatebooleannotifyConsumerIdsChangedEnabletrue;}核心的topic元数据结构publicclassTopicConfig{privatestaticfinalStringSEPARATOR;publicstaticintdefaultReadQueueNums16;默认的topic是有16个readqueue和writequeuepublicstaticintdefaultWriteQueueNums16;privateStringtopicName;当前你的broker会告诉你的这个nameserver,我这里放了哪些topic的队列privateintreadQueueNumsdefaultReadQueueNums;我的这个broker机器对这个队列,放了多少个readqueue和writequeueprivateintwriteQueueNumsdefaultWriteQueueNums;privateintpermPermName。PERMREADPermName。PERMWRITE;privateTopicFilterTypetopicFilterTypeTopicFilterType。SINGLETAG;默认的topic过滤类型是基于tagprivateinttopicSysFlag0;privatebooleanorderfalse;publicTopicConfig(){}}消费组信息publicclassConsumerGroupInfo{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。BROKERLOGGERNAME);消费组名称privatefinalStringgroupName;消费组订阅数据,这个消费组订阅了哪些topicprivatefinalConcurrentMapStringTopic,SubscriptionDatasubscriptionTablenewConcurrentHashMapString,SubscriptionData();消费组跟broker之间的各个网络连接privatefinalConcurrentMapChannel,ClientChannelInfochannelInfoTablenewConcurrentHashMapChannel,ClientChannelInfo(16);消费类型,pull模型还是push模型privatevolatileConsumeTypeconsumeType;消息模型,集群模式还是广播模式privatevolatileMessageModelmessageModel;从哪里开始消费的策略privatevolatileConsumeFromWhereconsumeFromWhere;最近一次更新时间戳privatevolatilelonglastUpdateTimestampSystem。currentTimeMillis();publicConsumerGroupInfo(StringgroupName,ConsumeTypeconsumeType,MessageModelmessageModel,ConsumeFromWhereconsumeFromWhere){this。groupNamegroupName;this。consumeTypeconsumeType;this。messageModelmessageModel;this。consumeFromWhereconsumeFromWhere;}}publicclassSubscriptionDataimplementsComparableSubscriptionData{publicfinalstaticStringSUBALL;是否启用类过滤模式privatebooleanclassFilterModefalse;topic主题privateStringtopic;子字符串privateStringsubString;tagsprivateSetStringtagsSetnewHashSetString();codeprivateSetIntegercodeSetnewHashSetInteger();privatelongsubVersionSystem。currentTimeMillis();privateStringexpressionTypeExpressionType。TAG;JSONField(serializefalse)privateStringfilterClassSource;publicSubscriptionData(){}}消费者客户端网络连接信息publicclassClientChannelInfo{消费者客户端网络连接privatefinalChannelchannel;消费者客户端网络连接idprivatefinalStringclientId;编程语言codeprivatefinalLanguageCodelanguage;版本号privatefinalintversion;最近一次更新时间戳privatevolatilelonglastUpdateTimestampSystem。currentTimeMillis();publicClientChannelInfo(Channelchannel){this(channel,null,null,0);}publicClientChannelInfo(Channelchannel,StringclientId,LanguageCodelanguage,intversion){this。channelchannel;this。clientIdclientId;this。languagelanguage;this。versionversion;}}4、从messageStore中拉取到具体的消息 根据配置实例化messageFilter,并且从messageStore中拉取到具体的消息。可以看到这里就是方法的核心,接下来继续看getMessage是如何拉取到消息的publicGetMessageResultgetMessage(finalStringgroup,finalStringtopic,finalintqueueId,finallongoffset,finalintmaxMsgNums,finalMessageFiltermessageFilter){判断store是否关闭if(this。shutdown){log。warn(messagestorehasshutdown,sogetMessageisforbidden);returnnull;}判断当前运行状态是否可读if(!this。runningFlags。isReadable()){log。warn(messagestoreisnotreadable,sogetMessageisforbiddenthis。runningFlags。getFlagBits());returnnull;}if(MixAll。isLmq(topic)this。isLmqConsumeQueueNumExceeded()){log。warn(messagestoreisnotavailable,brokerconfigenableLmqandenableMultiDispatch,lmqconsumeQueuenumexceedmaxLmqConsumeQueueNumconfignum);returnnull;}longbeginTimethis。getSystemClock()。now();GetMessageStatusstatusGetMessageStatus。NOMESSAGEINQUEUE;待查找队列的偏移量longnextBeginOffsetoffset;当前队列的最小偏移量longminOffset0;当前队列的最大偏移量longmaxOffset0;lazyinitwhenfindmsg。GetMessageResultgetResultnull;当前commitLog的最大偏移量finallongmaxOffsetPythis。commitLog。getMaxOffset();先获取到这个queueId对应的一个consumequeueConsumeQueueconsumeQueuefindConsumeQueue(topic,queueId);if(consumeQueue!null){获取ConsumeQueue的最小逻辑offsetminOffsetconsumeQueue。getMinOffsetInQueue();最大逻辑offsetmaxOffsetconsumeQueue。getMaxOffsetInQueue();消息队列无数据if(maxOffset0){statusGetMessageStatus。NOMESSAGEINQUEUE;nextBeginOffsetnextOffsetCorrection(offset,0);}查询的队列offset太小elseif(offsetminOffset){statusGetMessageStatus。OFFSETTOOSMALL;nextBeginOffsetnextOffsetCorrection(offset,minOffset);}查询的offset溢出一个elseif(offsetmaxOffset){statusGetMessageStatus。OFFSETOVERFLOWONE;nextBeginOffsetnextOffsetCorrection(offset,offset);}查询的队列offset过大elseif(offsetmaxOffset){statusGetMessageStatus。OFFSETOVERFLOWBADLY;if(0minOffset){nextBeginOffsetnextOffsetCorrection(offset,minOffset);}else{nextBeginOffsetnextOffsetCorrection(offset,maxOffset);}}else{根据指定的offset可以去查找出一段consumequeue里面的数据SelectMappedBufferResultbufferConsumeQueueconsumeQueue。getIndexBuffer(offset);用于检测bufferConsumeQueue中每个offset对应的物理偏移量的commitLog数据是否存在if(bufferConsumeQueue!null){try{预先设置状态未NOMATCHEDMESSAGEstatusGetMessageStatus。NOMATCHEDMESSAGE;longnextPhyFileStartOffsetLong。MINVALUE;longmaxPhyOffsetPulling0;inti0;最多需要校验的消息条数finalintmaxFilterMessageCountMath。max(16000,maxMsgNumsConsumeQueue。CQSTOREUNITSIZE);是否记录消费落后磁盘量finalbooleandiskFallRecordedthis。messageStoreConfig。isDiskFallRecorded();getResultnewGetMessageResult(maxMsgNums);ConsumeQueueExt。CqExtUnitcqExtUnitnewConsumeQueueExt。CqExtUnit();for(;ibufferConsumeQueue。getSize()imaxFilterMessageCount;iConsumeQueue。CQSTOREUNITSIZE){对这段consumequeue里面的数据,每一条数据都是一个消息offsetsizestags读取第18个字节为物理偏移量offsetPylongoffsetPybufferConsumeQueue。getByteBuffer()。getLong();第912个字节为消息大小sizePyintsizePybufferConsumeQueue。getByteBuffer()。getInt();第1316个字节为tagsCodelongtagsCodebufferConsumeQueue。getByteBuffer()。getLong();maxPhyOffsetPullingoffsetPy;表示上一轮的消息,未在commitLog获取到if(nextPhyFileStartOffset!Long。MINVALUE){如果下一个消息起始offset大于当前的要获取的offsetPyif(offsetPynextPhyFileStartOffset)continue;}通过最大偏移量当前数据偏移量和内存大小作比较判断数据是否可以从内存获取booleanisInDiskcheckInDiskByCommitOffset(offsetPy,maxOffsetPy);检查拉取的消息总大小是否到达上限,如果达到则中止这次消息拉取if(this。isTheBatchFull(sizePy,maxMsgNums,getResult。getBufferTotalSize(),getResult。getMessageCount(),isInDisk)){break;}booleanextRetfalse,isTagsCodeLegaltrue;判断tagsCode是否小于等于0if(consumeQueue。isExtAddr(tagsCode)){读取下一条消息,保存到cqExtUnit中extRetconsumeQueue。getExt(tagsCode,cqExtUnit);if(extRet){tagsCodecqExtUnit。getTagsCode();}else{cantfindextcontent。Clientwillfiltermessagesbytagalso。log。error(〔BUG〕cantfindconsumequeueextendfilecontent!addr{},offsetPy{},sizePy{},topic{},group{},tagsCode,offsetPy,sizePy,topic,group);isTagsCodeLegalfalse;}}消息过滤,如果匹配不成功并且消息为空,暂时设置状态为NOMATCHEDMESSAGE匹配成功有以下几种情况SubscriptionData对象为空;SubscriptionData。classFilterMode变量为true;SubscriptionData对象的subString变量等于;SubscriptionData对象的codeSet集合包含tagsCode值;if(messageFilter!null!messageFilter。isMatchedByConsumeQueue(isTagsCodeLegal?tagsCode:null,extRet?cqExtUnit:null)){if(getResult。getBufferTotalSize()0){statusGetMessageStatus。NOMATCHEDMESSAGE;}continue;}根据consumequeue里指定的消息物理便偏移量,去commitlog查询一条消息SelectMappedBufferResultselectResultthis。commitLog。getMessage(offsetPy,sizePy);if(nullselectResult){if(getResult。getBufferTotalSize()0){statusGetMessageStatus。MESSAGEWASREMOVING;}未读取到消息,表示对应的mappedFile已经删除,从下一个文件的起始位置开始读取消息nextPhyFileStartOffsetthis。commitLog。rollNextFile(offsetPy);continue;}过滤真实的消息if(messageFilter!null!messageFilter。isMatchedByCommitLog(selectResult。getByteBuffer()。slice(),null)){if(getResult。getBufferTotalSize()0){statusGetMessageStatus。NOMATCHEDMESSAGE;}release。。。selectResult。release();continue;}获取消息计数this。storeStatsService。getGetMessageTransferedMsgCount()。add(1);他会把这条消息数据添加到结果里面去getResult。addMessage(selectResult);statusGetMessageStatus。FOUND;nextPhyFileStartOffsetLong。MINVALUE;}if(diskFallRecorded){消费落后的数据量最大offset此次消费的最大offsetlongfallBehindmaxOffsetPymaxPhyOffsetPulling;记录消费落后的数据量brokerStatsManager。recordDiskFallBehindSize(group,topic,queueId,fallBehind);}下一个开始的offsetnextBeginOffsetoffset(iConsumeQueue。CQSTOREUNITSIZE);longdiffmaxOffsetPymaxPhyOffsetPulling;内存大小longmemory(long)(StoreUtil。TOTALPHYSICALMEMORYSIZE(this。messageStoreConfig。getAccessMessageInMemoryMaxRatio()100。0));当消费进度落后量大于物理内存时,建议调换到从库去处理读getResult。setSuggestPullingFromSlave(diffmemory);}finally{bufferConsumeQueue。release();}}else{没找到消息statusGetMessageStatus。OFFSETFOUNDNULL;设置从下一个mappedFile相同位置开始读取nextBeginOffsetnextOffsetCorrection(offset,consumeQueue。rollNextFile(offset));log。warn(consumerrequesttopic:topicoffset:offsetminOffset:minOffsetmaxOffset:maxOffset,butaccesslogicqueuefailed。);}}}else{statusGetMessageStatus。NOMATCHEDLOGICQUEUE;nextBeginOffsetnextOffsetCorrection(offset,0);}统计拉取消息和未拉取到消息的次数if(GetMessageStatus。FOUNDstatus){this。storeStatsService。getGetMessageTimesTotalFound()。add(1);}else{this。storeStatsService。getGetMessageTimesTotalMiss()。add(1);}longelapsedTimethis。getSystemClock()。now()beginTime;this。storeStatsService。setGetMessageEntireTimeMax(elapsedTime);lazyinitnodatafound。if(getResultnull){getResultnewGetMessageResult(0);}getResult。setStatus(status);getResult。setNextBeginOffset(nextBeginOffset);getResult。setMaxOffset(maxOffset);getResult。setMinOffset(minOffset);returngetResult;}四、总结前置判断;调用MessageStore的getMessage方法拉取消息;执行消费消息回调钩子;基于broker来存储你最新提交的offset偏移量; 消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker端可以通过longPollingEnabletrue来开启长轮询。 注:下文会分析长轮询和短轮询;