seata源码解析:seata AT模式是如何实现的?

网友投稿 659 2022-09-02

seata源码解析:seata AT模式是如何实现的?

seata源码解析:seata AT模式是如何实现的?

AT模式

AT模式是seata主推的一种模式,对业务无侵入,用户只需要关系自己的业务sql即可,用户的业务sql就是全局事务的一阶段,Seata会自动生成事务的二阶段提交和回滚操作。

那么它是如何做到对业务无侵入的?

seata对业务无侵入是通过数据源代理实现的。

一阶段:

在at模式中需要在客户端的数据库中建如下的表,用来保存回滚的数据

CREATE TABLE IF NOT EXISTS `undo_log`( `branch_id` BIGINT NOT NULL COMMENT 'branch transaction id', `xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id', `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info', `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` DATETIME(6) NOT NULL COMMENT 'create datetime', `log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime', UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

seata的数据源代理通过对业务sql的解析,在业务sql执行前后生成前置镜像和后置镜像保存在undo_log中,业务sql的执行和undo_log的插入在一个本地事务中,这样就可以保证任何对业务数据的更新都有相应的回滚日志存在。

在本地事务提交之前还会构建lockKey(本次事务提交影响的数据,lockKey的构建规则之前的文章已经说过了哈),向TC对涉及到的数据加锁,当加锁成功后,才会执行本地事务的commit过程,如果加锁失败,RM会根据重试策略进行重试,如果重试也失败,那么回滚本地事务

二阶段提交

当TM向TC发起全局提交请求时,此时分支事务实际上以及提交了,TC立即释放该全局事务的锁,然后异步调用RM清理回滚日志

二阶段回滚

当RM收到TC发来的回滚请求时,根据xid和branchid找到对应的回滚记录,通过回滚记录生成反向的SQL并执行,完成分支的回滚。当分支回滚结束时,通过TC回滚完成,当所有分支都回滚完成时,才会释放全局事务的锁

RM在回滚的时候会有如下的校验

beforeImmage等于afterImage则不用回滚(数据没有发生变化),否则执行下一步如果当前的记录等于afterImage,则回滚,否则执行下一步如果当前的记录等于beforeImage,则不用会滚了,否则抛出异常(当前记录和beforeImage和afterImage都不相等),说明发生了脏写

为什么会出现回滚失败呢?

有人绕过系统直接操作数据库没有正确配置RM,比如RM执行单独的本地事务,此时可以在相应的方法上加上@GlobalLock注解,让RM在本地事务提交前也需要获取全局锁

AT模式是如何保证隔离性的?

写隔离

seata at 模式的读写隔离是通过全局锁来实现的。官网的例子讲的很清楚,我们引用一下

两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的全局锁,本地提交释放本地锁。tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的全局锁,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待全局锁。tx1 二阶段全局提交,释放全局锁。tx2 拿到全局锁提交本地事务。

如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

此时,如果 tx2 仍在等待该数据的全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的全局锁等锁超时,放弃全局锁并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

因为整个过程全局锁在 tx1 结束前一直是被 tx1 持有的,所以不会发生脏写的问题。

读隔离

seata at 模式默认的隔离级别为读未提交(因为已经提交的sql有可能会回滚)。如果要实现读已提交,select语句需要更改为 SELECT FOR UPDATE 语句。

SELECT FOR UPDATE 语句的执行会申请全局锁,如果全局锁被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是已提交的,才返回

出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句

源码分析

在执行sql的过程中,各个代理对象起到的作用如下

ExecuteTemplate#execute执行本地sql

分支事务的执行过程是在StatementProxy,PreparedStatementProxy的execute,executeQuery,executeUpdate等方法中的,而这些方法最终都会执行到ExecuteTemplate#execute方法

public static T execute(List sqlRecognizers, StatementProxy statementProxy, StatementCallback statementCallback, Object... args) throws SQLException { // 方法上加了 GlobalLock 注解,是at模式,都得执行代理后的逻辑 // 否则不用执行 if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); } // 获取数据库类型 String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { // 获取到sql语句解析器 sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), dbType); } Executor executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { // 没有找到合适的sql语句解析器,则执行使用原生的Statement执行sql executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); switch (sqlRecognizer.getSQLType()) { case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { // 一条sql包含多个update语句等 executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs;}

根据执行的sql语句的不同选择不同的执行器,然后执行execute方法。只有2个Executor重写了execute方法,即PlainExecutor和BaseTransactionalExecutor。

没有找到对应sql分析器的语句(比如select语句),才会执行PlainExecutor#execute方法,而PlainExecutor#execute方法只是单纯调用原生的Statement来执行sql

其他的语句则会通过BaseTransactionalExecutor#execute方法来执行

// BaseTransactionalExecutorpublic T execute(Object... args) throws Throwable { String xid = RootContext.getXID(); if (xid != null) { statementProxy.getConnectionProxy().bind(xid); } // 方法被 @GlobalLock 修饰,将值设置为true statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args);}

除了select for update语句,其他语句基本上会执行到AbstractDMLBaseExecutor#executeAutoCommitTrue方法

// AbstractDMLBaseExecutorprotected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.changeAutoCommit(); // 提交事务时会获取锁,当获锁失败时,按照锁重试策略进行重试 return new LockRetryPolicy(connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); }}

protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException("multi pk only support mysql!"); } // 查询前置镜像 TableRecords beforeImage = beforeImage(); // 执行sql T result = statementCallback.execute(statementProxy.getTargetStatement(), args); // 查询后置镜像 TableRecords afterImage = afterImage(beforeImage); // 构建undoLog prepareUndoLog(beforeImage, afterImage); return result;}

protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) { return; } // 修改主键时抛出异常 if (SQLType.UPDATE == sqlRecognizer.getSQLType()) { if (beforeImage.getRows().size() != afterImage.getRows().size()) { throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys."); } } ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; // 构建lockKeys String lockKeys = buildLockKey(lockKeyRecords); if (null != lockKeys) { connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog); }}

主要流程就是生成前置镜像,执行业务sql,生成后置镜像,构建lockKey和undoLog,接着就是调用ConnectionProxy来提交事务

构建lockKeys的规则如下,根据执行的sql构建出对应的select语句,找到影响数据的主键值(用seata表中必须有主键)

如果主键是一列,则形式如下

// 表名:主键值1,主键值2account_info:1,2

如果主键是多列,则形式如下

// 表名:主键值1_主键值a,主键值1_主键值baccount_info:1_a,2_b

ConnectionProxy#commit提交本地事务

private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { // 在全局事务中 processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { // 处理被 GlobalLock 修饰的方法 processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); }}

本次sql提交在一个全局事务中,则注册分支事务,插入undoLog

private void processGlobalTransactionCommit() throws SQLException { try { // 注册分支事务,向tc发一个分支事务注册请求,然后tc在branch_table表里插入一条记录 // 注册的时候tc会看事务提交所需要的资源是否能被加锁 // 如果能,则注册成功 // 如果不能,说明资源正在被别的事务使用,注册失败 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { // 插入undolog UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // 将业务修改 和 undolog 一起插入 targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); // 向tc汇报分支状态为失败 report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { // 向tc汇报分支状态为成功 report(true); } context.reset();}

本次sql提交在一个本地事务中,只不过方法被加了@GlobalLock,需要尝试获取锁,避免脏写

private void processLocalCommitWithGlobalLocks() throws SQLException { // 查询是否能获取到锁 checkLock(context.buildLockKeys()); try { targetConnection.commit(); } catch (Throwable ex) { throw new SQLException(ex); } context.reset();}

SelectForUpdateExecutor

前面的文章我们说过用select for update语句来保证隔离级别为读已提交。SelectForUpdateExecutor就是用来执行select for update语句的。

流程比较简单就不放源码了,先构建出lockKey,然后向TC发送请求看这些记录是否已经被其他事务加锁了,如果被加锁了,则根据重试策略不断重试,如果没被加锁,则正常返回查询的结果

参考博客

[1][2]好文 [3]https://jianshu.com/p/0ed828c2019a

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:seata源码解析:事务状态及全局锁的存储
下一篇:PHP四大主流框架的优缺点,第一个最难(php的劣势)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~