TCC-Transaction源码解析之事务补偿
上文中,我们对TCC-Transaction的事务提交阶段主流程进行了详细的解析。上文遗留了一个问题:
如果在事务执行过程中出现了down机、停电、网络异常等情况,事务一致性就无法得到保证,此时应该怎么做?
这个问题在TCC-Transaction框架中是通过定时任务+状态机方式实现的,这种方式也是我们日常开发中经常使用的一种策略。本文,我们对事务恢复主逻辑进行分析,使TCC-Transaction源码解析形成一个闭环。
RecoverScheduledJob
事务补偿定时任务的核心逻辑由tcc-transaction-spring模块下的RecoverScheduledJob.java完成,对失败的confirm、cancel操作进行失败补偿操作。代码逻辑如下:
public class RecoverScheduledJob {
private TransactionRecovery transactionRecovery;
private TransactionConfigurator transactionConfigurator;
private Scheduler scheduler;
// 通过init方法启动quartz定时任务
public void init() {
try {
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
jobDetail.setTargetObject(transactionRecovery);
jobDetail.setTargetMethod("startRecover");
jobDetail.setName("transactionRecoveryJob");
jobDetail.setConcurrent(false);
jobDetail.afterPropertiesSet();
CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
cronTrigger.setBeanName("transactionRecoveryCronTrigger");
cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
cronTrigger.setJobDetail(jobDetail.getObject());
cronTrigger.afterPropertiesSet();
scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
scheduler.start();
} catch (Exception e) {
throw new SystemException(e);
}
}
...省略getter setter...
}
通过quartz进行任务调度,通过RecoverConfig中的配置初始化定时任务,通过MethodInvokingJobDetailFactoryBean的targetObject与targetMethod指定了定时任务具体执行类及具体方法。
我们注意以下代码,它将任务的核心逻辑设置到jobDetail中
jobDetail.setTargetObject(transactionRecovery);
jobDetail.setTargetMethod("startRecover");
最终通过quartz的Scheduler对任务发起调度,这里通过cron表达式触发器进行调度。
TransactionRecovery
TransactionRecovery是TCC-Transaction框架中事务补偿的核心实现
public class TransactionRecovery {
......
private TransactionConfigurator transactionConfigurator;
通过startRecover开启事务补偿重试任务。
public void startRecover() {
// 获取待补偿的任务列表
List<Transaction> transactions = loadErrorTransactions();
// 对待补偿的任务列表执行补偿操作
recoverErrorTransactions(transactions);
}
首先通过loadErrorTransactions()获取待补偿的任务列表:
private List<Transaction> loadErrorTransactions() {
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
// 获取配置的具体事务持久化策略,如:基于数据库、zk、redis等
TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
// 获取重试策略,包含:cron表达式,最大重试次数等
RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
// 获取在RecoverDuration间隔之前未完成的transaction列表,查询方式依具体的持久化策略而定
return transactionRepository
.findAllUnmodifiedSince(
new Date(currentTimeInMillis
- recoverConfig.getRecoverDuration() * 1000));
}
接着看一下recoverErrorTransactions方法逻辑,对待补偿的任务列表进行补偿操作。
private void recoverErrorTransactions(List<Transaction> transactions) {
// 对需要进行重试的事务列表进行迭代
for (Transaction transaction : transactions) {
// 如果重试次数超过配置的最大重试次数,则打印异常日志;跳过不再重试
if (transaction.getRetriedCount() >
transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
...省略异常日志...
continue;
}
// 如果是分支事务,并且超过最长超时时间则忽略不再重试
if (transaction.getTransactionType().equals(TransactionType.BRANCH)
&& (transaction.getCreateTime().getTime()
transactionConfigurator.getRecoverConfig().getMaxRetryCount()
*transactionConfigurator.getRecoverConfig().getRecoverDuration()
* 1000
> System.currentTimeMillis())) {
continue;
}
try {
// 增加重试次数
transaction.addRetriedCount();
// 如果当前事务状态为CONFIRMING
if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {
// 设置事务状态为CONFIRMING
transaction.changeStatus(TransactionStatus.CONFIRMING);
// 修改当前事务状态
transactionConfigurator.getTransactionRepository().update(transaction);
// 提交事务
transaction.commit();
// 删除事务记录(删除失败则不作处理)
transactionConfigurator.getTransactionRepository().delete(transaction);
// 如果事务状态为CANCELLING或者事务为根事务(根事务没提交)
} else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)
|| transaction.getTransactionType().equals(TransactionType.ROOT)) {
// 设置事务状态为CANCELLING
transaction.changeStatus(TransactionStatus.CANCELLING);
// 修改当前事务状态
transactionConfigurator.getTransactionRepository().update(transaction);
// 回滚事务
transaction.rollback();
// 删除事务记录(删除失败则不作处理)
transactionConfigurator.getTransactionRepository().delete(transaction);
}
} catch (Throwable throwable) {
...省略异常日志...
}
}
}
public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
this.transactionConfigurator = transactionConfigurator;
}
}
上述注释已经很明确的指出了事务补偿job的核心逻辑,就不再赘述。我们总结一下:
对于trying阶段的异常事务,不会进行重试;而是会触发canceling操作;
对于confirming、canceling阶段的异常事务,定时进行重试补偿,尽最大努力去尝试提交事务,如果达到了最大重试次数还是处理失败则不再处理。这种极端的情况需要人工进行介入。
TCC-Transaction提供的后台server模块允许我们对事务进行手工补偿操作,这极大的提高了框架的可靠性以及易用性。
小结
本文我们主要对TCC-Transaction的事务补偿阶段逻辑进行了分析。TCC-Transaction框架通过定时补偿,对异常事务进行处理,保证了分布式事务的最终一致性。
通过这一过程,我们能够看出,TCC-Transaction框架本质上也是柔性事务的一种解决方案,如果仔细分析,它也是满足BASE原则的。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。