Skip to main content

Spring事务处理源码解析

基于spring 5.3.5版本源码分析

事务处理的逻辑主要在 MethodInterceptor 中,下面通过源码来解析Spring是如何处理事务的。

1. 入口方法MethodInterceptor#invoke

这个方法是事务执行的入口:

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {

Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

//事务处理的主要逻辑
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}

2. TransactionAspectSupport#invokeWithinTransaction方法

这个方法主要包含了两部分:

  • 响应式编程事务支持(ReactiveTransactionManager类添加与Spring5.2版本)
  • 标准的的事务支持

响应式后续分析。我们来分析一下标准的事务支持:

// 重点:是否需要创建事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// 这是一个around advice: 调用链中的下一个拦截器.
// 这通常会导致调用目标对象.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 事务的异常处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清理事务的信息
cleanupTransactionInfo(txInfo);
}

if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
//提交事务
commitTransactionAfterReturning(txInfo);
return retVal;

事务主要是通过 TransactionAspectSupport#createTransactionIfNecessary 方法获取,然后执行目标方法获取方法返回结果。如果有问题处理问题,根据情况(配置的事务传递性和事务回滚的Execption)是否对事务进行回滚。

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// 处理txAttr没有名字的情况
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//重点:通过PlatformTransactionManager管理获取事务状态
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
//将TransactionStatus包装成TransactionInfo
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

3. 事务传播

在通过注解@Transactional管理事务的时候需要配置Propagation的值。默认为Propagation.REQUIRED,下面来看一下AbstractPlatformTransactionManager#getTransaction。方法主要逻辑为两个:

  1. 存在事务如何处理
  2. 没有存在事务如何处理

这里就涉及到了根据事务的传播方式来处理事务。

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {

// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();

//重点:判断是否存在事务
if (isExistingTransaction(transaction)) {
// 存在事务 -> 检查 propagation 来决定如何处理.
return handleExistingTransaction(def, transaction, debugEnabled);
}

//重点: 下面就是处理没有事务,根据propagation决定如何处理
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}

if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynch

3.1 当前存在事务的传播属性

方法主要是 AbstractPlatformTransactionManager#handleExistingTransaction:

  • NEVER

    存在事务就报错

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    throw new IllegalTransactionStateException(
    "Existing transaction found for transaction marked with propagation 'never'");
    }
  • NOT_SUPPORTED

    把当前事务挂起,强制不在事务中运行

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    //挂起当前事务
    Object suspendedResources = suspend(transaction);
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(
    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
  • REQUIRES_NEW

    挂起当前事务,创建一个新事务

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    //挂起已有事务
    SuspendedResourcesHolder suspendedResources = suspend(transaction);
    try {
    //开启一个新的事务
    return startTransaction(definition, transaction, debugEnabled, suspendedResources);
    }
    catch (RuntimeException | Error beginEx) {
    resumeAfterBeginException(transaction, suspendedResources, beginEx);
    throw beginEx;
    }
    }
  • NESTED

    在当前事务中创建一个嵌套事务

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    //判断是否允许嵌套事务
    if (!isNestedTransactionAllowed()) {
    throw new NestedTransactionNotSupportedException(
    "Transaction manager does not allow nested transactions by default - " +
    "specify 'nestedTransactionAllowed' property with value 'true'");
    }
    if (useSavepointForNestedTransaction()) {
    // 已存在的事务创建savepoint通过TransactionStatus的SavepointManager API来实现
    //通常使用JDBC 3.0保存点。从不激活Spring同步
    DefaultTransactionStatus status =
    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    status.createAndHoldSavepoint();
    return status;
    }
    else {
    //JTA通过提交和回滚
    return startTransaction(definition, transaction, debugEnabled, null);
    }
    }
  • REQUIRED、SUPPORTS、MANDATORY

    这三个都是使用当前事务

3.2 不存在事务的传播性

  • MANDATORY

    抛异常

    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    throw new IllegalTransactionStateException(
    "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
  • REQUIRED、REQUIRES_NEW、NESTED

    新建事务

    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    SuspendedResourcesHolder suspendedResources = suspend(null);
    try {
    return startTransaction(def, transaction, debugEnabled, suspendedResources);
    }
    catch (RuntimeException | Error ex) {
    resume(null, suspendedResources);
    throw ex;
    }
    }
  • SUPPORTS、NOT_SUPPORTED、NEVER

    以非事务的方式运行

    else {
    // Create "empty" transaction: no actual transaction, but potentially synchronization.
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }

4. 事务类型传播终结

不存在事务存在事务
REQUIRED新建事务使用当前事务
REQUIRES_NEW新建事务挂起当前事务,创建一个新的事务
NESTED新建事务在当前事务中创建一个嵌套事务
MANDATORY抛异常使用当前事务
SUPPORTS非事务运行使用当前事务
NOT_SUPPORTED非事务运行挂起当前事务,强制不在事务中运行
NEVER非事务运行抛异常

5. 事务回滚

当设置了事务如果存在执行方法出现错误,就会对事务进行回滚。下面看一下 TransactionAspectSupport#completeTransactionAfterThrowing 方法主要负责回滚事务:

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {

if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
//回滚事务
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
}
else {
//不想回滚当前exception提交事务
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
}
catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}

6. 事务提交

当方法执行完成,需要提交事务,提交是由 TransactionAspectSupport#commitTransactionAfterReturning 处理事务提交:

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}