源码下载 github仓库:https://github.com/seata/seata
源码下载后使用idea打开,打开后可以看到有很多目录,官方提供的Spring boot start的方式,那说明只要找到对应的spring.factories
源码入口 打开项目,可以看见seata-spring-boot-starter
1 2 3 4 5 6 org.springframework.boot.autoconfigure.EnableAutoConfiguration =\ io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\ io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\ io.seata.spring.boot.autoconfigure.HttpAutoConfiguration,\ io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration
SeataAutoConfiguration 其中关键的是io.seata.spring.boot.autoconfigure.SeataAutoConfiguration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) @AutoConfigureAfter({SeataCoreAutoConfiguration.class}) public class SeataAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class); @Bean(BEAN_NAME_FAILURE_HANDLER) @ConditionalOnMissingBean(FailureHandler.class) public FailureHandler failureHandler () { return new DefaultFailureHandlerImpl (); } @Bean @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER}) @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner (SeataProperties seataProperties, FailureHandler failureHandler, ConfigurableListableBeanFactory beanFactory, @Autowired(required = false) List<ScannerChecker> scannerCheckers) { return new GlobalTransactionScanner (seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); } }
GlobalTransactionScanner 1 2 public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener , InitializingBean, ApplicationContextAware, DisposableBean
, ApplicationContextAware
, DisposableBean
是spring bean生命周期的内容,InitializingBean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override protected Object wrapIfNecessary (Object bean, String beanName, Object cacheKey) { if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext); interceptor = new TccActionInterceptor (TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { if (globalTransactionalInterceptor == null ) { globalTransactionalInterceptor = new GlobalTransactionalInterceptor (failureHandlerHook); ConfigurationCache.addConfigListener( ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } }
GlobalTransactionalInterceptor 进入GlobalTransactionalInterceptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Override public Object invoke (final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null ; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes); if (!localDisable) { if (globalTransactionalAnnotation != null || this .aspectTransactional != null ) { AspectTransactional transactional; if (globalTransactionalAnnotation != null ) { transactional = new AspectTransactional (globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes()); } else { transactional = this .aspectTransactional; } return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null ) { return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } return methodInvocation.proceed(); }
构建事务信息对象 主要关注的是transactionalTemplate.execute()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 Object handleGlobalTransaction (final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true ; try { return transactionalTemplate.execute(new TransactionalExecutor () { @Override public Object execute () throws Throwable { return methodInvocation.proceed(); } public String name () { String name = aspectTransactional.getName(); if (!StringUtils.isNullOrEmpty(name)) { return name; } return formatMethod(methodInvocation.getMethod()); } @Override public TransactionInfo getTransactionInfo () { int timeout = aspectTransactional.getTimeoutMills(); if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) { timeout = defaultGlobalTransactionTimeout; } TransactionInfo transactionInfo = new TransactionInfo (); transactionInfo.setTimeOut(timeout); transactionInfo.setName(name()); transactionInfo.setPropagation(aspectTransactional.getPropagation()); transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval()); transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes()); Set<RollbackRule> rollbackRules = new LinkedHashSet <>(); for (Class<?> rbRule : aspectTransactional.getRollbackFor()) { rollbackRules.add(new RollbackRule (rbRule)); } for (String rbRule : aspectTransactional.getRollbackForClassName()) { rollbackRules.add(new RollbackRule (rbRule)); } for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) { rollbackRules.add(new NoRollbackRule (rbRule)); } for (String rbRule : aspectTransactional.getNoRollbackForClassName()) { rollbackRules.add(new NoRollbackRule (rbRule)); } transactionInfo.setRollbackRules(rollbackRules); return transactionInfo; } }); } catch (TransactionalExecutor.ExecutionException e) { } }
核心源码 execute
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public Object execute (TransactionalExecutor business) throws Throwable { TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null ) { throw new ShouldNeverHappenException ("transactionInfo does not exist" ); } GlobalTransaction tx = GlobalTransactionContext.getCurrent(); Propagation propagation = txInfo.getPropagation(); SuspendedResourcesHolder suspendedResourcesHolder = null ; try { switch (propagation) { case NOT_SUPPORTED: if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); } return business.execute(); case REQUIRES_NEW: if (existingTransaction(tx)) { suspendedResourcesHolder = tx.suspend(); tx = GlobalTransactionContext.createNew(); } break ; case SUPPORTS: if (notExistingTransaction(tx)) { return business.execute(); } break ; case REQUIRED: break ; case NEVER: if (existingTransaction(tx)) { throw new TransactionException ( String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s" , tx.getXid())); } else { return business.execute(); } case MANDATORY: if (notExistingTransaction(tx)) { throw new TransactionException ("No existing transaction found for transaction marked with propagation 'mandatory'" ); } break ; default : throw new TransactionException ("Not Supported Propagation:" + propagation); } if (tx == null ) { tx = GlobalTransactionContext.createNew(); } GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { beginTransaction(txInfo, tx); Object rs; try { rs = business.execute(); } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } commitTransaction(tx); return rs; } finally { resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { if (suspendedResourcesHolder != null ) { tx.resume(suspendedResourcesHolder); } } }
数据源代理 state对DataSource
(这里有点小区别,1.4.2之前的版本SeataDataSourceConfiguration是SeataAutoConfiguration类中的一个静态内部类,这个类往容器中注入了一个SeataDataSourceBeanPostProcessor类型的bean,这个bean通过在数据源bean的生命周期中去直接拦截返回数据的代理类实现代理的,而1.4.2版本则移除了这个类,目的都是完成对数据源的代理,这里展示的是1.4.2之后的版本代码 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override protected Object wrapIfNecessary (Object bean, String beanName, Object cacheKey) { if (!(bean instanceof DataSource)) { return bean; } if (!(bean instanceof SeataDataSourceProxy)) { Object enhancer = super .wrapIfNecessary(bean, beanName, cacheKey); if (bean == enhancer) { return bean; } DataSource origin = (DataSource) bean; SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode); DataSourceProxyHolder.put(origin, proxy); return enhancer; } }
类型的数据源代理,DataSourceProxyHolder.put(origin, proxy)
内部其实是一个Map<DataSource, SeataDataSourceProxy>
方法,也就是构建数据源代理对象的方法,里面根据不同的模式 构建的对象有区别,这里只说官方默认的AT模式。
1 2 3 4 5 6 7 8 9 10 11 SeataDataSourceProxy buildProxy (DataSource origin, String proxyMode) { if (BranchType.AT.name().equalsIgnoreCase(proxyMode)) { return new DataSourceProxy (origin); } if (BranchType.XA.name().equalsIgnoreCase(proxyMode)) { return new DataSourceProxyXA (origin); } throw new IllegalArgumentException ("Unknown dataSourceProxyMode: " + proxyMode); }
1 2 3 4 5 @Override public ConnectionProxy getConnection () throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy (this , targetConnection); }
1 2 3 4 5 @Override public PreparedStatement prepareStatement (String sql) throws SQLException { return new PreparedStatementProxy (this , targetPreparedStatement, sql); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public boolean execute () throws SQLException { return ExecuteTemplate.execute(this , (statement, args) -> statement.execute()); } @Override public ResultSet executeQuery () throws SQLException { return ExecuteTemplate.execute(this , (statement, args) -> statement.executeQuery()); } @Override public int executeUpdate () throws SQLException { return ExecuteTemplate.execute(this , (statement, args) -> statement.executeUpdate()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public static <T, S extends Statement > T execute (List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (CollectionUtils.isEmpty(sqlRecognizers)) { executor = new PlainExecutor <>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1 ) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0 ); switch (sqlRecognizer.getSQLType()) { case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class []{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object []{statementProxy, statementCallback, sqlRecognizer}); break ; case UPDATE: executor = new UpdateExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case DELETE: executor = new DeleteExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor <>(statementProxy, statementCallback, sqlRecognizer); break ; case INSERT_ON_DUPLICATE_UPDATE: switch (dbType) { case JdbcConstants.MYSQL: case JdbcConstants.MARIADB: executor = new MySQLInsertOnDuplicateUpdateExecutor (statementProxy, statementCallback, sqlRecognizer); break ; default : throw new NotSupportYetException (dbType + " not support to INSERT_ON_DUPLICATE_UPDATE" ); } break ; default : executor = new PlainExecutor <>(statementProxy, statementCallback); break ; } } else { executor = new MultiExecutor <>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { ex = new SQLException (ex); } throw (SQLException) ex; } return rs; }
1 2 3 4 5 6 7 8 9 10 11 @Override public T doExecute (Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected T executeAutoCommitTrue (Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.changeAutoCommit(); return new LockRetryPolicy (connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; }); } catch (Exception e) { LOGGER.error("execute executeAutoCommitTrue error:{}" , e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true ); } }
产生UndoLog 进入executeAutoCommitFalse
1 2 3 4 5 6 7 8 9 10 11 protected T executeAutoCommitFalse (Object[] args) throws Exception { TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public void commit () throws SQLException { try { lockRetryPolicy.execute(() -> { doCommit(); return null ; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); } throw e; } catch (Exception e) { throw new SQLException (e); } }
1 2 3 4 5 6 7 8 9 10 private void doCommit () throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void processGlobalTransactionCommit () throws SQLException { try { register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { UndoLogManagerFactory.getUndoLogManager(this .getDbType()).flushUndoLogs(this ); targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}" , ex.getMessage(), ex); report(false ); throw new SQLException (ex); } if (IS_REPORT_SUCCESS_ENABLE) { report(true ); } context.reset(); }