应用办公生活信息教育商业
投稿投诉
商业财经
汽车智能
教育国际
房产环球
信息数码
热点科技
生活手机
晨报新闻
办公软件
科学动态
应用生物
体育时事

老弟问我,RocketMQ中的ProcessQueue怎么理

  今天来分享RocketMQ中一个非常重要又不太好理解的知识点ProcessQueue。
  一句话概括,ProcessQueue就是MessageQueue的消费快照。看下面这张图:
  1ProcessQueue构建
  RocketMQ客户端启动时,会开启一个rebalance线程,代码如下:MQClientInstance。javapublicvoidstart()throwsMQClientException{synchronized(this){switch(this。serviceState){caseCREATEJUST:。。。Startrebalanceservicethis。rebalanceService。start();。。。}}}
  这个线程会不停的做重平衡操作,对ProcessQueue进行维护。在重平衡线程类RebalanceImpl定义了一个变量processQueueTable,数据结构如下:
  可以看到,在processQueueTable这个数据结构上维护了MessageQueue和ProcessQueue的映射。
  下面看一下维护processQueueTable的代码:privatebooleanupdateProcessQueueTableInRebalance(finalStringtopic,finalSetMessageQueuemqSet,finalbooleanisOrder){booleanchangedfalse;IteratorEntryMessageQueue,ProcessQueueitthis。processQueueTable。entrySet()。iterator();while(it。hasNext()){EntryMessageQueue,ProcessQueuenextit。next();MessageQueuemqnext。getKey();ProcessQueuepqnext。getValue();if(mq。getTopic()。equals(topic)){if(!mqSet。contains(mq)){从processQueueTable上移除}elseif(pq。isPullExpired()){switch(this。consumeType()){caseCONSUMEACTIVELY:拉模式break;caseCONSUMEPASSIVELY:推模式从processQueueTable上移除break;default:break;}}}}创建ProcessQueue并放到processQueueTableListPullRequestpullRequestListnewArrayListPullRequest();for(MessageQueuemq:mqSet){if(!this。processQueueTable。containsKey(mq)){。。。ProcessQueuepqnewProcessQueue();longnextOffset1L;try{nextOffsetthis。computePullFromWhereWithException(mq);}catch(Exceptione){log。info(doRebalance,{},computeoffsetfailed,{},consumerGroup,mq);continue;}if(nextOffset0){ProcessQueueprethis。processQueueTable。putIfAbsent(mq,pq);if(pre!null){log。info(doRebalance,{},mqalreadyexists,{},consumerGroup,mq);}else{封装好processQueueTable后再创建一个PullRequest进行消息拉取log。info(doRebalance,{},addanewmq,{},consumerGroup,mq);PullRequestpullRequestnewPullRequest();pullRequest。setConsumerGroup(consumerGroup);pullRequest。setNextOffset(nextOffset);pullRequest。setMessageQueue(mq);pullRequest。setProcessQueue(pq);pullRequestList。add(pullRequest);changedtrue;}}else{log。warn(doRebalance,{},addnewmqfailed,{},consumerGroup,mq);}}}this。dispatchPullRequest(pullRequestList);returnchanged;}2拉取消息
  上一节中构建ProcessQueue后,会再创建一个PullRequest,这个PullRequest封装了MessageQueue和ProcessQueue,创建成功后被放到了PullMessageService中的pullRequestQueue变量:PullMessageService。javaprivatefinalLinkedBlockingQueuePullRequestpullRequestQueuenewLinkedBlockingQueuePullRequest();publicvoidexecutePullRequestImmediately(finalPullRequestpullRequest){try{this。pullRequestQueue。put(pullRequest);}catch(InterruptedExceptione){log。error(executePullRequestImmediatelypullRequestQueue。put,e);}}
  这里以RocketMQ的推模式为例,Consumer拉取到消息后,会进行如下处理:对拉取到的消息根据TAG再次进行过滤;更新PullRequest下次拉取的偏移量nextOffset;
  把拉取的消息封装到ProcessQueue的msgTreeMap(放到msgTreeMap之前首先要获取到写锁treeMapLock);封装ConsumeRequest进行消息消费;封装消息拉取请求再次进行拉取。
  代码如下:DefaultMQPushConsumerImpl。javapublicvoidonSuccess(PullResultpullResult){if(pullResult!null){1。对拉取到的消息根据TAG再次进行过滤pullResultDefaultMQPushConsumerImpl。this。pullAPIWrapper。processPullResult(pullRequest。getMessageQueue(),pullResult,subscriptionData);switch(pullResult。getPullStatus()){caseFOUND:2。更新PullRequest下次拉取的偏移量nextOffsetpullRequest。setNextOffset(pullResult。getNextBeginOffset());if(pullResult。getMsgFoundList()nullpullResult。getMsgFoundList()。isEmpty()){DefaultMQPushConsumerImpl。this。executePullRequestImmediately(pullRequest);}else{3。把拉取的消息封装到ProcessQueue的msgTreeMapbooleandispatchToConsumeprocessQueue。putMessage(pullResult。getMsgFoundList());4。封装ConsumeRequest进行消息消费DefaultMQPushConsumerImpl。this。consumeMessageService。submitConsumeRequest(pullResult。getMsgFoundList(),processQueue,pullRequest。getMessageQueue(),dispatchToConsume);5。封装消息拉取请求if(DefaultMQPushConsumerImpl。this。defaultMQPushConsumer。getPullInterval()0){DefaultMQPushConsumerImpl。this。executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl。this。defaultMQPushConsumer。getPullInterval());}else{DefaultMQPushConsumerImpl。this。executePullRequestImmediately(pullRequest);}}break;。。。}}}3消费消息
  在上一节提到过,拉取到消息后,会把消息封装成一个ConsumeRequest,这个线程类会调用消费者定义的MessageListener进行消费处理。看一下源代码:ConsumeMessageConcurrentlyService。ConsumeRequestpublicvoidrun(){if(this。processQueue。isDropped()){log。info(themessagequeuenotbeabletoconsume,becauseitsdropped。group{}{},ConsumeMessageConcurrentlyService。this。consumerGroup,this。messageQueue);return;}MessageListenerConcurrentlylistenerConsumeMessageConcurrentlyService。this。messageListener;ConsumeConcurrentlyContextcontextnewConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatusstatusnull;try{statuslistener。consumeMessage(Collections。unmodifiableList(msgs),context);}。。。if(!processQueue。isDropped()){ConsumeMessageConcurrentlyService。this。processConsumeResult(status,context,this);}}
  消息消费成功后,会调用processConsumeResult方法进行结果处理。对于广播模式,发送失败后不会做重试,相当于把消息丢弃,而对于集群模式,消费失败的消息会发送到Broker端等待消费者重新拉取进行重试。
  消费结果处理完后,消费成功的消息会从ProcessQueue的msgTreeMap中移除(需要获取到写锁treeMapLock),同时从msgTreeMap中获取最小的Offset来更新对应MessageQueue的偏移量。这个逻辑可以参考下面代码:publicvoidprocessConsumeResult(finalConsumeConcurrentlyStatusstatus,finalConsumeConcurrentlyContextcontext,finalConsumeRequestconsumeRequest){intackIndexcontext。getAckIndex();switch(status){caseCONSUMESUCCESS:if(ackIndexconsumeRequest。getMsgs()。size()){ackIndexconsumeRequest。getMsgs()。size()1;}intokackIndex1;break;。。。}switch(this。defaultMQPushConsumer。getMessageModel()){caseBROADCASTING:。。。break;caseCLUSTERING:ListMessageExtmsgBackFailednewArrayListMessageExt(consumeRequest。getMsgs()。size());for(intiackIndex1;iconsumeRequest。getMsgs()。size();i){MessageExtmsgconsumeRequest。getMsgs()。get(i);消费失败的,发送回Brokerbooleanresultthis。sendMessageBack(msg,context);。。。}break;default:break;}从msgTreeMap中移除并返回msgTreeMap第一条消息的offsetlongoffsetconsumeRequest。getProcessQueue()。removeMessage(consumeRequest。getMsgs());if(offset0!consumeRequest。getProcessQueue()。isDropped()){this。defaultMQPushConsumerImpl。getOffsetStore()。updateOffset(consumeRequest。getMessageQueue(),offset,true);}}4消费者限流4。1缓存消息数量
  如果消费者缓存的消息数量大于RocketMQ配置的阈值(默认1000),就会触发延迟拉取,而消费者缓存的消息数量就来自ProcessQueue,看下面代码:longcachedMessageCountprocessQueue。getMsgCount()。get();if(cachedMessageCountthis。defaultMQPushConsumer。getPullThresholdForQueue()){this。executePullRequestLater(pullRequest,PULLTIMEDELAYMILLSWHENFLOWCONTROL);return;}4。2缓存的消息大小
  如果消费者缓存的消息大小大于RocketMQ配置的阈值(默认100M),就会触发延迟拉取,而消费者缓存的消息大小就来自ProcessQueue,看下面代码:longcachedMessageSizeInMiBprocessQueue。getMsgSize()。get()(10241024);if(cachedMessageSizeInMiBthis。defaultMQPushConsumer。getPullThresholdSizeForQueue()){this。executePullRequestLater(pullRequest,PULLTIMEDELAYMILLSWHENFLOWCONTROL);return;}4。3消息间隔
  对于普通消息,如果消费偏移量间隔大于配置的阈值(默认2000),就会触发延迟拉取,而消息间隔就来自ProcessQueue,看下面代码:if(!this。consumeOrderly){if(processQueue。getMaxSpan()this。defaultMQPushConsumer。getConsumeConcurrentlyMaxSpan()){this。executePullRequestLater(pullRequest,PULLTIMEDELAYMILLSWHENFLOWCONTROL);return;}}4。4获取锁失败
  对于顺序消息,如果获取锁失败,也会触发延迟拉取,而判断获取锁是否成功,也是在ProcessQueue,看下面代码:if(processQueue。isLocked()){。。。}else{this。executePullRequestLater(pullRequest,pullTimeDelayMillsWhenException);}5总结
  ProcessQueue是MessageQueue的消费快照,可以协助消费者进行消息拉取、消息消费、更新偏移量、限流。最后,看一下ProcessQueue的数据结构:
  来源:https:mp。weixin。qq。comszB7dM9xt26c6Z04PvYfOmQ

明明前后只差20天,任嘉伦演的周生辰和长意,差别咋就那么大呢任嘉伦称王,天下无双。忘了在哪里看到过这样的一条评论。虽然,这说法有点夸大其词。但仔细想想,任嘉伦演过的不管是广平王李俶,还是小南辰王周生辰,都给观众留下了十……NBA盘点常规赛执教胜场1000场却没有总冠军的传奇教练教练是赢得总冠军的最重要的因素。库班,独行侠老板开篇话:记得本赛季开赛大概一个半月的时间,国王队主教练小沃顿被球队解雇,成为第一位下课的主教练。作为名宿之子,球员时代跟随……外国人在中国给长期在中国工作生活的外籍人的一些建议,值得收藏目前,中国已经成为世界东方的最大的经济体,一举一动都牵动着整个世界的经济大局,外国人来到中国找工作已经变成一种常态。多年以来,外国人到中国工作的情况,已经发生了根本的变化。……霸王龙跟狮子打架谁会赢?这种问题都写进书,这代孩子是要逆天?要论脑洞的逆天程度,谁都比不过人类幼崽,孩子经常问的不少问题,让我这个老母亲哑口无言,完全没办法从科学角度去回答。比如他喜欢一切打架厉害的生物,就时不时过来问我:妈妈,恐龙里面……心脑血管疾病的营养调理,大家必须知道心脑血管疾病的病理基础是动脉粥样硬化,可发生在主动脉、冠状动脉、脑动脉、肢体动脉、肾动脉等处。可使动脉内膜有黄色斑或斑块沉着(内含胆固醇、胆固醇酯,甘油三酯等),并有内膜炎症、……飞撞飞踹!一回合伤俩!科尔怒喷斯玛特你这是故意伤人独行侠和马刺上演绝杀好戏,圈主没安排上头条湖人又一次大败圈主也放过了这次绝佳的吐槽机会因为后台实在是太多圈友想看圈主唠嗑唠嗑勇士了,所以忍痛割爱给大伙蹲了勇士的比赛……笑星巩汉林,让国足再次成为笑话最近几天关于著名笑星巩汉林和男足之间的骂战频繁登上热搜,再加上搅屎棍董贼怂天怂地的一通乱骂,着实让吃瓜群众看了一场好戏!有人说巩汉林是一个门外汉,不懂足球就不要瞎评论!那……全球彩电销量Top15三星第1,小米第5,华为第15,国产8虽然现在小屏越来越挤占大屏的空间,比如Pad、手机、电脑等占了彩电的空间,抢走了用户的时间,但对于很多人而言,家里买一台电视还是必须的,不管看不看。所以这些年以来,虽然彩……3位知名男星塌房!为户口陪睡男大佬,2天潜规则2位女星,毁三4月16日,娱乐圈再次迎来3大丑闻瓜,两个和出轨有关,一个和没有底线有关,不得不说,现如今的娱乐圈真的是,只有想不到的,没有他们做不到的。首先第一个瓜就来自赵弈钦,被女友……又见海棠花开!快来这里邂逅济南醉美的春天!阳春三月,春光明媚,天下第一泉风景区内柳条抽芽,百花争艳。其中,有花贵妃、花中神仙之称的海棠,姿容优美,娇艳芬芳,恰在绿肥红肥时,更让人们沉醉。海棠是蔷薇科苹果属多种植物……人在低谷时,如何治愈自己?这是我听过最好的答案最近,看到一个新闻,内心久久不能平静。4月6日,湖南张家界,4名来自不同地方的游客,相约在天门山景区。在到达山顶处时,毫无预兆下,先后翻越护栏跳崖。结果,3人……再次82场全勤!科尔卢尼就是那种你在选秀中最想得到的球员直播吧4月10日讯今日NBA常规赛收官战,勇士客场157101大破开拓者。勇士主帅科尔赛后接受了采访。谈到本赛季再次82场全勤的卢尼,科尔不吝赞美之词,他说道:卢尼正是那……
15岁全红禅身高突破1。5米变美!哥哥戴着草帽出现在镜头前全红禅今年15岁。她是中国跳水队最年轻的运动员之一,肩负着沉重的责任。据悉,跳水队长周继红决定让全红禅与陈芋喜组成新搭档,参加女子双人10米台比赛。对于红姐姐来说,这是她的信任……便宜的谷维素,作用有很多,但长期服用,会遇到3个问题什么是谷维素?谷维素是从米糠油中提取的以三萜醇为主要组分的阿魏酸酯混合物,临床主要用于周期性精神病、脑震荡后遗症、自主神经功能失调。其实谷维素就是一种维生素,主要成……一眼就惊艳的高质量文案,收藏起来吧1、请看护你曾经的激情和理想,在这个怀疑的时代,我们依然需要信仰。2、人类建议:不要透过伤害自己和故意让对方生气而试图用对方的同情或是紧张去测量彼此之间爱的温度。3……比利时宣称遭中国黑客攻击,外交部驳斥7月20日,外交部发言人汪文斌主持例行记者会。有记者提问,比利时外交部最近表示,中国黑客对比政府发起了网络攻击,你对此有何回应?图源:外交部汪文斌表示,我们注……地小知恒星知多少恒星是指那些自身都会发光,并且位置相对固定的星体。古代的天文学家认为恒星在星空的位置是固定的,所以给它起名恒星,意思是永恒不变的星。其实恒星也是在不停地高速运动着,它绕星系的中……智解城市病,高德地图出了本书天地交而万物通。纵观人类社会发展的每一步,无不与交通运输领域科技进步密切相关。交通拥堵是困扰各国城市交通管理者的城市病,是居民司空见惯的慢性病,也是专家学者努力破解的疑难病。缓……唐嫣身材太绝了,穿缎面抹胸裙尽显贵气感,气场强大碾压众人不得不说女神唐嫣的身材真的很绝,她的身材和气质都十分的出挑,这一次为自己挑选一条时尚的缎面抹胸连衣裙,更是展现出了个人的苗条曲线,但她身材瘦而不柴,这身穿搭看上去还挺性感的,碾……这样的牛顿你了解吗?(炼金术炒股专家)说到牛顿你可能会想到:近代物理学之父、经典力学三大定律、万有引力、微积分等!没错,这些都是牛顿的伟大成就。但是这是教科书里面的,当然我写这篇文章也不是诋毁这位伟大的科学家,而是……吃鸡福利更新!优质军需20选2,粉红小猪再次回归务实、不浮夸!我是你们的情报小能手,微笑十倍镜。自从吃鸡手游和平精英的SS20赛季上线以来,许多细心的小伙伴就发现福利商店销声匿迹了,虽然有些不地道,但却是一件好事!众所……3次上市均告失败,这家芯片公司把雷军都坑了?01hr小米也踩坑了?说起国内大公司的商业版图,大家第一时间肯定会想到阿里和腾讯。两家都是全国互联网公司的巨头,通过投资的方式,控股或持股了不少企业,形成了腾讯系和……健康长寿的人,每天坚持做这2件事,身体会越来越健康随着人们生活水平的提高,大家都开始注意养生,然而我们的平均寿命一直在增加,事实上,只要我们养成良好的生活习惯,我们都是可以健康长寿的。在日常生活中,不仅要多睡多走,我们还……每日游戏报光荣官微仁王官推庆祝仁王2获Steam大奖游戏7折一、光荣官微、仁王官推庆祝《仁王2》获Steam大奖游戏7折优惠中1月4日,Steam年度大奖正式公布,《仁王2:完全版》获得了2021年Steam大奖纵使手残仍大爱奖项……
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网