PostgreSQL

PostgreSQL 并发控制机制(3):基于时间戳的并发控制

2020-09-25  本文已影响0人  EthanHe

并发控制是多个事务在并发运行时,数据库保证事务一致性(Consistency)和隔离性(Isolation)的一种机制。主流商用关系数据库使用的并发控制技术主要有三种:严格两阶段封锁(S2PL)、多版本并发控制(MVCC)和乐观并发控制(OCC)。
本文介绍了基于时间戳的并发控制,包括常规基于时间戳的并发控制、多版本基于时间戳锁的并发控制、与时间戳相关的Google Percolator工程实践和openGauss对PostgreSQL SI的优化改进等。

一、基于时间戳的并发控制

基于时间戳的并发控制,有常规的“单”版本并发控制,也有多版本基于时间戳并发控制。

1、单版本基于时间戳的并发控制

事务T在启动时分配一个唯一的时间戳TS(T),并发控制保证存在一个等价的串行调度,该调度中的事务是按事务的启动时间先后顺序,亦即事务按它们的启动顺序串行化而不是按提交的顺序串行化。
为了实现这一点,基于时间戳顺序并发控制使用以下数据结构存储数据项x的以下信息:
1.rt(x):任何读x的事务的最大时间戳
2.wt(x):任何写x的事务的最大时间戳
3.f(x):标志,用于表明最后写x的事务是否已提交

使用上述数据结构进行读写并发控制的算法如下:

当事务T1请求读x时
1.第一种情况:TS(T1)<\wt(x)
读事务的启动时间小于最晚的写时间,意味着其他事务在T1后写入了新值,T1希望读到的原值已不存在,T1由于太“老”不能读,T1必须回滚

2.第二种情况:TS(T1)>wt(x)
读事务的启动时间大于最晚写入时间,这时候又分为分两种情况:
A.f(x)标记x已提交,则请求被批准,如TS(T1)>rt(x),则更新rt(x)为TS(T1)
B.f(x)标记x未提交,则T1必须等待(否则会导致脏读)

当事务T1请求写x时
1.第一种情况:TS(T1)<\rt(x)
写事务的时间比最晚的读时间要早,意味着过晚的写,T1太老不能写,T1必须回滚

2.第二种情况:rt(x)<TS(T1)<\wt(x)
写事务的启动时间大于最晚读时间,小于最晚写时间
如f(x)标记x已提交,则根据Thomas写规则,请求被批准,但没有任何的实际动作(因为T1写的x值不会被任何其他事务读取)
如f(x)标记x未提交,T1必须等待(否则会导致脏写)

3.第三种情况:wt(x)<\rt(x)<TS(T1)
写事务的启动时间比最晚读时间要大,且最晚读时间比最晚写时间要晚
如f(x)标记x未提交,那么T1必须等待(否则会导致脏写)
如f(x)标记x已提交,那么请求被批准,TS(T1)值赋予wt(x),f(x)值设置为未提交

从上述算法也可以看出该策略的一些缺点:
1.额外的空间开销:每个数据项都需要记录rt(x),wt(x),f(x)
2.额外的“事务控制”:rt(x),wt(x),f(x)这些信息存储在数据库中,必须像数据项一样进行更新,如果事务中止,它们必须被回退
3.额外的处理逻辑:对x的读意味着对rt(x)的写,逻辑复杂

2、多版本基于时间戳的并发控制

相对于单版本,多版本时间戳并发控制有以下调整:
1.当事务T的w(X)发生时,如果合法,则创建一个X的新版本,其写时间为事务T的启动时间,这个X的新版本可以记作Xt
2.当事务T的r(X)发生时,事务调度器找到X的版本Xt,需满足的调节是t≤TS(T),并且在t之后不存在另外一个X的版本
3.写时间与数据项的版本相关且永不改变
4.读时间与数据项的版本相关,用于拒绝小于该读时间的写操作,因为如果允许改写,那么读操作应读取该写操作的值,但这是不可能发生的

二、Google Percolator

Google Percolator的背景源于搜索索引系统存储数十PB的数据,每天有数十亿的更新,而MapReduce适合处理大批量的数据,而处理少量更新则显得效率较低,因此Percolator应运而生:为处理增量更新而设计,处理同样数据量的文档时可以将平均时延降低50%。
Percolator使用多版本基于时间戳并结合封锁技术的并发控制,可以实现:
1.保证读事务可以读到最近最新已提交的数据版本;
2.读不会阻塞写(但写会阻塞读);
3.写-写冲突时,使用FCW协议。

Percolator网上介绍的文章已有很多,其关键的数据结构包括:
1.事务启动时间start_ts
2.事务提交时间commit_ts
3.锁,KV结构,key=Row+Column(lock)+start_ts,value=primary row&column
4.实际列值data,KV结构,包括数据指针和实际数据,KV结构
1)指针:key=Row+Column(write)+commit_ts,value=start_ts
2)数据:Row+Column(data)+start_ts,value=列值

在此对论文提供的伪代码进行更详细的注释解读,其他不再赘述。

class Transaction { //class Transaction
  //结构体Write
  struct Write {
    Row row;    //行
    Column col;   //列
    string value; //列值
  };//Write Struct 
  vector<Write> writes_;//数据缓存Write
  int start_ts_;//事务开始时间

  Transaction() : start_ts_(oracle.GetTimestamp()) {} //构造函数,初始化变量start_ts_
  
  /*
  输入:Write结构体
  输出:无
  实现:简单的把Write对象(列值)push到Vector中
  */
  void Set(Write w) {//Set函数
    writes_.push_back(w); 
  }
  
  /*
  输入:Row-行标识,Column-列标识
  输出:value-列值,成功返回true,失败(如没有获取值)返回false
  */
  bool Get(Row row, Column c, string* value) {
    while (true) {
    //Bigtable提供的行级事务
      bigtable::Txn T = bigtable::StartRowTransaction(row);
      // Check for locks that signal concurrent writes.
    // 检查是否存在并发事务在写数据
    // 注:SI的特点是写不阻塞读,读不阻塞写,但在这里却需要等待?
    //     原因是SI保证读到的是事务开始(start_ts)之前已提交的数据,
    //     存在锁意味着写操作未完成且该操作的commit_ts可能在事务开始之前,
    //   但需要在写入之后才能知道是否在start_ts之前,因此需要等待
      if (T.Read(row, c+"lock", [0, start_ts_])) { //判断[0, start_ts_]内是否存在lock?
        // There is a pending lock; try to clean it and wait
    // 仍存在lock,等待
        BackoffAndMaybeCleanupLock(row, c);
        continue;
      }
      // Find the latest write below our start timestamp.
    //读取最近已提交的数据版本
      latest write = T.Read(row, c+"write", [0, start_ts_]);
      if (!latest_write.found())
    //没有数据,返回false
        return false; // no data
    //从Column+write中获取start_ts
      int data_ts = latest_write.start_timestamp();
    //获取真正的数据:Row+Column(column+"data")+start_ts
      *value = T.Read(row, c+"data", [data_ts, data_ts]);
      return true;
    }
  }
  
  // Prewrite tries to lock cell w, returning false in case of conflict.
  // 预写入(理论基础:通过意向表缓存数据,执行延迟更新)
  /*  
  输入:Write结构体,Write主节点
  输出:成功返回true,失败返回false
  */
  bool Prewrite(Write w, Write primary) {
  //获取列
    Column c = w.col;
  //启动Bigtable行事务
    bigtable::Txn T = bigtable::StartRowTransaction(w.row);
    // Abort on writes after our start timestamp ...
  // 存在比事务启动时间start_ts更大的值,存在ww冲突,按照FUW原则,本事务回滚
    if (T.Read(w.row, c+"write", [start_ts_, ∞]))
      return false;
    // ... or locks at any timestamp.
  // 存在锁,说明未完成的写,即存在ww冲突,且其他事务比本事务更"早"获得锁,本事务回滚
    if (T.Read(w.row, c+"lock", [0, ∞]))
      return false;

  //校验完毕,可以写数据
  //写入数据:key=Row+Column(data)+start_ts,value=需写入的值
    T.Write(w.row, c+"data", start_ts_, w.value);
  //上锁,key=Row+Column(lock)+start_ts,value=主节点的行&列
    T.Write(w.row, c+"lock", start_ts_,
      {primary.row, primary.col}); // The primary’s location.
  //执行提交操作
    return T.Commit();
  }

  //提交操作
  /*
  输入:无
  输出:成功返回true,失败返回false
  */
  bool Commit() {
    // The primary’s location.
  // 数组writes_的第一个元素为主节点
    Write primary = writes_[0];
  // 除第一个元素外,其他为从节点
    vector<Write> secondaries(writes_.begin()+1, writes_.end());
  //主节点预写入失败
    if (!Prewrite(primary, primary))
      return false;
    //遍历从节点,执行预写入,一个节点不成功则全部失败
    for (Write w : secondaries)
      if (!Prewrite(w, primary))
        return false;

    //获取事务提交时间戳
    int commit_ts = oracle_.GetTimestamp();
    // Commit primary first.
  // 主节点首先提交
    Write p = primary;
    //启动Bigtable事务
  bigtable::Txn T = bigtable::StartRowTransaction(p.row);
  //谨慎起见,判断是否存在锁(本事务,start_ts唯一),避免重复写入
    if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
      return false; // aborted while working
    //写入:key=Row+Column(write)+commit_ts,value=start_ts,实际的值在key=Row+Column(data)+start_ts中
    T.Write(p.row, p.col+"write", commit_ts, start_ts_); // Pointer to data written at start_ts_.
  //删除锁
    T.Erase(p.row, p.col+"lock", commit_ts);
  //Bigtable事务提交
    if (!T.Commit())
      return false; // commit point
    // Second phase: write out write records for secondary cells.
  //遍历从节点,写key=Row+Column(write)+commit_ts,value=start_ts,同时删除锁
    for (Write w : secondaries) {
      bigtable::Write(w.row, w.col+"write", commit_ts, start_ts_);
      bigtable::Erase(w.row, w.col+"lock", commit_ts);
    }
    return true;
  }
} // class Transaction

三、openGauss对PostgreSQL SI的增强

在PostgreSQL中,可通过函数txid_current_snapshot()可获取当前的快照信息:

11:05:16 (xdb@[local]:5432)testdb=# select txid_current_snapshot();
 txid_current_snapshot 
-----------------------
 2404:2404:
(1 row)

11:24:11 (xdb@[local]:5432)testdb=# 

输出格式为ST1 : ST2 : xip_list
其中:
ST1:最早仍活跃的事务ID,早于此XID的事务要么被提交并可见,要么回滚丢弃。
ST2:最后已完结事务(COMMITTED/ABORTED)的事务ID + 1。
xip_list:在"拍摄"快照时仍进行中的事务ID。该列表包含xmin和xmax之间的活动事务ID。
总结一下,简单来说,对于给定的事务XID:
XID ∈ [1,ST1),过去的事务,对此快照均可见;
XID ∈ [ST1,ST2),不考虑子事务的情况,仍处于IN_PROGRESS状态的,不可见;COMMITED状态,可见;ABORTED状态,不可见;
XID ∈ [ST2,∞),未来的事务,对此快照均不可见;

可以看到,对于XID ∈ [ST1,ST2)的事务ID,其判断逻辑的复杂度与活跃事务数n成正比,算法复杂度为O(n)。
但从快照隔离的机制以及基于时间戳的并发控制的理论可以看到,只要获取读时间(不同的隔离级别,读时间不同)之前已提交事务的数据项版本即可,而这个算法复杂度可以降低为O(1)。
openGauss引入了数据结构CSN(CommitSeqNo),其优化原理如下图所示:

openGauss CSN示意图

已提交的XID映射为CSN(可以理解为逻辑意义上的时间戳),每次读的时候,只需要把当前读的逻辑时间戳与事务提交的逻辑时间戳进行简单的对比即可得到可读的元组版本。

下面是相关的代码片段:

typedef struct SnapshotData {
    SnapshotSatisfiesFunc satisfies; /* tuple test function */
    ...
    CommitSeqNo snapshotcsn;
    ...
}

//获取快照
#ifndef ENABLE_MULTIPLE_NODES
Snapshot GetSnapshotData(Snapshot snapshot, bool force_local_snapshot, bool forHSFeedBack)
#else
Snapshot GetSnapshotData(Snapshot snapshot, bool force_local_snapshot)
#endif
{    
    ...
    //从线程变量中获取CSN
    snapshot->snapshotcsn = t_thrd.xact_cxt.ShmemVariableCache->nextCommitSeqNo;
    ...
}

openGauss提供了CSN相关的函数获取:

testdb=# \df *csn*
                                                                   List of functions
   Schema   |            Name            | Result data type |                  Argument data types                   |  Type  | fencedmode | propackage
------------+----------------------------+------------------+--------------------------------------------------------+--------+------------+------------
 pg_catalog | gs_get_next_xid_csn        | SETOF record     | OUT node_name text, OUT next_xid xid, OUT next_csn xid | normal | f          | f
 pg_catalog | pg_export_snapshot_and_csn | record           | OUT snapshot text, OUT "CSN" text                      | normal | f          | f
 pg_catalog | pgxc_get_csn               | SETOF bigint     | xid                                                    | normal | f          | f
(3 rows)

-- 获取下一个XID和CSN
testdb=#  select gs_get_next_xid_csn();
 gs_get_next_xid_csn
---------------------
 (gaussdb,9540,1919)
(1 row)

-- 根据XID获取相应的CSN
testdb=# select pgxc_get_csn('9537'::xid);
 pgxc_get_csn
--------------
         1918
(1 row)

四、参考资料

[1] Hector Garcia-Molina,Jeffrey D.Ullman,Jennifer Widom,《数据库系统实现(第2版)》
[2] Daniel Peng,Frank Dabek,Large-scale Incremental Processing Using Distributed Transactions and Notifications
[3] 李国良,周敏奇,《openGauss数据库核心技术》

上一篇下一篇

猜你喜欢

热点阅读