应用办公生活信息教育商业
投稿投诉
商业财经
汽车智能
教育国际
房产环球
信息数码
热点科技
生活手机
晨报新闻
办公软件
科学动态
应用生物
体育时事

仅此一招,再无消息乱序的烦恼

  1。概览
  RocketMQ早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一MQ使用规范,需要对其进行抽象和封装1。1。背景
  RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。
  在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:计算destinationprotectedStringcreateDestination(Stringtopic,Stringtag){if(org。apache。commons。lang3。StringUtils。isNotEmpty(tag)){returntopic:tag;}else{returntopic;}}发送信息StringdestinationcreateDestination(topic,tag);SendResultsendResultthis。rocketMQTemplate。syncSendOrderly(destination,msg,shardingKey,2000);
  tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。
  但,在消费消息时,就变的没那么方便了,简单示例如下:ServiceRocketMQMessageListener(topicconsumertesttopic1,consumerGroupusermessageconsumer1,selectorExpression,consumeModeConsumeMode。ORDERLY)Slf4jpublicclassRocketBasedUserMessageConsumerextendsUserMessageConsumerimplementsRocketMQListenerMessageExt{OverridepublicvoidonMessage(MessageExtmessage){Stringtagmessage。getTags();byte〔〕bodymessage。getBody();log。info(handlemsgbody{},newString(body));switch(tag){caseUserCreatedEvent:UserEvents。UserCreatedEventcreatedEventJSON。parseObject(body,UserEvents。UserCreatedEvent。class);handle(createdEvent);return;caseUserEnableEvent:UserEvents。UserEnableEventenableEventJSON。parseObject(body,UserEvents。UserEnableEvent。class);handle(enableEvent);return;caseUserDisableEvent:UserEvents。UserDisableEventdisableEventJSON。parseObject(body,UserEvents。UserDisableEvent。class);handle(disableEvent);return;caseUserDeletedEvent:UserEvents。UserDeletedEventdeletedEventJSON。parseObject(body,UserEvents。UserDeletedEvent。class);handle(deletedEvent);return;}}}
  该方法有几个问题:tag维护成本较高,RocketMQMessageListener设置selectorExpression为,将拉取全部数据,增加通讯成本;如果使用tag1tag2方式,每次调整都需要对代码和配置进行更新,特别容易遗漏;充斥大量模板代码,比如case分支,反序列化,调用业务方法等;API具有侵入性,开发是需要关心RocketMQAPI,存在一定学习成本;1。2。目标
  提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:Tag和代码保持一致,不需要多处配置,新增逻辑自动完成Tag注册;消除模板方法,类中只保留核心业务方法,框架完成方法分发、消息反序列化等操作;代码零侵入,仅使用注解,无需了解RocketMQAPI;2。快速入门
  框架依赖rocketmqspringbootstarter完成消息发送和回收。2。1。环境准备2。1。1。增加依赖
  首先,增加rocketmq相关依赖。dependencygroupIdorg。apache。rocketmqgroupIdrocketmqspringbootstarterartifactIdversion2。2。1versiondependency
  然后,增加legostarter。dependencygroupIdcom。geekhalo。legogroupIdlegostarterartifactIdversion0。1。13tagbaseddispatchermessageconsumerSNAPSHOTversiondependency2。1。2。增加配置
  在application。yml文件中增加rocketmq配置。rocketmq:nameserver:http:127。0。0。1:9876producer:group:rocketdemo2。2。定义消费者
  定义消费者,只需:在Bean上增加TagBasedDispatcherMessageConsumer注解,并指定topic和consumer在Bean的方法上添加HandleTag注解,并指定监听的tag
  示例如下:TagBasedDispatcherMessageConsumer(topicconsumertesttopic,consumerusermessageconsumer)publicclassUserMessageConsumer{privatefinalMapLong,ListUserEvents。UserEventeventsMaps。newHashMap();publicvoidclean(){this。events。clear();;}publicListUserEvents。UserEventgetUserEvents(LonguserId){returnthis。events。get(userId);}HandleTag(UserCreatedEvent)publicvoidhandle(UserEvents。UserCreatedEventuserCreatedEvent){ListUserEvents。UserEventuserEventsthis。events。computeIfAbsent(userCreatedEvent。getUserId(),userIdnewArrayList());userEvents。add(userCreatedEvent);}HandleTag(UserEnableEvent)publicvoidhandle(UserEvents。UserEnableEventuserEnableEvent){ListUserEvents。UserEventuserEventsthis。events。computeIfAbsent(userEnableEvent。getUserId(),userIdnewArrayList());userEvents。add(userEnableEvent);}HandleTag(UserDisableEvent)publicvoidhandle(UserEvents。UserDisableEventuserDisableEvent){ListUserEvents。UserEventuserEventsthis。events。computeIfAbsent(userDisableEvent。getUserId(),userIdnewArrayList());userEvents。add(userDisableEvent);}HandleTag(UserDeletedEvent)publicvoidhandle(UserEvents。UserDeletedEventuserDeletedEvent){ListUserEvents。UserEventuserEventsthis。events。computeIfAbsent(userDeletedEvent。getUserId(),userIdnewArrayList());userEvents。add(userDeletedEvent);}}2。3。测试
  编写测试用例如下:SpringBootTest(classesDemoApplication。class)Slf4jclassUserMessageConsumerTest{AutowiredprivateUserMessageConsumeruserMessageConsumer;AutowiredprivateRocketMQTemplaterocketMQTemplate;privateListLonguserIds;BeforeEachvoidsetUp()throwsInterruptedException{this。userMessageConsumer。clean();this。userIdsnewArrayList();for(inti0;i100;i){userIds。add(10000Li);}this。userIds。forEach(userIdsendMessage(userId));TimeUnit。SECONDS。sleep(3);}privatevoidsendMessage(LonguserId){Stringtopicconsumertesttopic;{StringtagUserCreatedEvent;UserEvents。UserCreatedEventuserCreatedEventnewUserEvents。UserCreatedEvent();userCreatedEvent。setUserId(userId);userCreatedEvent。setUserName(NameuserId);sendOrderlyMessage(topic,tag,userCreatedEvent);}{StringtagUserEnableEvent;UserEvents。UserEnableEventuserEnableEventnewUserEvents。UserEnableEvent();userEnableEvent。setUserId(userId);userEnableEvent。setUserName(NameuserId);sendOrderlyMessage(topic,tag,userEnableEvent);}{StringtagUserDisableEvent;UserEvents。UserDisableEventuserDisableEventnewUserEvents。UserDisableEvent();userDisableEvent。setUserId(userId);userDisableEvent。setUserName(NameuserId);sendOrderlyMessage(topic,tag,userDisableEvent);}{StringtagUserDeletedEvent;UserEvents。UserDeletedEventuserDeletedEventnewUserEvents。UserDeletedEvent();userDeletedEvent。setUserId(userId);userDeletedEvent。setUserName(NameuserId);sendOrderlyMessage(topic,tag,userDeletedEvent);}}privatevoidsendOrderlyMessage(Stringtopic,Stringtag,UserEvents。UserEventevent){StringshardingKeyString。valueOf(event。getUserId());StringjsonJSON。toJSONString(event);MessageStringmsgMessageBuilder。withPayload(json)。build();StringdestinationcreateDestination(topic,tag);SendResultsendResultthis。rocketMQTemplate。syncSendOrderly(destination,msg,shardingKey,2000);log。info(Sendresultis{}formsg,sendResult,msg);}protectedStringcreateDestination(Stringtopic,Stringtag){if(org。apache。commons。lang3。StringUtils。isNotEmpty(tag)){returntopic:tag;}else{returntopic;}}AfterEachvoidtearDown(){}TestvoidgetUserEvents(){this。userIds。forEach(userId{ListUserEvents。UserEventuserEventsthis。userMessageConsumer。getUserEvents(userId);Assertions。assertEquals(4,userEvents。size());Assertions。assertTrue(userEvents。get(0)instanceofUserEvents。UserCreatedEvent);Assertions。assertTrue(userEvents。get(1)instanceofUserEvents。UserEnableEvent);Assertions。assertTrue(userEvents。get(2)instanceofUserEvents。UserDisableEvent);Assertions。assertTrue(userEvents。get(3)instanceofUserEvents。UserDeletedEvent);});}}
  启动时,可以看到如下日志:TagBasedDispatcherConsumerContainer:successtosubscribehttp:127。0。0。1:9876,topicconsumertesttopic,tagUserCreatedEventUserEnableEventUserDeletedEventUserDisableEvent,groupusermessageconsumer
  从日志上可以看出,框架以组groupusermessageconsumer创建Consumer,并订阅consumertesttopic的UserCreatedEventUserEnableEventUserDeletedEventUserDisableEvent等Tag,初始化流程符合预期。
  测试逻辑比较简单,逻辑如下:创建100个用户每个用户创建并依次发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent消费发送完成后,停顿3秒依次检测每个用户收到的消息,并对顺序进行检测
  观察日志,可以看到发送和消费日志交替出现:UserMessageConsumerTest:SendresultisSendResult〔sendStatusSENDOK,msgId2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD,offsetMsgIdC0A8010A00002A9F00000000056077FB,messageQueueMessageQueue〔topicconsumertesttopic,brokerNamebogon,queueId2〕,queueOffset1121〕formsgTagBasedDispatcherConsumerContainer:consume2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FCcost:0ms
  用例通过,运行结果符合预期。3。设计扩展3。1。初始化流程
  image
  框架初始化流程如下:TagBasedDispatcherConsumerContainerRegistry实现Spring的BeanPostProcessor接口,依次对托管bean进行处理;如果Bean上存在TagBasedDispatcherMessageConsumer注解,便会提取配置信息,构建TagBasedDispatcherConsumerContainer实例TagBasedDispatcherConsumerContainer收集方法上的HandleTag注解,结合TagBasedDispatcherMessageConsumer上的topic、consumer等信息构建DefaultMQPushConsumer并完成topic和tag的订阅TagBasedDispatcherConsumerContainer内部会构建tag与method的映射关系,以对指定tag进行处理;3。2。运行流程
  image
  运行流程如下:
  消息发送者将消息发送至MQ;MQ将消息发送至Consumer;Consumer收到消息后,根据tag对消息进行分发;处理器对消息进行反序列化,获取调用参数,然后调用方法执行业务逻辑;4。项目信息
  项目仓库地址:https:gitee。comlitao851025lego
  项目文档地址:https:gitee。comlitao851025legowikissupportTagBasedDispatcherMessageConsumer

足球报河南队考虑一线队全方位降薪黄紫昌去留未定直播吧3月2日讯据《足球报》报道,河南队考虑一线队全方位降薪,上赛季表现出色的黄紫昌去留未定。谈及河南队主帅哈维尔离队的情况,《足球报》表示,基于俱乐部股权可能发生变化,……爱在空气中当南太平洋上空形成漩涡状的云层时,我们不禁注意到它们心形的形状。但是流体力学对天空中的情人节有一个可靠的解释。这种模式被称为冯卡兰姆涡旋,这种现象在地球上的许多地方……大爆冷!埃格努28分孤立无援,美国主攻0分,13惨遭逆转在20212022赛季意大利女排甲级联赛第18轮的强强对话中,领头羊科内利亚诺俱乐部意外先赢后输,最终以13(2523202525271325)输给了蒙扎俱乐部,本赛季继03输……64!斯诺克诞生2席8强,2个威尔逊全败,中国双星登场今晨,2022年斯诺克球员锦标赛率先结束2场比赛,2席8强诞生。马克威廉姆斯、罗伯逊2大世界冠军拒绝被爆冷,分别以63和64击败加里威尔逊和凯伦威尔逊豪取开门红。中国双星赵心童……超旗舰之全新演绎64AudioFourtBlanc外网评测在最新的名为Blanc的版本中,来自64Audio的这款IEM全球限量500台,这家来自华盛顿州温哥华的知名制造商将他们的顶级产品进行了新的诠释,如果你的预算不是问题,那么64……11月1日要闻回顾商务部部署今冬明春蔬菜等生活必需品市场保供【中方正式提出申请加入《数字经济伙伴关系协定》(DEPA)】11月1日,中国商务部部长王文涛致信新西兰贸易与出口增长部长奥康纳,代表中方向《数字经济伙伴关系协定》(DEPA)保……娱乐圈8位换脸大佬,有人成为整容模板,有人整出了后遗症在这个看脸的时代,娱乐圈因样貌出众的人,不占少数。无论男女,对完美样貌的追求,从未停止。许多明星为了美,在脸上动过不少心思,特别是在女明星中,她们的容貌竞争尤为激烈。……上赛季饱受质疑,本赛季重新证明自己的5大球星希罗表现亮眼20212022赛季的NBA常规赛,已经开打半个月了。本赛季的各支球队、各位球员们的表现,非常具有戏剧性。热火队、公牛队、猛龙队等等球队都打出了比上赛季要出色的表现,而被寄予厚……华为市场管理流程(MM)详解根据《华为IPD管理体系指南》和《华为市场管理流程指南》中的定义,市场管理流程有个六步骤,第一步骤是理解市场,其中理解市场有两个最重要的输出,一个是关于公司在本市场的宏观目标(……问题探讨不少地方上没钱,同国企大量ampampquot私有化近期,据相关媒体数据显示,各个地方城投公司,累积欠款近60余万亿。不少地方上没有充足资金发展,或者没钱解决民生问题等等,都是现实存在。一句话,很多地方没钱,同二十多年前,大量国……谁在享受新年的美好,谁横下心准备孤独终老头条创作挑战赛谁在享受新年美好(原创:诗一首)谁在享受新年的美好,谁横下心准备孤独终老,活着是为了自己的心情,有缘的人走一起就是依靠。都不为……2017年夏秋西部自驾游36(下)巴音布鲁克4次参观汇总D52自特克斯县往巴音布鲁克,目的是要走独库公路,参观巴音布鲁克。但因天气原因,独库公路已经封闭,只得绕道巩乃斯,行程423km。因内容多,分上下两集,上集主要介绍从特克……
好消息,二老板马云现身西班牙,广州队或迎来翻身良机自从广州恒大淘宝更名为广州队之后,我们一直看到的是最大股东恒大在忙前忙后,包括这次的经济危机事件。但是我们一定不要忘记了,广州队还有一个第二大股东,那就是阿里巴巴。如果说广州队……杜锋喜从天降!赵睿离队后首度现身,回归时间确定,球迷大赞大家好!欢迎来到大胡子体育吧!本期节目将和球迷朋友一起聊一聊广东宏远,聊一聊赵睿。不可否认,广东宏远上赛季能够成功卫冕,运气无疑占据了很大一部分。失去了易建联和马尚布鲁克斯这两……吕布双喜临门,高渐离免费史诗官宣,限定返场4选1,上官婉儿笑大家好,我是王者小皮丘,每天为您分享有趣的王者荣耀新皮肤,新版本消息,以及最新的上分攻略等内容,感谢您的支持。最近,王者荣耀体验服又进行了调整,有不少英雄进行了调整,其中……五张图带你了解胃炎发展过程第一阶段:慢性浅表性胃炎慢性萎缩性胃炎第二阶段:慢性萎缩性胃炎肠。上皮化生、异型增生第三阶段:肠上皮化生、异型增生胃癌〔预防胃癌3步骤〕1。治疗Hp感染……蔡崇信财富缩水!退出美国富豪前100,篮网太烧钱,家底不足百日前,来自美国权威商业杂志《福布斯》的最新榜单,2021年美国富豪TOP400,快船富豪老板鲍尔默,目前以965亿美金(超过6200亿元人民币)的财富值,高居总榜的第9位,也是……基金我们操作的依据是什么满仓者,如果继续涨,就继续持有,因为还有可能创新高,就继续持有,等调整。如果现在连续下跌,就是已经开始调整了,就出一部分,再跌到一定位置,再进。这是一位铁粉朋友的留言,谢谢这位……壹号本OneXPlayer2体验不仅仅是游戏掌机还是个便携本时间放到几年前,提到游戏掌机,你可能只会想到switch。而这两年,得益于硬件更好的整合性,基于X86架构的游戏掌机也不断涌现出来,为移动游戏提供了全新的体验。近期我们也体验到……心路历程一头条创作挑战赛随着年龄的增长,自己心里的想法和对事物的看法都有所不同,每过一个时期回首过去的自己,都会觉得自己傻而天真,不知道小伙伴的你们也是不是和我一样呢?不知从何时起……又被逆转,第3次了!科尔新阵缺点明显,勇士引援迫在眉睫?最多领先14分,最终鏖战到加时赛,以114119不敌对手,勇士3连胜戛然而止,而勇士队距离现在最近的三场失利,全都是被逆转。并且最近这几场都是以科尔的新五小阵容为主。1月……风雨过后,落叶离歌一场持续多日的风雨过后,忽然有了深秋的感觉。萧瑟的风划过行道树的梢头,枝上的黄叶飒飒,唱响又一季岁月的离歌。片片叶子由最初的新绿到满树葱茏,直至现在的泛黄飘落,难忘的时光……科学家警告外星生物会搭乘人类飞船返回地球造成入侵破坏千百年来,人类一直在寻找外星生命甚至外星文明的踪迹,但你有没有想过,也许外星人也对我们好奇,甚至悄然来到地球?在11月17日《生物科学》发表的最新论文中,加拿大麦吉尔大学教授A……秘鲁最年轻妈妈5岁怀孕6岁生子,儿子到死都不知道生父是谁文贝贝豆育儿课堂(原创文章,欢迎转载分享)晚婚晚育是我国现行计划生育政策中重要的一项,晚育不仅有利于女性发展自己的事业,做时代的半边天,更有利于优生优育,延长人口替代的周……
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网