老弟问我,RocketMQ中的ProcessQueue怎么理
今天来分享RocketMQ中一个非常重要又不太好理解的知识点ProcessQueue。
一句话概括,ProcessQueue就是MessageQueue的消费快照。看下面这张图:
1ProcessQueue构建
RocketMQ客户端启动时,会开启一个rebalance线程,代码如下:MQClientInstance。javapublicvoidstart()throwsMQClientException{synchronized(this){switch(this。serviceState){caseCREATEJUST:。。。Startrebalanceservicethis。rebalanceService。start();。。。}}}
这个线程会不停的做重平衡操作,对ProcessQueue进行维护。在重平衡线程类RebalanceImpl定义了一个变量processQueueTable,数据结构如下:
可以看到,在processQueueTable这个数据结构上维护了MessageQueue和ProcessQueue的映射。
下面看一下维护processQueueTable的代码:privatebooleanupdateProcessQueueTableInRebalance(finalStringtopic,finalSetMessageQueuemqSet,finalbooleanisOrder){booleanchangedfalse;IteratorEntryMessageQueue,ProcessQueueitthis。processQueueTable。entrySet()。iterator();while(it。hasNext()){EntryMessageQueue,ProcessQueuenextit。next();MessageQueuemqnext。getKey();ProcessQueuepqnext。getValue();if(mq。getTopic()。equals(topic)){if(!mqSet。contains(mq)){从processQueueTable上移除}elseif(pq。isPullExpired()){switch(this。consumeType()){caseCONSUMEACTIVELY:拉模式break;caseCONSUMEPASSIVELY:推模式从processQueueTable上移除break;default:break;}}}}创建ProcessQueue并放到processQueueTableListPullRequestpullRequestListnewArrayListPullRequest();for(MessageQueuemq:mqSet){if(!this。processQueueTable。containsKey(mq)){。。。ProcessQueuepqnewProcessQueue();longnextOffset1L;try{nextOffsetthis。computePullFromWhereWithException(mq);}catch(Exceptione){log。info(doRebalance,{},computeoffsetfailed,{},consumerGroup,mq);continue;}if(nextOffset0){ProcessQueueprethis。processQueueTable。putIfAbsent(mq,pq);if(pre!null){log。info(doRebalance,{},mqalreadyexists,{},consumerGroup,mq);}else{封装好processQueueTable后再创建一个PullRequest进行消息拉取log。info(doRebalance,{},addanewmq,{},consumerGroup,mq);PullRequestpullRequestnewPullRequest();pullRequest。setConsumerGroup(consumerGroup);pullRequest。setNextOffset(nextOffset);pullRequest。setMessageQueue(mq);pullRequest。setProcessQueue(pq);pullRequestList。add(pullRequest);changedtrue;}}else{log。warn(doRebalance,{},addnewmqfailed,{},consumerGroup,mq);}}}this。dispatchPullRequest(pullRequestList);returnchanged;}2拉取消息
上一节中构建ProcessQueue后,会再创建一个PullRequest,这个PullRequest封装了MessageQueue和ProcessQueue,创建成功后被放到了PullMessageService中的pullRequestQueue变量:PullMessageService。javaprivatefinalLinkedBlockingQueuePullRequestpullRequestQueuenewLinkedBlockingQueuePullRequest();publicvoidexecutePullRequestImmediately(finalPullRequestpullRequest){try{this。pullRequestQueue。put(pullRequest);}catch(InterruptedExceptione){log。error(executePullRequestImmediatelypullRequestQueue。put,e);}}
这里以RocketMQ的推模式为例,Consumer拉取到消息后,会进行如下处理:对拉取到的消息根据TAG再次进行过滤;更新PullRequest下次拉取的偏移量nextOffset;
把拉取的消息封装到ProcessQueue的msgTreeMap(放到msgTreeMap之前首先要获取到写锁treeMapLock);封装ConsumeRequest进行消息消费;封装消息拉取请求再次进行拉取。
代码如下:DefaultMQPushConsumerImpl。javapublicvoidonSuccess(PullResultpullResult){if(pullResult!null){1。对拉取到的消息根据TAG再次进行过滤pullResultDefaultMQPushConsumerImpl。this。pullAPIWrapper。processPullResult(pullRequest。getMessageQueue(),pullResult,subscriptionData);switch(pullResult。getPullStatus()){caseFOUND:2。更新PullRequest下次拉取的偏移量nextOffsetpullRequest。setNextOffset(pullResult。getNextBeginOffset());if(pullResult。getMsgFoundList()nullpullResult。getMsgFoundList()。isEmpty()){DefaultMQPushConsumerImpl。this。executePullRequestImmediately(pullRequest);}else{3。把拉取的消息封装到ProcessQueue的msgTreeMapbooleandispatchToConsumeprocessQueue。putMessage(pullResult。getMsgFoundList());4。封装ConsumeRequest进行消息消费DefaultMQPushConsumerImpl。this。consumeMessageService。submitConsumeRequest(pullResult。getMsgFoundList(),processQueue,pullRequest。getMessageQueue(),dispatchToConsume);5。封装消息拉取请求if(DefaultMQPushConsumerImpl。this。defaultMQPushConsumer。getPullInterval()0){DefaultMQPushConsumerImpl。this。executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl。this。defaultMQPushConsumer。getPullInterval());}else{DefaultMQPushConsumerImpl。this。executePullRequestImmediately(pullRequest);}}break;。。。}}}3消费消息
在上一节提到过,拉取到消息后,会把消息封装成一个ConsumeRequest,这个线程类会调用消费者定义的MessageListener进行消费处理。看一下源代码:ConsumeMessageConcurrentlyService。ConsumeRequestpublicvoidrun(){if(this。processQueue。isDropped()){log。info(themessagequeuenotbeabletoconsume,becauseitsdropped。group{}{},ConsumeMessageConcurrentlyService。this。consumerGroup,this。messageQueue);return;}MessageListenerConcurrentlylistenerConsumeMessageConcurrentlyService。this。messageListener;ConsumeConcurrentlyContextcontextnewConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatusstatusnull;try{statuslistener。consumeMessage(Collections。unmodifiableList(msgs),context);}。。。if(!processQueue。isDropped()){ConsumeMessageConcurrentlyService。this。processConsumeResult(status,context,this);}}
消息消费成功后,会调用processConsumeResult方法进行结果处理。对于广播模式,发送失败后不会做重试,相当于把消息丢弃,而对于集群模式,消费失败的消息会发送到Broker端等待消费者重新拉取进行重试。
消费结果处理完后,消费成功的消息会从ProcessQueue的msgTreeMap中移除(需要获取到写锁treeMapLock),同时从msgTreeMap中获取最小的Offset来更新对应MessageQueue的偏移量。这个逻辑可以参考下面代码:publicvoidprocessConsumeResult(finalConsumeConcurrentlyStatusstatus,finalConsumeConcurrentlyContextcontext,finalConsumeRequestconsumeRequest){intackIndexcontext。getAckIndex();switch(status){caseCONSUMESUCCESS:if(ackIndexconsumeRequest。getMsgs()。size()){ackIndexconsumeRequest。getMsgs()。size()1;}intokackIndex1;break;。。。}switch(this。defaultMQPushConsumer。getMessageModel()){caseBROADCASTING:。。。break;caseCLUSTERING:ListMessageExtmsgBackFailednewArrayListMessageExt(consumeRequest。getMsgs()。size());for(intiackIndex1;iconsumeRequest。getMsgs()。size();i){MessageExtmsgconsumeRequest。getMsgs()。get(i);消费失败的,发送回Brokerbooleanresultthis。sendMessageBack(msg,context);。。。}break;default:break;}从msgTreeMap中移除并返回msgTreeMap第一条消息的offsetlongoffsetconsumeRequest。getProcessQueue()。removeMessage(consumeRequest。getMsgs());if(offset0!consumeRequest。getProcessQueue()。isDropped()){this。defaultMQPushConsumerImpl。getOffsetStore()。updateOffset(consumeRequest。getMessageQueue(),offset,true);}}4消费者限流4。1缓存消息数量
如果消费者缓存的消息数量大于RocketMQ配置的阈值(默认1000),就会触发延迟拉取,而消费者缓存的消息数量就来自ProcessQueue,看下面代码:longcachedMessageCountprocessQueue。getMsgCount()。get();if(cachedMessageCountthis。defaultMQPushConsumer。getPullThresholdForQueue()){this。executePullRequestLater(pullRequest,PULLTIMEDELAYMILLSWHENFLOWCONTROL);return;}4。2缓存的消息大小
如果消费者缓存的消息大小大于RocketMQ配置的阈值(默认100M),就会触发延迟拉取,而消费者缓存的消息大小就来自ProcessQueue,看下面代码:longcachedMessageSizeInMiBprocessQueue。getMsgSize()。get()(10241024);if(cachedMessageSizeInMiBthis。defaultMQPushConsumer。getPullThresholdSizeForQueue()){this。executePullRequestLater(pullRequest,PULLTIMEDELAYMILLSWHENFLOWCONTROL);return;}4。3消息间隔
对于普通消息,如果消费偏移量间隔大于配置的阈值(默认2000),就会触发延迟拉取,而消息间隔就来自ProcessQueue,看下面代码:if(!this。consumeOrderly){if(processQueue。getMaxSpan()this。defaultMQPushConsumer。getConsumeConcurrentlyMaxSpan()){this。executePullRequestLater(pullRequest,PULLTIMEDELAYMILLSWHENFLOWCONTROL);return;}}4。4获取锁失败
对于顺序消息,如果获取锁失败,也会触发延迟拉取,而判断获取锁是否成功,也是在ProcessQueue,看下面代码:if(processQueue。isLocked()){。。。}else{this。executePullRequestLater(pullRequest,pullTimeDelayMillsWhenException);}5总结
ProcessQueue是MessageQueue的消费快照,可以协助消费者进行消息拉取、消息消费、更新偏移量、限流。最后,看一下ProcessQueue的数据结构:
来源:https:mp。weixin。qq。comszB7dM9xt26c6Z04PvYfOmQ
15岁全红禅身高突破1。5米变美!哥哥戴着草帽出现在镜头前全红禅今年15岁。她是中国跳水队最年轻的运动员之一,肩负着沉重的责任。据悉,跳水队长周继红决定让全红禅与陈芋喜组成新搭档,参加女子双人10米台比赛。对于红姐姐来说,这是她的信任……
便宜的谷维素,作用有很多,但长期服用,会遇到3个问题什么是谷维素?谷维素是从米糠油中提取的以三萜醇为主要组分的阿魏酸酯混合物,临床主要用于周期性精神病、脑震荡后遗症、自主神经功能失调。其实谷维素就是一种维生素,主要成……
一眼就惊艳的高质量文案,收藏起来吧1、请看护你曾经的激情和理想,在这个怀疑的时代,我们依然需要信仰。2、人类建议:不要透过伤害自己和故意让对方生气而试图用对方的同情或是紧张去测量彼此之间爱的温度。3……
比利时宣称遭中国黑客攻击,外交部驳斥7月20日,外交部发言人汪文斌主持例行记者会。有记者提问,比利时外交部最近表示,中国黑客对比政府发起了网络攻击,你对此有何回应?图源:外交部汪文斌表示,我们注……
地小知恒星知多少恒星是指那些自身都会发光,并且位置相对固定的星体。古代的天文学家认为恒星在星空的位置是固定的,所以给它起名恒星,意思是永恒不变的星。其实恒星也是在不停地高速运动着,它绕星系的中……
智解城市病,高德地图出了本书天地交而万物通。纵观人类社会发展的每一步,无不与交通运输领域科技进步密切相关。交通拥堵是困扰各国城市交通管理者的城市病,是居民司空见惯的慢性病,也是专家学者努力破解的疑难病。缓……
唐嫣身材太绝了,穿缎面抹胸裙尽显贵气感,气场强大碾压众人不得不说女神唐嫣的身材真的很绝,她的身材和气质都十分的出挑,这一次为自己挑选一条时尚的缎面抹胸连衣裙,更是展现出了个人的苗条曲线,但她身材瘦而不柴,这身穿搭看上去还挺性感的,碾……
这样的牛顿你了解吗?(炼金术炒股专家)说到牛顿你可能会想到:近代物理学之父、经典力学三大定律、万有引力、微积分等!没错,这些都是牛顿的伟大成就。但是这是教科书里面的,当然我写这篇文章也不是诋毁这位伟大的科学家,而是……
吃鸡福利更新!优质军需20选2,粉红小猪再次回归务实、不浮夸!我是你们的情报小能手,微笑十倍镜。自从吃鸡手游和平精英的SS20赛季上线以来,许多细心的小伙伴就发现福利商店销声匿迹了,虽然有些不地道,但却是一件好事!众所……
3次上市均告失败,这家芯片公司把雷军都坑了?01hr小米也踩坑了?说起国内大公司的商业版图,大家第一时间肯定会想到阿里和腾讯。两家都是全国互联网公司的巨头,通过投资的方式,控股或持股了不少企业,形成了腾讯系和……
健康长寿的人,每天坚持做这2件事,身体会越来越健康随着人们生活水平的提高,大家都开始注意养生,然而我们的平均寿命一直在增加,事实上,只要我们养成良好的生活习惯,我们都是可以健康长寿的。在日常生活中,不仅要多睡多走,我们还……
每日游戏报光荣官微仁王官推庆祝仁王2获Steam大奖游戏7折一、光荣官微、仁王官推庆祝《仁王2》获Steam大奖游戏7折优惠中1月4日,Steam年度大奖正式公布,《仁王2:完全版》获得了2021年Steam大奖纵使手残仍大爱奖项……