仅此一招,再无消息乱序的烦恼
1。概览
RocketMQ早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一MQ使用规范,需要对其进行抽象和封装1。1。背景
RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。
在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:计算destinationprotectedStringcreateDestination(Stringtopic,Stringtag){if(org。apache。commons。lang3。StringUtils。isNotEmpty(tag)){returntopic:tag;}else{returntopic;}}发送信息StringdestinationcreateDestination(topic,tag);SendResultsendResultthis。rocketMQTemplate。syncSendOrderly(destination,msg,shardingKey,2000);
tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。
但,在消费消息时,就变的没那么方便了,简单示例如下:ServiceRocketMQMessageListener(topicconsumertesttopic1,consumerGroupusermessageconsumer1,selectorExpression,consumeModeConsumeMode。ORDERLY)Slf4jpublicclassRocketBasedUserMessageConsumerextendsUserMessageConsumerimplementsRocketMQListenerMessageExt{OverridepublicvoidonMessage(MessageExtmessage){Stringtagmessage。getTags();byte〔〕bodymessage。getBody();log。info(handlemsgbody{},newString(body));switch(tag){caseUserCreatedEvent:UserEvents。UserCreatedEventcreatedEventJSON。parseObject(body,UserEvents。UserCreatedEvent。class);handle(createdEvent);return;caseUserEnableEvent:UserEvents。UserEnableEventenableEventJSON。parseObject(body,UserEvents。UserEnableEvent。class);handle(enableEvent);return;caseUserDisableEvent:UserEvents。UserDisableEventdisableEventJSON。parseObject(body,UserEvents。UserDisableEvent。class);handle(disableEvent);return;caseUserDeletedEvent:UserEvents。UserDeletedEventdeletedEventJSON。parseObject(body,UserEvents。UserDeletedEvent。class);handle(deletedEvent);return;}}}
该方法有几个问题:tag维护成本较高,RocketMQMessageListener设置selectorExpression为,将拉取全部数据,增加通讯成本;如果使用tag1tag2方式,每次调整都需要对代码和配置进行更新,特别容易遗漏;充斥大量模板代码,比如case分支,反序列化,调用业务方法等;API具有侵入性,开发是需要关心RocketMQAPI,存在一定学习成本;1。2。目标
提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:Tag和代码保持一致,不需要多处配置,新增逻辑自动完成Tag注册;消除模板方法,类中只保留核心业务方法,框架完成方法分发、消息反序列化等操作;代码零侵入,仅使用注解,无需了解RocketMQAPI;2。快速入门
框架依赖rocketmqspringbootstarter完成消息发送和回收。2。1。环境准备2。1。1。增加依赖
首先,增加rocketmq相关依赖。dependencygroupIdorg。apache。rocketmqgroupIdrocketmqspringbootstarterartifactIdversion2。2。1versiondependency
然后,增加legostarter。dependencygroupIdcom。geekhalo。legogroupIdlegostarterartifactIdversion0。1。13tagbaseddispatchermessageconsumerSNAPSHOTversiondependency2。1。2。增加配置
在application。yml文件中增加rocketmq配置。rocketmq:nameserver:http:127。0。0。1:9876producer:group:rocketdemo2。2。定义消费者
定义消费者,只需:在Bean上增加TagBasedDispatcherMessageConsumer注解,并指定topic和consumer在Bean的方法上添加HandleTag注解,并指定监听的tag
示例如下:TagBasedDispatcherMessageConsumer(topicconsumertesttopic,consumerusermessageconsumer)publicclassUserMessageConsumer{privatefinalMapLong,ListUserEvents。UserEventeventsMaps。newHashMap();publicvoidclean(){this。events。clear();;}publicListUserEvents。UserEventgetUserEvents(LonguserId){returnthis。events。get(userId);}HandleTag(UserCreatedEvent)publicvoidhandle(UserEvents。UserCreatedEventuserCreatedEvent){ListUserEvents。UserEventuserEventsthis。events。computeIfAbsent(userCreatedEvent。getUserId(),userIdnewArrayList());userEvents。add(userCreatedEvent);}HandleTag(UserEnableEvent)publicvoidhandle(UserEvents。UserEnableEventuserEnableEvent){ListUserEvents。UserEventuserEventsthis。events。computeIfAbsent(userEnableEvent。getUserId(),userIdnewArrayList());userEvents。add(userEnableEvent);}HandleTag(UserDisableEvent)publicvoidhandle(UserEvents。UserDisableEventuserDisableEvent){ListUserEvents。UserEventuserEventsthis。events。computeIfAbsent(userDisableEvent。getUserId(),userIdnewArrayList());userEvents。add(userDisableEvent);}HandleTag(UserDeletedEvent)publicvoidhandle(UserEvents。UserDeletedEventuserDeletedEvent){ListUserEvents。UserEventuserEventsthis。events。computeIfAbsent(userDeletedEvent。getUserId(),userIdnewArrayList());userEvents。add(userDeletedEvent);}}2。3。测试
编写测试用例如下:SpringBootTest(classesDemoApplication。class)Slf4jclassUserMessageConsumerTest{AutowiredprivateUserMessageConsumeruserMessageConsumer;AutowiredprivateRocketMQTemplaterocketMQTemplate;privateListLonguserIds;BeforeEachvoidsetUp()throwsInterruptedException{this。userMessageConsumer。clean();this。userIdsnewArrayList();for(inti0;i100;i){userIds。add(10000Li);}this。userIds。forEach(userIdsendMessage(userId));TimeUnit。SECONDS。sleep(3);}privatevoidsendMessage(LonguserId){Stringtopicconsumertesttopic;{StringtagUserCreatedEvent;UserEvents。UserCreatedEventuserCreatedEventnewUserEvents。UserCreatedEvent();userCreatedEvent。setUserId(userId);userCreatedEvent。setUserName(NameuserId);sendOrderlyMessage(topic,tag,userCreatedEvent);}{StringtagUserEnableEvent;UserEvents。UserEnableEventuserEnableEventnewUserEvents。UserEnableEvent();userEnableEvent。setUserId(userId);userEnableEvent。setUserName(NameuserId);sendOrderlyMessage(topic,tag,userEnableEvent);}{StringtagUserDisableEvent;UserEvents。UserDisableEventuserDisableEventnewUserEvents。UserDisableEvent();userDisableEvent。setUserId(userId);userDisableEvent。setUserName(NameuserId);sendOrderlyMessage(topic,tag,userDisableEvent);}{StringtagUserDeletedEvent;UserEvents。UserDeletedEventuserDeletedEventnewUserEvents。UserDeletedEvent();userDeletedEvent。setUserId(userId);userDeletedEvent。setUserName(NameuserId);sendOrderlyMessage(topic,tag,userDeletedEvent);}}privatevoidsendOrderlyMessage(Stringtopic,Stringtag,UserEvents。UserEventevent){StringshardingKeyString。valueOf(event。getUserId());StringjsonJSON。toJSONString(event);MessageStringmsgMessageBuilder。withPayload(json)。build();StringdestinationcreateDestination(topic,tag);SendResultsendResultthis。rocketMQTemplate。syncSendOrderly(destination,msg,shardingKey,2000);log。info(Sendresultis{}formsg,sendResult,msg);}protectedStringcreateDestination(Stringtopic,Stringtag){if(org。apache。commons。lang3。StringUtils。isNotEmpty(tag)){returntopic:tag;}else{returntopic;}}AfterEachvoidtearDown(){}TestvoidgetUserEvents(){this。userIds。forEach(userId{ListUserEvents。UserEventuserEventsthis。userMessageConsumer。getUserEvents(userId);Assertions。assertEquals(4,userEvents。size());Assertions。assertTrue(userEvents。get(0)instanceofUserEvents。UserCreatedEvent);Assertions。assertTrue(userEvents。get(1)instanceofUserEvents。UserEnableEvent);Assertions。assertTrue(userEvents。get(2)instanceofUserEvents。UserDisableEvent);Assertions。assertTrue(userEvents。get(3)instanceofUserEvents。UserDeletedEvent);});}}
启动时,可以看到如下日志:TagBasedDispatcherConsumerContainer:successtosubscribehttp:127。0。0。1:9876,topicconsumertesttopic,tagUserCreatedEventUserEnableEventUserDeletedEventUserDisableEvent,groupusermessageconsumer
从日志上可以看出,框架以组groupusermessageconsumer创建Consumer,并订阅consumertesttopic的UserCreatedEventUserEnableEventUserDeletedEventUserDisableEvent等Tag,初始化流程符合预期。
测试逻辑比较简单,逻辑如下:创建100个用户每个用户创建并依次发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent消费发送完成后,停顿3秒依次检测每个用户收到的消息,并对顺序进行检测
观察日志,可以看到发送和消费日志交替出现:UserMessageConsumerTest:SendresultisSendResult〔sendStatusSENDOK,msgId2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD,offsetMsgIdC0A8010A00002A9F00000000056077FB,messageQueueMessageQueue〔topicconsumertesttopic,brokerNamebogon,queueId2〕,queueOffset1121〕formsgTagBasedDispatcherConsumerContainer:consume2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FCcost:0ms
用例通过,运行结果符合预期。3。设计扩展3。1。初始化流程
image
框架初始化流程如下:TagBasedDispatcherConsumerContainerRegistry实现Spring的BeanPostProcessor接口,依次对托管bean进行处理;如果Bean上存在TagBasedDispatcherMessageConsumer注解,便会提取配置信息,构建TagBasedDispatcherConsumerContainer实例TagBasedDispatcherConsumerContainer收集方法上的HandleTag注解,结合TagBasedDispatcherMessageConsumer上的topic、consumer等信息构建DefaultMQPushConsumer并完成topic和tag的订阅TagBasedDispatcherConsumerContainer内部会构建tag与method的映射关系,以对指定tag进行处理;3。2。运行流程
image
运行流程如下:
消息发送者将消息发送至MQ;MQ将消息发送至Consumer;Consumer收到消息后,根据tag对消息进行分发;处理器对消息进行反序列化,获取调用参数,然后调用方法执行业务逻辑;4。项目信息
项目仓库地址:https:gitee。comlitao851025lego
项目文档地址:https:gitee。comlitao851025legowikissupportTagBasedDispatcherMessageConsumer