函数式内功心法-05: 无锁高并发技术之STM海市蜃楼
不知道大家听说过没有,函数式天生擅长高并发。
并发concurrency与并行parallel的区别相信大家都懂。并发是不同的逻辑相互打配合,并行则是相同的逻辑一起跑加快速度。当然不完全准确,思想上差不多是这个意思。
目前函数式语言高并发的技术有很多种,
比如以Erlang/Scala为代表的Actor,
还有Clojure/Go为代表的CSP,
以及目前要介绍的Haskell/Clojure为代表的STM.
其它技术将会在后文一一介绍.
让我们从此刻开始拥抱STM吧!
- STM是什么?
- 深入理解STM
a. STM到底是怎么做到的呢?
b. 进一步控制STM! - STM的haskell源码解析
a. STM世界
b. TVar变量
c. retry与orElse流程控制
d. rts运行时接口清单 - STM的rts运行时分析
a. PrimOps介绍
b. rts运行时基本概念
c. STM数据类型-StgTSO, StgTRecHeader , StgTVar
d. Tvar操作-newTVar#, readTVar#, writeTVar#
e. STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#
一. STM是什么?
函数式值是不可变的,但是如果两个线程需要通讯,则需要特殊的变量来通讯。如果需要单个变量通讯,系统大部分提供了原子类型。如果涉及多个变量通讯,则问题比较复杂了。
在用锁的解决方案中,则需要保证锁的顺序一致性,不然则会出死锁。锁的资源管理也是非常头痛的,因为代码分散在各个地方,一处更改对其它均有影响。
STM则简单很多。我们把需要改变的事务变量读过来,在自己构建的环境里面处理它。处理完了之后进行乾坤大挪移,最终把所有更新过的虚拟事务变量一起写回去。如果处理失败,自然还是原来的值。其它线程也只能看到处理前或处理后的值。所以达到了all-or-nothing的效果。
对于用户接口来说,则只需要新建事务变量,读写事务变量,最终提交整个事务逻辑即可,其它的什么也不用做,实在是快乐得狠。。。
我们简单看一下代码:
newtype Gold = Gold Int
deriving (Eq, Ord, Show, Num)
type Balance = TVar Gold
basicTransfer :: Gold -> Balance -> Balance -> STM ()
basicTransfer qty fromBal toBal = do
fromQty <- readTVar fromBal
toQty <- readTVar toBal
writeTVar fromBal (fromQty - qty)
writeTVar toBal (toQty + qty)
transferTest = do
alice <- newTVar (12 :: Gold)
bob <- newTVar 4
basicTransfer 3 alice bob
liftM2 (,) (readTVar alice) (readTVar bob)
atomically transferTest
a. 我们创建了两个事务变量TVar,
b. 接着调用basicTransfer在STM世界里面构建转账事务逻辑
c. 最后调用atomically运行transferTest事务逻辑。
我看可以看到,只要在STM世界里进行逻辑处理,不管有多少个TVAR都是支持的,我们只需要newTVar后调用readTVar及writeTVar进行变量的修改,其它一切都不需要管。简单方便极了,好用好用好用!
二. 深入理解STM
官方的文档可以参考: https://ghc.haskell.org/trac/ghc/wiki/Commentary/Rts/STM
https://en.wikipedia.org/wiki/Concurrent_Haskell
这里请允许我用自己粗俗的再次转述一下:
1. STM到底是怎么做到的呢?
a. 每个线程创建自己的TRec事务记录
b. 每次对不同的Tvar操作时,会记录在TRec的TRecEntry里面。
c. 当提交STM事务时, 线程检查TRec里面TVar的值是否被其它线程提交改变。如果没有其它线程动过,则由系统底层全部锁住批量改写。如果其它线程动过这些TVar,则再次以这些新的tvar值重新运行事务。
2. 进一步控制STM!
前面的事务控制是自动发生的,当然有些时候我们需要stm的特殊服务。
比如我们想进行转账,可是转账的时候TVar的余额不足。我们能不能监控TVar,如果TVar的值发生改变了,我们再次重试本次事务呢? 或者失败了回滚后去运行另一个事务呢?
这里面就是STM的两大流程控制: retry与orElse
a. 我们来看一下retry的控制代码
transferBlocking :: Int -> TVar Int -> TVar Int -> STM ()
transferBlocking v a b = do
x <- readTVar a
y <- readTVar b
if x < v
then retry
else do
writeTVar a (x - v)
writeTVar b (y + v)
我们可以看到如果x<v的时候,事务不能发生,调用了retry进行了阻塞。阻塞了什么时候唤醒呢?当阻塞的时候,线程会将自己加入到TVar的watchQueue队列里面,当其它线程修改了TVar,则会唤醒线程重新运行事务。
不用写代码,就有这么高级的功能,是不是很震憾!
b. orElse就更加高级了,直接作用在retry上面.
如果前面的retry没有成功,我们不让它阻塞等待唤醒,我们跑另一个事务!连retry也被控制了,可怕,太可怕!
transferChoice :: Int -> TVar Int -> TVar Int -> TVar Int -> STM ()
transferChoice v a a' b = do
transferBlocking v a b `orElse` transferBlocking v a' b
orElse很简单,将前面retry事务作为一个嵌套事务,如果失败则回滚继续其它事务。当然如果是其它线程修改数据造成验证失败,则不算是前面的事务失败,还是要重新再提交事务的喽。
发生了这么多有趣的事情,是不是忍受不了看源码的诱惑了。
好的,遵命!
三. STM的haskell源码解析
STM已经作为haskell的核心部分,所以直接在运行时与base模块里面了。
在这节我们关注haskell接口部分。
前面讲的几个部分,我们再来回顾一下:
a. STM世界
b. retry与orElse流程控制
c. TVar变量
d. atomically事务提交
haskell stm的代码均在GHC.Conc.Sync模块里面:
https://github.com/ghc/ghc/blob/master/libraries/base/GHC/Conc/Sync.hs
1. STM世界
a. 先看STM的类型定义
newtype STM a = STM (State# RealWorld -> (# State# RealWorld, a #))
unSTM :: STM a -> (State# RealWorld -> (# State# RealWorld, a #))
unSTM (STM a) = a
atomically :: STM a -> IO a
atomically (STM m) = IO (\s -> (atomically# m) s )
可以看到STM世界跟IO世界非常像,都是状态传递。
当atomically运行STM事务逻辑时,传送到了IO世界.
b. 接着看stm是如何传递复合的
instance Applicative STM where
{-# INLINE pure #-}
{-# INLINE (*>) #-}
{-# INLINE liftA2 #-}
pure x = returnSTM x
(<*>) = ap
liftA2 = liftM2
m *> k = thenSTM m k
instance Monad STM where
{-# INLINE (>>=) #-}
m >>= k = bindSTM m k
(>>) = (*>)
bindSTM :: STM a -> (a -> STM b) -> STM b
bindSTM (STM m) k = STM ( \s ->
case m s of
(# new_s, a #) -> unSTM (k a) new_s
)
thenSTM :: STM a -> STM b -> STM b
thenSTM (STM m) k = STM ( \s ->
case m s of
(# new_s, _ #) -> unSTM k new_s
)
instance Alternative STM where
empty = retry
(<|>) = orElse
这里的传递复合过程比较简单。
a. >>=为代表的bindSTM, 运行完后接着运行,
>>为其快捷方式,不绑定直接thenSTM,忽略绑定参数运行
b. <|>为代表的如果发生retry则运行另外一个
<|>的逻辑即为后文的orElse on retry逻辑
c. 最后看STM是如何构造的
returnSTM :: a -> STM a
returnSTM x = STM (\s -> (# s, x #))
newTVar :: a -> STM (TVar a)
newTVar val = STM $ \s1# ->
case newTVar# val s1# of
(# s2#, tvar# #) -> (# s2#, TVar tvar# #)
readTVar :: TVar a -> STM a
readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
writeTVar :: TVar a -> a -> STM ()
writeTVar (TVar tvar#) val = STM $ \s1# ->
case writeTVar# tvar# val s1# of
s2# -> (# s2#, () #)
throwSTM :: Exception e => e -> STM a
throwSTM e = STM $ raiseIO# (toException e)
STM构造方法分为三种,一种常量构造,一种是通过TVar构建,还有一种是异常接入。
TVar是最常用的形式,所以我们接着进一步了解
2. TVar变量
data TVar a = TVar (TVar# RealWorld a)
instance Eq (TVar a) where
(TVar tvar1#) == (TVar tvar2#) = isTrue# (sameTVar# tvar1# tvar2#)
newTVar :: a -> STM (TVar a)
newTVar val = STM $ \s1# ->
case newTVar# val s1# of
(# s2#, tvar# #) -> (# s2#, TVar tvar# #)
readTVar :: TVar a -> STM a
readTVar (TVar tvar#) = STM $ \s# -> readTVar# tvar# s#
writeTVar :: TVar a -> a -> STM ()
writeTVar (TVar tvar#) val = STM $ \s1# ->
case writeTVar# tvar# val s1# of
s2# -> (# s2#, () #)
TVar数据类型本质上是TVar#原始类型的封装,TVar#由运行时提供。
TVar#包括新建,读取,写入三种方式调用.
分别用由运行时的newTVar#, readTVar#, writeTVar#提供实现
3. retry与orElse流程控制
throwSTM :: Exception e => e -> STM a
throwSTM e = STM $ raiseIO# (toException e)
catchSTM :: Exception e => STM a -> (e -> STM a) -> STM a
catchSTM (STM m) handler = STM $ catchSTM# m handler'
where
handler' e = case fromException e of
Just e' -> unSTM (handler e')
Nothing -> raiseIO# e
retry :: STM a
retry = STM $ \s# -> retry# s#
orElse :: STM a -> STM a -> STM a
orElse (STM m) e = STM $ \s -> catchRetry# m (unSTM e) s
这里的流程也包含了异常处理.
a. 首先throwSTM 调用运行时接口raiseIO#封装异常到STM世界
接着catchSTM调用运行时catchSTM#处理异常
b. 而retry则由运行时的retry# 方法提供
c. orElse则是对retry进行catch,由运行时的catchRetry#方法提供
4. 运行时接口清单
现在一切都指向运行时了,我们重新回顾一下。
TVar类型-TVar#
Tvar操作-newTVar#, readTVar#, writeTVar#
STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#
到了最激动人心的时刻了,进军STM运行时!
四. STM的运行时分析
1. PrimOps介绍
rts基本的操作在PrimOps里面实现,
由genprimopcode工具动态生成GHC.Prim模块。
a. C语言层
- STM.c提供了cmm接口的c语言函数
https://github.com/ghc/ghc/blob/master/rts/STM.c - Closures.h提供了cmm接口的数据结构:
包括StgTSO, StgTVar, StgTRecHeader
https://github.com/ghc/ghc/blob/master/includes/rts/storage/Closures.h - TSO.h 提供了线程状态的数据结构
https://github.com/ghc/ghc/blob/master/includes/rts/storage/TSO.h
b. cmm层
cmm实现GHC.Prim方法, 对接底层c接口
https://github.com/ghc/ghc/blob/master/rts/PrimOps.cmm
包含如下函数:
stg_newTVarzh -> newTVar#
stg_readTVarzh -> readTVar#
stg_writeTVarzh -> writeTVar#
stg_atomicallyzh -> atomically#
stg_catchSTMzh -> catchSTM#
stg_retryzh -> retry#
stg_catchRetryzh -> catchRetry#
c . 接口定义层
定义rts运行时底层调用方法列表:
https://github.com/ghc/ghc/blob/master/compiler/prelude/primops.txt.pp
d. Haskell层
动态生成GHC.Prim 模块供haskell调用,
接口映射过程包含一些字符转换将#转为zh
usage: genprimopcode command < primops.txt > ...\n"
https://github.com/ghc/ghc/blob/master/utils/genprimopcode/Main.hs
-- Constructors
encode_ch '(' = "ZL" -- Needed for things like (,), and (->)
encode_ch ')' = "ZR" -- For symmetry with (
encode_ch '[' = "ZM"
encode_ch ']' = "ZN"
encode_ch ':' = "ZC"
encode_ch 'Z' = "ZZ"
-- Variables
encode_ch 'z' = "zz"
encode_ch '&' = "za"
encode_ch '|' = "zb"
encode_ch '^' = "zc"
encode_ch '$' = "zd"
encode_ch '=' = "ze"
encode_ch '>' = "zg"
encode_ch '#' = "zh"
encode_ch '.' = "zi"
encode_ch '<' = "zl"
encode_ch '-' = "zm"
encode_ch '!' = "zn"
encode_ch '+' = "zp"
encode_ch '\'' = "zq"
encode_ch '\\' = "zr"
encode_ch '/' = "zs"
encode_ch '*' = "zt"
encode_ch '_' = "zu"
encode_ch '%' = "zv"
encode_ch c = 'z' : shows (ord c) "U"
2. rts运行时基本概念
前面介绍了STM涉及c, cmm, haskell三种接口。现在就来详细说明一下。
a. Haskell源码编译过程
+---------+
LLVM backend /--->| LLVM IR |--\
| +---------+ | LLVM
| v
+------------+ Desugar +------+ STGify +-----+ CodeGen +-----+ | NCG +----------+
| Parse tree |--------->| Core |-------->| STG |--------->| C-- |----+-------->| Assembly |
+------------+ +------+ +-----+ +-----+ | +----------+
| ^
| +---+ | GCC
C backend \---->| C |--/
+---+
a. haskell代码首先desugar之后变成haskell core.
b. 接着经过stg转换生成cmm
c. 最后与不同的c语言接口进行链接(LLVM, GCC, Assembly)
b. RTS的线程状态保存在TSO(Thread State Object)里面
包含了StgStack_堆栈与StgTRecHeader事务记录。
其中StgStack_用于闭包调用,StgTRecHeader用于TVar事务处理。
TSO对象定义在TSO.h文件中:
typedef struct StgTSO_ {
StgHeader header;
struct StgTSO_* _link;
struct StgTSO_* global_link; // Links threads on the
// generation->threads lists
struct StgStack_ *stackobj;
StgWord16 what_next; // Values defined in Constants.h
StgWord16 why_blocked; // Values defined in Constants.h
StgWord32 flags; // Values defined in Constants.h
StgTSOBlockInfo block_info;
StgThreadID id;
StgWord32 saved_errno;
StgWord32 dirty; /* non-zero => dirty */
struct InCall_* bound;
struct Capability_* cap;
struct StgTRecHeader_ * trec; /* STM transaction record */
struct MessageThrowTo_ * blocked_exceptions;
struct StgBlockingQueue_ *bq;
StgInt64 alloc_limit; /* in bytes */
StgWord32 tot_stack_size;
#if defined(TICKY_TICKY)
/* TICKY-specific stuff would go here. */
#endif
#if defined(PROFILING)
StgTSOProfInfo prof;
#endif
#if defined(mingw32_HOST_OS)
StgWord32 saved_winerror;
#endif
} *StgTSOPtr; // StgTSO defined in rts/Types.h
https://github.com/ghc/ghc/blob/master/includes/rts/storage/TSO.h
c. STG寄存器指针
我们可以看到CurrentTSO寄存器指向了当前线程TSO地址
https://github.com/ghc/ghc/blob/master/compiler/cmm/CmmExpr.hs
d. haskell的闭包调用与非函数式不一样。
haskell的闭包调用是通过操纵Stack指针入口进行调用的。而不是采用c语言传统的后入先出的入栈方式。
更多的详情请参照如下文档:
https://www.cs.tufts.edu/~nr/cs257/archive/simon-peyton-jones/eval-apply-jfp.pdf
https://github.com/ghc/ghc/blob/master/compiler/cmm/CmmParse.y
3. STM数据类型-StgTSO, StgTRecHeader , StgTVar
我们首先来看STM相关的数据结构。
前面提到有两种数据结构,一种是事务变量 StgTVar,另一种是线程自己的事务记录StgTRecHeader,记录在StgTSO线程状态对象里面,StgTSO在上一节中有介绍。
STM相关的数据结构定义在如下文件中:
https://github.com/ghc/ghc/blob/master/includes/rts/storage/Closures.h
typedef struct {
StgHeader header;
StgClosure *volatile current_value;
StgTVarWatchQueue *volatile first_watch_queue_entry;
StgInt volatile num_updates;
} StgTVar;
struct StgTRecHeader_ {
StgHeader header;
struct StgTRecHeader_ *enclosing_trec;
StgTRecChunk *current_chunk;
TRecState state;
};
typedef struct StgTRecChunk_ {
StgHeader header;
struct StgTRecChunk_ *prev_chunk;
StgWord next_entry_idx;
TRecEntry entries[TREC_CHUNK_NUM_ENTRIES];
} StgTRecChunk;
typedef struct {
StgTVar *tvar;
StgClosure *expected_value;
StgClosure *new_value;
#if defined(THREADED_RTS)
StgInt num_updates;
#endif
} TRecEntry;
我们可以看:
a. StgTVar包含了一个StgTVarWatchQueue队列,用于阻塞唤醒操作,还有一个current_value用于存储当前值。
b. StgTRecHeader_包含了StgTRecChunk,每个StgTRecChunk包含TREC_CHUNK_NUM_ENTRIES个TRecEntry。StgTRecHeader_可以通过enclosing_trec指向下一个StgTRecHeader_完成容量的扩充。
c. TRecEntry里面包含了StgTVar,以及expected_value与new_value值用于对比及记录修改状态。详情请见后文
有了基础的数据结构,我们接着看具体的操作过程。
4. Tvar操作-newTVar#, readTVar#, writeTVar#
a. 首先看新建事务变量的过程: newTVar#
stg_newTVarzh (P_ init)
{
W_ tv;
ALLOC_PRIM_P (SIZEOF_StgTVar, stg_newTVarzh, init);
tv = Hp - SIZEOF_StgTVar + WDS(1);
SET_HDR (tv, stg_TVAR_DIRTY_info, CCCS);
StgTVar_current_value(tv) = init;
StgTVar_first_watch_queue_entry(tv) = stg_END_STM_WATCH_QUEUE_closure;
StgTVar_num_updates(tv) = 0;
return (tv);
}
stg_newTVarzh接受一个init值。
调用ALLOC_PRIM_P分配空间后,操作堆指针赋值给tv。
接着初始化StgTVar事务变量tv:
- 将current_value设为 init值
- 将first_watch_queue_entry置为stg_END_STM_WATCH_QUEUE_closure
- 将StgTVar_num_updates置为0
b. 接着看变量读取函数readTVar#
stg_readTVarzh (P_ tvar)
{
P_ trec;
P_ result;
// Call to stmReadTVar may allocate
MAYBE_GC_P (stg_readTVarzh, tvar);
trec = StgTSO_trec(CurrentTSO);
("ptr" result) = ccall stmReadTVar(MyCapability() "ptr", trec "ptr",
tvar "ptr");
return (result);
}
StgClosure *stmReadTVar(Capability *cap,
StgTRecHeader *trec,
StgTVar *tvar) {
StgTRecHeader *entry_in = NULL;
StgClosure *result = NULL;
TRecEntry *entry = NULL;
TRACE("%p : stmReadTVar(%p)", trec, tvar);
ASSERT(trec != NO_TREC);
ASSERT(trec -> state == TREC_ACTIVE ||
trec -> state == TREC_CONDEMNED);
entry = get_entry_for(trec, tvar, &entry_in);
if (entry != NULL) {
if (entry_in == trec) {
// Entry found in our trec
result = entry -> new_value;
} else {
// Entry found in another trec
TRecEntry *new_entry = get_new_entry(cap, trec);
new_entry -> tvar = tvar;
new_entry -> expected_value = entry -> expected_value;
new_entry -> new_value = entry -> new_value;
result = new_entry -> new_value;
}
} else {
// No entry found
StgClosure *current_value = read_current_value(trec, tvar);
TRecEntry *new_entry = get_new_entry(cap, trec);
new_entry -> tvar = tvar;
new_entry -> expected_value = current_value;
new_entry -> new_value = current_value;
result = current_value;
}
TRACE("%p : stmReadTVar(%p)=%p", trec, tvar, result);
return result;
}
这里使用ccall调用stmReadTVar底层c函数
c. 最后不得不提到的写入函数writeTVar#
stg_writeTVarzh (P_ tvar, /* :: TVar a */
P_ new_value /* :: a */)
{
W_ trec;
// Call to stmWriteTVar may allocate
MAYBE_GC_PP (stg_writeTVarzh, tvar, new_value);
trec = StgTSO_trec(CurrentTSO);
ccall stmWriteTVar(MyCapability() "ptr", trec "ptr", tvar "ptr",
new_value "ptr");
return ();
}
void stmWriteTVar(Capability *cap,
StgTRecHeader *trec,
StgTVar *tvar,
StgClosure *new_value) {
StgTRecHeader *entry_in = NULL;
TRecEntry *entry = NULL;
TRACE("%p : stmWriteTVar(%p, %p)", trec, tvar, new_value);
ASSERT(trec != NO_TREC);
ASSERT(trec -> state == TREC_ACTIVE ||
trec -> state == TREC_CONDEMNED);
entry = get_entry_for(trec, tvar, &entry_in);
if (entry != NULL) {
if (entry_in == trec) {
// Entry found in our trec
entry -> new_value = new_value;
} else {
// Entry found in another trec
TRecEntry *new_entry = get_new_entry(cap, trec);
new_entry -> tvar = tvar;
new_entry -> expected_value = entry -> expected_value;
new_entry -> new_value = new_value;
}
} else {
// No entry found
StgClosure *current_value = read_current_value(trec, tvar);
TRecEntry *new_entry = get_new_entry(cap, trec);
new_entry -> tvar = tvar;
new_entry -> expected_value = current_value;
new_entry -> new_value = new_value;
}
TRACE("%p : stmWriteTVar done", trec);
}
5. STM操作-atomically#, raiseIO#, catchSTM#, retry#, catchRetry#
a. 提交事务函数之atomically#
stg_atomicallyzh (P_ stm)
{
P_ old_trec;
P_ new_trec;
P_ code, frame_result;
// stmStartTransaction may allocate
MAYBE_GC_P(stg_atomicallyzh, stm);
STK_CHK_GEN();
old_trec = StgTSO_trec(CurrentTSO);
/* Nested transactions are not allowed; raise an exception */
if (old_trec != NO_TREC) {
jump stg_raisezh(base_ControlziExceptionziBase_nestedAtomically_closure);
}
code = stm;
frame_result = NO_TREC;
/* Start the memory transcation */
("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", old_trec "ptr");
StgTSO_trec(CurrentTSO) = new_trec;
jump stg_ap_v_fast
(ATOMICALLY_FRAME_FIELDS(,,stg_atomically_frame_info, CCCS, 0,
code,frame_result))
(stm);
}
INFO_TABLE_RET(stg_atomically_frame, ATOMICALLY_FRAME,
// layout of the frame, and bind the field names
ATOMICALLY_FRAME_FIELDS(W_,P_,
info_ptr, p1, p2,
code,
frame_result))
return (P_ result) // value returned to the frame
{
W_ valid;
gcptr trec, outer, q;
trec = StgTSO_trec(CurrentTSO);
outer = StgTRecHeader_enclosing_trec(trec);
/* Back at the atomically frame */
frame_result = result;
/* try to commit */
(valid) = ccall stmCommitTransaction(MyCapability() "ptr", trec "ptr");
if (valid != 0) {
/* Transaction was valid: commit succeeded */
StgTSO_trec(CurrentTSO) = NO_TREC;
return (frame_result);
} else {
/* Transaction was not valid: try again */
("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr",
NO_TREC "ptr");
StgTSO_trec(CurrentTSO) = trec;
jump stg_ap_v_fast
// push the StgAtomicallyFrame again: the code generator is
// clever enough to only assign the fields that have changed.
(ATOMICALLY_FRAME_FIELDS(,,info_ptr,p1,p2,
code,frame_result))
(code);
}
}
StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
StgInt64 max_commits_at_start = max_commits;
TRACE("%p : stmCommitTransaction()", trec);
ASSERT(trec != NO_TREC);
lock_stm(trec);
ASSERT(trec -> enclosing_trec == NO_TREC);
ASSERT((trec -> state == TREC_ACTIVE) ||
(trec -> state == TREC_CONDEMNED));
// Use a read-phase (i.e. don't lock TVars we've read but not updated) if
// the configuration lets us use a read phase.
bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
if (result) {
// We now know that all the updated locations hold their expected values.
ASSERT(trec -> state == TREC_ACTIVE);
if (config_use_read_phase) {
StgInt64 max_commits_at_end;
StgInt64 max_concurrent_commits;
TRACE("%p : doing read check", trec);
result = check_read_only(trec);
TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
max_commits_at_end = max_commits;
max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
(n_capabilities * TOKEN_BATCH_SIZE));
if (((max_concurrent_commits >> 32) > 0) || shake()) {
result = false;
}
}
if (result) {
// We now know that all of the read-only locations held their expected values
// at the end of the call to validate_and_acquire_ownership. This forms the
// linearization point of the commit.
// Make the updates required by the transaction.
FOR_EACH_ENTRY(trec, e, {
StgTVar *s;
s = e -> tvar;
if ((!config_use_read_phase) || (e -> new_value != e -> expected_value)) {
// Either the entry is an update or we're not using a read phase:
// write the value back to the TVar, unlocking it if necessary.
ACQ_ASSERT(tvar_is_locked(s, trec));
TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
unpark_waiters_on(cap,s);
IF_STM_FG_LOCKS({
s -> num_updates ++;
});
unlock_tvar(cap, trec, s, e -> new_value, true);
}
ACQ_ASSERT(!tvar_is_locked(s, trec));
});
} else {
revert_ownership(cap, trec, false);
}
}
unlock_stm(trec);
free_stg_trec_header(cap, trec);
TRACE("%p : stmCommitTransaction()=%d", trec, result);
return result;
}
static StgBool validate_and_acquire_ownership (Capability *cap,
StgTRecHeader *trec,
int acquire_all,
int retain_ownership) {
StgBool result;
if (shake()) {
TRACE("%p : shake, pretending trec is invalid when it may not be", trec);
return false;
}
ASSERT((trec -> state == TREC_ACTIVE) ||
(trec -> state == TREC_WAITING) ||
(trec -> state == TREC_CONDEMNED));
result = !((trec -> state) == TREC_CONDEMNED);
if (result) {
FOR_EACH_ENTRY(trec, e, {
StgTVar *s;
s = e -> tvar;
if (acquire_all || entry_is_update(e)) {
TRACE("%p : trying to acquire %p", trec, s);
if (!cond_lock_tvar(trec, s, e -> expected_value)) {
TRACE("%p : failed to acquire %p", trec, s);
result = false;
BREAK_FOR_EACH;
}
} else {
ASSERT(config_use_read_phase);
IF_STM_FG_LOCKS({
TRACE("%p : will need to check %p", trec, s);
if (s -> current_value != e -> expected_value) {
TRACE("%p : doesn't match", trec);
result = false;
BREAK_FOR_EACH;
}
e -> num_updates = s -> num_updates;
if (s -> current_value != e -> expected_value) {
TRACE("%p : doesn't match (race)", trec);
result = false;
BREAK_FOR_EACH;
} else {
TRACE("%p : need to check version %ld", trec, e -> num_updates);
}
});
}
});
}
if ((!result) || (!retain_ownership)) {
revert_ownership(cap, trec, acquire_all);
}
return result;
}
static StgBool cond_lock_tvar(StgTRecHeader *trec,
StgTVar *s,
StgClosure *expected) {
StgClosure *result;
StgWord w;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
w = cas((void *)&(s -> current_value), (StgWord)expected, (StgWord)trec);
result = (StgClosure *)w;
TRACE("%p : %s", trec, result ? "success" : "failure");
return (result == expected);
}
EXTERN_INLINE StgWord
cas(StgVolatilePtr p, StgWord o, StgWord n)
{
return __sync_val_compare_and_swap(p, o, n);
}
void
tryWakeupThread (Capability *cap, StgTSO *tso)
{
traceEventThreadWakeup (cap, tso, tso->cap->no);
#if defined(THREADED_RTS)
if (tso->cap != cap)
{
MessageWakeup *msg;
msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
msg->tso = tso;
sendMessage(cap, tso->cap, (Message*)msg);
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
(W_)tso->id, tso->cap->no);
return;
}
#endif
switch (tso->why_blocked)
{
case BlockedOnMVar:
case BlockedOnMVarRead:
{
if (tso->_link == END_TSO_QUEUE) {
tso->block_info.closure = (StgClosure*)END_TSO_QUEUE;
goto unblock;
} else {
return;
}
}
case BlockedOnMsgThrowTo:
{
const StgInfoTable *i;
i = lockClosure(tso->block_info.closure);
unlockClosure(tso->block_info.closure, i);
if (i != &stg_MSG_NULL_info) {
debugTraceCap(DEBUG_sched, cap, "thread %ld still blocked on throwto (%p)",
(W_)tso->id, tso->block_info.throwto->header.info);
return;
}
// remove the block frame from the stack
ASSERT(tso->stackobj->sp[0] == (StgWord)&stg_block_throwto_info);
tso->stackobj->sp += 3;
goto unblock;
}
case BlockedOnSTM:
tso->block_info.closure = &stg_STM_AWOKEN_closure;
goto unblock;
case BlockedOnBlackHole:
case ThreadMigrating:
goto unblock;
default:
// otherwise, do nothing
return;
}
unblock:
// just run the thread now, if the BH is not really available,
// we'll block again.
tso->why_blocked = NotBlocked;
appendToRunQueue(cap,tso);
// We used to set the context switch flag here, which would
// trigger a context switch a short time in the future (at the end
// of the current nursery block). The idea is that we have just
// woken up a thread, so we may need to load-balance and migrate
// threads to other CPUs. On the other hand, setting the context
// switch flag here unfairly penalises the current thread by
// yielding its time slice too early.
//
// The synthetic benchmark nofib/smp/chan can be used to show the
// difference quite clearly.
// cap->context_switch = 1;
}
b. 捕获异常函数之 catchSTM#
stg_catchSTMzh (P_ code /* :: STM a */,
P_ handler /* :: Exception -> STM a */)
{
STK_CHK_GEN();
/* Start a nested transaction to run the body of the try block in */
W_ cur_trec;
W_ new_trec;
cur_trec = StgTSO_trec(CurrentTSO);
("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
cur_trec "ptr");
StgTSO_trec(CurrentTSO) = new_trec;
jump stg_ap_v_fast
(CATCH_STM_FRAME_FIELDS(,,stg_catch_stm_frame_info, CCCS, 0,
code, handler))
(code);
}
INFO_TABLE_RET(stg_catch_stm_frame, CATCH_STM_FRAME,
// layout of the frame, and bind the field names
CATCH_STM_FRAME_FIELDS(W_,P_,info_ptr,p1,p2,code,handler))
return (P_ ret)
{
W_ r, trec, outer;
trec = StgTSO_trec(CurrentTSO);
outer = StgTRecHeader_enclosing_trec(trec);
(r) = ccall stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
if (r != 0) {
/* Commit succeeded */
StgTSO_trec(CurrentTSO) = outer;
return (ret);
} else {
/* Commit failed */
W_ new_trec;
("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
StgTSO_trec(CurrentTSO) = new_trec;
jump stg_ap_v_fast
(CATCH_STM_FRAME_FIELDS(,,info_ptr,p1,p2,code,handler))
(code);
}
}
StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
StgTRecHeader *et;
ASSERT(trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
TRACE("%p : stmCommitNestedTransaction() into %p", trec, trec -> enclosing_trec);
ASSERT((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));
lock_stm(trec);
et = trec -> enclosing_trec;
bool result = validate_and_acquire_ownership(cap, trec, (!config_use_read_phase), true);
if (result) {
// We now know that all the updated locations hold their expected values.
if (config_use_read_phase) {
TRACE("%p : doing read check", trec);
result = check_read_only(trec);
}
if (result) {
// We now know that all of the read-only locations held their expected values
// at the end of the call to validate_and_acquire_ownership. This forms the
// linearization point of the commit.
TRACE("%p : read-check succeeded", trec);
FOR_EACH_ENTRY(trec, e, {
// Merge each entry into the enclosing transaction record, release all
// locks.
StgTVar *s;
s = e -> tvar;
if (entry_is_update(e)) {
unlock_tvar(cap, trec, s, e -> expected_value, false);
}
merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
ACQ_ASSERT(s -> current_value != (StgClosure *)trec);
});
} else {
revert_ownership(cap, trec, false);
}
}
unlock_stm(trec);
free_stg_trec_header(cap, trec);
TRACE("%p : stmCommitNestedTransaction()=%d", trec, result);
return result;
}
c. 休眠函数之retry#
stg_retryzh /* no arg list: explicit stack layout */
{
W_ frame_type;
W_ frame;
W_ trec;
W_ outer;
W_ r;
// STM operations may allocate
MAYBE_GC_ (stg_retryzh); // NB. not MAYBE_GC(), we cannot make a
// function call in an explicit-stack proc
// Find the enclosing ATOMICALLY_FRAME or CATCH_RETRY_FRAME
retry_pop_stack:
SAVE_THREAD_STATE();
(frame_type) = ccall findRetryFrameHelper(MyCapability(), CurrentTSO "ptr");
LOAD_THREAD_STATE();
frame = Sp;
trec = StgTSO_trec(CurrentTSO);
outer = StgTRecHeader_enclosing_trec(trec);
if (frame_type == CATCH_RETRY_FRAME) {
// The retry reaches a CATCH_RETRY_FRAME before the atomic frame
ASSERT(outer != NO_TREC);
// Abort the transaction attempting the current branch
ccall stmAbortTransaction(MyCapability() "ptr", trec "ptr");
ccall stmFreeAbortedTRec(MyCapability() "ptr", trec "ptr");
if (!StgCatchRetryFrame_running_alt_code(frame) != 0) {
// Retry in the first branch: try the alternative
("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
StgTSO_trec(CurrentTSO) = trec;
StgCatchRetryFrame_running_alt_code(frame) = 1 :: CInt; // true;
R1 = StgCatchRetryFrame_alt_code(frame);
jump stg_ap_v_fast [R1];
} else {
// Retry in the alternative code: propagate the retry
StgTSO_trec(CurrentTSO) = outer;
Sp = Sp + SIZEOF_StgCatchRetryFrame;
goto retry_pop_stack;
}
}
// We've reached the ATOMICALLY_FRAME: attempt to wait
ASSERT(frame_type == ATOMICALLY_FRAME);
ASSERT(outer == NO_TREC);
(r) = ccall stmWait(MyCapability() "ptr", CurrentTSO "ptr", trec "ptr");
if (r != 0) {
// Transaction was valid: stmWait put us on the TVars' queues, we now block
StgHeader_info(frame) = stg_atomically_waiting_frame_info;
Sp = frame;
R3 = trec; // passing to stmWaitUnblock()
jump stg_block_stmwait [R3];
} else {
// Transaction was not valid: retry immediately
("ptr" trec) = ccall stmStartTransaction(MyCapability() "ptr", outer "ptr");
StgTSO_trec(CurrentTSO) = trec;
Sp = frame;
R1 = StgAtomicallyFrame_code(frame);
jump stg_ap_v_fast [R1];
}
}
StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
TRACE("%p : stmWait(%p)", trec, tso);
ASSERT(trec != NO_TREC);
ASSERT(trec -> enclosing_trec == NO_TREC);
ASSERT((trec -> state == TREC_ACTIVE) ||
(trec -> state == TREC_CONDEMNED));
lock_stm(trec);
bool result = validate_and_acquire_ownership(cap, trec, true, true);
if (result) {
// The transaction is valid so far so we can actually start waiting.
// (Otherwise the transaction was not valid and the thread will have to
// retry it).
// Put ourselves to sleep. We retain locks on all the TVars involved
// until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
// in the TSO, (c) TREC_WAITING in the Trec.
build_watch_queue_entries_for_trec(cap, tso, trec);
park_tso(tso);
trec -> state = TREC_WAITING;
// We haven't released ownership of the transaction yet. The TSO
// has been put on the wait queue for the TVars it is waiting for,
// but we haven't yet tidied up the TSO's stack and made it safe
// to wake up the TSO. Therefore, we must wait until the TSO is
// safe to wake up before we release ownership - when all is well,
// the runtime will call stmWaitUnlock() below, with the same
// TRec.
} else {
unlock_stm(trec);
free_stg_trec_header(cap, trec);
}
TRACE("%p : stmWait(%p)=%d", trec, tso, result);
return result;
}
分支跳转函数之catchRetry#
stg_catchRetryzh (P_ first_code, /* :: STM a */
P_ alt_code /* :: STM a */)
{
W_ new_trec;
// stmStartTransaction may allocate
MAYBE_GC_PP (stg_catchRetryzh, first_code, alt_code);
STK_CHK_GEN();
/* Start a nested transaction within which to run the first code */
("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
StgTSO_trec(CurrentTSO) "ptr");
StgTSO_trec(CurrentTSO) = new_trec;
// push the CATCH_RETRY stack frame, and apply first_code to realWorld#
jump stg_ap_v_fast
(CATCH_RETRY_FRAME_FIELDS(,, stg_catch_retry_frame_info, CCCS, 0,
0, /* not running_alt_code */
first_code,
alt_code))
(first_code);
}
INFO_TABLE_RET(stg_catch_retry_frame, CATCH_RETRY_FRAME,
CATCH_RETRY_FRAME_FIELDS(W_,P_,
info_ptr, p1, p2,
running_alt_code,
first_code,
alt_code))
return (P_ ret)
{
unwind Sp = Sp + SIZEOF_StgCatchRetryFrame;
W_ r;
gcptr trec, outer, arg;
trec = StgTSO_trec(CurrentTSO);
outer = StgTRecHeader_enclosing_trec(trec);
(r) = ccall stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr");
if (r != 0) {
// Succeeded (either first branch or second branch)
StgTSO_trec(CurrentTSO) = outer;
return (ret);
} else {
// Did not commit: re-execute
P_ new_trec;
("ptr" new_trec) = ccall stmStartTransaction(MyCapability() "ptr",
outer "ptr");
StgTSO_trec(CurrentTSO) = new_trec;
if (running_alt_code != 0) {
jump stg_ap_v_fast
(CATCH_RETRY_FRAME_FIELDS(,,info_ptr, p1, p2,
running_alt_code,
first_code,
alt_code))
(alt_code);
} else {
jump stg_ap_v_fast
(CATCH_RETRY_FRAME_FIELDS(,,info_ptr, p1, p2,
running_alt_code,
first_code,
alt_code))
(first_code);
}
}
}
由于时间关系,代码不作细一步讲述,后面将会补充进来。