今天看啥  ›  专栏  ›  FangZzzzz

TCC-Transaction源码解读。

FangZzzzz  · 掘金  ·  · 2019-09-18 07:47
阅读 37

TCC-Transaction源码解读。

TCC原理

为了解决在事务运行过程中大颗粒度资源锁定的问题,业界提出一种新的事务模型,它是基于业务层面的事务定义。锁粒度完全由业务自己控制。它本质是一种补偿的思路。它把事务运行过程分成 Try、Confirm / Cancel 两个阶段。在每个阶段的逻辑由业务代码控制。这样就事务的锁粒度可以完全自由控制。业务可以在牺牲隔离性的情况下,获取更高的性能。

  • try阶段:
    • try:尝试执行业务
      • 完成所有业务检查(一致性)
      • 预留必须的业务资源(准隔离性)
  • Confirm / Cancel 阶段:
    • Confirm :确认执行业务
      • 真正执行业务
      • 不做任何业务检查
      • confirm操作满足幂等性
    • Cancel:取消执行业务
      • 释放 Try 阶段预留的业务资源
      • Cancel 操作满足幂等性
    • Confirm 与 Cancel 互斥

整体流程如下图:

摘自:www.iocoder.cn/TCC-Transac…

Tcc-transaction原理

在tcc-transaction中,一个tcc事务可以包含多个业务活动,tcc-transaction把每个事务活动都抽象成事务的参与者,每个事务可以包含多个参与者。

我们先看一看tcc-transaction基础架构设计。

事务管理器负责管理事务以及参与者,spring aop的方式在业务前后进行拦截,负责将参与者加入到事务中,并将事务注册进事务管理器,同时将事务存储在事务存储器进行持久化,事务补偿job通过定时检查事务存储器里的事务状态来对事务进行补偿。

显而易见,tcc-transaction入口就是事务拦截器,事务拦截器通过两个切面的方式将所有组件组织在一起。

读源码前置参考

事务

我们先看看Transaction这个类的属性。

    /*
     * 事务编号
     */
    private TransactionXid xid;

    /**
     * 事务状态
     */
    private TransactionStatus status;

    /**
     * 事务类型
     */
    private TransactionType transactionType;

    /**
     * 重试次数
     */
    private volatile int retriedCount = 0;

    /**
     * 创建时间
     */
    private Date createTime = new Date();

    /**
     * 最后更新时间
     */
    private Date lastUpdateTime = new Date();

    /**
     * 乐观锁控制
     */
    private long version = 1;

    /**
     * 多个参与者
     */
    private List<Participant> participants = new ArrayList<Participant>();

    // 附带属性映射 用于隐式传参
    private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();

复制代码

其中包含TransactionXid、TransactionStatus、TransactionType以及Participant。

TransactionXid

    /**
     * xid 格式标识
     */
    private int formatId = 1;

    /**
     * 全局事务编号
     */
    private byte[] globalTransactionId;

    /**
     * 分支事务编号
     */
    private byte[] branchQualifier;
复制代码

TransactionStatus

    TRYING(1), CONFIRMING(2), CANCELLING(3);
复制代码

TransactionType

    /**
     * 根事务
     */
    ROOT(1),
    /**
     * 分支事务
     */
    BRANCH(2);
复制代码

参与者

    /**
     * 事务id
     */
    private TransactionXid xid;

    /**
     * confirm方法上下文
     */
    private InvocationContext confirmInvocationContext;

    /**
     * cancel方法上下文
     */
    private InvocationContext cancelInvocationContext;

    /**
     * 执行器
     */
    private Terminator terminator = new Terminator();

    /**
     * 上下文编辑
     */
    Class<? extends TransactionContextEditor> transactionContextEditorClass;
    
    public void rollback() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
    }

    public void commit() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
    }
复制代码

事务管理器

提供事务的获取、发起、提交、回滚,参与者的新增等等方法。

这些先看看,有个印象,下面会详细讲解其中的内容。

事务拦截器

事务拦截器由两个spring aop切面实现,如下图。

我们先来看一看使用示例:

参与者需要声明 try / confirm / cancel 三个类型的方法,和 TCC 的操作一一对应。在程序里,通过 @Compensable 注解标记在 try 方法上,并填写对应的 confirm / cancel 方法。

    // try 方法
    @Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = DubboTransactionContextEditor.class)
    public String record(CapitalTradeOrderDto tradeOrderDto) {
    }
    
    // confirm方法
    public void confirmRecord(CapitalTradeOrderDto tradeOrderDto) {
    }
    
    // cancel方法
    public void cancelRecord(CapitalTradeOrderDto tradeOrderDto) {
    }
复制代码

很明显,肯定是有切面切入了@Compensable:注解的方法,我们先来找一下。 在源码的spring支持中有这么一段配置:

    <bean id="transactionConfigurator" class="org.mengyun.tcctransaction.spring.support.:"
          init-method="init"/>
          
    <bean id="compensableTransactionAspect" class="org.mengyun.tcctransaction.spring.ConfigurableTransactionAspect"
          init-method="init">
        <property name="transactionConfigurator" ref="transactionConfigurator"/>
    </bean>

    <bean id="resourceCoordinatorAspect" class="org.mengyun.tcctransaction.spring.ConfigurableCoordinatorAspect"
          init-method="init">
        <property name="transactionConfigurator" ref="transactionConfigurator"/>
    </bean>
复制代码

SpringTransactionConfigurator是一个配置相关的类,我们先不管。

ConfigurableTransactionAspect

先看看类图:

我们先看看ConfigurableTransactionAspect类。

@Aspect
public class ConfigurableTransactionAspect extends CompensableTransactionAspect implements Ordered {

    private TransactionConfigurator transactionConfigurator;

    public void init() {
        // 设置事务管理器
        TransactionManager transactionManager = transactionConfigurator.getTransactionManager();

        // 这个是具体的拦截实现类
        CompensableTransactionInterceptor compensableTransactionInterceptor = new CompensableTransactionInterceptor();
        compensableTransactionInterceptor.setTransactionManager(transactionManager);
        compensableTransactionInterceptor.setDelayCancelExceptions(transactionConfigurator.getRecoverConfig().getDelayCancelExceptions());

        this.setCompensableTransactionInterceptor(compensableTransactionInterceptor);
    }

    @Override
    public int getOrder() {
        // order越小,则优先级越高。
        return Ordered.HIGHEST_PRECEDENCE;
    }

    public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
        this.transactionConfigurator = transactionConfigurator;
    }
}
复制代码

ConfigurableTransactionAspect继承了CompensableTransactionAspect同时实现了Orderd,且Order值为最小值,这就是我们要找的第一个切面。

这里子类就是给父类提供了初始化,切面具体逻辑在父类。我们看看父类的逻辑。

@Aspect
public abstract class CompensableTransactionAspect {

    private CompensableTransactionInterceptor compensableTransactionInterceptor;

    public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
        this.compensableTransactionInterceptor = compensableTransactionInterceptor;
    }

    /**
     * 切点
     */
    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void compensableService() {

    }

    /**
     * 切面
     * @param pjp
     * @return
     * @throws Throwable
     */
    @Around("compensableService()")
    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

        return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
    }

    public abstract int getOrder();
}
复制代码

可以看到具体逻辑还是在compensableTransactionInterceptor中,我们继续跟。

    public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {

        // 获得 具体的方法 注解 传播行为 以及事务上下文
        CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp);

        // 事务是否存在于当前线程中
        // transactionManager为单例的,其中包含一个threadlocal , 里面是一个Deque(双端队列)
        // 判断该threadlocal里的Deque是否为空
        boolean isTransactionActive = transactionManager.isTransactionActive();

        // 判断事务上下文是否合法
        // 传播级别为支持当前事务 而不支持新事务、同时当前线程中不存在事务,且事务上下文为空
        // 则是非法  抛出异常
        if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) {
            throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName());
        }


        // 判断是根事务还是分支事务
        // 如果传播级别为必须新开事务 或者 (允许新建事务且线程中不存在事务,且事务上下文为空) 则是根事务
        // ----------------------------------------------------------------------------------------------------
        //  如果不满足上述条件, 且(事务的传播级别为支持当前事务或者为支持当前事务,并不允许开启事务)并且当前没有活动事务,并且事务上下文不为空
        //  则是分支事务
        switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
            case ROOT:
                return rootMethodProceed(compensableMethodContext);
            case PROVIDER:
                return providerMethodProceed(compensableMethodContext);
            default:
                // 如果事务存在于线程中 或者事务的传播行为为支持当前事务,没有事务则以非事务执行 则直接执行
                return pjp.proceed();
        }
    }
复制代码

首先获取注解对应的参数。

    public CompensableMethodContext(ProceedingJoinPoint pjp) {
        this.pjp = pjp;
        // 获取到具体方法
        this.method = getCompensableMethod();
        // 获取到注解
        this.compensable = method.getAnnotation(Compensable.class);
        // 注解上的传播行为
        this.propagation = compensable.propagation();
        // 获得事务上下文
        this.transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
    }
复制代码

tcc-transaction支持四种传播行为。

    /**
     * 支持当前事务,如果当前没有事务,就新建一个事务。
     */
    REQUIRED(0),
    /**
     * 支持当前事务,如果当前没有事务,就以非事务方式执行。
     */
    SUPPORTS(1),
    /**
     * 支持当前事务,如果当前没有事务,就抛出异常。
     */
    MANDATORY(2),
    /**
     *新建事务,如果当前存在事务,把当前事务挂起。
     */
    REQUIRES_NEW(3);
复制代码

FactoryBuilder是一个抽象的单例工厂,整句话就是从注解的transactionContextEditor.class中获得一个单例得transactionContextEditor,然后调用这个transactionContextEditor的get方法获得事务的上下文。

我们回到interceptCompensableMethod。isTransactionActive判断当前线程是否包含事务。

    public boolean isTransactionActive() {
        // 从threadLocal里获取双向队列
        Deque<Transaction> transactions = CURRENT.get();
        return transactions != null && !transactions.isEmpty();
    }
复制代码

就是看一下当前线程里的双向队列是否有值。

继续往下走。

        // 判断事务上下文是否合法
        // 传播级别为支持当前事务而且不支持新事务、同时当前线程中不存在事务,且事务上下文为空
        // 则是非法  抛出异常
        if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) {
            throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName());
        }
复制代码
    // 非法的必要条件
    public static boolean isLegalTransactionContext(boolean isTransactionActive, CompensableMethodContext compensableMethodContext) {

        if (compensableMethodContext.getPropagation().equals(Propagation.MANDATORY) // 事务传播行为为支持当前事务,没有事务则抛出异常
                && !isTransactionActive // 没有当前事务
                && compensableMethodContext.getTransactionContext() == null //事务上下文为空
        ) {
            return false;
        }

        return true;
    }
复制代码

接着走。

    switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
        // 判断是根事务还是分支事务
        // 如果传播级别为必须新开事务 或者 (允许新建事务且线程中不存在事务,且事务上下文为空) 则是根事务
        case ROOT:
            return rootMethodProceed(compensableMethodContext);
        //  如果不满足上述条件, 且(事务的传播级别为支持当前事务或者为支持当前事务,并不允许开启事务)并且当前没有活动事务,并且事务上下文不为空
        case PROVIDER:
            return providerMethodProceed(compensableMethodContext);
        default:
            // 如果事务存在于线程中 或者事务的传播行为为支持当前事务,没有事务则以非事务执行 则直接执行
            return pjp.proceed();
        }
    }
复制代码

看一下根事务的处理方法。

    private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

        Object returnValue = null;

        Transaction transaction = null;

        // 是否同步confirm
        boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();

        // 是否同步cancel
        boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

        // 加入延迟cancel相关的异常
        Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>();
        allDelayCancelExceptions.addAll(this.delayCancelExceptions);
        allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions()));

        try {
            // 创建根事务
            // 在内存中创建一个事务,声明为根事务
            // 将刚刚的事务持久化到redis、zookeeper或者mysql 再或者磁盘
            // 将事务注册进transactionManager中的treadlocal中的deque中
            transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());

            try {
                // 执行
                returnValue = compensableMethodContext.proceed();
            } catch (Throwable tryingException) {
                // 异常处理
                // 如果不是需要延迟的异常类型
                if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) {

                    logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
                    // 立即回滚
                    // 获取当前线程的deque队头的事务,
                    // 改变事务状态 并持久化改变的事务状态
                    // rollback所有参与者
                    // delete持久化的事务
                    transactionManager.rollback(asyncCancel);
                }
                // 抛出异常
                throw tryingException;
            }
            // 提交
            // 和上述回滚步骤一致
            transactionManager.commit(asyncConfirm);

        } finally {
            // 将事务从当前线程事务队列移除
            transactionManager.cleanAfterCompletion(transaction);
        }

        return returnValue;
    }
复制代码

看一下begin方法。

    /**
     * 发起根事务
     * @param uniqueIdentify
     * @return 事务
     */
    public Transaction begin(Object uniqueIdentify) {
        // 创建根事务
        Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT);
        // 存储 事务
        transactionRepository.create(transaction);
        // 注册事务
        registerTransaction(transaction);
        return transaction;
    }
复制代码
    private void registerTransaction(Transaction transaction) {

        if (CURRENT.get() == null) {
            CURRENT.set(new LinkedList<Transaction>());
        }

        CURRENT.get().push(transaction);
    }
复制代码

分支事务的处理方法。

    private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {

        Transaction transaction = null;


        boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();

        boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();

        try {

            // 事务上下问中的状态
            switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) {
                // 如果是try阶段
                case TRYING:
                    // 传播发起分支
                    // 在内存中创建一个事务,声明为分支事务,事务id为事务上下文中的事务id
                    // 持久化事务
                    // 注册进deque中
                    transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());
                    return compensableMethodContext.proceed();
                case CONFIRMING:
                    try {
                        //  传播获取分支
                        //  从持久化中取出事务
                        //  注册进deque中
                        transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                        transactionManager.commit(asyncConfirm);
                    } catch (NoExistedTransactionException excepton) {
                        //the transaction has been commit,ignore it.
                    }
                    break;
                case CANCELLING:
                    try {
                        // 传播获取分支
                        transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
                        transactionManager.rollback(asyncCancel);
                    } catch (NoExistedTransactionException exception) {
                        //the transaction has been rollback,ignore it.
                    }
                    break;
            }

        } finally {
            transactionManager.cleanAfterCompletion(transaction);
        }

        Method method = compensableMethodContext.getMethod();

        return ReflectionUtils.getNullValue(method.getReturnType());
    }

复制代码

看一下propagationNewBegin方法。

    /**
     * 传播发起分支事务
     * @param transactionContext 事务上下文
     * @return  分支事务
     */
    public Transaction propagationNewBegin(TransactionContext transactionContext) {
        // 创建分支事务
        Transaction transaction = new Transaction(transactionContext);
        // 存储事务
        transactionRepository.create(transaction);
        // 注册事务
        registerTransaction(transaction);
        return transaction;
    }
复制代码

总结

我们理一下逻辑。

  • 1)切面切进来首先获取到切面的一些参数。
    • 方法
    • 注解
    • 传播行为
    • 以及事务上下文
  • 2)判断是否是活动的事务(判断当前线程中的Deque是否为空)。
  • 3)判断事务是否合法,不合法情况必须满足以下三个条件:
    • 事务传播行为为MANDATORY,支持当前事务,没有事务则抛出异常。
    • 当前没有活动事务
    • 事务上下文为空。
  • 4)判断是新起一个事务(根事务或者分支事务)或者是已经在事务中,直接执行(不创建Transaction)。
    • 根事务两种情况:
      • 事务传播行为为REQUIRED(支持当前事务,没有事务则新起事务),且当前没有活动事务,事务上下文为空。
      • 事务传播行为为REQUIRES_NEW。
    • 分支事务的情况:
      • 事务传播行为为REQUIRED或者MANDATORY,且当前没有活动事务,事务上下文不为空。
    • 不新起事务(不创建Transaction)情况:
      • 当前线程存在活动事务,或者传播行为为SUPPORTS(支持当前事务,没有事务则以非事务执行)。
  • 5)起一个根事务的处理方式
    • 5.1)获取注解中的是否同步confirm、cancel状态。
    • 5.2)获取延迟cancel相关的异常。
    • 5.3)创建根事务
      • 5.3.1)创建一个根事务,随机生成事务id,事务状态为trying,事务类型为root。
      • 5.3.2)存储上一步创建的根事务到事务存储器(可以是mysql、redis、zookeeper、File)。
      • 5.3.3)将事务注册进TransactionManager的TreadLocal的Deque栈尾。
    • 5.4)执行具体业务(还会进到下一个拦截器)。
    • 5.5)如果上一步出现异常,判断是否是需要延迟的异常,如果是则继续往上抛异常,如果不是则调用transactionManager的rollback。
      • 5.5.1)从Deque中拿到当前事务。
      • 5.5.2)改变事务状态为CANCELLING。
      • 5.5.3)更新事务存储器里的事务状态。
      • 5.5.4)同步或者异步的调用transaction的rollback方法,即循环所有的参与者,调用他们的rollback方法。
      • 5.5.5)删除事务存储器的事务。
    • 5.6)没有出现异常则commit,和上述rollback步骤基本一致,就不赘述。
    • 5.7)清理现场,删除deque中的transaction,如果删除后为空,则清除ThreadLocal。
  • 6)起一个分支事务的处理方式
    • 6.1)获取注解中的是否同步confirm、cancel状态。
    • 6.2)判断事务阶段。
      • 6.2.1)TRYING阶段
        • 6.2.1.1) 创建一个分支事务,使用事务上下文中的事务id,事务状态为trying,事务类型为BRANCH。
        • 6.2.1.2)存储上一步创建的分支事务到事务存储器(可以是mysql、redis、zookeeper、File)。
        • 6.2.1.3)将事务注册进TransactionManager的TreadLocal的Deque栈尾。
        • 6.2.1.4)直接执行业务逻辑,不处理异常
      • 6.2.2)CONFIRMING阶段
        • 6.2.2.1)根据事务上下文中的事务id去事务存储器查询到相关的事务(在try阶段存储到事务存储器的)
        • 6.2.2.2)改变事务状态
        • 6.2.2.3)注册进Deque。
        • 6.2.2.4)调用transactionManager的commit。
          • 6.2.2.4.1)从deque中拿到事务
          • 6.2.2.4.2)改变事务状态
          • 6.2.2.4.3)改变事务存储器中的事务状态。
          • 6.2.2.4.4)调用所有参与者的commit方法。
          • 6.2.2.4.5)删除事务存储器中的事务。
      • 6.2.3)CANCELLING阶段 和CONFIRMING阶段阶段类似。 6.3)清理现场。 6.4) 如果为TRYING或者CONFIRMING阶段,则返回一个空值。
  • 7) 不新起事务处理方法,直接执行。

理一理根事务和分支事务的不同。

  • 根事务执行完毕之后直接rollback或者commit。
  • 分支事务TRYING阶段不rollback或者commit,等到CONFIRMING阶段或者CANCELLING阶段才做去。

两个问题:

  • 根事务rollback、commit之后直接删除了事务存储器相关的事务,分支事务怎么感知到commit或者rollback?
  • 分支事务执行完后并不做什么,不处理异常也不commit,其他分支事务或者根事务怎么感知到当前事务状态?

ConfigurableCoordinatorAspect

还是先看看类图。

来看看ConfigurableCoordinatorAspect类。

@Aspect
public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered {

    private TransactionConfigurator transactionConfigurator;

    public void init() {

        ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor();
        resourceCoordinatorInterceptor.setTransactionManager(transactionConfigurator.getTransactionManager());
        this.setResourceCoordinatorInterceptor(resourceCoordinatorInterceptor);
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE + 1;
    }

    public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
        this.transactionConfigurator = transactionConfigurator;
    }
}
复制代码

和上面的ConfigurableTransactionAspect大抵类似,不过优先级要低一些。也就是当ConfigurableTransactionAspect执行到proceed时,就会进入这个切面。

看看父类。

@Aspect
public abstract class ResourceCoordinatorAspect {

    private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;

    /**
     * 切点为打了Compensable注解的方法
     */
    @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
    public void transactionContextCall() {

    }

    /**
     * 环绕
     * @param pjp
     * @return
     * @throws Throwable
     */
    @Around("transactionContextCall()")
    public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
        return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
    }

    public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
        this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
    }

    public abstract int getOrder();
}
复制代码

直接看interceptTransactionContextMethod。

   public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {

        Transaction transaction = transactionManager.getCurrentTransaction();

        if (transaction != null) {

            switch (transaction.getStatus()) {
                case TRYING:
                    enlistParticipant(pjp);
                    break;
                case CONFIRMING:
                    break;
                case CANCELLING:
                    break;
            }
        }

        return pjp.proceed(pjp.getArgs());
    }
复制代码
  • 第一步先从线程Deque中拿到上一个拦截器方进去的transaction。
  • 如果是try状态则调用enlistParticipant,否则什么都不做。

一起看看enlistParticipant方法干了什么。

private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {

        // 获得 @Compensable 注解
        Method method = CompensableMethodUtils.getCompensableMethod(pjp);
        if (method == null) {
            throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
        }
        Compensable compensable = method.getAnnotation(Compensable.class);

        // 获得 确认执行业务方法 和 取消执行业务方法
        String confirmMethodName = compensable.confirmMethod();
        String cancelMethodName = compensable.cancelMethod();

        // 获取 当前线程事务第一个(栈顶)元素
        Transaction transaction = transactionManager.getCurrentTransaction();
        // 创建 事务编号
        TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());

        // 设置事务上下文
        if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
            FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
        }

        // 获得类
        Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());

        // 创建 确认执行方法调用上下文 和 取消执行方法调用上下文
        InvocationContext confirmInvocation = new InvocationContext(targetClass,
                confirmMethodName,
                method.getParameterTypes(), pjp.getArgs());

        InvocationContext cancelInvocation = new InvocationContext(targetClass,
                cancelMethodName,
                method.getParameterTypes(), pjp.getArgs());

        Participant participant =
                new Participant(
                        xid,
                        confirmInvocation,
                        cancelInvocation,
                        compensable.transactionContextEditor());
        // 添加 事务参与者 到 事务
        transactionManager.enlistParticipant(participant);

    }
复制代码

总的来说一句话,创建设置事务上下文并生成一个参与者加入到trasaction里,修改事务存储器里的值。

总结

ConfigurableTransactionAspect作为第一个切面,负责事务的生成、处理以及存储。 ConfigurableCoordinatorAspect作为第二个切面,负责参与者的生成和事务上下文的创建。

疑问解答

1、分支事务是不知道自己应该提交还是回滚的。

2、我们看一下官方的demo。

package org.mengyun.tcctransaction.sample.dubbo.capital.api;

import org.mengyun.tcctransaction.api.Compensable;
import org.mengyun.tcctransaction.sample.dubbo.capital.api.dto.CapitalTradeOrderDto;

/**
 * Created by changming.xie on 4/1/16.
 */
public interface CapitalTradeOrderService {

    @Compensable
    String record(CapitalTradeOrderDto tradeOrderDto);

}
复制代码

这是一个dubbo RPC接口,也就是说,这个接口也打上了@Compensable注解(之前一直没想明白这个问题),也就是说,在调用远程RPC服务时,调用方还会再进入一次切面(走两次切面逻辑),然后把远程参与者加入到调用方的Transaction里。也就是说,当主事务回滚或提交的时候,会调用远程参与者的接口,并告知当前事务状态,远程参与者根据不同的状态调用不同的方法来做confirm或commit。

如下文:

    /**
     * transaction提交TCC事务
     */
    public void commit() {
        for (Participant participant : participants) {
            participant.commit();
        }
    }
    
    
    /**
     * participant提交事务
     */
    public void commit() {
        terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
    }
复制代码

当远程分支收到这个RPC调用时,会去判断当前事务正在什么阶段,然后调用对应的方法。

switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
        case TRYING:
            // 传播发起分支事务
            transaction = transactionManager.propagationNewBegin(transactionContext);
            return pjp.proceed();
        case CONFIRMING:
            try {
                // 传播获取分支事务
                transaction = transactionManager.propagationExistBegin(transactionContext);
                // 提交事务
                transactionManager.commit();
            } catch (NoExistedTransactionException excepton) {
                //the transaction has been commit,ignore it.
            }
                break;
        case CANCELLING:
            try {
                // 传播获取分支事务
                transaction = transactionManager.propagationExistBegin(transactionContext);
                // 回滚事务
                transactionManager.rollback();
                } catch (NoExistedTransactionException exception) {
                //the transaction has been rollback,ignore it.
            }
        break;
}
复制代码

至此就走完所有的正常流程了。

todo 事务补偿

todo dubbo支持




原文地址:访问原文地址
快照地址: 访问文章快照