PostgreSQL

PostgreSQL 源码解读(119)- MVCC#4(启动事

2020-07-15  本文已影响0人  EthanHe

本节介绍了PostgreSQL启动事务的逻辑,主要内容是函数StartTransaction的实现逻辑。

一、数据结构

静态变量
当前事务状态CurrentTransactionState


/*
 * CurrentTransactionState always points to the current transaction state
 * block.  It will point to TopTransactionStateData when not in a
 * transaction at all, or when in a top-level transaction.
 * CurrentTransactionState通常指向当前事务块.
 * 如不处于事务中或者处于顶层事务中,则指向TopTransactionStateData
 */
static TransactionStateData TopTransactionStateData = {
    .state = TRANS_DEFAULT,
    .blockState = TBLOCK_DEFAULT,
};

/*
 * unreportedXids holds XIDs of all subtransactions that have not yet been
 * reported in an XLOG_XACT_ASSIGNMENT record.
 * unreportedXids保存所有尚未在XLOG_XACT_ASSIGNMENT记录的子事务.
 */
static int  nUnreportedXids;
static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];

static TransactionState CurrentTransactionState = &TopTransactionStateData;

/*
 * The subtransaction ID and command ID assignment counters are global
 * to a whole transaction, so we do not keep them in the state stack.
 * subtransaction ID和command ID全局计数器,对事务可见,在state栈中不记录这些信息.
 */
static SubTransactionId currentSubTransactionId;
static CommandId currentCommandId;
static bool currentCommandIdUsed;
 

TransactionState
事务状态结构体

/*
 *  transaction states - transaction state from server perspective
 *  事务状态枚举 - 服务器视角的事务状态
 */
typedef enum TransState
{
    TRANS_DEFAULT,              /* idle 空闲 */
    TRANS_START,                /* transaction starting 事务启动 */
    TRANS_INPROGRESS,           /* inside a valid transaction 进行中 */
    TRANS_COMMIT,               /* commit in progress 提交中 */
    TRANS_ABORT,                /* abort in progress 回滚中 */
    TRANS_PREPARE               /* prepare in progress 准备中 */
} TransState;

/*
 *  transaction block states - transaction state of client queries
 *  事务块状态 - 客户端查询的事务状态
 *
 * Note: the subtransaction states are used only for non-topmost
 * transactions; the others appear only in the topmost transaction.
 * 注意:subtransaction只用于非顶层事务;其他字段用于顶层事务.
 */
typedef enum TBlockState
{
    /* not-in-transaction-block states 未进入事务块状态 */
    TBLOCK_DEFAULT,             /* idle 空闲  */
    TBLOCK_STARTED,             /* running single-query transaction 单个查询事务 */

    /* transaction block states 事务块状态 */
    TBLOCK_BEGIN,               /* starting transaction block 开始事务块 */
    TBLOCK_INPROGRESS,          /* live transaction 进行中 */
    TBLOCK_IMPLICIT_INPROGRESS, /* live transaction after implicit BEGIN 隐式事务,进行中 */
    TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker 并行worker中的事务,进行中 */
    TBLOCK_END,                 /* COMMIT received 接收到COMMIT */
    TBLOCK_ABORT,               /* failed xact, awaiting ROLLBACK 失败,等待ROLLBACK */
    TBLOCK_ABORT_END,           /* failed xact, ROLLBACK received 失败,已接收ROLLBACK */
    TBLOCK_ABORT_PENDING,       /* live xact, ROLLBACK received 进行中,接收到ROLLBACK */
    TBLOCK_PREPARE,             /* live xact, PREPARE received 进行中,接收到PREPARE */

    /* subtransaction states 子事务状态 */
    TBLOCK_SUBBEGIN,            /* starting a subtransaction 开启 */
    TBLOCK_SUBINPROGRESS,       /* live subtransaction 进行中 */
    TBLOCK_SUBRELEASE,          /* RELEASE received 接收到RELEASE */
    TBLOCK_SUBCOMMIT,           /* COMMIT received while TBLOCK_SUBINPROGRESS 进行中,接收到COMMIT */
    TBLOCK_SUBABORT,            /* failed subxact, awaiting ROLLBACK 失败,等待ROLLBACK */
    TBLOCK_SUBABORT_END,        /* failed subxact, ROLLBACK received 失败,已接收ROLLBACK */
    TBLOCK_SUBABORT_PENDING,    /* live subxact, ROLLBACK received 进行中,接收到ROLLBACK */
    TBLOCK_SUBRESTART,          /* live subxact, ROLLBACK TO received 进行中,接收到ROLLBACK TO */
    TBLOCK_SUBABORT_RESTART     /* failed subxact, ROLLBACK TO received 失败,已接收ROLLBACK TO */
} TBlockState;

/*
 *  transaction state structure
 *  事务状态结构体
 */
typedef struct TransactionStateData
{
    //事务ID
    TransactionId transactionId;    /* my XID, or Invalid if none */
    //子事务ID
    SubTransactionId subTransactionId;  /* my subxact ID */
    //保存点名称
    char       *name;           /* savepoint name, if any */
    //保存点级别
    int         savepointLevel; /* savepoint level */
    //低级别的事务状态
    TransState  state;          /* low-level state */
    //高级别的事务状态
    TBlockState blockState;     /* high-level state */
    //事务嵌套深度
    int         nestingLevel;   /* transaction nesting depth */
    //GUC上下文嵌套深度
    int         gucNestLevel;   /* GUC context nesting depth */
    //事务生命周期上下文
    MemoryContext curTransactionContext;    /* my xact-lifetime context */
    //查询资源
    ResourceOwner curTransactionOwner;  /* my query resources */
    //按XID顺序保存的已提交的子事务ID
    TransactionId *childXids;   /* subcommitted child XIDs, in XID order */
    //childXids数组大小
    int         nChildXids;     /* # of subcommitted child XIDs */
    //分配的childXids数组空间
    int         maxChildXids;   /* allocated size of childXids[] */
    //上一个CurrentUserId
    Oid         prevUser;       /* previous CurrentUserId setting */
    //上一个SecurityRestrictionContext
    int         prevSecContext; /* previous SecurityRestrictionContext */
    //上一事务是否只读?
    bool        prevXactReadOnly;   /* entry-time xact r/o state */
    //是否处于Recovery?
    bool        startedInRecovery;  /* did we start in recovery? */
    //XID是否已保存在WAL Record中?
    bool        didLogXid;      /* has xid been included in WAL record? */
    //Enter/ExitParallelMode计数器
    int         parallelModeLevel;  /* Enter/ExitParallelMode counter */
    //父事务状态
    struct TransactionStateData *parent;    /* back link to parent */
} TransactionStateData;

//结构体指针
typedef TransactionStateData *TransactionState;

VirtualTransactionId
VirtualTransactionIDs由执行事务的后台进程BackendId和逻辑分配的LocalTransactionId组成.

/*
 * Top-level transactions are identified by VirtualTransactionIDs comprising
 * the BackendId of the backend running the xact, plus a locally-assigned
 * LocalTransactionId.  These are guaranteed unique over the short term,
 * but will be reused after a database restart; hence they should never
 * be stored on disk.
 * 最高层的事务通过VirtualTransactionIDs定义.
 * VirtualTransactionIDs由执行事务的后台进程BackendId和逻辑分配的LocalTransactionId组成.
 *
 * Note that struct VirtualTransactionId can not be assumed to be atomically
 * assignable as a whole.  However, type LocalTransactionId is assumed to
 * be atomically assignable, and the backend ID doesn't change often enough
 * to be a problem, so we can fetch or assign the two fields separately.
 * We deliberately refrain from using the struct within PGPROC, to prevent
 * coding errors from trying to use struct assignment with it; instead use
 * GET_VXID_FROM_PGPROC().
 * 请注意,不能假设struct VirtualTransactionId作为一个整体是原子可分配的。
 * 但是,类型LocalTransactionId是假定原子可分配的,同时后台进程ID不会经常变换,因此这不是一个问题,
 *   因此我们可以单独提取或者分配这两个域字段.
 * 
 */
typedef struct
{
    BackendId   backendId;      /* determined at backend startup */
    LocalTransactionId localTransactionId;  /* backend-local transaction id */
} VirtualTransactionId;

二、源码解读

StartTransaction函数,用于启动事务,设置事务状态为TRANS_INPROGRESS,CurrentTransactionState->state = TRANS_INPROGRESS.

/*
 *  StartTransaction
 *  启动事务
 */
static void
StartTransaction(void)
{
    TransactionState s;//事务状态
    VirtualTransactionId vxid;//虚拟事务ID

    /*
     * Let's just make sure the state stack is empty
     * 确保事务栈是空的
     */
    s = &TopTransactionStateData;
    CurrentTransactionState = s;

    Assert(XactTopTransactionId == InvalidTransactionId);

    /* check the current transaction state */
    //检查当前事务状态
    Assert(s->state == TRANS_DEFAULT);

    /*
     * Set the current transaction state information appropriately during
     * start processing.  Note that once the transaction status is switched
     * this process cannot fail until the user ID and the security context
     * flags are fetched below.
     * 在启动过程中设置当前事务状态信息。
     * 请注意,一旦切换了事务状态,在后续获取用户ID和安全上下文标志前,不会出现异常。
     */
    s->state = TRANS_START;
    //无效事务ID,待分配
    s->transactionId = InvalidTransactionId;    /* until assigned */

    /*
     * initialize current transaction state fields
     * 初始化当前事务状态字段
     *
     * note: prevXactReadOnly is not used at the outermost level
     * 注意:prevXactReadOnly不会在最外层中使用
     */
    s->nestingLevel = 1;
    s->gucNestLevel = 1;
    s->childXids = NULL;
    s->nChildXids = 0;
    s->maxChildXids = 0;

    /*
     * Once the current user ID and the security context flags are fetched,
     * both will be properly reset even if transaction startup fails.
     * 一旦当前用户ID和安全上下文标记已提取,即使事务启动失败,也会正确地重置它们。
     */
    GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);

    /* SecurityRestrictionContext should never be set outside a transaction */
    //SecurityRestrictionContext不应在事务外设置
    Assert(s->prevSecContext == 0);

    /*
     * Make sure we've reset xact state variables
     * 确保已重置了xact状态变量
     *
     * If recovery is still in progress, mark this transaction as read-only.
     * We have lower level defences in XLogInsert and elsewhere to stop us
     * from modifying data during recovery, but this gives the normal
     * indication to the user that the transaction is read-only.
     * 如仍处于恢复过程,标志此事务为只读.
     * 在XLogInsert中和其他地方有低级别的保护机制确保在恢复过程中不会更新数据,
     *   只是给用户正常的提示,说明事务只读.
     */
    if (RecoveryInProgress())
    {
        //只读状态
        s->startedInRecovery = true;
        XactReadOnly = true;
    }
    else
    {
        s->startedInRecovery = false;
        XactReadOnly = DefaultXactReadOnly;
    }
    XactDeferrable = DefaultXactDeferrable;
    XactIsoLevel = DefaultXactIsoLevel;
    forceSyncCommit = false;
    MyXactFlags = 0;

    /*
     * reinitialize within-transaction counters
     * 重新初始化事务内计数器
     */
    s->subTransactionId = TopSubTransactionId;
    currentSubTransactionId = TopSubTransactionId;
    currentCommandId = FirstCommandId;
    currentCommandIdUsed = false;

    /*
     * initialize reported xid accounting
     * 初始化已报告的事务计数
     */
    nUnreportedXids = 0;
    s->didLogXid = false;

    /*
     * must initialize resource-management stuff first
     * 必须首先初始化资源管理器
     */
    AtStart_Memory();
    AtStart_ResourceOwner();

    /*
     * Assign a new LocalTransactionId, and combine it with the backendId to
     * form a virtual transaction id.
     * 分配新的本地事务ID(LocalTransactionId),
     *   与backendId组成虚拟事务ID.
     */
    vxid.backendId = MyBackendId;
    vxid.localTransactionId = GetNextLocalTransactionId();

    /*
     * Lock the virtual transaction id before we announce it in the proc array
     * 在proc array声明前,锁定虚拟事务ID
     */
    VirtualXactLockTableInsert(vxid);

    /*
     * Advertise it in the proc array.  We assume assignment of
     * LocalTransactionID is atomic, and the backendId should be set already.
     * 在proc array中声明.
     * 假定LocalTransactionID是原子的,backendId已分配.
     */
    Assert(MyProc->backendId == vxid.backendId);
    MyProc->lxid = vxid.localTransactionId;

    TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);

    /*
     * set transaction_timestamp() (a/k/a now()).  Normally, we want this to
     * be the same as the first command's statement_timestamp(), so don't do a
     * fresh GetCurrentTimestamp() call (which'd be expensive anyway).  But
     * for transactions started inside procedures (i.e., nonatomic SPI
     * contexts), we do need to advance the timestamp.  Also, in a parallel
     * worker, the timestamp should already have been provided by a call to
     * SetParallelStartTimestamps().
     * 设置transaction_timestamp.
     * 正常来说,期望该值与第一条命令的statement_timestamp一样,这样就不需要
     *   调用GetCurrentTimestamp进行刷新(昂贵的操作!).
     * 但对于在过程中启动的事务(如非原子的SPI上下文),我们确实需要增加时间戳.
     * 同样的,在并行worker中,时间戳应通过外层调用SetParallelStartTimestamps提供.
     */
    if (!IsParallelWorker())
    {
        if (!SPI_inside_nonatomic_context())
            xactStartTimestamp = stmtStartTimestamp;
        else
            xactStartTimestamp = GetCurrentTimestamp();
    }
    else
        Assert(xactStartTimestamp != 0);
    pgstat_report_xact_timestamp(xactStartTimestamp);
    /* Mark xactStopTimestamp as unset. */
    //标记xactStopTimestamp未设置
    xactStopTimestamp = 0;

    /*
     * initialize other subsystems for new transaction
     * 为新事务初始化其他子系统(GUC/Cache等)
     */
    AtStart_GUC();
    AtStart_Cache();
    AfterTriggerBeginXact();

    /*
     * done with start processing, set current transaction state to "in
     * progress"
     * 已完成启动过程,设置事务状态为TRANS_INPROGRESS
     */
    s->state = TRANS_INPROGRESS;

    ShowTransactionState("StartTransaction");
}

三、跟踪分析

执行begin,触发该函数调用

11:10:36 (xdb@[local]:5432)testdb=# begin;

启动gdb,设置断点

(gdb) b StartTransaction
Breakpoint 4 at 0x54800f: file xact.c, line 1825.
(gdb) c
Continuing.

Breakpoint 4, StartTransaction () at xact.c:1825
1825        s = &TopTransactionStateData;
(gdb) 

查看调用栈

(gdb) bt
#0  StartTransaction () at xact.c:1825
#1  0x0000000000548f50 in StartTransactionCommand () at xact.c:2718
#2  0x00000000008c8e7d in start_xact_command () at postgres.c:2500
#3  0x00000000008c6771 in exec_simple_query (query_string=0x24a6ec8 "begin;") at postgres.c:948
#4  0x00000000008cae70 in PostgresMain (argc=1, argv=0x24d2dc8, dbname=0x24d2c30 "testdb", username=0x24a3ba8 "xdb")
    at postgres.c:4182
#5  0x000000000082642b in BackendRun (port=0x24c8c00) at postmaster.c:4361
#6  0x0000000000825b8f in BackendStartup (port=0x24c8c00) at postmaster.c:4033
#7  0x0000000000821f1c in ServerLoop () at postmaster.c:1706
#8  0x00000000008217b4 in PostmasterMain (argc=1, argv=0x24a1b60) at postmaster.c:1379
#9  0x00000000007488ef in main (argc=1, argv=0x24a1b60) at main.c:228
(gdb) 

查看TopTransactionStateData全局变量(尚未初始化)

(gdb) p TopTransactionStateData
$7 = {transactionId = 0, subTransactionId = 0, name = 0x0, savepointLevel = 0, state = TRANS_DEFAULT, 
  blockState = TBLOCK_DEFAULT, nestingLevel = 0, gucNestLevel = 0, curTransactionContext = 0x0, curTransactionOwner = 0x0, 
  childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0, prevXactReadOnly = false, 
  startedInRecovery = false, didLogXid = true, parallelModeLevel = 0, parent = 0x0}

设置全局变量CurrentTransactionState = & TopTransactionStateData;

(gdb) n
1826        CurrentTransactionState = s;
(gdb) 
1828        Assert(XactTopTransactionId == InvalidTransactionId);
(gdb) 

初始化事务状态

(gdb) n
1833        if (s->state != TRANS_DEFAULT)
(gdb) 
1841        s->state = TRANS_START;
(gdb) 
1842        s->transactionId = InvalidTransactionId;    /* until assigned */
(gdb) 
1852        if (RecoveryInProgress())
(gdb) 
1859            s->startedInRecovery = false;
(gdb) 
1860            XactReadOnly = DefaultXactReadOnly;
(gdb) 
1862        XactDeferrable = DefaultXactDeferrable;
(gdb) 
1863        XactIsoLevel = DefaultXactIsoLevel;
(gdb) 
1864        forceSyncCommit = false;
(gdb) 
1865        MyXactFlags = 0;
(gdb) 
1870        s->subTransactionId = TopSubTransactionId;
(gdb) 
1871        currentSubTransactionId = TopSubTransactionId;
(gdb) 
1872        currentCommandId = FirstCommandId;
(gdb) 
1873        currentCommandIdUsed = false;
(gdb) 
1878        nUnreportedXids = 0;
(gdb) 
1879        s->didLogXid = false;
(gdb) 
1884        AtStart_Memory();
(gdb) 

启动subsystem(内存/GUC/Cache等)

(gdb) 
1884        AtStart_Memory();
(gdb) n
1885        AtStart_ResourceOwner();
(gdb) 

设置虚拟事务ID

1891        vxid.backendId = MyBackendId;
(gdb) 
1892        vxid.localTransactionId = GetNextLocalTransactionId();
(gdb) 
1897        VirtualXactLockTableInsert(vxid);
(gdb) 
1903        Assert(MyProc->backendId == vxid.backendId);
(gdb) p vxid
$8 = {backendId = 3, localTransactionId = 6}
(gdb) 
(gdb) n
1904        MyProc->lxid = vxid.localTransactionId;
(gdb) 

设置时间戳

1906        TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);
(gdb) 
1917        if (!IsParallelWorker())
(gdb) 
1919            if (!SPI_inside_nonatomic_context())
(gdb) 
1920                xactStartTimestamp = stmtStartTimestamp;
(gdb) 
1926        pgstat_report_xact_timestamp(xactStartTimestamp);
(gdb) 
1928        xactStopTimestamp = 0;
(gdb) 
(gdb) p xactStartTimestamp
$9 = 601009839154257

初始化其他字段

(gdb) n
1935        s->nestingLevel = 1;
(gdb) n
1936        s->gucNestLevel = 1;
(gdb) 
1937        s->childXids = NULL;
(gdb) 
1938        s->nChildXids = 0;
(gdb) 
1939        s->maxChildXids = 0;
(gdb) 
1940        GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext);
(gdb) 
1942        Assert(s->prevSecContext == 0);
(gdb) 
1947        AtStart_GUC();
(gdb) 
1948        AtStart_Cache();
(gdb) 
1949        AfterTriggerBeginXact();
(gdb) 
1955        s->state = TRANS_INPROGRESS;
(gdb) 
1957        ShowTransactionState("StartTransaction");
(gdb) 
1958    }
(gdb) 

初始化后的事务状态

(gdb) p *s
$10 = {transactionId = 0, subTransactionId = 1, name = 0x0, savepointLevel = 0, state = TRANS_INPROGRESS, 
  blockState = TBLOCK_DEFAULT, nestingLevel = 1, gucNestLevel = 1, curTransactionContext = 0x2523850, 
  curTransactionOwner = 0x24d4868, childXids = 0x0, nChildXids = 0, maxChildXids = 0, prevUser = 10, prevSecContext = 0, 
  prevXactReadOnly = false, startedInRecovery = false, didLogXid = false, parallelModeLevel = 0, parent = 0x0}
(gdb) 

完成调用

(gdb) n
StartTransactionCommand () at xact.c:2719
2719                s->blockState = TBLOCK_STARTED;
(gdb) 
2720                break;
(gdb) 

DONE!

四、参考资料

PG Source Code

上一篇下一篇

猜你喜欢

热点阅读