前言什么分布式事务? 随着互联网的快速发展,软件系统由原来的单体应用转变为分布式应用,分布式系统会把一个应用系统拆分为可独立部署的多个服务,因此需要服务与服务之间远程协作才能完成事务操作,这种分布式系统环境下由不同的服务之间通过网络远程协作完成事务称之为分布式事务,例如用户注册送积分事务、创建订单减库存事务,银行转账事务等都是分布式事务。分布式事务产生的场景 例1:典型的场景就是微服务架构微服务之间通过远程调用完成事务操作。 比如:订单微服务和库存微服务,下单的同时订单微服务请求库存微服务减库存。简言之:跨JVM进程产生分布式事务。 例2:单体系统访问多个数据库实例当单体系统需要访问多个数据库(实例)时就会产生分布式事务。 比如:用户信息和订单信息分别在两个MySQL实例存储,用户管理系统删除用户信息,需要分别删除用户信息及用户的订单信息,由于数据分布在不同的数据实例,需要通过不同的数据库链接去操作数据,此时产生分布式事务。简言之:跨数据库实例产生分布式事务。如何解决分布式事务 根据CAP和BASE理论,分布式事务解决的核心思想主要是:无法做到强一致性,但每个应用都可以根据自身的业务特点,采用适当的方式来使系统达到最终一致性,而最终一致性涉及到方式主要是二阶段提交和三阶段提交。 目前主要解决分布式事务的方式是通过RocketMQ或者阿里推出的SEATA框架解决,本文主要是通过RocketMQ实操来处理分布式事务的场景。RocketMQ基本使用 关于RocketMQ的基本的消息发送方式和消息类型,大家可以到官网自行学习:RocketMQ官网文档,MQ解决分布式事务主要是通过事务消息的方式来解决。实操 因为这篇文章主要是针对分布式事务,所以建表和业务逻辑不是十分严谨,不过作者会尽可能的保证分布式事务、MQ使用的严谨性。场景 场景比较简单,主要是一个跨行转账的操作,例:手机号为XXX的用户将钱从农行转到华夏银行。建表 数据库test:存在表abcperson、transferdetail 建表sql:CREATETABLEabcperson(useridint(11)NOTNULLCOMMENT用户编号,namevarchar(20)DEFAULTCOMMENT用户名称,idcardvarchar(20)DEFAULTNULLCOMMENT身份证号,banlancedecimal(10,2)DEFAULTNULLCOMMENT余额,mobilevarchar(12)DEFAULTCOMMENT手机号,createtimedatetimeDEFAULTNULLCOMMENT创建时间,updatetimedatetimeDEFAULTNULLCOMMENT更新时间,deleteflgchar(1)DEFAULT0COMMENT删除状态,PRIMARYKEY(userid))ENGINEInnoDBDEFAULTCHARSETutf8COMMENT用户表;CREATETABLEtransferdetail(idint(11)NOTNULLAUTOINCREMENTCOMMENT明细ID,useridint(11)NOTNULLDEFAULT0COMMENT用户ID,moneydecimal(10,2)DEFAULT0。00COMMENT转账金额,msgidvarchar(50)DEFAULTCOMMENT消息ID,deleteflgchar(1)DEFAULT0COMMENT是否删除状态,createtimetimestampNULLDEFAULTCURRENTTIMESTAMP,PRIMARYKEY(id))ENGINEInnoDBAUTOINCREMENT7DEFAULTCHARSETutf8COMMENT消息发送表; 数据库test1:存在表hxperson 建表sql:CREATETABLEhxperson(useridint(11)NOTNULLCOMMENT用户编号,namevarchar(20)DEFAULTCOMMENT用户名称,idcardvarchar(20)DEFAULTNULLCOMMENT身份证号,banlancedecimal(10,0)DEFAULTNULLCOMMENT余额,mobilevarchar(12)DEFAULTCOMMENT手机号,createtimedatetimeDEFAULTNULLCOMMENT创建时间,updatetimedatetimeDEFAULTNULLCOMMENT更新时间,deleteflgchar(1)DEFAULTNULLCOMMENT0,PRIMARYKEY(userid))ENGINEInnoDBDEFAULTCHARSETutf8;生产者PostMapping(abcToHx)publicStringabcToHx(Stringmobile,BigDecimaltransferMoney){AbcPersonabcPersonuserService。getByMobile(mobile);if(ObjectUtil。isNotEmpty(abcPerson)ObjectUtil。isNotEmpty(transferMoney)abcPerson。getBanlance()。doubleValue()transferMoney。doubleValue()){TransferDtotransferDtonewTransferDto();transferDto。setMobile(mobile);transferDto。setMoney(transferMoney);transferDto。setUserId(abcPerson。getUserId());transferDto。setDistributedId(snowFlakeUtil。snowflakeId());1发送半消息Stringdestinationtransfertopic:toHx;MessagemessageMessageBuilder。withPayload(JSON。toJSONString(transferDto))。build();TransactionSendResultresultrocketMQTemplate。sendMessageInTransaction(destination,message,null);log。warn(发送半消息:message,响应内容:result);returnSUCCESS;}returnFAIL;}解释: 标注1:之所以使用雪花算法生成唯一ID,是为了消费者消费时,确保消息不会重复消费,所以通过唯一ID确定(虽然消息ID通常是唯一的,不过在特定情况下可能会出现消息ID不同,但实际消息内容一样的情况(消费者主动重发、因客户端重投机制导致的重复等),这样会出现重复消费的问题,所以需要其他唯一标识保证消息消费的幂等性问题。)详情可看RocketMQ官网的最佳实践的解释,解释如下:牛逼啊!接私活必备的N个开源项目!赶快收藏2。1消费过程幂等 RocketMQ无法避免消息重复(ExactlyOnce),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。 首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。 实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过 msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。 RocektMQ在发送半消息时,会调用我们重写的监听器的executeLocalTransaction(Messagemsg,Objectarg)方法来执行本地事务OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,Objectarg){StringjsonStrnewString((byte〔〕)msg。getPayload());log。error(arg:arg执行本地事务:JSON。toJSONString(msg),传输字段:jsonStr);TransferDtotransferDtoJSON。parseObject(jsonStr,TransferDto。class);booleanflaguserService。transferMoney(transferDto。getUserId(),transferDto。getMoney(),transferDto。getDistributedId()。toString());在提交本地事务到return期间,可能因为生产者异常或网络等问题,导致MQ未接收到半消息的状态,RocketMQ机制是:后续会调用checkLocalTransaction检查本地事务的执行情况if(flag){log。warn(executeLocalTransaction本地事务执行完成,提交:JSON。toJSONString(msg));说明本地事务执行成功,事务消息提交returnRocketMQLocalTransactionState。COMMIT;}else{log。warn(executeLocalTransaction本地事务执行失败,ROLLBACK);本地事务执行失败,事务消息回滚returnRocketMQLocalTransactionState。ROLLBACK;}}OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg){StringjsonStrnewString((byte〔〕)msg。getPayload());TransferDtotransferDtoJSON。parseObject(jsonStr,TransferDto。class);TransferDetailtransferDetailtransferDetailService。getByMsgId(transferDto。getDistributedId()。toString());if(ObjectUtil。isNotEmpty(transferDetail)){log。warn(本地事务执行完成,提交:JSON。toJSONString(msg));说明本地事务执行成功,事务消息提交returnRocketMQLocalTransactionState。COMMIT;}else{本地事务执行失败,事务消息回滚returnRocketMQLocalTransactionState。ROLLBACK;}} UserService的transferMoney方法:TransactionalOverridepublicbooleantransferMoney(IntegeruserId,BigDecimalmoney,StringmsgId){booleanflagfalse;TransferDetailtransferDetailnewTransferDetail();transferDetail。setMoney(money);transferDetail。setUserId(userId);transferDetail。setMsgId(msgId);flagtransferDetailDao。insert(transferDetail)0;throwRuntimeException(flag,消息存储失败,异常回滚。。。);加悲观锁AbcPersonlockAbcPersongetByIdForUpdate(userId);if(ObjectUtil。isNotEmpty(lockAbcPerson)lockAbcPerson。getBanlance()。doubleValue()money。doubleValue()){修改用户金额BigDecimalcurrentBanlancenewBigDecimal(lockAbcPerson。getBanlance()。doubleValue()money。doubleValue());lockAbcPerson。setBanlance(currentBanlance);flagupdate(lockAbcPerson)0?true:false;throwRuntimeException(flag,修改用户金额失败,异常回滚。。。);}returnflag;}publicvoidthrowRuntimeException(booleanflag,Stringmsg){if(!flag){thrownewRuntimeException(msg);}} 解释:在执行完transferMoney方法到returnRocketMQLocalTransactionState期间,可能因为生产者异常或网络等问题,导致MQ未接收到半消息的状态,RocketMQ机制是:后续会调用checkLocalTransaction检查本地事务的执行情况TransferDetail主要是将本地事务执行情况落磁盘,保证后续的checkLocalTransaction()可以通过回查数据,来确定消息是提交还是回滚。transferMoney方法之所以先加明细再加悲观锁,是为了降低不必要的加锁时间,提升性能。另外,搜索公众号顶级科技后台回复API接口,获取一份惊喜礼包。消费者 主要是通过实现RocketMQListener接口,监听响应的消息,来给出响应。OverridepublicvoidonMessage(MessageExtmessage){Stringkeynull;Stringvaluenull;try{构建存储redis的key、value,目的是为了保证消息不会被重复消费StringmsgIdmessage。getMsgId();TransferDtotransferDtoJSON。parseObject(newString(message。getBody()),TransferDto。class);keymq:transferDto。getDistributedId();log。warn(获取到当前消息的msgId:msgId);valueThread。currentThread()。getId():System。currentTimeMillis();加分布式锁booleanflagredisTemplate。opsForValue()。setIfAbsent(key,value,1,TimeUnit。HOURS);if(flag){try{正常业务流程执行,完成后该消息会自动完成,期间有其他消费者执行该消息,也无法拿到锁flaguserService。transferMoney(transferDto。getMobile(),transferDto。getMoney());if(!flag){thrownewRuntimeException(没有添加金额成功,抛出异常);}log。warn(成功消费);}catch(Exceptione){执行业务出现异常,释放分布式锁,通过value验证,保证不会错误的释放锁通过watch机制保证原子性操作,若watch被打断,则说明该key已经被修改,当然也就无需当前线程释放锁redisTemplate。watch(key);redisTemplate。multi();StringlockValue(String)redisTemplate。opsForValue()。get(key);if(StrUtil。isNotBlank(lockValue)lockValuevalue){redisTemplate。delete(key);}redisTemplate。exec();thrownewRuntimeException(释放分布式锁,因消费失败,故抛出异常);}}else{未拿到锁,抛出异常,则该消息便不会被成功消费thrownewRuntimeException(未拿到锁,不进行消费);}}catch(Exceptione){log。warn(消费异常);thrownewRuntimeException(消费异常);}} 消费者的UserService,转账操作TransactionalOverridepublicbooleantransferMoney(Stringmobile,BigDecimalmoney){booleanflagfalse;加悲观锁HxPersonhxPersonhxPersonDao。getByIdOrMobileForUpdate(null,mobile);if(ObjectUtil。isNotEmpty(hxPerson)){hxPerson。setBanlance(newBigDecimal(hxPerson。getBanlance()。doubleValue()money。doubleValue()));flaghxPersonDao。update(hxPerson)0?true:false;}returnflag;}验证 操作前test数据库的表数据 test1数据库的表数据 通过调用生产者的转账接口: 生产消费完成、再看下两个库的数据情况: 正常的分布式事务流程就走完了,大家有什么改进或疑问的点可以提出来,一起进步,共同学习!!! 原文链接:https:mp。weixin。qq。comsPPHbOtfFX5naG9cvxSk7Zg