支付宝一面多线程事务怎么回滚?说用Transactional
背景介绍
1,最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。
2,在spring中可以使用Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
3,下面用一个简单示例演示多线程事务。公用的类和方法平均拆分list方法。paramsourceparamnparamTreturnpublicstaticTListListTaverageAssign(ListTsource,intn){ListListTresultnewArrayListListT();intremaidersource。size()n;intnumbersource。size()n;intoffset0;偏移量for(inti0;in;i){ListTvaluenull;if(remaider0){valuesource。subList(inumberoffset,(i1)numberoffset1);remaider;offset;}else{valuesource。subList(inumberoffset,(i1)numberoffset);}result。add(value);}returnresult;}线程池配置versionV1。0publicclassExecutorConfig{privatestaticintmaxPoolSizeRuntime。getRuntime()。availableProcessors();privatevolatilestaticExecutorServiceexecutorService;publicstaticExecutorServicegetThreadPool(){if(executorServicenull){synchronized(ExecutorConfig。class){if(executorServicenull){executorServicenewThreadPool();}}}returnexecutorService;}privatestaticExecutorServicenewThreadPool(){intqueueSize500;intcorePoolMath。min(5,maxPoolSize);returnnewThreadPoolExecutor(corePool,maxPoolSize,10000L,TimeUnit。MILLISECONDS,newLinkedBlockingQueue(queueSize),newThreadPoolExecutor。AbortPolicy());}privateExecutorConfig(){}}获取sqlSessionauthor86182versionV1。0ComponentpublicclassSqlContext{ResourceprivateSqlSessionTemplatesqlSessionTemplate;publicSqlSessiongetSqlSession(){SqlSessionFactorysqlSessionFactorysqlSessionTemplate。getSqlSessionFactory();returnsqlSessionFactory。openSession();}}
另外,如果你近期准备面试跳槽,建议在Java面试库小程序在线刷题,涵盖2000道Java面试题,几乎覆盖了所有主流技术面试题。示例事务不成功操作测试多线程事务。paramemployeeDOListOverrideTransactionalpublicvoidsaveThread(ListEmployeeDOemployeeDOList){try{先做删除操作,如果子线程出现异常,此操作不会回滚this。getBaseMapper()。delete(null);获取线程池ExecutorServiceserviceExecutorConfig。getThreadPool();拆分数据,拆分5份ListListEmployeeDOlistsaverageAssign(employeeDOList,5);执行的线程Thread〔〕threadArraynewThread〔lists。size()〕;监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatchcountDownLatchnewCountDownLatch(lists。size());AtomicBooleanatomicBooleannewAtomicBoolean(true);for(inti0;ilists。size();i){if(ilists。size()1){atomicBoolean。set(false);}ListEmployeeDOlistlists。get(i);threadArray〔i〕newThread((){try{最后一个线程抛出异常if(!atomicBoolean。get()){thrownewServiceException(001,出现异常);}批量添加,mybatisPlus中自带的batch方法this。saveBatch(list);}finally{countDownLatch。countDown();}});}for(inti0;ilists。size();i){service。execute(threadArray〔i〕);}当子线程执行完毕时,主线程再往下执行countDownLatch。await();System。out。println(添加完毕);}catch(Exceptione){log。info(error,e);thrownewServiceException(002,出现异常);}finally{connection。close();}}
数据库中存在一条数据:
SpringBoot基础就不介绍了,推荐下这个实战教程:https:github。comjavastacksspringbootbestpractice测试用例RunWith(SpringRunner。class)SpringBootTest(classes{ThreadTest01。class,MainApplication。class})publicclassThreadTest01{ResourceprivateEmployeeBOemployeeBO;测试多线程事务。throwsInterruptedExceptionTestpublicvoidMoreThreadTest2()throwsInterruptedException{intsize10;ListEmployeeDOemployeeDOListnewArrayList(size);for(inti0;isize;i){EmployeeDOemployeeDOnewEmployeeDO();employeeDO。setEmployeeName(loli);employeeDO。setAge(18);employeeDO。setGender(1);employeeDO。setIdNumber(iXX);employeeDO。setCreatTime(Calendar。getInstance()。getTime());employeeDOList。add(employeeDO);}try{employeeBO。saveThread(employeeDOList);System。out。println(添加成功);}catch(Exceptione){e。printStackTrace();}}}
测试结果:
可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,Transactional注解没有生效。
使用sqlSession控制手动提交事务ResourceSqlContextsqlContext;测试多线程事务。paramemployeeDOListOverridepublicvoidsaveThread(ListEmployeeDOemployeeDOList)throwsSQLException{获取数据库连接,获取会话(内部自有事务)SqlSessionsqlSessionsqlContext。getSqlSession();ConnectionconnectionsqlSession。getConnection();try{设置手动提交connection。setAutoCommit(false);获取mapperEmployeeMapperemployeeMappersqlSession。getMapper(EmployeeMapper。class);先做删除操作employeeMapper。delete(null);获取执行器ExecutorServiceserviceExecutorConfig。getThreadPool();ListCallableIntegercallableListnewArrayList();拆分listListListEmployeeDOlistsaverageAssign(employeeDOList,5);AtomicBooleanatomicBooleannewAtomicBoolean(true);for(inti0;ilists。size();i){if(ilists。size()1){atomicBoolean。set(false);}ListEmployeeDOlistlists。get(i);使用返回结果的callable去执行,CallableIntegercallable(){让最后一个线程抛出异常if(!atomicBoolean。get()){thrownewServiceException(001,出现异常);}returnemployeeMapper。saveBatch(list);};callableList。add(callable);}执行子线程ListFutureIntegerfuturesservice。invokeAll(callableList);for(FutureIntegerfuture:futures){如果有一个执行不成功,则全部回滚if(future。get()0){connection。rollback();return;}}connection。commit();System。out。println(添加完毕);}catch(Exceptione){connection。rollback();log。info(error,e);thrownewServiceException(002,出现异常);}finally{connection。close();}}sqlinsertidsaveBatchparameterTypeListINSERTINTOemployee(employeeid,age,employeename,birthdate,gender,idnumber,creattime,updatetime,status)valuesforeachcollectionlistitemitemindexindexseparator,({item。employeeId},{item。age},{item。employeeName},{item。birthDate},{item。gender},{item。idNumber},{item。creatTime},{item。updateTime},{item。status})foreachinsert
数据库中一条数据:
测试结果:抛出异常,
删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。
另外,如果你近期准备面试跳槽,建议在Java面试库小程序在线刷题,涵盖2000道Java面试题,几乎覆盖了所有主流技术面试题。
成功操作示例:ResourceSqlContextsqlContext;测试多线程事务。paramemployeeDOListOverridepublicvoidsaveThread(ListEmployeeDOemployeeDOList)throwsSQLException{获取数据库连接,获取会话(内部自有事务)SqlSessionsqlSessionsqlContext。getSqlSession();ConnectionconnectionsqlSession。getConnection();try{设置手动提交connection。setAutoCommit(false);EmployeeMapperemployeeMappersqlSession。getMapper(EmployeeMapper。class);先做删除操作employeeMapper。delete(null);ExecutorServiceserviceExecutorConfig。getThreadPool();ListCallableIntegercallableListnewArrayList();ListListEmployeeDOlistsaverageAssign(employeeDOList,5);for(inti0;ilists。size();i){ListEmployeeDOlistlists。get(i);CallableIntegercallable()employeeMapper。saveBatch(list);callableList。add(callable);}执行子线程ListFutureIntegerfuturesservice。invokeAll(callableList);for(FutureIntegerfuture:futures){if(future。get()0){connection。rollback();return;}}connection。commit();System。out。println(添加完毕);}catch(Exceptione){connection。rollback();log。info(error,e);thrownewServiceException(002,出现异常);thrownewServiceException(ExceptionCodeEnum。EMPLOYEESAVEORUPDATEERROR);}}
测试结果:
数据库中数据:
删除的删除了,添加的添加成功了,测试成功。
版权声明:本文为CSDN博主weixin43225491的原创文章,遵循CC4。0BYSA版权协议,转载请附上原文出处链接及本声明。原文链接:https:blog。csdn。netweixin43225491articledetails117705686