源码下载

github仓库:https://github.com/seata/seata

源码下载后使用idea打开,打开后可以看到有很多目录,官方提供的Spring boot start的方式,那说明只要找到对应的spring.factories就能找到源码入口。

源码入口

打开项目,可以看见seata-spring-boot-starter模块,在该模块下的resources/META-INF可以看见一个spring.factories文件。

1
2
3
4
5
6
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataSagaAutoConfiguration

SeataAutoConfiguration

其中关键的是io.seata.spring.boot.autoconfigure.SeataAutoConfiguration这一个类,在源码中找到它。

从这里可以看出主要的作用就是向ioc容器中注入了一个GlobalTransactionScanner类型的bean对象,再找到GlobalTransactionScanner类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter({SeataCoreAutoConfiguration.class})
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);

@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}

@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler,
ConfigurableListableBeanFactory beanFactory,
@Autowired(required = false) List<ScannerChecker> scannerCheckers) {
// ....
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
}

GlobalTransactionScanner

1
2
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean

InitializingBean, ApplicationContextAware, DisposableBean是spring bean生命周期的内容,InitializingBean主要初始化TM和RM,DisposableBean则做一些资源销毁的任务。

GlobalTransactionScanner继承AbstractAutoProxyCreator,这是aop的一个重要组件,这里重写了它的wrapIfNecessary方法,spring中每个bean在实例化的时候都会执行这个方法,可以实现对目标对象的动态代理。

从代码中可以看出,这里会给被代理方法添加一个叫做GlobalTransactionalInterceptor(全局事务拦截器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// ...
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
// init tcc fence clean task if enable useTccFence
TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
// ...
// 主要代码
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
// ....
}

GlobalTransactionalInterceptor

进入GlobalTransactionalInterceptor,既然是拦截器,那么就看它的invoke方法,这里会判断是否存在@GlobalTransactional注解或存在全局事务,有的话就会调用handleGlobalTransaction进入全局事务执行环节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
    @Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass =
methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes());
} else {
transactional = this.aspectTransactional;
}
return handleGlobalTransaction(methodInvocation, transactional);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation, globalLockAnnotation);
}
}
}
return methodInvocation.proceed();
}

构建事务信息对象

主要关注的是transactionalTemplate.execute()这个方法,这里传入的TransactionalExecutor主要目的是对事务信息对象的构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}

// 获取事务名称
public String name() {
String name = aspectTransactional.getName();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}

@Override
public TransactionInfo getTransactionInfo() {
// reset the value of timeout
int timeout = aspectTransactional.getTimeoutMills();
if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
timeout = defaultGlobalTransactionTimeout;
}

// 构建事务信息对象
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTimeOut(timeout);
transactionInfo.setName(name());
transactionInfo.setPropagation(aspectTransactional.getPropagation());
transactionInfo.setLockRetryInterval(aspectTransactional.getLockRetryInterval());
transactionInfo.setLockRetryTimes(aspectTransactional.getLockRetryTimes());
Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
for (Class<?> rbRule : aspectTransactional.getRollbackFor()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getRollbackForClassName()) {
rollbackRules.add(new RollbackRule(rbRule));
}
for (Class<?> rbRule : aspectTransactional.getNoRollbackFor()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
for (String rbRule : aspectTransactional.getNoRollbackForClassName()) {
rollbackRules.add(new NoRollbackRule(rbRule));
}
transactionInfo.setRollbackRules(rollbackRules);
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
// ...
}
}

核心源码

execute就是主要源码部分了,会先获取当前的一个全局事务id(xid),然后根据不同的事务传播行为做不同的处理,最后就是开启全局事务和提交或回滚全局事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public Object execute(TransactionalExecutor business) throws Throwable {
// 1. Get transactionInfo
// 获取事务信息,就是上一步方法中执行的结果
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
// 获取当前事务
GlobalTransaction tx = GlobalTransactionContext.getCurrent();

// 1.2 Handle the transaction propagation.
// 下面这部分是根据不同的事务传播机制做不同的行为
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// If transaction is existing, suspend it.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
}
// Execute without transaction and return.
return business.execute();
case REQUIRES_NEW:
// If transaction is existing, suspend it, and then begin new transaction.
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend();
tx = GlobalTransactionContext.createNew();
}
// Continue and execute with new transaction
break;
case SUPPORTS:
// If transaction is not existing, execute without transaction.
if (notExistingTransaction(tx)) {
return business.execute();
}
// Continue and execute with new transaction
break;
case REQUIRED:
// If current transaction is existing, execute with current transaction,
// else continue and execute with new transaction.
break;
case NEVER:
// If transaction is existing, throw exception.
if (existingTransaction(tx)) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
, tx.getXid()));
} else {
// Execute without transaction and return.
return business.execute();
}
case MANDATORY:
// If transaction is not existing, throw exception.
if (notExistingTransaction(tx)) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
// Continue and execute with current transaction.
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}

// 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
if (tx == null) {
tx = GlobalTransactionContext.createNew();
}

// set current tx config to holder
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

try {
// 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
// else do nothing. Of course, the hooks will still be triggered.
// 开启事务
beginTransaction(txInfo, tx);

Object rs;
try {
// Do Your Business
// 执行业务
rs = business.execute();
} catch (Throwable ex) {
// 3. The needed business exception to rollback.
// 如果业务发生异常,那么判断目标异常是否需要回滚,进行回滚或忽略异常
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}

// 4. everything is fine, commit.
// 提交事物
commitTransaction(tx);

return rs;
} finally {
//5. clear
// 资源清理
resumeGlobalLockConfig(previousConfig);
triggerAfterCompletion();
cleanUp();
}
} finally {
// If the transaction is suspended, resume it.
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}

这里的开启、提交、回滚等操作其最后都是调用DefaultTransactionManager类下的syncCall方法,且都是采用netty与TCC进行通讯,开启事务时会返回一个全局事务id(xid)并绑定到本地当前事务。

数据源代理

state对DataSourceStatementConnection等都进行了代理,所以在执行语句前可以进行一些特殊的操作,比如生成前镜像和后镜像。

SeataDataSourceAutoConfiguration类中可知数据源代理是通过SeataAutoDataSourceProxyCreator类中的操作来完成的。

这里有点小区别,1.4.2之前的版本SeataDataSourceConfiguration是SeataAutoConfiguration类中的一个静态内部类,这个类往容器中注入了一个SeataDataSourceBeanPostProcessor类型的bean,这个bean通过在数据源bean的生命周期中去直接拦截返回数据的代理类实现代理的,而1.4.2版本则移除了这个类,目的都是完成对数据源的代理,这里展示的是1.4.2之后的版本代码

找到SeataAutoDataSourceProxyCreator类发现它也是继承了AbstractAutoProxyCreator类来实现动态代理,那么找到重写的wrapIfNecessary方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
// we only care DataSource bean
if (!(bean instanceof DataSource)) {
return bean;
}
// when this bean is just a simple DataSource, not SeataDataSourceProxy
if (!(bean instanceof SeataDataSourceProxy)) {
Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
// this mean this bean is either excluded by user or had been proxy before
if (bean == enhancer) {
return bean;
}
// else, build proxy, put <origin, proxy> to holder and return enhancer
DataSource origin = (DataSource) bean;
SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
DataSourceProxyHolder.put(origin, proxy);
return enhancer;
}
// ...
}

从代码中可以看出,这里构建了一个SeataDataSourceProxy类型的数据源代理,DataSourceProxyHolder.put(origin, proxy)内部其实是一个Map<DataSource, SeataDataSourceProxy>类型的Map对象,用来存储原数据源对象和代理对象的关系。

另外,这里的buildProxy方法,也就是构建数据源代理对象的方法,里面根据不同的模式 构建的对象有区别,这里只说官方默认的AT模式。

1
2
3
4
5
6
7
8
9
10
11
SeataDataSourceProxy buildProxy(DataSource origin, String proxyMode) {
if (BranchType.AT.name().equalsIgnoreCase(proxyMode)) {
// AT 模式构建的数据源代理对象
return new DataSourceProxy(origin);
}
if (BranchType.XA.name().equalsIgnoreCase(proxyMode)) {
// XA 模式构建的数据源代理对象
return new DataSourceProxyXA(origin);
}
throw new IllegalArgumentException("Unknown dataSourceProxyMode: " + proxyMode);
}

DataSourceProxy中通过重写getConnection方法的重新完成对Connection的代理

1
2
3
4
5
@Override
public ConnectionProxy getConnection() throws SQLException {
Connection targetConnection = targetDataSource.getConnection();
return new ConnectionProxy(this, targetConnection);
}

ConnectionProxy类继承AbstractConnectionProxyAbstractConnectionProxy类实现Connection接口,主要实现prepareStatement方法来实现代理PreparedStatement对象

1
2
3
4
5
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
// ...
return new PreparedStatementProxy(this, targetPreparedStatement, sql);
}

PreparedStatementProxy代理对象对executeexecuteQueryexecuteUpdate等方法重写,内部都是调用的ExecuteTemplate.execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public boolean execute() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
}

@Override
public ResultSet executeQuery() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery());
}

@Override
public int executeUpdate() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
}

接着看ExecuteTemplate.execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
// ...
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
// 根据不同的sql类型得到不同的执行器
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType) {
case JdbcConstants.MYSQL:
case JdbcConstants.MARIADB:
executor =
new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
default:
// 普通执行器
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
// 调用执行方法
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}

进入executor.execute(args)找到实现类BaseTransactionalExecutor再进入doExecute方法,再找到doExecute的在AbstractDMLBaseExecutor中的实现方法

AbstractDMLBaseExecutor中有两个分支,自动提交事务和手动提交事务,其实自动提交事务后面也会改成手动,不过它内部有提交逻辑

1
2
3
4
5
6
7
8
9
10
11
@Override
public T doExecute(Object... args) throws Throwable {
AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
if (connectionProxy.getAutoCommit()) {
        // 执行自动提交事务逻辑
return executeAutoCommitTrue(args);
} else {
        // 执行手动提交事务逻辑
return executeAutoCommitFalse(args);
}
}

进入executeAutoCommitTrue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected T executeAutoCommitTrue(Object[] args) throws Throwable {
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
try {
// 将事务提交方式设置为手动
connectionProxy.changeAutoCommit();
return new LockRetryPolicy(connectionProxy).execute(() -> {
// 执行手动提交事务的逻辑,这里很重要,AT模式的undolog就是在此时产生
T result = executeAutoCommitFalse(args);
// 提交事务
connectionProxy.commit();
return result;
});
} catch (Exception e) {
// when exception occur in finally,this exception will lost, so just print it here
LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
connectionProxy.getTargetConnection().rollback();
}
throw e;
} finally {
connectionProxy.getContext().reset();
connectionProxy.setAutoCommit(true);
}
}

产生UndoLog

进入executeAutoCommitFalse方法,这里会在语句执行之前产生一个前镜像,在执行之后产生一个后镜像其实就是记录执行前后的区别,然后根据他们来形成undoLog用于后续可能的数据回滚,这里的undolog只是存在缓存中,暂时没有持久化到db

1
2
3
4
5
6
7
8
9
10
11
protected T executeAutoCommitFalse(Object[] args) throws Exception {
// 前镜像
TableRecords beforeImage = beforeImage();
// 执行语句
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
// 后镜像
TableRecords afterImage = afterImage(beforeImage);
// 形成undoLog暂存缓存
prepareUndoLog(beforeImage, afterImage);
return result;
}

回到上一步,执行完executeAutoCommitFalse之后,执行connectionProxy.commit()进入事务提交环节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 @Override
public void commit() throws SQLException {
try {
lockRetryPolicy.execute(() -> {
// 具体提交事务逻辑
doCommit();
return null;
});
} catch (SQLException e) {
if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
// 异常回滚
rollback();
}
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}

进入doCommit()

1
2
3
4
5
6
7
8
9
10
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
// 处理全局事务
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}

进入processGlobalTransactionCommit(),这里将undoLog真正持久化到db保存起来,然后执行targetConnection.commit();提交事务到数据库,提交成功与否都会通知服务端(TCC)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void processGlobalTransactionCommit() throws SQLException {
try {
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
// 将undoLog持久化到db
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
// 提交事务
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
// 发生异常,上报TCC事务执行失败,进行回滚
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
// 上报TCC事务执行成功
report(true);
}
context.reset();
}