应用办公生活信息教育商业
投稿投诉
商业财经
汽车智能
教育国际
房产环球
信息数码
热点科技
生活手机
晨报新闻
办公软件
科学动态
应用生物
体育时事

基于RocketMQ的分布式事务解决方案

  前言什么分布式事务?
  随着互联网的快速发展,软件系统由原来的单体应用转变为分布式应用,分布式系统会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务。分布式事务产生的场景
  例1:典型的场景就是微服务架构微服务之间通过远程调用完成事务操作。
  比如:订单微服务和库存微服务,下单的同时订单微服务请求库存微服务减库存。简言之:跨JVM进程产生分布式事务。
  例2:单体系统访问多个数据库实例当单体系统需要访问多个数据库(实例)时就会产生分布式事务。
  比如:用户信息和订单信息分别在两个MySQL实例存储,用户管理系统删除用户信息,需要分别删除用户信息及用户的订单信息,由于数据分布在不同的数据实例,需要通过不同的数据库链接去操作数据,此时产生分布式事务。简言之:跨数据库实例产生分布式事务。如何解决分布式事务
  根据CAP和BASE理论,分布式事务解决的核心思想主要是:无法做到强一致性,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性,而最终一致性涉及到方式主要是二阶段提交和三阶段提交。
  目前主要解决分布式事务的方式是通过RocketMQ或者阿里推出的SEATA框架解决,本文主要是通过RocketMQ实操来处理分布式事务的场景。RocketMQ基本使用
  关于RocketMQ的基本的消息发送方式和消息类型,大家可以到官网自行学习:RocketMQ官网文档,MQ解决分布式事务主要是通过事务消息的方式来解决。实操
  因为这篇文章主要是针对分布式事务,所以建表和业务逻辑不是十分严谨,不过作者会尽可能的保证分布式事务、MQ使用的严谨性。场景
  场景比较简单,主要是一个跨行转账的操作,例:手机号为XXX的用户将钱从农行转到华夏银行。建表
  数据库test:存在表abcperson、transferdetail
  建表sql:CREATETABLEabcperson(useridint(11)NOTNULLCOMMENT用户编号,namevarchar(20)DEFAULTCOMMENT用户名称,idcardvarchar(20)DEFAULTNULLCOMMENT身份证号,banlancedecimal(10,2)DEFAULTNULLCOMMENT余额,mobilevarchar(12)DEFAULTCOMMENT手机号,createtimedatetimeDEFAULTNULLCOMMENT创建时间,updatetimedatetimeDEFAULTNULLCOMMENT更新时间,deleteflgchar(1)DEFAULT0COMMENT删除状态,PRIMARYKEY(userid))ENGINEInnoDBDEFAULTCHARSETutf8COMMENT用户表;CREATETABLEtransferdetail(idint(11)NOTNULLAUTOINCREMENTCOMMENT明细ID,useridint(11)NOTNULLDEFAULT0COMMENT用户ID,moneydecimal(10,2)DEFAULT0。00COMMENT转账金额,msgidvarchar(50)DEFAULTCOMMENT消息ID,deleteflgchar(1)DEFAULT0COMMENT是否删除状态,createtimetimestampNULLDEFAULTCURRENTTIMESTAMP,PRIMARYKEY(id))ENGINEInnoDBAUTOINCREMENT7DEFAULTCHARSETutf8COMMENT消息发送表;
  数据库test1:存在表hxperson
  建表sql:CREATETABLEhxperson(useridint(11)NOTNULLCOMMENT用户编号,namevarchar(20)DEFAULTCOMMENT用户名称,idcardvarchar(20)DEFAULTNULLCOMMENT身份证号,banlancedecimal(10,0)DEFAULTNULLCOMMENT余额,mobilevarchar(12)DEFAULTCOMMENT手机号,createtimedatetimeDEFAULTNULLCOMMENT创建时间,updatetimedatetimeDEFAULTNULLCOMMENT更新时间,deleteflgchar(1)DEFAULTNULLCOMMENT0,PRIMARYKEY(userid))ENGINEInnoDBDEFAULTCHARSETutf8;生产者PostMapping(abcToHx)publicStringabcToHx(Stringmobile,BigDecimaltransferMoney){AbcPersonabcPersonuserService。getByMobile(mobile);if(ObjectUtil。isNotEmpty(abcPerson)ObjectUtil。isNotEmpty(transferMoney)abcPerson。getBanlance()。doubleValue()transferMoney。doubleValue()){TransferDtotransferDtonewTransferDto();transferDto。setMobile(mobile);transferDto。setMoney(transferMoney);transferDto。setUserId(abcPerson。getUserId());transferDto。setDistributedId(snowFlakeUtil。snowflakeId());1发送半消息Stringdestinationtransfertopic:toHx;MessagemessageMessageBuilder。withPayload(JSON。toJSONString(transferDto))。build();TransactionSendResultresultrocketMQTemplate。sendMessageInTransaction(destination,message,null);log。warn(发送半消息:message,响应内容:result);returnSUCCESS;}returnFAIL;}解释:
  标注1:之所以使用雪花算法生成唯一ID,是为了消费者消费时,确保消息不会重复消费,所以通过唯一ID确定(虽然消息ID通常是唯一的,不过在特定情况下可能会出现消息ID不同,但实际消息内容一样的情况(消费者主动重发、因客户端重投机制导致的重复等),这样会出现重复消费的问题,所以需要其他唯一标识保证消息消费的幂等性问题。)详情可看RocketMQ官网的最佳实践的解释,解释如下:牛逼啊!接私活必备的N个开源项目!赶快收藏2。1消费过程幂等
  RocketMQ无法避免消息重复(ExactlyOnce),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。
  首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。
  实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过
  msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
  RocektMQ在发送半消息时,会调用我们重写的监听器的executeLocalTransaction(Messagemsg,Objectarg)方法来执行本地事务OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){StringjsonStrnewString((byte〔〕)msg。getPayload());log。error(arg:arg执行本地事务:JSON。toJSONString(msg),传输字段:jsonStr);TransferDtotransferDtoJSON。parseObject(jsonStr,TransferDto。class);booleanflaguserService。transferMoney(transferDto。getUserId(),transferDto。getMoney(),transferDto。getDistributedId()。toString());在提交本地事务到return期间,可能因为生产者异常或网络等问题,导致MQ未接收到半消息的状态,RocketMQ机制是:后续会调用checkLocalTransaction检查本地事务的执行情况if(flag){log。warn(executeLocalTransaction本地事务执行完成,提交:JSON。toJSONString(msg));说明本地事务执行成功,事务消息提交returnRocketMQLocalTransactionState。COMMIT;}else{log。warn(executeLocalTransaction本地事务执行失败,ROLLBACK);本地事务执行失败,事务消息回滚returnRocketMQLocalTransactionState。ROLLBACK;}}OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringjsonStrnewString((byte〔〕)msg。getPayload());TransferDtotransferDtoJSON。parseObject(jsonStr,TransferDto。class);TransferDetailtransferDetailtransferDetailService。getByMsgId(transferDto。getDistributedId()。toString());if(ObjectUtil。isNotEmpty(transferDetail)){log。warn(本地事务执行完成,提交:JSON。toJSONString(msg));说明本地事务执行成功,事务消息提交returnRocketMQLocalTransactionState。COMMIT;}else{本地事务执行失败,事务消息回滚returnRocketMQLocalTransactionState。ROLLBACK;}}
  UserService的transferMoney方法:TransactionalOverridepublicbooleantransferMoney(IntegeruserId,BigDecimalmoney,StringmsgId){booleanflagfalse;TransferDetailtransferDetailnewTransferDetail();transferDetail。setMoney(money);transferDetail。setUserId(userId);transferDetail。setMsgId(msgId);flagtransferDetailDao。insert(transferDetail)0;throwRuntimeException(flag,消息存储失败,异常回滚。。。);加悲观锁AbcPersonlockAbcPersongetByIdForUpdate(userId);if(ObjectUtil。isNotEmpty(lockAbcPerson)lockAbcPerson。getBanlance()。doubleValue()money。doubleValue()){修改用户金额BigDecimalcurrentBanlancenewBigDecimal(lockAbcPerson。getBanlance()。doubleValue()money。doubleValue());lockAbcPerson。setBanlance(currentBanlance);flagupdate(lockAbcPerson)0?true:false;throwRuntimeException(flag,修改用户金额失败,异常回滚。。。);}returnflag;}publicvoidthrowRuntimeException(booleanflag,Stringmsg){if(!flag){thrownewRuntimeException(msg);}}
  解释:在执行完transferMoney方法到returnRocketMQLocalTransactionState期间,可能因为生产者异常或网络等问题,导致MQ未接收到半消息的状态,RocketMQ机制是:后续会调用checkLocalTransaction检查本地事务的执行情况TransferDetail主要是将本地事务执行情况落磁盘,保证后续的checkLocalTransaction()可以通过回查数据,来确定消息是提交还是回滚。transferMoney方法之所以先加明细再加悲观锁,是为了降低不必要的加锁时间,提升性能。另外,搜索公众号顶级科技后台回复API接口,获取一份惊喜礼包。消费者
  主要是通过实现RocketMQListener接口,监听响应的消息,来给出响应。OverridepublicvoidonMessage(MessageExtmessage){Stringkeynull;Stringvaluenull;try{构建存储redis的key、value,目的是为了保证消息不会被重复消费StringmsgIdmessage。getMsgId();TransferDtotransferDtoJSON。parseObject(newString(message。getBody()),TransferDto。class);keymq:transferDto。getDistributedId();log。warn(获取到当前消息的msgId:msgId);valueThread。currentThread()。getId():System。currentTimeMillis();加分布式锁booleanflagredisTemplate。opsForValue()。setIfAbsent(key,value,1,TimeUnit。HOURS);if(flag){try{正常业务流程执行,完成后该消息会自动完成,期间有其他消费者执行该消息,也无法拿到锁flaguserService。transferMoney(transferDto。getMobile(),transferDto。getMoney());if(!flag){thrownewRuntimeException(没有添加金额成功,抛出异常);}log。warn(成功消费);}catch(Exceptione){执行业务出现异常,释放分布式锁,通过value验证,保证不会错误的释放锁通过watch机制保证原子性操作,若watch被打断,则说明该key已经被修改,当然也就无需当前线程释放锁redisTemplate。watch(key);redisTemplate。multi();StringlockValue(String)redisTemplate。opsForValue()。get(key);if(StrUtil。isNotBlank(lockValue)lockValuevalue){redisTemplate。delete(key);}redisTemplate。exec();thrownewRuntimeException(释放分布式锁,因消费失败,故抛出异常);}}else{未拿到锁,抛出异常,则该消息便不会被成功消费thrownewRuntimeException(未拿到锁,不进行消费);}}catch(Exceptione){log。warn(消费异常);thrownewRuntimeException(消费异常);}}
  消费者的UserService,转账操作TransactionalOverridepublicbooleantransferMoney(Stringmobile,BigDecimalmoney){booleanflagfalse;加悲观锁HxPersonhxPersonhxPersonDao。getByIdOrMobileForUpdate(null,mobile);if(ObjectUtil。isNotEmpty(hxPerson)){hxPerson。setBanlance(newBigDecimal(hxPerson。getBanlance()。doubleValue()money。doubleValue()));flaghxPersonDao。update(hxPerson)0?true:false;}returnflag;}验证
  操作前test数据库的表数据
  test1数据库的表数据
  通过调用生产者的转账接口:
  生产消费完成、再看下两个库的数据情况:
  正常的分布式事务流程就走完了,大家有什么改进或疑问的点可以提出来,一起进步,共同学习!!!
  原文链接:https:mp。weixin。qq。comsPPHbOtfFX5naG9cvxSk7Zg

汪涵代言酸菜面翻车,网友翻出早年花絮视频,吃到嘴边就吐了315晚会上,曝光了不少企业内幕,尤其是康师傅和统一酸菜牛肉面,一夜之间成为了众矢之的。虽然企业在第一时间发文道歉,并下架了所有商品,但失去的口碑已经无法挽回。当大家看到……每一个优秀孩子的背后,都有一个会陪聊的妈妈很多父母希望能和孩子成为朋友亲密无间无话不谈可有时忙了一天回到家发现自己还不如一台平板电脑更受孩子青睐如何与孩子亲近起来?我们为您推荐一位孩子妈妈……冷冻的馒头不能吃?保存超过3天会产生黄曲霉素,真相是如此吗?导语:随着人们生活水平的不断提高,各种家用电器已经成为了每家每户的必备物品,家用电器的普及给人们提供了许多的方便与快捷,能够把保质期期限比较短的食物适当的延长一些,这样也能够极……超好玩的植物大战僵尸中文版forMac(兼容m1)植物大战僵尸中文版forMac推荐给大家!此版本完美兼容macosbigsur11系统!植物大战僵尸mac版将经典策略塔防玩法发挥到了极致,无论是防御方的植物种类,还是进攻的僵……西班牙人高层请大家保持耐心,并且对转会报以期望UnavezfinalizadoelactodepresentacindeBrianOlivn(28aos),laprimeracaranuevadelRCDEspanyol2……李霄鹏,已回山东!何去何从,面临3个选择日前,根据国内媒体报道称,中国男足主教练李霄鹏近期已经回到山东,对于他的去留问题,足协至今没有给出确切的说法。李霄鹏在这种情况下,只能重返山东足坛。至于李霄鹏未来该何去何从,应……后妈林青霞陪5岁孙女打麻将,28年过去将她衣服扔掉的继女怎样林青霞是这个世界上最美、最善良、最好的后妈。林青霞继女邢嘉倩近日,林青霞分享一组照片说疫情期间不能出去旅行,但一家人在一起很幸福。林青霞画了一张与孙女打麻将的……56岁拳王泰森近况胡须花白显老态,拄拐行走,坦言离死亡不远了每一项体育运动中都有一位王者,比如足球世界里的贝利、篮球世界里的乔丹以及F1赛车里的舒马赫,而拳王泰森就是拳击这项运动中的王者。泰森的巅峰时期,可以说是打遍天下无敌手,创造了许……这几种水果生吃有健康隐患不知道大家对寄生虫的印象是怎样的?但在很多人眼里,它确实是一种可怕的生物。那为什么可怕?其实是因为寄生虫寄到体内后能很快适应人体内的环境,潜伏期长。你根本不知道自己是不是感染了……今年一季度外贸开门稳海关多举措助力企业出口提速来源:人民网原创稿人民网北京4月15日电(记者栗翘楚)随着春暖花开,气温回升,云南高原时令蔬菜开始陆续上市。近日,一批产自云南省玉溪市重24吨的鲜胡萝卜经昆明海关所属玉溪……重庆查矮小症挂什么科,需要做哪些检查?大家好,很多家长不知道孩子长得矮需要怎么看啊,因为个子矮啊,多数情况并不是疾病引起啊,主要受遗传、营养、环境、内分泌等因素的影响,所以今天就和大家聊聊个子矮需要做哪些检查。一般……24连胜,272,数据昭示湖人要黑七!此前仅5次黑七奇迹北京时间4月17日,在今天凌晨进行的一场NBA季后赛中,湖人可以说有些爆冷的以128112击败了西部第二的灰熊队,取得季后赛开门红。而两个数据显示,这场开门红之后,湖人要上演黑……
无知,是对AI恐惧的病因所在近期举行的一场艺术大赛上,一幅由AI绘制而成的画作,从一众人类艺术家的大作中脱颖而出,成功斩获一等奖。这不是人类第一次在创意层面被人工智能所折服。今年高考期间,某人工智能……大家喝过的最好喝的纯牛奶是什么?为了写好这篇回答我提早两个月开始准备,把一些空盒子收集下来做了合辑。自己都吃了一大惊鬼知道我到底买了多少不同的牛奶。。。喝了多少吨的纯牛奶。。。。。我们家一直……满30岁后,不管有钱没钱多吃10样菜,人称清道夫,营养还便宜导语:满30岁后,别忘多吃10样清道夫菜,营养还便宜,记得转告家人。满30岁后,不管有钱没钱多吃10样菜,人称清道夫,营养还便宜俗话说三十而立,30岁的年纪也可以说得上是……辽宁15天萌娃和护士耍心眼,吃药藏嘴里假装下咽,没人时机智吐近日,在辽宁长春,一名仅仅出生15天的萌娃,和护士耍心眼,把药藏在嘴里假装咽下去了,等到没人的时候再吐出来的视频在网络上快速传播,引发了众多网友的热议。事情是这样的:一名出生1……心有繁花,满目星辰前几天,在浏览朋友圈时,读到一篇文章,对于作者的观点,觉得十分认同,所以顺其意写下几笔,以记之。杨绛先生说:我们曾如此渴望命运的波澜,到最后才发现,人生最曼妙的风景,竟是……她曾和导演相恋12年,却遭张雨绮插足,如今资产40亿不想再爱在中国的娱乐圈中,谁能在国际舞台上充当评委,无疑是对身份的一种肯定,要知道当评委这一殊荣,早年只有国际范儿的巩俐才得到过,然而幸运的是,如果中国的娱乐圈中又出现了一位优秀的女明……鹈鹕大胜!莺歌305,瓦兰2615,赛后莺歌接受采访,CJ过北京时间4月25日,NBA季后赛太阳客场挑战鹈鹕,太阳今天的首发是保罗、C约翰逊、布里奇斯、克劳德、艾顿,鹈鹕的首发是麦科勒姆、琼斯、英格拉姆、海耶斯、瓦兰。太阳此前总比分21……预言贴一次说清,iPhone14系列售价到底涨没涨价?距离iPhone14系列发布,还有一个月左右的时间。关于iPhone14系列的相关消息,这几个月已经被炒得沸沸扬扬。比如最近有消息显示,Apple紧急叫停了iPho……烂大街的碎花裙过时了,今年流行这3种裙子,时髦又有气质自从,见到下面这些博主的碎花裙造型后,喜欢穿碎花裙的人越来越多。现如今,碎花裙已经到了人手一件的程度。而当人人都穿上碎花裙后,碎花裙的时尚感会被减退,变得越来越烂大街。所……大反转!广东3位冠军功臣离队出变数,杜锋还留下抗击辽宁的主角北京时间8月9日,广东男篮传来重磅喜讯,现在看起来周鹏和赵睿的离队,再次出现了一些插曲和大反转,这两大王朝功臣是否要离开球队,可能还是个未知数。广东队的跟队记者晒出了一张……正式离队!CBA神射手或投奔马布里,加盟北控让人期待根据青岛媒体透露,青岛队在4月22日已经开始了集训,而两位老国手赵大鹏和赵泰隆的合同都到期了,没有参加青岛队的集训,确定离队。青岛队这个赛季球员流失太严重了,刘传兴、林韦翰、赵……为什么劝5060岁再懒也别穿打底裤?看完日本主妇的穿搭,就懂今日好啊有人说:美人在骨不在皮。所谓的骨在我看来是一种风韵与气度。从她们优雅的谈吐中,我们能领略到这种美;从她们曼妙得体的肢体语言中,我们更能见识到那种深厚的修养与底蕴。……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网