源码下载 github仓库:https://github.com/seata/seata
源码下载后使用idea打开,打开后可以看到有很多目录,官方提供的Spring boot start的方式,那说明只要找到对应的spring.factories
就能找到源码入口。
源码入口 打开项目,可以看见seata-spring-boot-starter
模块,在该模块下的resources/META-INF可以看见一个spring.factories
文件。
1 2 3 4 5 6 org.springframework.boot.autoconfigure.EnableAutoConfiguration =\ io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\ io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\ io.seata.spring.boot.autoconfigure.HttpAutoConfiguration,\ io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration
SeataAutoConfiguration 其中关键的是io.seata.spring.boot.autoconfigure.SeataAutoConfiguration
这一个类,在源码中找到它。
从这里可以看出主要的作用就是向ioc容器中注入了一个GlobalTransactionScanner
类型的bean对象,再找到GlobalTransactionScanner
类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) @AutoConfigureAfter({SeataCoreAutoConfiguration.class}) public class SeataAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class); @Bean(BEAN_NAME_FAILURE_HANDLER) @ConditionalOnMissingBean(FailureHandler.class) public FailureHandler failureHandler () { return new DefaultFailureHandlerImpl (); } @Bean @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER}) @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner (SeataProperties seataProperties, FailureHandler failureHandler, ConfigurableListableBeanFactory beanFactory, @Autowired(required = false) List<ScannerChecker> scannerCheckers) { return new GlobalTransactionScanner (seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); } }
GlobalTransactionScanner 1 2 public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener , InitializingBean, ApplicationContextAware, DisposableBean
InitializingBean
, ApplicationContextAware
, DisposableBean
是spring bean生命周期的内容,InitializingBean
主要初始化TM和RM,DisposableBean
则做一些资源销毁的任务。
GlobalTransactionScanner
继承AbstractAutoProxyCreator
,这是aop的一个重要组件,这里重写了它的wrapIfNecessary
方法,spring中每个bean在实例化的时候都会执行这个方法,可以实现对目标对象的动态代理。
从代码中可以看出,这里会给被代理方法添加一个叫做GlobalTransactionalInterceptor
(全局事务拦截器)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override protected Object wrapIfNecessary (Object bean, String beanName, Object cacheKey) { if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); interceptor = new TccActionInterceptor (TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { if (globalTransactionalInterceptor == null ) { globalTransactionalInterceptor = new GlobalTransactionalInterceptor (failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } }
GlobalTransactionalInterceptor 进入GlobalTransactionalInterceptor
,既然是拦截器,那么就看它的invoke
方法,这里会判断是否存在@GlobalTransactional
注解或存在全局事务,有的话就会调用handleGlobalTransaction
进入全局事务执行环节。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public Object invoke (final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null ; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { if (globalTransactionalAnnotation != null || this .aspectTransactional != null ) { AspectTransactional transactional; if (globalTransactionalAnnotation != null ) { transactional = new AspectTransactional (globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes()); } else { transactional = this .aspectTransactional; } return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null ) { return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } return methodInvocation.proceed(); }
构建事务信息对象 主要关注的是transactionalTemplate.execute()
这个方法,这里传入的TransactionalExecutor
主要目的是对事务信息对象的构建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 Object handleGlobalTransaction (final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true ; try { return transactionalTemplate.execute(new TransactionalExecutor () { @Override public Object execute () throws Throwable { return methodInvocation.proceed(); } public String name () { String name = aspectTransactional.getName(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); } @Override public TransactionInfo getTransactionInfo () { int timeout = aspectTransactional.getTimeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } TransactionInfo transactionInfo = new TransactionInfo (); transactionInfo.setTimeOut(timeout); transactionInfo.setName(name()); transactionInfo.setPropagation(aspectTransactional.getPropagation()); transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval()); transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes()); Set<RollbackRule> rollbackRules = new LinkedHashSet <>(); for (Class<?> rbRule : aspectTransactional.getRollbackFor()) { rollbackRules.add(new RollbackRule (rbRule)); } for (String rbRule : aspectTransactional.getRollbackForClassName()) { rollbackRules.add(new RollbackRule (rbRule)); } for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) { rollbackRules.add(new NoRollbackRule (rbRule)); } for (String rbRule : aspectTransactional.getNoRollbackForClassName()) { rollbackRules.add(new NoRollbackRule (rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { } }
核心源码 execute
就是主要源码部分了,会先获取当前的一个全局事务id(xid),然后根据不同的事务传播行为做不同的处理,最后就是开启全局事务和提交或回滚全局事务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public Object execute (TransactionalExecutor business) throws Throwable { TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null ) { throw new ShouldNeverHappenException ("transactionInfo does not exist" ); } GlobalTransaction tx = GlobalTransactionContext.getCurrent(); Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null ; try { switch (propagation) { case NOT_SUPPORTED: if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } return business.execute(); case REQUIRES_NEW: if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } break ; case SUPPORTS: if (notExistingTransaction(tx)) { return business.execute(); } break ; case REQUIRED: break ; case NEVER: if (existingTransaction(tx)) { throw new TransactionException ( String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s" , tx.getXid())); } else { return business.execute(); } case MANDATORY: if (notExistingTransaction(tx)) { throw new TransactionException ("No existing transaction found for transaction marked with propagation 'mandatory'" ); } break ; default : throw new TransactionException ("Not Supported Propagation:" + propagation); } if (tx == null ) { tx = GlobalTransactionContext.createNew(); } GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { beginTransaction(txInfo, tx); Object rs; try { rs = business.execute(); } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } commitTransaction(tx); return rs; } finally { resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { if (suspendedResourcesHolder != null ) { tx.resume(suspendedResourcesHolder); } } }
这里的开启、提交、回滚等操作其最后都是调用DefaultTransactionManager
类下的syncCall
方法,且都是采用netty与TCC进行通讯,开启事务时会返回一个全局事务id(xid)并绑定到本地当前事务。
数据源代理 state对DataSource
、Statement
、Connection
等都进行了代理,所以在执行语句前可以进行一些特殊的操作,比如生成前镜像和后镜像。
从SeataDataSourceAutoConfiguration
类中可知数据源代理是通过SeataAutoDataSourceProxyCreator
类中的操作来完成的。
(这里有点小区别,1.4.2之前的版本SeataDataSourceConfiguration是SeataAutoConfiguration类中的一个静态内部类,这个类往容器中注入了一个SeataDataSourceBeanPostProcessor类型的bean,这个bean通过在数据源bean的生命周期中去直接拦截返回数据的代理类实现代理的,而1.4.2版本则移除了这个类,目的都是完成对数据源的代理,这里展示的是1.4.2之后的版本代码 )
找到SeataAutoDataSourceProxyCreator
类发现它也是继承了AbstractAutoProxyCreator
类来实现动态代理,那么找到重写的wrapIfNecessary
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected Object wrapIfNecessary (Object bean, String beanName, Object cacheKey) { if (!(bean instanceof DataSource)) { return bean; } if (!(bean instanceof SeataDataSourceProxy)) { Object enhancer = super .wrapIfNecessary(bean, beanName, cacheKey); if (bean == enhancer) { return bean; } DataSource origin = (DataSource) bean; SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode); DataSourceProxyHolder.put(origin, proxy); return enhancer; } }
从代码中可以看出,这里构建了一个SeataDataSourceProxy
类型的数据源代理,DataSourceProxyHolder.put(origin, proxy)
内部其实是一个Map<DataSource, SeataDataSourceProxy>
类型的Map对象,用来存储原数据源对象和代理对象的关系。
另外,这里的buildProxy
方法,也就是构建数据源代理对象的方法,里面根据不同的模式 构建的对象有区别,这里只说官方默认的AT模式。
1 2 3 4 5 6 7 8 9 10 11 SeataDataSourceProxy buildProxy (DataSource origin, String proxyMode) { if (BranchType.AT.name().equalsIgnoreCase(proxyMode)) { return new DataSourceProxy (origin); } if (BranchType.XA.name().equalsIgnoreCase(proxyMode)) { return new DataSourceProxyXA (origin); } throw new IllegalArgumentException ("Unknown dataSourceProxyMode: " + proxyMode); }
DataSourceProxy
中通过重写getConnection方法的重新完成对Connection的代理
1 2 3 4 5 @Override public ConnectionProxy getConnection () throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy (this , targetConnection); }
ConnectionProxy
类继承AbstractConnectionProxy
,AbstractConnectionProxy
类实现Connection接口,主要实现prepareStatement方法来实现代理PreparedStatement
对象
1 2 3 4 5 @Override public PreparedStatement prepareStatement (String sql) throws SQLException { return new PreparedStatementProxy (this , targetPreparedStatement, sql); }
PreparedStatementProxy
代理对象对execute
、executeQuery
、executeUpdate
等方法重写,内部都是调用的ExecuteTemplate.execute
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public boolean execute () throws SQLException { return ExecuteTemplate.execute(this , (statement, args) -> statement.execute()); } @Override public ResultSet executeQuery () throws SQLException { return ExecuteTemplate.execute(this , (statement, args) -> statement.executeQuery()); } @Override public int executeUpdate () throws SQLException { return ExecuteTemplate.execute(this , (statement, args) -> statement.executeUpdate()); }
接着看ExecuteTemplate.execute
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public static <T, S extends Statement > T execute (List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (CollectionUtils.isEmpty(sqlRecognizers)) { executor = new PlainExecutor <>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1 ) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0 ); switch (sqlRecognizer.getSQLType()) { case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class []{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object []{statementProxy, statementCallback, sqlRecognizer}); break ; case UPDATE: executor = new UpdateExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case DELETE: executor = new DeleteExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case INSERT_ON_DUPLICATE_UPDATE: switch (dbType) { case JdbcConstants.MYSQL: case JdbcConstants.MARIADB: executor = new MySQLInsertOnDuplicateUpdateExecutor (statementProxy, statementCallback, sqlRecognizer); break ; default : throw new NotSupportYetException (dbType + " not support to INSERT_ON_DUPLICATE_UPDATE" ); } break ; default : executor = new PlainExecutor <>(statementProxy, statementCallback); break ; } } else { executor = new MultiExecutor <>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { ex = new SQLException (ex); } throw (SQLException) ex; } return rs; }
进入executor.execute(args)
找到实现类BaseTransactionalExecutor
再进入doExecute
方法,再找到doExecute
的在AbstractDMLBaseExecutor
中的实现方法
AbstractDMLBaseExecutor中有两个分支,自动提交事务和手动提交事务,其实自动提交事务后面也会改成手动,不过它内部有提交逻辑
1 2 3 4 5 6 7 8 9 10 11 @Override public T doExecute (Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } }
进入executeAutoCommitTrue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected T executeAutoCommitTrue (Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.changeAutoCommit(); return new LockRetryPolicy (connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; }); } catch (Exception e) { LOGGER.error("execute executeAutoCommitTrue error:{}" , e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true ); } }
产生UndoLog 进入executeAutoCommitFalse
方法,这里会在语句执行之前产生一个前镜像
,在执行之后产生一个后镜像
其实就是记录执行前后的区别,然后根据他们来形成undoLog用于后续可能的数据回滚,这里的undolog只是存在缓存中,暂时没有持久化到db
1 2 3 4 5 6 7 8 9 10 11 protected T executeAutoCommitFalse (Object[] args) throws Exception { TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; }
回到上一步,执行完executeAutoCommitFalse
之后,执行connectionProxy.commit()
进入事务提交环节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void commit () throws SQLException { try { lockRetryPolicy.execute(() -> { doCommit(); return null ; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); } throw e; } catch (Exception e) { throw new SQLException (e); } }
进入doCommit()
1 2 3 4 5 6 7 8 9 10 private void doCommit () throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } }
进入processGlobalTransactionCommit()
,这里将undoLog真正持久化到db保存起来,然后执行targetConnection.commit();
提交事务到数据库,提交成功与否都会通知服务端(TCC)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void processGlobalTransactionCommit () throws SQLException { try { register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { UndoLogManagerFactory.getUndoLogManager(this .getDbType()).flushUndoLogs(this ); targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}" , ex.getMessage(), ex); report(false ); throw new SQLException (ex); } if (IS_REPORT_SUCCESS_ENABLE) { report(true ); } context.reset(); }