文章目录
  1. 1. 源码下载
  2. 2. 从一个简单的样例入手
  3. 3. 解析注解@Compensable
    1. 3.1. CompensableTransactionAspect
      1. 3.1.1. CompensableTransactionInterceptor.interceptCompensableMethod(pjp);
      2. 3.1.2. rootMethodProceed(compensableMethodContext)
        1. 3.1.2.1. begin()
        2. 3.1.2.2. commit()
        3. 3.1.2.3. rollback()
        4. 3.1.2.4. cleanAfterCompletion(transaction)
      3. 3.1.3. providerMethodProceed(compensableMethodContext)
    2. 3.2. ResourceCoordinatorAspect
      1. 3.2.1. enlistParticipant(pjp)
  4. 4. 小结

TCC分布式事务解决方案在开源界的主要实现为Byte-TCC、TCC-Transaction等。其中笔者了解较多并且业界使用率较高的为TCC-Transaction这一实现。

本文,我将带领读者对TCC-Transaction这一分布式事务框架进行一次源码解析,提高自己的阅读源码的能力,也希望能够对读者深入了解TCC-Transaction有所帮助。

源码下载

源码地址为 https://github.com/changmingxie/tcc-transaction,我们关注最新版本1.2.x。

源码下载后导入IDEA中,项目目录结构如下图:

代码结构

模块及其对应职责说明如下:

tcc-transaction
    |-transaction-tcc-api                   框架API定义,公共类/核心实体定义/枚举/工具类等
    |-transaction-tcc-core                  框架核心逻辑
    |-transaction-tcc-dubbo                 框架整合Dubbo实现
    |-transaction-tcc-spring                框架Spring整合,包含获取数据库连接/切面获取等
    |-transaction-tcc-server                后台管理页面,对事务进行手工重试等
    |-transaction-tcc-unit-test             单元测试
    |-transaction-tcc-tutorial-sample       样例工程
        |-tcc-transaction-dubbo-sample
        |-tcc-transaction-http-sample
        |-tcc-transaction-sample-domain
        |-tcc-transaction-server-sample

项目核心模块为 tcc-transaction-core,它实现了TCC核心业务逻辑,也是本次源码解析的重点对象。

我们从Dubbo使用样例入手进行分析,关于如何使用TCC-Transaction的更多说明,请参照官方文档: 使用指南1.2.x

从一个简单的样例入手

我们从一个调用案例入手开始进行分析,样例路径为org.mengyun.tcctransaction.sample.dubbo.order.service.PaymentServiceImpl。

@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", 
asyncConfirm = false, 
delayCancelExceptions = {SocketTimeoutException.class, com.alibaba.dubbo.remoting.TimeoutException.class})
public void makePayment(@UniqueIdentity String orderNo, Order order,
                 BigDecimal redPacketPayAmount, BigDecimal capitalPayAmount) {
    System.out.println("order try make payment called.time seq:" + 
                DateFormatUtils.format(Calendar.getInstance(), "yyyy-MM-dd HH:mm:ss"));

    //check if the order status is DRAFT, if no, means that another call makePayment 
                for the same order happened, ignore this call makePayment.
    if (order.getStatus().equals("DRAFT")) {
        order.pay(redPacketPayAmount, capitalPayAmount);
        try {
            orderRepository.updateOrder(order);
        } catch (OptimisticLockingFailureException e) {
            //ignore the concurrently update order exception, ensure idempotency.
        }
    }

    String result = capitalTradeOrderService.record(buildCapitalTradeOrderDto(order));
    String result2 = redPacketTradeOrderService.record(buildRedPacketTradeOrderDto(order));
}
...省略confirmMakePayment实现...
...省略cancelMakePayment实现...

这段代码为模拟支付扣款操作,可以看到在方法上添加了@Compensable注解,它是TCC-Transaction框架的核心注解,作用为:开启tcc事务支持,注解可以设置一下参数

参数名 描述
propagation 事务传播属性,REQUIRED(必须存在事务,不存在则进行创建),SUPPORTS(如果有事务则在事务内运行),MANDATORY(必须存在事务),REQUIRES_NEW(不管是否存在是否都创建新的事务)
confirmMethod confirm阶段方法实现
cancelMethod cancel阶段方法实现
transactionContextEditor 设置transactionContextEditor
asyncConfirm 是否使用异步confirm
asyncCancel 是否使用异步cancel

解析注解@Compensable

看到了@Compensable注解以及对应的confirm、cancle方法,处于技术敏感,我们可以猜测在框架中一定存在切面逻辑对@Compensable进行拦截并处理;在切面逻辑中一定有对confirm、cancel方法的调用。从这个猜想出发,我们通过阅读相关代码去验证自己的猜想。

我们进入tcc-transaction-core模块的代码目录,目录结构如下:

org.mengyun.tcctransaction
    |-common
    |-context
    |-interceptor           TCC事务拦截器
    |-recover               TCC事务补偿
    |-repository            事务存储
    |-serializer
    |-support
    |-utils

我们主要关注interceptor目录,该目录下的interceptor实现了对注解@Compensable的解析以及对事务的代理逻辑。

CompensableTransactionAspect

CompensableTransactionAspect切面主要实现了对@Compensable的解析以及对事务的代理。

@Aspect
public abstract class CompensableTransactionAspect {

    private CompensableTransactionInterceptor compensableTransactionInterceptor;

    public void setCompensableTransactionInterceptor(
                CompensableTransactionInterceptor compensableTransactionInterceptor) {
        this.compensableTransactionInterceptor = compensableTransactionInterceptor;
    }

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void compensableService() {

    }

    @Around("compensableService()")
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

        return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
    }

    public abstract int getOrder();
}

CompensableTransactionAspect的实现类为ConfigurableTransactionAspect.java, 加载顺序order= Ordered.HIGHEST_PRECEDENCE(-2147483648)。

该切面对标注了@Compensable的方法进行拦截,通过@Around为业务方法添加环绕增强。可以看到具体的增强方法实现为CompensableTransactionInterceptor.interceptCompensableMethod(pjp);

CompensableTransactionInterceptor.interceptCompensableMethod(pjp);

接着上述的分析,我们看一下CompensableTransactionInterceptor.interceptCompensableMethod(pjp)的逻辑。

[CompensableTransactionInterceptor.java]
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

    // 初始化一个TCC方法执行上下文
    CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp);
    // 校验事务支持是否开启
    boolean isTransactionActive = transactionManager.isTransactionActive();
    // 校验事务隔离级别
    if (!TransactionUtils.isLegalTransactionContext(
                isTransactionActive, compensableMethodContext)) {
        throw new SystemException
            ("no active compensable transaction while propagation is mandatory for method " 
                    + compensableMethodContext.getMethod().getName());
    }
    // 根据事务方法类型判断执行哪个逻辑
    switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
        case ROOT:
            return rootMethodProceed(compensableMethodContext);
        case PROVIDER:
            return providerMethodProceed(compensableMethodContext);
        default:
            return pjp.proceed();
    }
}

我们主要关注switch代码段

switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
    case ROOT:
        //处理主事务切面,即:本次事务的入口方法
        return rootMethodProceed(compensableMethodContext);
    case PROVIDER:
        //处理提供者事务切面
        return providerMethodProceed(compensableMethodContext);
    default:
        //消费者事务直接执行,会对应执行远端提供者事务切面
        return pjp.proceed();
}

当事务方法为ROOT方法(即分布式事务的主方法)时,执行rootMethodProceed(compensableMethodContext);方法为PROVIDER(提供者)方法时,执行providerMethodProceed(compensableMethodContext)。默认为消费者事务,则直接执行。

我们以此看一下这几种事务切面的执行逻辑。

rootMethodProceed(compensableMethodContext)

对于事务的Root方法,执行rootMethodProceed逻辑,代码逻辑:

private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) 
    throws Throwable {

    Object returnValue = null;

    Transaction transaction = null;

    boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();

    boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

    Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>();
    allDelayCancelExceptions.addAll(this.delayCancelExceptions);
    allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions()));

    try {
        // 创建事务, 将主事务的信息写入db或者zk或者redis中去,事务信息写入具体方式可配置
        transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());

        try {
            // 执行完成之后会马上进到另外一个切面中去
            returnValue = compensableMethodContext.proceed();
        } catch (Throwable tryingException) {
            // 如果try失败,则进行回滚
            if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) {

                logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
                // 回滚事务
                transactionManager.rollback(asyncCancel);
            }

            throw tryingException;
        }
        // 提交事务
        transactionManager.commit(asyncConfirm);

    } finally {
        // 最终如果执行成功,则删除之前的事务记录;如果执行失败则不作任何处理,等待job进行补偿操作
        transactionManager.cleanAfterCompletion(transaction);
    }
    return returnValue;
}

注意关注这段代码

// 执行完成之后会马上进到另外一个切面中去
returnValue = compensableMethodContext.proceed();

当所有的切面都执行完成之后才会执行后续的逻辑,也就是真正执行业务方法。

该方法为一个典型的模板方法,对事务通过begin、commit、rollback进行了抽象。

我们进入三个方法详细的分析。

begin()

首先进入begin方法

[TransactionManager.java]
public Transaction begin(Object uniqueIdentify) {
    // 0
    Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT);
    // 1
    transactionRepository.create(transaction);
    // 2
    registerTransaction(transaction);
    return transaction;
}

0.首先声明并初始化一个分布式事务对象Transaction,标记为ROOT事务,事务初始状态为TRYING。这里采用了经典的状态机策略

public Transaction(TransactionType transactionType) {
    this.xid = new TransactionXid();
    // 事务初始状态设置成TRYING
    this.status = TransactionStatus.TRYING;
    this.transactionType = transactionType;
}   

1.将事务信息存储到数据源中,数据源可以是数据库、redis、zk等,可配置;TransactionRepository是具体的持久化策略的抽象

2.注册事务,在TransactionManager中,通过双向队列(Deque)实现事务栈功能,用来处理嵌套事务。通过对Deque声明为为ThreadLocal,所以对每个线程而言,事务栈都都是独立的

private static final ThreadLocal> CURRENT = new ThreadLocal>();

commit()

接着看一下commit()方法

[TransactionManager.java]
public void commit(boolean asyncCommit) {

    // 从ThreadLocal中获取当前事务
    final Transaction transaction = getCurrentTransaction();
    // 设置事务状态为CONFIRMING
    transaction.changeStatus(TransactionStatus.CONFIRMING);
    // 更新存储中的事务信息
    transactionRepository.update(transaction);

    // 如果异步commit属性为true
    if (asyncCommit) {
        try {
            Long statTime = System.currentTimeMillis();
            // 通过本地线程池异步进行事务提交
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    commitTransaction(transaction);
                }
            });
            logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
        } catch (Throwable commitException) {
            logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
            throw new ConfirmingException(commitException);
        }
    } else {
        // 否则同步进行事务提交
        commitTransaction(transaction);
    }
}

commit(boolean asyncCommit)方法执行事务的提交过程,具体提交逻辑在commitTransaction(transaction)中完成。

[TransactionManager.java]
private void commitTransaction(Transaction transaction) {
    try {
        // 提交事务
        transaction.commit();
        // 删除本次提交的本地事务记录,如果commit异常,不会把数据库内事务记录删除,
        // 通过job重试进行补偿
        transactionRepository.delete(transaction);
    } catch (Throwable commitException) {
        logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
        throw new ConfirmingException(commitException);
    }
}

[Transaction.java]
public void commit() {
    // 对每一个分支执行提交操作
    for (Participant participant : participants) {
        participant.commit();
    }
}

可以看到,在事务提交完成之后,对本地持久化的事务记录进行了物理删除,具体删除方式取决于持久化机制。感兴趣的同学可以自行查看 org.mengyun.tcctransaction.repository 目录下的实现。

rollback()

我们看一下方法rollback()是如何实现事务回滚逻辑的

[TransactionManager.java]
public void rollback(boolean asyncRollback) {

    // 从ThreadLocal中获取当前事务    
    final Transaction transaction = getCurrentTransaction();
    transaction.changeStatus(TransactionStatus.CANCELLING);
    // 更新事务状态为CANCELLING
    transactionRepository.update(transaction);
    // 如果异步rollback属性为true
    if (asyncRollback) {
        try {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    // 通过线程池执行回滚逻辑
                    rollbackTransaction(transaction);
                }
            });
        } catch (Throwable rollbackException) {
            logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
            throw new CancellingException(rollbackException);
        }
    } else {
        // 异步rollback设置为false,同步执行回滚
        rollbackTransaction(transaction);
    }
}

和commit方法类似,在rollback(boolean asyncRollback)执行事务的回滚操作,具体的操作在rollbackTransaction(transaction)中执行:

private void rollbackTransaction(Transaction transaction) {
    try {
        // 事务回滚
        transaction.rollback();
        // 删除本次回滚的本地事务记录,如果rollback异常,不会把数据库内事务记录删除,
        // 通过job重试进行补偿
        transactionRepository.delete(transaction);
    } catch (Throwable rollbackException) {
        logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
        throw new CancellingException(rollbackException);
    }
}
cleanAfterCompletion(transaction)

无论是否提交/回滚,最终都会执行cleanAfterCompletion(transaction)方法进行现场清理操作。

public void cleanAfterCompletion(Transaction transaction) {
    if (isTransactionActive() && transaction != null) {
        // 从ThreadLocal中获取当前事务
        Transaction currentTransaction = getCurrentTransaction();‘
        // 弹出当前事务
        if (currentTransaction == transaction) {
            CURRENT.get().pop();
            if (CURRENT.get().size() == 0) {
                CURRENT.remove();
            }
        } else {
            throw new SystemException("Illegal transaction when clean after completion");
        }
    }
}

事务执行结束,从栈中弹出当前结束的事务。

providerMethodProceed(compensableMethodContext)

看完rootMethodProceed根事务切面逻辑,再来看提供者切面事务逻辑就好理解多了,方法逻辑如下:

private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

    // 获取异步回滚、异步提交标识
    Transaction transaction = null;
    boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
    boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

    try {
        // 判断当前事务状态
        switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) {
            // 如果事务状态为TRYING
            case TRYING:
                //  通过使用transactionContext创建分支事务
                transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());
                // 执行被切方法逻辑
                return compensableMethodContext.proceed();

            // 如果事务状态为CONFIRMING
            case CONFIRMING:
                try {
                    // 对事务状态进行更新
                    transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                    // 提交事务,不执行切面方法
                    transactionManager.commit(asyncConfirm);
                } catch (NoExistedTransactionException excepton) {
                    //the transaction has been commit,ignore it.
                }
                break;

            // 如果事务状态为CANCELLING
            case CANCELLING:
                try {
                    // 更新事务状态
                    transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                    // 执行事务回滚,不执行切面方法
                    transactionManager.rollback(asyncCancel);
                } catch (NoExistedTransactionException exception) {
                    //the transaction has been rollback,ignore it.
                }
                break;
        }

    } finally {
        // 对现场进行清理
        transactionManager.cleanAfterCompletion(transaction);
    }

    Method method = compensableMethodContext.getMethod();
    // 处理原始类型返回值,返回原始类型的默认值,因为不能返回null
    return ReflectionUtils.getNullValue(method.getReturnType());
}

public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {

    // 根据事务id从事务持久化组件中查询到本事务
    Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
    // 不为空
    if (transaction != null) {
        // 对事务状态进行更新,根据传参不同,执行TRYING->CONFIRMING或者TRYING->CANCELING等操作
        transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
        // 对事务栈进行操作,执行嵌套事务入栈
        registerTransaction(transaction);
        return transaction;
    } else {
        throw new NoExistedTransactionException();
    }
}

这里进行小结,可以看到在provider类型的方法切面,对于远程的Participant,如果transaction的status为trying,则通过transactionManager.propagationNewBegin创建分支事务并执行被切方法逻辑;

如果是status为confirming或canceling,则会调用对应的confirm或cancel配置的方法,跳过被切方法

对于普通类型方法直接调用,normal类型的方法是封装了对远程dubbo接口方法调用逻辑的本地proxy方法,所以直接执行即可

ResourceCoordinatorAspect

ResourceCoordinatorAspect切面主要是为了执行资源协调,它的实现为ConfigurableCoordinatorAspect

[ResourceCoordinatorAspect.java]
@Aspect
public abstract class ResourceCoordinatorAspect {

    private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;

    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void transactionContextCall() {
    }

    @Around("transactionContextCall()")
    public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
    }

    public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
        this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
    }

    public abstract int getOrder();
}

[ConfigurableCoordinatorAspect.java]
@Aspect
public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered {

    private TransactionConfigurator transactionConfigurator;

    public void init() {
        ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor();
        resourceCoordinatorInterceptor.setTransactionManager(transactionConfigurator.getTransactionManager());
        this.setResourceCoordinatorInterceptor(resourceCoordinatorInterceptor);
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 1;
    }

    public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
        this.transactionConfigurator = transactionConfigurator;
    }
}

ConfigurableCoordinatorAspect的职责为设置事务的参与者;在一个事务内,每个被@Compensable注解的方法都是事务参与者。

可以看到该切面的优先级为 Ordered.HIGHEST_PRECEDENCE + 1,order的数值大于CompensableTransactionAspect。由于 @Order中的值越小,优先级越高,因此切面ResourceCoordinatorAspect的优先级小于CompensableTransactionAspect。

从代码可以看出,设置事务参与者逻辑是通过ResourceCoordinatorInterceptor.interceptTransactionContextMethod方法执行的。

[ResourceCoordinatorInterceptor.java]
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {

    // 从当前ThreadLocal中获取事务
    Transaction transaction = transactionManager.getCurrentTransaction();
    if (transaction != null) {
        switch (transaction.getStatus()) {
            case TRYING:
                // 只需要在TRYING阶段将参与者的信息提取出来设置到transaction中
                enlistParticipant(pjp);
                break;
            case CONFIRMING:
                break;
            case CANCELLING:
                break;
        }
    }
    // 执行目标方法
    return pjp.proceed(pjp.getArgs());
}

我们可以得知,在trying阶段,框架会把所有事务参与者加入到当前事务中去。

对于Root方法,先创建主事务,事务参与者包括Root方法对应的本地参与者及Normal方法对应的远程参与者;

对于Provider方法,首先通过主事务上下文创建分支事务,事务参与者包括Provider方法对应的本地参与者以及它所包含的Normal方法对应的远程参与者。而远程参与者又可以开启新的分支事务。

我们可以合理的猜想,如果事务嵌套的层级很多,一定会存在性能问题。

enlistParticipant(pjp)

我们详细看一下enlistParticipant(pjp)是如何生成的事务参与者对象。

private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {

    // 首先获取@Compensable信息
    Method method = CompensableMethodUtils.getCompensableMethod(pjp);
    if (method == null) {
        // @Compensable标注的方法为空则抛出异常
        throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
    }
    Compensable compensable = method.getAnnotation(Compensable.class);

    // 回去confirm和cancle方法名
    String confirmMethodName = compensable.confirmMethod();
    String cancelMethodName = compensable.cancelMethod();
    // 获取当前事务以及全局事务id
    Transaction transaction = transactionManager.getCurrentTransaction();
    TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());

    // 设置事务上下文到Editor中
    // Editor用来统一提取事务上下文,如果是dubbo则对应设置dubbo的rpc上下文
    // 此处的上下文设置之后就会调用try逻辑
    if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
        FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
    }

    // 通过目标类名,方法名,参数类型获取目标类
    Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());

    // confirm逻辑调用上下文
    InvocationContext confirmInvocation = new InvocationContext(targetClass,
            confirmMethodName,
            method.getParameterTypes(), pjp.getArgs());

    //cancel逻辑调用上下文
    InvocationContext cancelInvocation = new InvocationContext(targetClass,
            cancelMethodName,
            method.getParameterTypes(), pjp.getArgs());

    // 此处较为关键,confirm和cancle具有相同地位,都被抽象成InvocationContext
    Participant participant =
            new Participant(
                    xid,
                    confirmInvocation,
                    cancelInvocation,
                    compensable.transactionContextEditor());
    // 将participant设置到transaction中,并同步到持久化存储中
    transactionManager.enlistParticipant(participant);
}

[TransactionManager.java]
public void enlistParticipant(Participant participant) {
    Transaction transaction = this.getCurrentTransaction();
    transaction.enlistParticipant(participant);
    transactionRepository.update(transaction);
}

[Transaction.java]
public void enlistParticipant(Participant participant) {
    participants.add(participant);
}

从上述的代码逻辑中,我们可以得到结论,CompensableTransactionAspect开启事务,ResourceCoordinatorAspect对注解@Compensable进行解析,将confirm与cancel的具体逻辑设置到事务管理器中。

当上述两个切面都执行完成之后,开始执行try中的方法。如果try成功则执行commit否则执行rollback。

每个分支事务最终被封装到Transaction的participants中,每个分布式事务都有一个自己的 ThreadLocal

我们再次回顾commit的逻辑,查看Transaction.commit()方法

[Transaction.java]
public void commit() {
    // 对每一个分支执行提交操作
    for (Participant participant : participants) {
        participant.commit();
    }
}

participant就是切面ResourceCoordinatorAspect 添加的。我们再看一下participant.commit()的逻辑:

[Transaction.java]
public void commit() {
    terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}

可以看到最终事务提交是通过invoke反射实现的,我们进入invoke逻辑

public Object invoke(TransactionContext transactionContext, 
                    InvocationContext invocationContext, 
                    Class<? extends TransactionContextEditor> transactionContextEditorClass) {
    // 如果事务执行上下文方法名不为空
    if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
        try {
            Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();

            Method method = null;

            method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
            // 实例化原事务执行者的代理对象
            FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
            // 反射执行
            return method.invoke(target, invocationContext.getArgs());

        } catch (Exception e) {
            throw new SystemException(e);
        }
    }
    return null;
}

最终通过method.invoke(target, invocationContext.getArgs())方法完成了真实的事务提交操作。

小结

到此我们对TCC-TRANSACTION的事务提交主流程进行了完整的分析。

通过分析我们可以知道TCC-TRANSACTION的核心逻辑是通过两个切面CompensableTransactionAspect、ResourceCoordinatorAspect 实现的。通过对事务进行包装与代理,实现了类二阶段的分布式事务解决方案。

实际上,TCC-TRANSACTION还有一个重要的补偿逻辑我们还没有分析,它是基于定时调度实现的。

限于本文的篇幅,就不再继续展开。我将单独用一篇文章来对TCC-TRANSACTION的补偿过程进行分析,我们下文再会。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。

文章目录
  1. 1. 源码下载
  2. 2. 从一个简单的样例入手
  3. 3. 解析注解@Compensable
    1. 3.1. CompensableTransactionAspect
      1. 3.1.1. CompensableTransactionInterceptor.interceptCompensableMethod(pjp);
      2. 3.1.2. rootMethodProceed(compensableMethodContext)
        1. 3.1.2.1. begin()
        2. 3.1.2.2. commit()
        3. 3.1.2.3. rollback()
        4. 3.1.2.4. cleanAfterCompletion(transaction)
      3. 3.1.3. providerMethodProceed(compensableMethodContext)
    2. 3.2. ResourceCoordinatorAspect
      1. 3.2.1. enlistParticipant(pjp)
  4. 4. 小结
Fork me on GitHub