openGauss数据库源码解析系列文章—— 事务机制源码解析(一)

2年前 (2022) 程序员胖胖胖虎阿
311 0 0

本篇为小伙伴们带来第五章——事务机制源码解析的精彩内容。

事务是数据库操作的执行单位,需要满足最基本的ACID(原子性、一致性、隔离性、持久性)属性。
(1) 原子性:一个事务提交之后要么全部执行,要么全部不执行。
(2) 一致性:事务的执行不能破坏数据库的完整性和一致性。
(3) 隔离性:事务的隔离性是指在并发中,一个事务的执行不能被其他事务干扰。
(4) 持久性:一旦事务完成提交,那么它对数据库的状态变更就会永久保存在数据库中。
本章主要介绍openGauss事务模块是如何实现数据库事务的基本属性,使用户数据不丢不错、修改不乱、查询无错误。

5.1 事务整体架构和代码概览

事务模块总体结构如图5-1所示。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
在openGauss中,事务的实现与存储引擎的实现有很强关联,代码主要集中在src/gausskernel/storage/access/transam及src/gausskernel/storage/lmgr下,关键文件如图5-1所示。
(1) 事务管理器:事务系统的中枢,它的实现是一个有限循环状态机,通过接受外部系统的命令并根据当前事务所处的状态决定事务的下一步执行过程。
(2) 日志管理器:用来记录事务执行的状态以及数据变化的过程,包括事务提交日志(CLOG)、事务提交序列日志(CSNLOG)以及事务日志(XLOG)。其中CLOG日志只用来记录事务执行的结果状态,CSNLOG记录日志提交的顺序,用于可见性判断;XLOG是数据的redo日志,用于恢复及持久化。
(3) 线程管理机制:通过一片内存区域记录所有线程的事务信息,任何一个线程可以通过访问该区域获取其他事务的状态信息。
(4) MVCC机制:openGauss系统中,事务执行读流程结合各事务提交的CSN序列号,采用了多版本并发控制机制,实现了元组的读和写互不阻塞。详细可见性判断方法见“5.2 事务并发控制”。
(5) 锁管理器:实现系统的写并发控制,通过锁机制来保证事务写流程的隔离性。

5.2 事务并发控制

事务并发控制机制用来保证并发执行事务的情况下openGauss的ACID特性。下面将逐一介绍事务并发控制的各组成部分。

5.2.1 事务状态机

openGauss将事务系统分为上层(事务块TBlockState)以及底层(TransState)两个层次。
通过分层的设计,在处理上层业务时可以屏蔽具体细节,实现灵活支持客户端各类事务执行语句(BEGIN/START TRANSACTION/COMMIT/ROLLBACK/END)。
(1) 事务块TBlockState:客户端query的状态,用于提高用户操作数据的灵活性,用事务块的形式支持在一个事务中执行多条query语句。
(2) 底层事务TransState:内核端视角,记录了整个事务当前处于的具体状态。

1. 事务上层状态机

事务块上层状态机结构体代码如下:

typeset enum TBlockState
{
/* 不在事务块中的状态:单条SQL语句 */
TBLOCK_DEFAULT,/* 事务块缺省状态 */
TBLOCK_STARTED,/*执行单条query 语句*/

/* 处于事务块中的状态:一个事务包含多条语句 */
TBLOCK_BEGIN,/* 遇到事务开始命令BEGIN/START TRANSACTION */
TBLOCK_INPROGRESS,/* 表明正在事务块处理过程中*/
TBLOCK_END,/ *遇到事务结束命令END/COMMIT */
TBLOCK_ABORT,/* 事务块内执行报错,等待客户端执行ROLLBACK */
TBLOCK_ABORT_END,/ *在事务块内执行报错后,接收客户端执行ROLLBACK */
TBLOCK_ABORT_PENDING,/* 事务块内执行成功,接收客户端执行ROLLBACK(期望事务回滚)*/
TBLOCK_PREPARE,/ *两阶段提交事务,收到PREPARE TRANSACTION命令*/

/* 子事务块状态,与上述事务块状态类似 */
TBLOCK_SUBBEGIN,/* 遇到子事务开始命令SAVEPOINT */
TBLOCK_SUBINPROGRESS,/* 表明正在子事务块处理过程中*/
TBLOCK_SUBRELEASE,/* 遇到子事务结束命令RELEASE SAVEPOINT */
TBLOCK_SUBCOMMIT,/* 遇到事务结束命令END/COMMIT 从最底层的子事务递归的提交到最顶层事务*/
TBLOCK_SUBABORT,/* 子事务块内执行报错,等待客户端ROLLBACK TO/ROLLBACK */
TBLOCK_SUBABORT_END,/* 在子事务块内执行报错后,接收到客户端ROLLBACK TO上层子事务/ROLLBACK */
TBLOCK_SUBABORT_PENDING,/* 子事务块内执行成功,接收客户端执行的ROLLBACK TO上层子事务/ROLLBACK */
TBLOCK_SUBRESTART,/* 子事务块内执行成功,收到ROLLBACK TO当前子事务*/
TBLOCK_SUBABORT_RESTART/* 子事务块内执行报错后,接收到ROLLBACK TO当前子事务*/
} TBlockState;

为了便于理解,可以先不关注子事务块的状态。当理解了主事务的状态机行为后,子事务块的状态机转换同父事务类似。父子事务的关系类似于一个栈的实现,父事务的子事务相较于父事务后开始先结束。
显式事务块的状态机及相应的转换函数如图5-2所示。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-2 事务块状态机

图5-2中的事务状态相对应的事务状态机结构体中的值如表5-1所示。

表5-1 事务块状态

事务状态

事务状态机结构体

默认

TBLOCK_DEFAULT

已开始

TBLOCK_STARTED

事务块开启

TBLOCK_BEGIN

事务块运行中

TBLOCK_INPROGRESS

事务块结束

TBLOCK_END

回滚

TBLOCK_ABORT

回滚结束

TBLOCK_ABORT_END

回滚等待

TBLOCK_ABORT_PENDING

在无异常情形下,一个事务块的状态机如图5-2所示按照默认(TBLOCK_DEFAULT)->已开始(TBLOCK_STARTED)->事务块开启(TBLOCK_BEGIN)->事务块运行中(TBLOCK_INPROGRESS)->事务块结束(TBLOCK_END)->默认(TBLOCK_DEFAULT)循环。剩余的状态机是在上述正常场景下的各个状态点的异常处理分支。

(1) 在进入事务块运行中(TBLOCK_INPROGRESS)之前出错,因为事务还没有开启,直接报错并回滚,清理资源回到默认(TBLOCK_DEFAULT)状态。
(2) 在事务块运行中(TBLOCK_INPROGRESS)出错分为2种情形。事务执行失败:事务块运行中(TBLOCK_INPROGRESS)->回滚(TBLOCK_ABORT)->回滚结束(TBLOCK_ABORT_END)->默认(TBLOCK_DEFAULT);用户手动回滚执行成功的事务:事务块运行中(TBLOCK_INPROGRESS)->回滚等待(TBLOCK_ABORT_PENDING)->默认(TBLOCK_DEFAULT)。
(3) 在用户执行COMMIT语句时出错:事务块结束(TBLOCK_END)->默认(TBLOCK_DEFAULT)。由图5-2可以看出,事务开始后离开默认(TBLOCK_DEFAULT)状态,事务完全结束后回到默认(TBLOCK_DEFAULT)状态。
(4) openGauss同时还支持隐式事务块,当客户端执行单条SQL语句时可以自动提交,其状态机相对比较简单:按照默认(TBLOCK_DEFAULT)->已开始(TBLOCK_STARTED)->默认(TBLOCK_DEFAULT)循环。

2. 事务底层状态

TransState结构体代码如下:从内核视角的事务状态,真正意义上的事务状态。

typedef enum TransState
{
TRANS_DEFAULT,/* 当前为空闲缺省状态,无事务开启*/
TRANS_START,/* 事务正在开启*/
TRANS_INPROGRESS,/* 事务开始完毕,进入事务运行中*/
TRANS_COMMIT,/* 事务正在提交*/
TRANS_ABORT,/* 事务正在回滚*/
TRANS_PREPARE/* 两阶段提交事务进入PREPARE TRANSACTION阶段*/
} TransState;

openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-3事务底层状态
内核内部底层状态如图5-3所示,底层状态机的描述见结构体TransState。
(1) 在事务开启前事务状态为TRANS_DEFAULT。
(2) 事务开启过程中事务状态为TRANS_START。
(3) 事务成功开启后一直处于TRANS_INPROGRESS。
(4) 事务结束/回滚的过程中为TARNS_COMMIT/ TRANS_ABORT。
(5) 事务结束后事务状态回到TRANS_DEFAULT。

3. 事务状态机系统实例

本小节给出一条SQL的状态机运转实例,有助于更好地理解内部事务如何运作。在客户端执行SQL语句:

 BEGIN;
        SELECT * FROM TABLE1; 
    END;

1) 整体流程
整体执行过程如图5-4,任何语句的执行总是先进入事务处理接口事务块中,然后调用事务底层函数处理具体命令,最后返回到事务块中。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-4 整体流程
2) BEGIN执行流程,如图5-5所示。
(1) 入口函数exec_simple_query处理begin命令。
(2) start_xact_command函数开始一个query命令,调用StartTransactionCommand函数,此时事务块上层状态未TBLOCK_DEFAULT,继续调用StartTransaction函数,设置事务底层状态TRANS_START,完成内存、缓存区、锁资源的初始化后将事务底层状态设为TRANS_INPROGRESS,最后在StartTransactionCommand函数中设置事务块上层状态为TBLOCK_STARTED。
(3) PortalRun函数处理begin语句,依次向下调用函数,最后调用BeginTransactionBlock函数转换事务块上层状态为TBLOCK_BEGIN。
(4) finish_xact_command函数结束一个query命令,调用CommitTransactionCommand函数设置事务块上层状态从TBLOCK_BEGIN变为TBLOCK_INPROGRESS,并等待读取下一条命令。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-5 BEGIN执行流程
3) SELECT执行流程,如图5-6所示。
(1) 入口函数exec_simple_query处理“SELECT * FROM table1;”命令。
(2) start_xact_command函数开始一个query命令,调用StartTransactionCommand函数,由于当前上层事务块状态为TBLOCK_INPROGRESS,说明已经在事务块内部,则直接返回,不改变事务上层以及底层的状态。
(3) PortalRun执行SELECT语句,依次向下调用函数ExecutorRun根据执行计划执行最优路径查询。
(4) finish_xact_command函数结束一条query命令,调用CommitTransactionCommand函数,当前事务块上层状态仍为TBLOCK_INPROGESS,不改变当前事务上层以及底层的状态。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
4) END执行流程,如图5-7所示。
(1) 入口函数exec_simple_query处理end命令。
(2) start_xact_command函数开始一个query命令,调用StartTransactionCommand函数,当前上层事务块状态为TBLOCK_INPROGESS,表明事务仍然在进行,此时也不改变任何上层及底层事务状态。
(3) PortalRun函数处理end语句,依次调用processUtility函数,最后调用EndTransactionBlock函数对当前上层事务块状态机进行转换,设置事务块上层状态为TBLOCK_END。
(4) Finish_xact_command函数结束query命令,调用CommitTransactionCommand函数,当前事务块状态TBLOCK_END;继续调用CommitTransaction函数提交事务,设置事务底层状态为TRANS_COMMIT,进行事务提交流程并且清理事务资源;清理后设置底层事务状态为TRANS_DEFAULT,返回CommitTansactionCommand函数;设置事务块上层状态为TBLOCK_DEFAULT,整个事务块结束。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-7 END执行流程

4. 事务状态转换相关函数简述

1) 事务处理子函数:根据当前事务上层状态机,对事务的资源进行相应的申请、回收及清理。

具体介绍如表5-2所示。

表5-2 事务处理子函数

子函数

说明

StartTransaction

开启事务,对内存及变量进行初始化操作,完成后将底层事务状态置为TRANS_INPROGRESS

CommitTransaction

当前的底层状态机为TRANS_INPROGRESS,然后置为TRANS_COMMIT,本地持久化CLOG及XLOG日志,并清空相应的事务槽位信息,最后将底层状态机置为TRANS_DEFAULT

PrepareTransaction

当前底层状态机为TRANS_INPROGRESS,同前面描述的CommitTransaction函数类似处理,设置底层状态机为TRANS_PREPARE,构造两阶段GXACT结构并创建两阶段文件,加入dummy的槽位信息,将线程的锁信息转移到dummy槽位中,释放资源,最后将底层状态机置为TRANS_DEFAULT

AbortTransaction

释放LWLock、UnlockBuffers、LockErrorCleanup,当前底层状态为TRANS_INPROGRESS,设置为TRANS_ABORT,记录相应的CLOG日志,清空事务槽位信息,释放各类资源

CleanupTransaction

当前底层状态机应为TRANS_ABORT,继续清理一些资源,一般紧接着AbortTransaction调用

FinishPreparedTransaction

结束两阶段提交事务

StartSubTransaction

开启子事务

CommitSubTransaction

提交子事务

AbortSubTransaction

回滚子事务

CleanupSubTransaction

清理子事务的一些资源信息,类似于CleanupTransaction

PushTransaction/PopTransaction

子事务类似于一个栈式的信息,Push/Pop函数是开启和结束子事务时使用

2) 处理函数,根据相应的状态机调用子函数。

具体介绍如表5-3所示。

表5-3 事务执行函数

函数

说明

StartTransactionCommand

事务开始时根据上层状态机调用相应的事务执行函数

CommitTransactionCommand

事务结束时根据上层状态机调用相应的事务执行函数

AbortCurrentTransaction

事务内部出错,长跳转longjump调用,提前清理掉相应的资源,并将事务上层状态机置为TBLOCK_ABORT

3) 上层事务状态机控制函数

具体介绍如表5-4所示。

表5-4 上层事务状态机控制函数

函数

说明

BeginTransactionBlock

开启一个显式事务时,将上层事务状态机变为TBLOCK_BEGIN

EndTransactionBlock

显式提交一个事务时,将上层事务状态机变为TBLOCK_END

UserAbortTransactionBlock

显式回滚一个事务时,将上层事务状态机变为TBLOCK_ABORT_PENDING/ TBLOCK_ABORT_END

PrepareTransactionBlock

显式执行PREPARE语句,将上层事务状态机变为TBLOCK_PREPARE

DefineSavepoint

执行savepoint语句,通过调用PushTransaction将子事务上层事务状态机变为TBLOCK_SUBBEGIN

ReleaseSavepoint

执行release savepoint语句,将子事务上层状态机转变为TBLOCK_SUBRELEASE

RollbackToSavepoint

执行“rollback to”语句,将所有子事务上层状态机转变为TBLOCK_SUBABORT_PENDING/ TBLOCK_SUBABORT_END,顶层事务的上层状态机转变为TBLOCK_SUBABORT_RESTART

5.2.2 事务ID分配及CLOG/CSNLOG

为了在数据库内部区别不同的事务,openGauss数据库会为它们分配唯一的标识符,即事务id(transaction id,缩写xid),xid是uint64单调递增的序列。当事务结束后,使用CLOG记录是否提交,使用CSNLOG(commit sequence number log)记录该事务提交的序列,用于可见性判断。

1. 64位xid及其分配

openGauss对每一个写事务均会分配一个唯一标识。当事务插入时,会将事务信息写到元组头部的xmin,代表插入该元组的xid;当事务进行更新和删除时,会将当前事务信息写到元组头部的xmax,代表删除该元组的xid。当前事务id的分配采用的是uint64单调递增序列,为了节省空间以及兼容老的版本,当前设计是将元组头部的xmin/xmax分成两部分存储,元组头部的xmin/xmax均为uint32的数字;页面的头部存储64位的xid_base,为当前页面的xid_base。
元组结构如图5-8所示,页面头结构如图5-9所示,那么对于每一条元组真正的xmin、xmax计算公式即为:元组头中xmin/xmax + 页面xid_base。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-8 元组结构
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-9 页面头结构
当页面不断有更大的xid插入进来时,可能超过“xid_base + 232”,此时需要通过调节xid_base来满足所有元组的xmin/xmax都可以通过该值及元组头部的值计算出来,详细逻辑见“2. CLOG、CSNLOG”内“3) 关键函数:”中的第(3)小节。
为了使xid不消耗过快,openGauss当前只对写事务进行xid的分配,只读事务不会额外分配xid,也就是说并不是任何事务一开始都会分配xid,只有真正使用xid时才会去分配。在分配子事务xid时,如果父事务还未分配xid,则会先给父事务分配xid,再给子事务分配xid,确保子事务的xid比父事务大。理论上64位xid已经足够使用:假设数据库的tps为1000万,即1秒钟处理1000万个事务,64xid可以使用58万年。

2. CLOG、CSNLOG

CLOG以及CSNLOG分别维护事务ID->CommitLog以及事务ID->CommitSeqNoLog的映射关系。由于内存的资源有限,并且系统中可能会有长事务存在,内存中可能无法存放所有的映射关系,此时需要将这些映射写盘成物理文件,所以产生了CLOG(XID->CommitLog Map)、CSNLOG(XID->CommitSeqNoLog Map)文件。CSNLOG以及CLOG均采用了SLRU(simple least recently used,简单最近最少使用)机制来实现文件的读取及刷盘操作。
1) CLOG用于记录事务id的提交状态。openGauss中对于每个事务id使用4个bit位来标识它的状态。CLOG定义代码如下:

#define CLOG_XID_STATUS_IN_PROGRESS 0x00  表示事务未开始或还在运行中(故障场景可能是crash)
#define CLOG_XID_STATUS_COMMITTED 0x01 表示该事务已经提交
#define CLOG_XID_STATUS_ABORTED 0x02 表示该事务已经回滚
#define CLOG_XID_STATUS_SUB_COMMITTED 0x03 表示子事务已经提交而父事务状态未知

CLOG页面的物理组织形式如图5-10所示。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-10 CLOG页面的物理组织形式
图5-10标识事务1、4、5还在运行中,事务2已经提交,事务3已经回滚。
2) CSNLOG用于记录事务提交的序列号。openGauss为每个事务id分配8个字节uint64的CSN号,所以一个8kB页面能保存1k个事务的CSN号。CSNLOG达到一定大小后会分块,每个CSNLOG文件块的大小为256kB。同xid号类似,CSN号预留了几个特殊的号。CSNLOG定义代码如下:

#define COMMITSEQNO_INPROGRESS UINT64CONST(0x0) 表示该事务还未提交或回滚
#define COMMITSEQNO_ABORTED UINT64CONST(0x1) 表示该事务已经回滚
#define COMMITSEQNO_FROZEN UINT64CONST(0x2) 表示该事务已提交,且对任何快照可见
#define COMMITSEQNO_FIRST_NORMAL UINT64CONST(0x3) 事务正常的CSN号起始值
#define COMMITSEQNO_COMMIT_INPROGRESS (UINT64CONST(1) << 62) 事务正在提交中

同CLOG相似,CSNLOG的物理结构体如图5-11所示。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-11 CSNLOG的物理结构体
事务id 2048、2049、2050、2051、2052、2053的对应的CSN号依次是5、4、7、10、6、8;也就是说事务提交的次序依次是2049->2048->2052->2050->2053->2051。

3) 关键函数
64位xid页面xid_base的计算函数:
(1) Heap_page_prepare_for_xid函数:在对页面有写入操作时调用,用来调节xid_base。
新来xid在“xid_base + FirstNormalxid”与“xid_base + MaxShortxid(0xFFFFFFFF)”之间时,当前的xid_base不需要调整。
新来xid在“xid_base + FirstNormalxid”左侧(xid小于该值)时,需要减小xid_base。
新来xid在“xid_base + MaxShortxid”右侧(xid大于该值)时,需要增加xid_base。
特殊情况下,由于页面的xid跨度大于32位能表示的范围时,就需要冻结掉本页面上较小的xid,即将提交的xid设为FrozenTransactionId(2),该值对所有事务均可见;将回滚的xid设为InvalidTransactionId(0),该值对所有的事务均不可见。
(2) Freeze_single_heap_page函数:对该页面上较小的xid进行冻结操作。
计算oldestxid,比该值小的事务已经无任何事务访问更老的版本,此时可以将提交的xid直接标记为FrozenTransactionId,即对所有事务可见;将回滚的xid标记为InvalidTransactionId,即对所有事务不可见。
页面整理,清理hot update链,重定向itemid,整理页面空间。
根据oldestxid处理各个元组。
(3) Heap_page_shift_base函数:更新xid_base,调整页面中各个元组头中的xmin/xmax。
(4) GetNewTransactionId函数:获取最新的事务id。

5.2.3 MVCC可见性判断机制

openGauss利用多版本并发控制来维护数据的一致性。当扫描数据时每个事务看到的只是拿快照那一刻的数据,而不是数据当前的最新状态。这样就可以避免一个事务看到其他并发事务的更新而导致不一致的场景。使用多版本并发控制的主要优点是读取数据的锁请求与写数据的锁请求不冲突,以此来实现读不阻塞写,写也不阻塞读。下面介绍事务的隔离级别以及openGauss可见性判断CSN机制。

1. 事务隔离级别

SQL标准考虑了并行事务间应避免的现象,定义了以下几种隔离级别,如表5-5所示。

表5-5 事务隔离级别

隔离级别

P0:脏写

P1:脏读

P2:不可重复读

P3:幻读

读未提交

不可能

可能

可能

可能

读已提交

不可能

不可能

可能

可能

可重复读

不可能

不可能

不可能

可能

可串行化

不可能

不可能

不可能

不可能

(1) 脏写(dirty write):两个事务分别写入,两个事务分别提交或回滚,则事务的结果无法确定,即一个事务可以回滚另一个事务的提交。
(2) 脏读(dirty read):一个事务可以读取另一个事务未提交的修改数据。
(3) 不可重复读(fuzzy read):一个事务重复读取前面读取过的数据,数据的结果被另外的事务修改。
(4) 幻读(phantom):一个事务重复执行范围查询,返回一组符合条件的数据,每次查询的结果集因为其他事务的修改发生改变(条数)。

在各类数据库实现的过程中,并发事务产生了一些新的现象,在原来的隔离级别的基础上,有了一些扩展。如表5-6所示。

表5-6 事务隔离级别扩展

隔离级别

P0:脏写

P1:脏读

P4:更新丢失

P2:不可重复读

P3:幻读

A5A:读偏斜

A5B:写偏斜

读未提交

不可能

可能

可能

可能

可能

可能

可能

读已提交

不可能

不可能

可能

可能

可能

可能

可能

可重复读

不可能

不可能

不可能

不可能

可能

不可能

不可能

快照一致性读

不可能

不可能

不可能

不可能

偶尔

不可能

可能

可串行化

不可能

不可能

不可能

不可能

不可能

不可能

不可能

(5) 更新丢失(lost update):一个事务在读取元组并更新该元组的过程中,有另一个事务修改了该元组的值,导致最终这次修改丢失。
(6) 读偏斜(read skew):假设数据x,y有隐式的约束x+y=100;事务一读取x=50;事务二写x=25并更新y=75保证约束成立,事务二提交,事务一再读取y=75,导致事务一中读取x+y=125,不满足约束。
(7) 写偏斜(write skew):假设数据x,y有隐式的约束x+y<=100;事务一读取x=50,并写入y=50;事务二读取y=30并写入x=70,并提交;事务一再提交;最终导致x=70,y=50不满足x+y<=100的约束。
openGauss提供读已提交隔离级别和可重复读隔离级别:在实现上可重复读隔离级别无幻读问题,有A5B写偏斜问题。

2. CSN机制

1) CSN原理简单如图5-12所示。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-12 CSN原理
每个非只读事务在运行过程中会取得一个xid号,在事务提交时会推进CSN,同时会将当前CSN与事务的xid映射关系保存起来(CSNLOG)。图5-12中,实心竖线标识取snapshot(快照)时刻,会获取最新提交CSN(3)的下一个值4。TX1、TX3、TX5已经提交,对应的CSN号分别是1、2、3。TX2、TX4、TX6正在运行,TX7、TX8是未来还未开启的事务。对于当前snapshot而言,严格小于CSN号4的事务提交结果均可见;其余事务提交结果在获取快照时刻还未提交,不可见。
2) MVCC快照可见性判断的流程
获取快照时记录当前活跃的最小的xid,记为snapshot.xmin。当前最新提交的“事务id(latestCompleteXid) + 1”,记为snapshot.xmax。当前最新提交的“CSN号 + 1”(NextCommitSeqNo),记为snapshot.csn。可见性判断的简易流程如图5-13所示。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-13 MVCC快照可见性判断流程
(1) xid大于等于snapshot.xmax时,该事务id不可见。
(2) xid比snapshot.xmin小时,说明该事务id在本次事务启动以前已经结束,需要去CLOG查询事务的提交状态,并在元组头上设置相应的标记位。
(3) xid处于snapshot.xmin和snapshot.xmax之间时,需要从CSN-XID映射中读取事务结束的CSN;如果CSN有值且比snapshot.csn小,表示该事务可见,否则不可见。
3) 提交流程
事务提交流程如图5-14所示。
openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-14 提交流程
(1) 设置CSN-XID映射commit-in-progress标记。
(2) 原子更新NextCommitSeqNo值。
(3) 生成redo日志,写CLOG,写CSNLOG。
(4) 更新PGPROC将对应的事务信息从PGPROC中移除,xid设置为InvalidTransactionId、xmin设置为InvalidTransactionId等。
4) 热备支持
在事务的提交流程步骤(1)与(2)之间,增加commit-in-progress的XLOG日志。备机在读快照时,首先获取轻量锁ProcArrayLock,并计算当前快照。如果使用当前快照中的CSN时,碰到xid对应的CSN号有COMMITSEQNO_COMMIT_INPROGRESS标记,则必须等待相应的事务提交XLOG回放结束后再读取相应的CSN判断是否可见。为了实现上述等待操作,备机在对commit-in-progress的XLOG日志做redo操作时,会调用XactLockTableInsert函数获取相应xid的事务排他锁;其他的读事务如果访问到该xid,会等待在此xid的事务锁上直到相应的事务提交XLOG回放结束后再继续运行。

3. 关键数据结构及函数

1) 快照
快照相关代码如下:

typedef struct SnapshotData {
    SnapshotSatisfiesFunc satisfies;  /* 判断可见性的函数;通常使用MVCC,即HeapTupleSatisfiesMVCC */
    TransactionId xmin; /*当前活跃事务最小值,小于该值的事务说明已结束  */
    TransactionId xmax; /*最新提交事务id(latestCompeleteXid)+1,大于等于改值说明事务还未开始,该事务id不可见  */
    TransactionId* xip; /*记录当前活跃事务链表,在CSN版本中该值无用  */
    TransactionId* subxip; /* 记录缓存子事务活跃链表,在CSN版本中该值无用  */
    uint32 xcnt; /* 记录活跃事务的个数(xip中元组数)在CSN版本中该值无用  */
    GTM_Timeline timeline; /* openGauss单机中无用  */
    uint32 max_xcnt;       /* xip的最大个数,CSN版本中该值无用  */
    int32 subxcnt; /* 缓存子事务活跃链表的个数,在CSN版本中该值无用  */
    int32 maxsubxcnt;  /* 缓存子事务活跃链表最大个数,在CSN版本中该值无用  */
    bool suboverflowed; /* 子事务活跃链表是否已超过共享内存中预分配的上限,在CSN版本中无用。  */

    CommitSeqNo snapshotcsn; /* 快照的CSN号,一般为最新提交事务的CSN号+1(NextCommitSeqNo),CSN号严格小于该值的事务可见。  */

    int prepared_array_capacity; /*  单机openGauss无用  */
    int prepared_count; /* 单机openGauss无用  */
    TransactionId* prepared_array; /* 单机openGauss无用  */

    bool takenDuringRecovery;  /* 是否Recovery过程中产生的快照  */
    bool copied; /* 该快照是会话级别静态的,还是新分配内存拷贝的  */

    CommandId curcid; /*事务块中的命令序列号,即同一事务中,前面插入的数据,后面可见。  */
    uint32 active_count; /* ActiveSnapshot stack的refcount */
    uint32 regd_count;   /* RegisteredSnapshotList 的refcount*/
void* user_data;     /* 本地多版本快照使用,标记该快照还有线程使用,不能直接释放 */
    SnapshotType snapshot_type; /*  openGauss单机无用  */
} SnapshotData;

2) HeapTupleSatisfiesMVCC
用于一般读事务的快照扫描,基于CSN的大体逻辑,详细代码如下:

bool HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer)
{
    …… /*  初始化变量  */

    if (!HeapTupleHeaderXminCommitted(tuple)) { /*  此处先判断用一个bit位记录的hint bit(提示比特位:openGauss判断可见性时,通常需要知道元组xmin和xmax对应的clog的提交状态;为了避免重复访问clog,openGauss内部对可见性判断进行了优化。hint bit是把事务状态直接记录在元组头中,用一个bit位来表示提交和回滚状态。openGauss并不会在事务提交或者回滚时主动更新元组上的 hint bit,而是等到访问该元组并进行可见性判断时,如果发现hint bit没有设置,则从 CLOG 中读取并设置,否则直接读取hint bit的值),防止同一条tuple反复获取事务最终提交状态。如果一次扫描发现该元组的xmin/xmax已经提交,就会打上相应的标记,加速扫描;如果没有标记则继续判断。  */
        if (HeapTupleHeaderXminInvalid(tuple)) /*  同样判断hint bit。如果xmin已经标记为invalid说明插入该元组的事务已经回滚,直接返回不可见  */
            return false;

        if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(page, tuple))) { /*  如果是一个事务内部,需要去判断该元组的CID,也即是同一个事务内,后面的查询可以查到当前事务之前插入的扫描结果  */
          …….
        } else {  /*  如果扫描别的事务,需要根据快照判断事务是否可见  */
            visible = XidVisibleInSnapshot(HeapTupleHeaderGetXmin(page, tuple), snapshot, &hintstatus); /*  通过csnlog判断事务是否可见,并且返回该事务的最终提交状态  */
            if (hintstatus == XID_COMMITTED) /*  如果该事务提交,则打上提交的hint bit用于加速判断  */
                SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED, HeapTupleHeaderGetXmin(page, tuple));

            if (hintstatus == XID_ABORTED) {
                … /*  如果事务回滚,则打上回滚标记  */
                SetHintBits(tuple, buffer, HEAP_XMIN_INVALID, InvalidTransactionId);
            }
            if (!visible) {  /*  如果xmin不可见,则该元组不可见,否则表示插入该元组的事务对于该次快照已经提交,继续判断删除该元组的事务是否对该次快照提交  */
                    return false;
                }
            }
        }
    } else { /* 如果该条元组的xmin已经被打上提交的hint bit,则通过函数接口CommittedXidVisibleInSnapshot判断是否对本次快照可见  */
        /* xmin is committed, but maybe not according to our snapshot */
        if (!HeapTupleHeaderXminFrozen(tuple) &&
            !CommittedXidVisibleInSnapshot(HeapTupleHeaderGetXmin(page, tuple), snapshot)) {
           return false;
        }
    }
   …… /*  后续xmax的判断同xmin类似,如果xmax对于本次快照可见,则说明删除该条元组的事务已经提交,则不可见,否则可见,此处不再赘述 */
    if (!(tuple->t_infomask & HEAP_XMAX_COMMITTED)) {
        if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(page, tuple))) {
            if (HeapTupleHeaderGetCmax(tuple, page) >= snapshot->curcid)
                return true; /* 在扫面前该删除事务已经提交 */
            else
                return false; /* 扫描开始后删除操作的事务才提交 */
        }

        visible = XidVisibleInSnapshot(HeapTupleHeaderGetXmax(page, tuple), snapshot, &hintstatus);
        if (hintstatus == XID_COMMITTED) {
            /* 设置xmax的hint bit */
            SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED, HeapTupleHeaderGetXmax(page, tuple));
        }
        if (hintstatus == XID_ABORTED) {
            /* 回滚或者故障 */
            SetHintBits(tuple, buffer, HEAP_XMAX_INVALID, InvalidTransactionId);
        }
        if (!visible) {
            return true; /* 快照中xmax对应的事务不可见,则认为该元组仍然活跃 */
        }
    } else {
        /* xmax对应的事务已经提交,但是快照中该事务不可见,认为删除该元组的操作未完成,仍然认为该元组可见 */
        if (!CommittedXidVisibleInSnapshot(HeapTupleHeaderGetXmax(page, tuple), snapshot)) {
            return true; /* 认为元组可见 */
        }
    }
    return false;
}

3) HeapTupleSatisfiesNow
该函数的逻辑同MVCC类似,只是此时并没有统一快照,而仅仅是判断当前xmin/xmax的状态,而不再继续调用XidVisibleInSnapshot函数、CommittedXidVisibleInSnapshot函数来判断是否对快照可见。
4) HeapTupleSatisfiesVacuum
根据传入的OldestXmin的值返回相应的状态。死亡元组(openGauss多版本机制中不可见的旧版本元组)且没有任何其他未结束的事务可能访问该元组(xmax<oldestXmin),可以被VACUUM清理。本函数具体代码如下:

HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple htup, TransactionId OldestXmin, Buffer buffer)
{
    ……  /* 初始化变量
    if (!HeapTupleHeaderXminCommitted(tuple)) {  /* hint bit标记加速,与MVCC的逻辑相同。  */
        if (HeapTupleHeaderXminInvalid(tuple))  /* 如果xmin未提交,则返回该元组死亡,可以清理。 */
            return HEAPTUPLE_DEAD;
        xidstatus = TransactionIdGetStatus(HeapTupleGetRawXmin(htup), false);  /* 通过CSNLog来获取当前的事务状态  */
        if (xidstatus == XID_INPROGRESS) {
            if (tuple->t_infomask & HEAP_XMAX_INVALID)  /* 如果xmax还没有,说明没有人删除,此时判断该元组正在插入过程中,否则在删除过程中  */
                return HEAPTUPLE_INSERT_IN_PROGRESS;
            return HEAPTUPLE_DELETE_IN_PROGRESS;  /* 返回正在删除的过程中  */
        } else if (xidstatus == XID_COMMITTED) {  /* 如果xmin提交了,打上hint bit,后面继续看xmax是否提交。  */
            SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED, HeapTupleGetRawXmin(htup));
        } else {
   ….  /* 事务结束了且未提交,可能是abort或者是crash的事务,一般返回死亡,可删除;单机情形 t_thrd.xact_cxt.useLocalSnapshot没有作用,恒为false。 */
            SetHintBits(tuple, buffer, HEAP_XMIN_INVALID, InvalidTransactionId);
            return ((!t_thrd.xact_cxt.useLocalSnapshot || IsInitdb) ? HEAPTUPLE_DEAD : HEAPTUPLE_LIVE);
        }
    }
  /* 接着判断xmax。如果还没有设置xmax说明没有人删除该元组,返回元组存活,不可删除。  */
    if (tuple->t_infomask & HEAP_XMAX_INVALID)
        return HEAPTUPLE_LIVE;
……
    if (!(tuple->t_infomask & HEAP_XMAX_COMMITTED)) {   /*如果xmax提交,则看xmax是否比oldesxmin小。小的话说明没有未结束的事务会访问该元组,可以删除。 */
        xidstatus = TransactionIdGetStatus(HeapTupleGetRawXmax(htup), false);
        if (xidstatus == XID_INPROGRESS)
            return HEAPTUPLE_DELETE_IN_PROGRESS;
         else if (xidstatus == XID_COMMITTED)
            SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED, HeapTupleGetRawXmax(htup));
        else {
…  /*  xmax对应的事务abort或者crash */
            SetHintBits(tuple, buffer, HEAP_XMAX_INVALID, InvalidTransactionId);
            return HEAPTUPLE_LIVE;
        }
    }

  /*判断该元组是否可以删除,xmax<OldestXmin可以删除。  */
    if (!TransactionIdPrecedes(HeapTupleGetRawXmax(htup), OldestXmin))
        return ((!t_thrd.xact_cxt.useLocalSnapshot || IsInitdb) ? HEAPTUPLE_RECENTLY_DEAD : HEAPTUPLE_LIVE);

    /* 该元组可以认为已经DEAD,不被任何活跃事务访问,可以删除。 */
    return ((!t_thrd.xact_cxt.useLocalSnapshot || IsInitdb) ? HEAPTUPLE_DEAD : HEAPTUPLE_LIVE);
}

5) SetXact2CommitInProgress
设置xid对应CSNLog的标记位COMMITSEQNO_COMMIT_INPROGRESS(详情见“5.2.2 事务ID分配及CLOG/CSNLOG”的第2节),表示此xid对应的事务正在提交过程中。该操作是为了保证可见性判断时的原子性,即为了防止并发读事务在CSN号设置的过程中读到不一致的数据。
6) CSNLogSetCommitSeqNo
给对应的xid设置相应的CSNLog。
7) RecordTransactionCommit
记录事务提交,主要是写CLOG、CSNLOG的XLOG日志以及写CLOG及CSNLOG。

5.2.4 进程内多线程管理机制

简述进程内多线程管理机制相关数据结构及多版本快照计算机制。

1. 事务信息管理

数据库启动时候维护了一段共享内存,每个线程初始化的时候会从这个共享内存中获取一个槽位并将其线程信息记录到槽位中。获取快照时,需要在共享内存数组中更新槽位信息,事务结束时,需要从槽位中将其事务信息清除。计算快照时,通过遍历该全局数组,获取当前所有并发线程的事务信息,并计算出快照信息(xmin、xmax、snapshotcsn等)。事务信息管理的关键数据结构代码如下:

typedef struct PGXACT {
    GTM_TransactionHandle handle; /* 单机模式无用参数 */
    TransactionId xid;              /* 该线程持有的xid号,如果没有则为0 */
    TransactionId prepare_xid;    /* 准备阶段的xid号*/

TransactionId xmin; /* 当前事务开启时最小的活跃xid,vaccum操作不会删除那些xid大于等于xmin的元组。  */
    CommitSeqNo csn_min;   /* 当前事务开启时最小的活跃CSN号*/
    TransactionId next_xid; /* 单机模式无用参数*/
    int nxids;  /* 子事物个数*/
    uint8 vacuumFlags; /* vacuum操作相关的flag */

bool needToSyncXid; /* 单机模式无用参数*/
bool delayChkpt; /* 如果该线程需要checkpoint线程延迟等待,此值为true
#ifdef __aarch64__ */
    char padding[PG_CACHE_LINE_SIZE - PGXACT_PAD_OFFSET]; /* 为了性能考虑的结构体对齐*/
#endif
} PGXACT;

struct PGPROC {
    SHM_QUEUE links; /* 链表中的指针 */

    PGSemaphoreData sem; /* 休眠等待的信号量 */
    int waitStatus;      /* 等待状态 */

    Latch procLatch; /* 线程的通用闩锁 */

    LocalTransactionId lxid; /* 当前线程本地顶层事务ID */
    ThreadId pid;            /* 线程的PID */

    ThreadId sessMemorySessionid;
    uint64 sessionid; /* 线程池模式下当前的会话ID */
    int logictid;     /* 逻辑线程ID */
    TransactionId gtt_session_frozenxid;    /* 会话级全局临时表的冻结XID */

    int pgprocno;
    int nodeno;

    /* 线程启动时下面这些数据结构为0 */
    BackendId backendId; /* 线程的后台ID */
    Oid databaseId;      /* 当前访问数据库的OID */
    Oid roleId;          /* 当前用户的OID */

    /* 版本号,用于升级过程中新老版本的判断 */
    uint32 workingVersionNum;

    /*热备模式下,标记当前事务是否收到冲突信号。设置该值时需要持有ProcArray锁。 */
    bool recoveryConflictPending;

    /* 线程等待的轻量级锁信息. */
    bool lwWaiting;        /* 当等待轻量级锁时,为真 */
    uint8 lwWaitMode;      /* 预获取锁的模式 */
    bool lwIsVictim;       /* 强制放弃轻量级锁 */
    dlist_node lwWaitLink; /* 等待在相同轻量级锁对象的下一个等待者 */

/* 线程等待的常规锁信息 */
    LOCK* waitLock;         /* 等待的常规锁对象 */
    PROCLOCK* waitProcLock; /* 等待常规锁对象的持有者 */
    LOCKMODE waitLockMode;  /* 预获取常规锁对象的模式 */
    LOCKMASK heldLocks;     /* 本线程获取锁对象模式的位掩码 */

    /* 等待主备机回放日志同步的信息 */
    XLogRecPtr waitLSN;     /* 等待的lsn*/
    int syncRepState;       /* 等待主备同步的状态 */
    bool syncRepInCompleteQueue;   /* 是否等待在完成队列中 */
    SHM_QUEUE syncRepLinks; /* 指向同步队列的指针 */

    DataQueuePtr waitDataSyncPoint; /* 数据页复制的数据同步点 */
    int dataSyncRepState;           /* 数据页复制的同步状态 */
    SHM_QUEUE dataSyncRepLinks;     /* 指向数据页同步队列的指针*/

    MemoryContext topmcxt; /* 本线程的顶层内存上下文 */
    char myProgName[64];
    pg_time_t myStartTime;
    syscalllock deleMemContextMutex;

    SHM_QUEUE myProcLocks[NUM_LOCK_PARTITIONS];

    /* 以下结构为了实现XID批量提交 */
    /* 是否为XID批量提交中的成员 */
    bool procArrayGroupMember;
    /* XID批量提交中的下一个成员 */
    pg_atomic_uint32 procArrayGroupNext;
    /* 父事务XID和子事物XID中的最大者 */
    TransactionId procArrayGroupMemberXid;

    /* 提交序列号 */
    CommitSeqNo commitCSN;

    /* 以下结构为了实现CLOG批量提交 */
    bool clogGroupMember;  /* 是否为CLOG批量提交中的成员*/
    pg_atomic_uint32 clogGroupNext;  /* CLOG批量提交中的下一个成员*/
    TransactionId clogGroupMemberXid;  /* CLOG批量提交的事务ID */
    CLogXidStatus clogGroupMemberXidStatus; /* CLOG批量提交的事务状态 */
    int64 clogGroupMemberPage;  /* CLOG批量提交对应的CLOG页面 */
    XLogRecPtr clogGroupMemberLsn;  /* CLOG批量提交成员的提交回放日志位置 */
#ifdef __aarch64__  
    /* 以下结构体是为了实现ARM架构下回放日志批量插入 */
    bool xlogGroupMember;
    pg_atomic_uint32 xlogGroupNext;
    XLogRecData* xlogGrouprdata;
    XLogRecPtr xlogGroupfpw_lsn;
    XLogRecPtr* xlogGroupProcLastRecPtr;
    XLogRecPtr* xlogGroupXactLastRecEnd;
    void* xlogGroupCurrentTransactionState;
    XLogRecPtr* xlogGroupRedoRecPtr;
    void* xlogGroupLogwrtResult;
    XLogRecPtr xlogGroupReturntRecPtr;
    TimeLineID xlogGroupTimeLineID;
    bool* xlogGroupDoPageWrites;
    bool xlogGroupIsFPW;
    uint64 snap_refcnt_bitmap;
#endif

    LWLock* subxidsLock;
struct XidCache subxids; /* 子事物XID */

    LWLock* backendLock; /* 每个线程的轻量级锁,用于保护以下数据结构的并发访问 */

    /* Lock manager data, recording fast-path locks taken by this backend. */
    uint64 fpLockBits;  /* 快速路径锁的持有模式 */
    FastPathTag fpRelId[FP_LOCK_SLOTS_PER_BACKEND]; /* 表对象的槽位 */
    bool fpVXIDLock;  /* 是否获得本地XID的快速路径锁 */
    LocalTransactionId fpLocalTransactionId; /* 本地的XID */
};

openGauss数据库源码解析系列文章—— 事务机制源码解析(一)
图5-15 事务信息
如图5-15所示,proc_base_all_procs以及proc_base_all_xacts为全局的共享区域,每个线程启动的时候会在这个共享区域中注册一个槽位,并且将线程级指针变量t_thrd.proc以及t_thrd.pgxact指向该区域。当该线程有事务开始时,会将对应事务的xmin、xid等信息填写到pgxact结构体中。关键函数及接口如下。
(1) GetOldestXmin:返回当前多版本快照缓存的oldestXmin。(多版本快照机制见后续章节)
(2) ProcArrayAdd:线程启动时在共享区域中注册一个槽位。
(3) ProcArrayRemove:将当前线程从ProcArray数组中移除。
(4) TransactionIdIsInProgress:判断xid是否还在运行之中。

2. 多版本快照机制

因为openGauss使用一段共享内存来实现快照的获取以及各线程事务信息的管理,计算快照持有共享锁以及事务结束持有排他锁有严重的锁争抢问题。为了解决该冲突,openGauss引入了多版本快照机制解决锁冲突。每当事务结束时,持有排他锁、计算快照的一个版本,记录到一个环形缓冲区队列内存里;当别的线程获取快照时,并不持有共享锁去重新计算,而是通过原子操作到该环形队列顶端获取最新快照并将其引用计数加1;待拷贝完了快照信息后,将引用计数减1;当槽位引用计数为0时,表示可以被新的快照复用。
1) 多版本快照数据结构
多版本快照数据结构代码如下:

typedef struct _snapxid {
    TransactionId xmin;
    TransactionId xmax;
    CommitSeqNo snapshotcsn;
    TransactionId localxmin;
    bool takenDuringRecovery;
    ref_cnt_t ref_cnt[NREFCNT]; /* 该快照的引用计数,如果为0则可复用 */
} snapxid_t;  /*多版本快照内容,在openGauss CSN方案下,仅需要记录xmin、xmax、snapshotcsn等关键信息即可。*/

static snapxid_t* g_snap_buffer = NULL;     /*   缓冲区队列内存区指针  */
static snapxid_t* g_snap_buffer_copy = NULL; /*   缓冲区队列内存的浅拷贝 */
static size_t g_bufsz = 0;
static bool g_snap_assigned = false;  /*多版本快照buffer队列是否已初始化 */

#define SNAP_SZ sizeof(snapxid_t) /*  每一个多版本快照的size大小  */
#define MaxNumSnapVersion 64    /* 多版本快照队列的大小,64个版本  */

static volatile snapxid_t* g_snap_current = NULL; /*  当前的快照指针  */
static volatile snapxid_t* g_snap_next = NULL; /*  下一个可用槽位的快照指针  */

2) buffer队列创建流程
在创建共享内存时,根据MaxNumSnapVersion函数的size生成“MaxNumSnapVersion * SNAP_SZ”大小的共享内存区。并将g_snap_current置为0号偏移,g_snap_next置为“1 * SNAP_SZ”偏移。
3) 多版本快照的计算
(1) 获取当前g_snap_next。
(2) 保证当前已持有Proc数组的排他锁,进行xmin、xmax、CSN等关键结构的计算,并存放到g_snap_next中。
(3) 寻找下一个refcount为0可复用的槽位,将g_snap_current赋值为g_snap_next,g_snap_next赋值为可复用的槽位偏移。
4) 多版本快照的获取
(1) 获取g_snap_current指针并将当前快照槽位的引用计数加1,防止并发更新快照时被复用。
(2) 将当前快中的信息拷贝到当前连接的静态快照内存中。
(3) 释放当前多版本快照,并将当前快照槽位的引用计数减1。
5) 关键函数
(1) CreateSharedRingBuffer:创建多版本快照共享内存信息。
(2) GetNextSnapXid:获取下一个多版本快照位置。函数代码如下:

static inline snapxid_t* GetNextSnapXid()
{
return g_snap_buffer ? (snapxid_t*)g_snap_next : NULL;
}

(3) SetNextSnapXid:获取下一个可用的槽位,并且将当前多版本快照最新更新。函数代码如下:

static void SetNextSnapXid()
{
    if (g_snap_buffer != NULL) {
        g_snap_current = g_snap_next; /* 将最新的多版本快照更新到最新。*/
        pg_write_barrier(); /* 此处是防止buffer ring初始化时的ARM乱序问题。*/
        g_snap_assigned = true;
        snapxid_t* ret = (snapxid_t*)g_snap_current;
        size_t idx = SNAPXID_INDEX(ret);
    loop: /* 主循环,整体思路是不停遍历多版本槽位信息,一直找到一个refcout为0的可重用槽位。*/
        do {
            ++idx;
            /* 如果发生回卷,那么重头再找 */
            if (idx == g_bufsz)
                idx = 0;
            ret = SNAPXID_AT(idx);
            if (IsZeroRefCount(ret)) {
                g_snap_next = ret;
                return;
            }
        } while (ret != g_snap_next);
        ereport(WARNING, (errmsg("snapshot ring buffer overflow.")));
/* 当前多版本快照个数为64个,理论上可能是会出现槽位被占满,如果没有空闲槽位,重新遍历即可。 */
        goto loop;
    }
}

(4) CalculateLocalLatestSnapshot:计算多版本快照信息。函数代码如下:

void CalculateLocalLatestSnapshot(bool forceCalc)
{
    …/* 初始化变量  */

    snapxid_t* snapxid = GetNextSnapXid(); /*设置下一个空闲多版本快照槽位信息 */

    /* 初始化xmax为 latestCompletedXid + 1 */ 
    xmax = t_thrd.xact_cxt.ShmemVariableCache->latestCompletedXid;
    TransactionIdAdvance(xmax);

     /*并不是每个事务提交都会重新计算xmin和oldestxmin,只有每1000个事务或者每隔1s才会计算,此时xmin及oldestxmin一般偏小,但是不影响可见性判断。  */
    currentTimeStamp = GetCurrentTimestamp();
    if (forceCalc || ((++snapshotPendingCnt == MAX_PENDING_SNAPSHOT_CNT) ||
                         (TimestampDifferenceExceeds(snapshotTimeStamp, currentTimeStamp, CALC_SNAPSHOT_TIMEOUT)))) {
        snapshotPendingCnt = 0;
        snapshotTimeStamp = currentTimeStamp;

        /* 初始化xmin */
        globalxmin = xmin = xmax;

        int* pgprocnos = arrayP->pgprocnos;
        int numProcs;

        /*
          循环遍历proc并计算快照相应值 
         */
        numProcs = arrayP->numProcs;
         /*主要流程,遍历proc_base_all_xacts,将其中pgxact->xid的最小值记为xmin,其中pgxact->xmin的最小值记为oldestxmin。  */
        for (index = 0; index < numProcs; index++) {
            int pgprocno = pgprocnos[index];
            volatile PGXACT* pgxact = &g_instance.proc_base_all_xacts[pgprocno];
            TransactionId xid;

            if (pgxact->vacuumFlags & PROC_IN_LOGICAL_DECODING)
                continue;

            /* 对于autovacuum的xmin,跳过,避免长VACUUM阻塞脏元组回收 */
            if (pgxact->vacuumFlags & PROC_IN_VACUUM)
                continue;

            /* 用最小的xmin来更新globalxmin */
            xid = pgxact->xmin; 

            if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, globalxmin))
                globalxmin = xid;

            xid = pgxact->xid;

            if (!TransactionIdIsNormal(xid))
                xid = pgxact->next_xid;

            if (!TransactionIdIsNormal(xid) || !TransactionIdPrecedes(xid, xmax))
                continue;

            if (TransactionIdPrecedes(xid, xmin))
                xmin = xid;
        }

        if (TransactionIdPrecedes(xmin, globalxmin))
            globalxmin = xmin;

        t_thrd.xact_cxt.ShmemVariableCache->xmin = xmin;
        t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin = globalxmin;
    }
     /* 此处给多版本快照信息赋值,xmin、oldestxmin因为不是及时计算故可能偏小,xmax、CSN号都是当前的准确值,注意计算快照的时候必须持有排他锁。  */
    snapxid->xmin = t_thrd.xact_cxt.ShmemVariableCache->xmin;
    snapxid->xmax = xmax;
    snapxid->localxmin = t_thrd.xact_cxt.ShmemVariableCache->recentLocalXmin;
    snapxid->snapshotcsn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo;
    snapxid->takenDuringRecovery = RecoveryInProgress();
    SetNextSnapXid(); /*设置当前多版本快照  */
}
(5) GetLocalSnapshotData:获取最新的多版本快照供事务使用。函数代码如下:
Snapshot GetLocalSnapshotData(Snapshot snapshot)
{
      /* 检查是否有多版本快照。在recover启动之前,是没有计算出多版本快照的,此时直接返回。 */
    if (!g_snap_assigned || (g_snap_buffer == NULL)) {
        ereport(DEBUG1, (errmsg("Falling back to origin GetSnapshotData: not assigned yet or during shutdown\n")));
        return NULL;
    }
    pg_read_barrier(); /*为了防止ringBuffer初始化时的ARM乱序问题*/
    snapxid_t* snapxid = GetCurrentSnapXid();  /* 将当前的多版本快照refcount++,避免被并发计算新快照的事务重用。  */

    snapshot->user_data = snapxid;

    …  /* 此处将多版本快照snapxid中的信息赋值给快照,注意此处是深拷贝,因为多版本快照仅有几个变量的关键信息,直接赋值即可,之后就可以将相应的多版本快照refcount释放。  */
    u_sess->utils_cxt.RecentXmin = snapxid->xmin;
    snapshot->xmin = snapxid->xmin;
    snapshot->xmax = snapxid->xmax;
    snapshot->snapshotcsn = snapxid->snapshotcsn;
    …
    ReleaseSnapshotData(snapshot);  /* 将多版本快照的refcount释放,以便可以被重用。  */
    return snapshot;
}

本篇为介绍完"5.1 事务整体架构和代码概览"及“5.2 事务并发控制”,下一篇将继续介绍“5.3 锁机制”的相关内容,敬请期待。

相关文章

暂无评论

暂无评论...