TCC原理
为了解决在事务运行过程中大颗粒度资源锁定的问题,业界提出一种新的事务模型,它是基于业务层面的事务定义。锁粒度完全由业务自己控制。它本质是一种补偿的思路。它把事务运行过程分成 Try、Confirm / Cancel 两个阶段。在每个阶段的逻辑由业务代码控制。这样就事务的锁粒度可以完全自由控制。业务可以在牺牲隔离性的情况下,获取更高的性能。
- try阶段:
- try:尝试执行业务
- 完成所有业务检查(一致性)
- 预留必须的业务资源(准隔离性)
- try:尝试执行业务
- Confirm / Cancel 阶段:
- Confirm :确认执行业务
- 真正执行业务
- 不做任何业务检查
- confirm操作满足幂等性
- Cancel:取消执行业务
- 释放 Try 阶段预留的业务资源
- Cancel 操作满足幂等性
- Confirm 与 Cancel 互斥
- Confirm :确认执行业务
整体流程如下图:
摘自:www.iocoder.cn/TCC-Transac…
Tcc-transaction原理
在tcc-transaction中,一个tcc事务可以包含多个业务活动,tcc-transaction把每个事务活动都抽象成事务的参与者,每个事务可以包含多个参与者。
我们先看一看tcc-transaction基础架构设计。
显而易见,tcc-transaction入口就是事务拦截器,事务拦截器通过两个切面的方式将所有组件组织在一起。
读源码前置参考
事务
我们先看看Transaction这个类的属性。
/*
* 事务编号
*/
private TransactionXid xid;
/**
* 事务状态
*/
private TransactionStatus status;
/**
* 事务类型
*/
private TransactionType transactionType;
/**
* 重试次数
*/
private volatile int retriedCount = 0;
/**
* 创建时间
*/
private Date createTime = new Date();
/**
* 最后更新时间
*/
private Date lastUpdateTime = new Date();
/**
* 乐观锁控制
*/
private long version = 1;
/**
* 多个参与者
*/
private List<Participant> participants = new ArrayList<Participant>();
// 附带属性映射 用于隐式传参
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
复制代码
其中包含TransactionXid、TransactionStatus、TransactionType以及Participant。
TransactionXid
/**
* xid 格式标识
*/
private int formatId = 1;
/**
* 全局事务编号
*/
private byte[] globalTransactionId;
/**
* 分支事务编号
*/
private byte[] branchQualifier;
复制代码
TransactionStatus
TRYING(1), CONFIRMING(2), CANCELLING(3);
复制代码
TransactionType
/**
* 根事务
*/
ROOT(1),
/**
* 分支事务
*/
BRANCH(2);
复制代码
参与者
/**
* 事务id
*/
private TransactionXid xid;
/**
* confirm方法上下文
*/
private InvocationContext confirmInvocationContext;
/**
* cancel方法上下文
*/
private InvocationContext cancelInvocationContext;
/**
* 执行器
*/
private Terminator terminator = new Terminator();
/**
* 上下文编辑
*/
Class<? extends TransactionContextEditor> transactionContextEditorClass;
public void rollback() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
}
public void commit() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}
复制代码
事务管理器
提供事务的获取、发起、提交、回滚,参与者的新增等等方法。
这些先看看,有个印象,下面会详细讲解其中的内容。
事务拦截器
事务拦截器由两个spring aop切面实现,如下图。
我们先来看一看使用示例:
参与者需要声明 try / confirm / cancel 三个类型的方法,和 TCC 的操作一一对应。在程序里,通过 @Compensable 注解标记在 try 方法上,并填写对应的 confirm / cancel 方法。
// try 方法
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = DubboTransactionContextEditor.class)
public String record(CapitalTradeOrderDto tradeOrderDto) {
}
// confirm方法
public void confirmRecord(CapitalTradeOrderDto tradeOrderDto) {
}
// cancel方法
public void cancelRecord(CapitalTradeOrderDto tradeOrderDto) {
}
复制代码
很明显,肯定是有切面切入了@Compensable:注解的方法,我们先来找一下。 在源码的spring支持中有这么一段配置:
<bean id="transactionConfigurator" class="org.mengyun.tcctransaction.spring.support.:"
init-method="init"/>
<bean id="compensableTransactionAspect" class="org.mengyun.tcctransaction.spring.ConfigurableTransactionAspect"
init-method="init">
<property name="transactionConfigurator" ref="transactionConfigurator"/>
</bean>
<bean id="resourceCoordinatorAspect" class="org.mengyun.tcctransaction.spring.ConfigurableCoordinatorAspect"
init-method="init">
<property name="transactionConfigurator" ref="transactionConfigurator"/>
</bean>
复制代码
SpringTransactionConfigurator是一个配置相关的类,我们先不管。
ConfigurableTransactionAspect
先看看类图:
我们先看看ConfigurableTransactionAspect类。
@Aspect
public class ConfigurableTransactionAspect extends CompensableTransactionAspect implements Ordered {
private TransactionConfigurator transactionConfigurator;
public void init() {
// 设置事务管理器
TransactionManager transactionManager = transactionConfigurator.getTransactionManager();
// 这个是具体的拦截实现类
CompensableTransactionInterceptor compensableTransactionInterceptor = new CompensableTransactionInterceptor();
compensableTransactionInterceptor.setTransactionManager(transactionManager);
compensableTransactionInterceptor.setDelayCancelExceptions(transactionConfigurator.getRecoverConfig().getDelayCancelExceptions());
this.setCompensableTransactionInterceptor(compensableTransactionInterceptor);
}
@Override
public int getOrder() {
// order越小,则优先级越高。
return Ordered.HIGHEST_PRECEDENCE;
}
public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
this.transactionConfigurator = transactionConfigurator;
}
}
复制代码
ConfigurableTransactionAspect继承了CompensableTransactionAspect同时实现了Orderd,且Order值为最小值,这就是我们要找的第一个切面。
这里子类就是给父类提供了初始化,切面具体逻辑在父类。我们看看父类的逻辑。
@Aspect
public abstract class CompensableTransactionAspect {
private CompensableTransactionInterceptor compensableTransactionInterceptor;
public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
this.compensableTransactionInterceptor = compensableTransactionInterceptor;
}
/**
* 切点
*/
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableService() {
}
/**
* 切面
* @param pjp
* @return
* @throws Throwable
*/
@Around("compensableService()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
}
public abstract int getOrder();
}
复制代码
可以看到具体逻辑还是在compensableTransactionInterceptor中,我们继续跟。
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
// 获得 具体的方法 注解 传播行为 以及事务上下文
CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp);
// 事务是否存在于当前线程中
// transactionManager为单例的,其中包含一个threadlocal , 里面是一个Deque(双端队列)
// 判断该threadlocal里的Deque是否为空
boolean isTransactionActive = transactionManager.isTransactionActive();
// 判断事务上下文是否合法
// 传播级别为支持当前事务 而不支持新事务、同时当前线程中不存在事务,且事务上下文为空
// 则是非法 抛出异常
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName());
}
// 判断是根事务还是分支事务
// 如果传播级别为必须新开事务 或者 (允许新建事务且线程中不存在事务,且事务上下文为空) 则是根事务
// ----------------------------------------------------------------------------------------------------
// 如果不满足上述条件, 且(事务的传播级别为支持当前事务或者为支持当前事务,并不允许开启事务)并且当前没有活动事务,并且事务上下文不为空
// 则是分支事务
switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
case ROOT:
return rootMethodProceed(compensableMethodContext);
case PROVIDER:
return providerMethodProceed(compensableMethodContext);
default:
// 如果事务存在于线程中 或者事务的传播行为为支持当前事务,没有事务则以非事务执行 则直接执行
return pjp.proceed();
}
}
复制代码
首先获取注解对应的参数。
public CompensableMethodContext(ProceedingJoinPoint pjp) {
this.pjp = pjp;
// 获取到具体方法
this.method = getCompensableMethod();
// 获取到注解
this.compensable = method.getAnnotation(Compensable.class);
// 注解上的传播行为
this.propagation = compensable.propagation();
// 获得事务上下文
this.transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
}
复制代码
tcc-transaction支持四种传播行为。
/**
* 支持当前事务,如果当前没有事务,就新建一个事务。
*/
REQUIRED(0),
/**
* 支持当前事务,如果当前没有事务,就以非事务方式执行。
*/
SUPPORTS(1),
/**
* 支持当前事务,如果当前没有事务,就抛出异常。
*/
MANDATORY(2),
/**
*新建事务,如果当前存在事务,把当前事务挂起。
*/
REQUIRES_NEW(3);
复制代码
FactoryBuilder是一个抽象的单例工厂,整句话就是从注解的transactionContextEditor.class中获得一个单例得transactionContextEditor,然后调用这个transactionContextEditor的get方法获得事务的上下文。
我们回到interceptCompensableMethod。isTransactionActive判断当前线程是否包含事务。
public boolean isTransactionActive() {
// 从threadLocal里获取双向队列
Deque<Transaction> transactions = CURRENT.get();
return transactions != null && !transactions.isEmpty();
}
复制代码
就是看一下当前线程里的双向队列是否有值。
继续往下走。
// 判断事务上下文是否合法
// 传播级别为支持当前事务而且不支持新事务、同时当前线程中不存在事务,且事务上下文为空
// 则是非法 抛出异常
if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, compensableMethodContext)) {
throw new SystemException("no active compensable transaction while propagation is mandatory for method " + compensableMethodContext.getMethod().getName());
}
复制代码
// 非法的必要条件
public static boolean isLegalTransactionContext(boolean isTransactionActive, CompensableMethodContext compensableMethodContext) {
if (compensableMethodContext.getPropagation().equals(Propagation.MANDATORY) // 事务传播行为为支持当前事务,没有事务则抛出异常
&& !isTransactionActive // 没有当前事务
&& compensableMethodContext.getTransactionContext() == null //事务上下文为空
) {
return false;
}
return true;
}
复制代码
接着走。
switch (compensableMethodContext.getMethodRole(isTransactionActive)) {
// 判断是根事务还是分支事务
// 如果传播级别为必须新开事务 或者 (允许新建事务且线程中不存在事务,且事务上下文为空) 则是根事务
case ROOT:
return rootMethodProceed(compensableMethodContext);
// 如果不满足上述条件, 且(事务的传播级别为支持当前事务或者为支持当前事务,并不允许开启事务)并且当前没有活动事务,并且事务上下文不为空
case PROVIDER:
return providerMethodProceed(compensableMethodContext);
default:
// 如果事务存在于线程中 或者事务的传播行为为支持当前事务,没有事务则以非事务执行 则直接执行
return pjp.proceed();
}
}
复制代码
看一下根事务的处理方法。
private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {
Object returnValue = null;
Transaction transaction = null;
// 是否同步confirm
boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
// 是否同步cancel
boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();
// 加入延迟cancel相关的异常
Set<Class<? extends Exception>> allDelayCancelExceptions = new HashSet<Class<? extends Exception>>();
allDelayCancelExceptions.addAll(this.delayCancelExceptions);
allDelayCancelExceptions.addAll(Arrays.asList(compensableMethodContext.getAnnotation().delayCancelExceptions()));
try {
// 创建根事务
// 在内存中创建一个事务,声明为根事务
// 将刚刚的事务持久化到redis、zookeeper或者mysql 再或者磁盘
// 将事务注册进transactionManager中的treadlocal中的deque中
transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity());
try {
// 执行
returnValue = compensableMethodContext.proceed();
} catch (Throwable tryingException) {
// 异常处理
// 如果不是需要延迟的异常类型
if (!isDelayCancelException(tryingException, allDelayCancelExceptions)) {
logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
// 立即回滚
// 获取当前线程的deque队头的事务,
// 改变事务状态 并持久化改变的事务状态
// rollback所有参与者
// delete持久化的事务
transactionManager.rollback(asyncCancel);
}
// 抛出异常
throw tryingException;
}
// 提交
// 和上述回滚步骤一致
transactionManager.commit(asyncConfirm);
} finally {
// 将事务从当前线程事务队列移除
transactionManager.cleanAfterCompletion(transaction);
}
return returnValue;
}
复制代码
看一下begin方法。
/**
* 发起根事务
* @param uniqueIdentify
* @return 事务
*/
public Transaction begin(Object uniqueIdentify) {
// 创建根事务
Transaction transaction = new Transaction(uniqueIdentify,TransactionType.ROOT);
// 存储 事务
transactionRepository.create(transaction);
// 注册事务
registerTransaction(transaction);
return transaction;
}
复制代码
private void registerTransaction(Transaction transaction) {
if (CURRENT.get() == null) {
CURRENT.set(new LinkedList<Transaction>());
}
CURRENT.get().push(transaction);
}
复制代码
分支事务的处理方法。
private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable {
Transaction transaction = null;
boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm();
boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel();
try {
// 事务上下问中的状态
switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) {
// 如果是try阶段
case TRYING:
// 传播发起分支
// 在内存中创建一个事务,声明为分支事务,事务id为事务上下文中的事务id
// 持久化事务
// 注册进deque中
transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext());
return compensableMethodContext.proceed();
case CONFIRMING:
try {
// 传播获取分支
// 从持久化中取出事务
// 注册进deque中
transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
transactionManager.commit(asyncConfirm);
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING:
try {
// 传播获取分支
transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext());
transactionManager.rollback(asyncCancel);
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}
} finally {
transactionManager.cleanAfterCompletion(transaction);
}
Method method = compensableMethodContext.getMethod();
return ReflectionUtils.getNullValue(method.getReturnType());
}
复制代码
看一下propagationNewBegin方法。
/**
* 传播发起分支事务
* @param transactionContext 事务上下文
* @return 分支事务
*/
public Transaction propagationNewBegin(TransactionContext transactionContext) {
// 创建分支事务
Transaction transaction = new Transaction(transactionContext);
// 存储事务
transactionRepository.create(transaction);
// 注册事务
registerTransaction(transaction);
return transaction;
}
复制代码
总结
我们理一下逻辑。
- 1)切面切进来首先获取到切面的一些参数。
- 方法
- 注解
- 传播行为
- 以及事务上下文
- 2)判断是否是活动的事务(判断当前线程中的Deque是否为空)。
- 3)判断事务是否合法,不合法情况必须满足以下三个条件:
- 事务传播行为为MANDATORY,支持当前事务,没有事务则抛出异常。
- 当前没有活动事务
- 事务上下文为空。
- 4)判断是新起一个事务(根事务或者分支事务)或者是已经在事务中,直接执行(不创建Transaction)。
- 根事务两种情况:
- 事务传播行为为REQUIRED(支持当前事务,没有事务则新起事务),且当前没有活动事务,事务上下文为空。
- 事务传播行为为REQUIRES_NEW。
- 分支事务的情况:
- 事务传播行为为REQUIRED或者MANDATORY,且当前没有活动事务,事务上下文不为空。
- 不新起事务(不创建Transaction)情况:
- 当前线程存在活动事务,或者传播行为为SUPPORTS(支持当前事务,没有事务则以非事务执行)。
- 根事务两种情况:
- 5)起一个根事务的处理方式
- 5.1)获取注解中的是否同步confirm、cancel状态。
- 5.2)获取延迟cancel相关的异常。
- 5.3)创建根事务
- 5.3.1)创建一个根事务,随机生成事务id,事务状态为trying,事务类型为root。
- 5.3.2)存储上一步创建的根事务到事务存储器(可以是mysql、redis、zookeeper、File)。
- 5.3.3)将事务注册进TransactionManager的TreadLocal的Deque栈尾。
- 5.4)执行具体业务(还会进到下一个拦截器)。
- 5.5)如果上一步出现异常,判断是否是需要延迟的异常,如果是则继续往上抛异常,如果不是则调用transactionManager的rollback。
- 5.5.1)从Deque中拿到当前事务。
- 5.5.2)改变事务状态为CANCELLING。
- 5.5.3)更新事务存储器里的事务状态。
- 5.5.4)同步或者异步的调用transaction的rollback方法,即循环所有的参与者,调用他们的rollback方法。
- 5.5.5)删除事务存储器的事务。
- 5.6)没有出现异常则commit,和上述rollback步骤基本一致,就不赘述。
- 5.7)清理现场,删除deque中的transaction,如果删除后为空,则清除ThreadLocal。
- 6)起一个分支事务的处理方式
- 6.1)获取注解中的是否同步confirm、cancel状态。
- 6.2)判断事务阶段。
- 6.2.1)TRYING阶段
- 6.2.1.1) 创建一个分支事务,使用事务上下文中的事务id,事务状态为trying,事务类型为BRANCH。
- 6.2.1.2)存储上一步创建的分支事务到事务存储器(可以是mysql、redis、zookeeper、File)。
- 6.2.1.3)将事务注册进TransactionManager的TreadLocal的Deque栈尾。
- 6.2.1.4)直接执行业务逻辑,不处理异常
- 6.2.2)CONFIRMING阶段
- 6.2.2.1)根据事务上下文中的事务id去事务存储器查询到相关的事务(在try阶段存储到事务存储器的)
- 6.2.2.2)改变事务状态
- 6.2.2.3)注册进Deque。
- 6.2.2.4)调用transactionManager的commit。
- 6.2.2.4.1)从deque中拿到事务
- 6.2.2.4.2)改变事务状态
- 6.2.2.4.3)改变事务存储器中的事务状态。
- 6.2.2.4.4)调用所有参与者的commit方法。
- 6.2.2.4.5)删除事务存储器中的事务。
- 6.2.3)CANCELLING阶段 和CONFIRMING阶段阶段类似。 6.3)清理现场。 6.4) 如果为TRYING或者CONFIRMING阶段,则返回一个空值。
- 6.2.1)TRYING阶段
- 7) 不新起事务处理方法,直接执行。
理一理根事务和分支事务的不同。
- 根事务执行完毕之后直接rollback或者commit。
- 分支事务TRYING阶段不rollback或者commit,等到CONFIRMING阶段或者CANCELLING阶段才做去。
两个问题:
- 根事务rollback、commit之后直接删除了事务存储器相关的事务,分支事务怎么感知到commit或者rollback?
- 分支事务执行完后并不做什么,不处理异常也不commit,其他分支事务或者根事务怎么感知到当前事务状态?
ConfigurableCoordinatorAspect
还是先看看类图。
来看看ConfigurableCoordinatorAspect类。
@Aspect
public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered {
private TransactionConfigurator transactionConfigurator;
public void init() {
ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor();
resourceCoordinatorInterceptor.setTransactionManager(transactionConfigurator.getTransactionManager());
this.setResourceCoordinatorInterceptor(resourceCoordinatorInterceptor);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1;
}
public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
this.transactionConfigurator = transactionConfigurator;
}
}
复制代码
和上面的ConfigurableTransactionAspect大抵类似,不过优先级要低一些。也就是当ConfigurableTransactionAspect执行到proceed时,就会进入这个切面。
看看父类。
@Aspect
public abstract class ResourceCoordinatorAspect {
private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;
/**
* 切点为打了Compensable注解的方法
*/
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void transactionContextCall() {
}
/**
* 环绕
* @param pjp
* @return
* @throws Throwable
*/
@Around("transactionContextCall()")
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
}
public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
}
public abstract int getOrder();
}
复制代码
直接看interceptTransactionContextMethod。
public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
Transaction transaction = transactionManager.getCurrentTransaction();
if (transaction != null) {
switch (transaction.getStatus()) {
case TRYING:
enlistParticipant(pjp);
break;
case CONFIRMING:
break;
case CANCELLING:
break;
}
}
return pjp.proceed(pjp.getArgs());
}
复制代码
- 第一步先从线程Deque中拿到上一个拦截器方进去的transaction。
- 如果是try状态则调用enlistParticipant,否则什么都不做。
一起看看enlistParticipant方法干了什么。
private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
// 获得 @Compensable 注解
Method method = CompensableMethodUtils.getCompensableMethod(pjp);
if (method == null) {
throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
}
Compensable compensable = method.getAnnotation(Compensable.class);
// 获得 确认执行业务方法 和 取消执行业务方法
String confirmMethodName = compensable.confirmMethod();
String cancelMethodName = compensable.cancelMethod();
// 获取 当前线程事务第一个(栈顶)元素
Transaction transaction = transactionManager.getCurrentTransaction();
// 创建 事务编号
TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());
// 设置事务上下文
if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
}
// 获得类
Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
// 创建 确认执行方法调用上下文 和 取消执行方法调用上下文
InvocationContext confirmInvocation = new InvocationContext(targetClass,
confirmMethodName,
method.getParameterTypes(), pjp.getArgs());
InvocationContext cancelInvocation = new InvocationContext(targetClass,
cancelMethodName,
method.getParameterTypes(), pjp.getArgs());
Participant participant =
new Participant(
xid,
confirmInvocation,
cancelInvocation,
compensable.transactionContextEditor());
// 添加 事务参与者 到 事务
transactionManager.enlistParticipant(participant);
}
复制代码
总的来说一句话,创建设置事务上下文并生成一个参与者加入到trasaction里,修改事务存储器里的值。
总结
ConfigurableTransactionAspect作为第一个切面,负责事务的生成、处理以及存储。 ConfigurableCoordinatorAspect作为第二个切面,负责参与者的生成和事务上下文的创建。
疑问解答
1、分支事务是不知道自己应该提交还是回滚的。
2、我们看一下官方的demo。
package org.mengyun.tcctransaction.sample.dubbo.capital.api;
import org.mengyun.tcctransaction.api.Compensable;
import org.mengyun.tcctransaction.sample.dubbo.capital.api.dto.CapitalTradeOrderDto;
/**
* Created by changming.xie on 4/1/16.
*/
public interface CapitalTradeOrderService {
@Compensable
String record(CapitalTradeOrderDto tradeOrderDto);
}
复制代码
这是一个dubbo RPC接口,也就是说,这个接口也打上了@Compensable注解(之前一直没想明白这个问题),也就是说,在调用远程RPC服务时,调用方还会再进入一次切面(走两次切面逻辑),然后把远程参与者加入到调用方的Transaction里。也就是说,当主事务回滚或提交的时候,会调用远程参与者的接口,并告知当前事务状态,远程参与者根据不同的状态调用不同的方法来做confirm或commit。
如下文:
/**
* transaction提交TCC事务
*/
public void commit() {
for (Participant participant : participants) {
participant.commit();
}
}
/**
* participant提交事务
*/
public void commit() {
terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
}
复制代码
当远程分支收到这个RPC调用时,会去判断当前事务正在什么阶段,然后调用对应的方法。
switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
case TRYING:
// 传播发起分支事务
transaction = transactionManager.propagationNewBegin(transactionContext);
return pjp.proceed();
case CONFIRMING:
try {
// 传播获取分支事务
transaction = transactionManager.propagationExistBegin(transactionContext);
// 提交事务
transactionManager.commit();
} catch (NoExistedTransactionException excepton) {
//the transaction has been commit,ignore it.
}
break;
case CANCELLING:
try {
// 传播获取分支事务
transaction = transactionManager.propagationExistBegin(transactionContext);
// 回滚事务
transactionManager.rollback();
} catch (NoExistedTransactionException exception) {
//the transaction has been rollback,ignore it.
}
break;
}
复制代码
至此就走完所有的正常流程了。
todo 事务补偿
todo dubbo支持