spingboot

并发工具类-Exchanger

2019-07-16  本文已影响11人  王侦

1.使用示例

public class UseExchanger {
    private static final Exchanger<Set<String>> exchange
            = new Exchanger<Set<String>>();

    public static void main(String[] args) {

        //第一个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setA = new HashSet<String>();//存放数据的容器
                try {
                    SleepTools.second(1);
                    setA.add("Wang");
                    System.out.println("SetA:");
                    for (String s : setA) {
                        System.out.print(s + " ");
                    }
                    System.out.println();
                    System.out.println();

                    setA = exchange.exchange(setA);//交换set
                    SleepTools.second(1);
                    /*处理交换后的数据*/
                    System.out.println("SetA:");
                    for (String s : setA) {
                        System.out.print(s+ " ");
                    }
                    System.out.println();

                } catch (InterruptedException e) {
                }
            }
        }).start();

        //第二个线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                Set<String> setB = new HashSet<String>();//存放数据的容器
                try {
                    setB.add("Li");
                    setB.add("Zhang");
                    System.out.println("SetB:");
                    for (String s : setB) {
                        System.out.print(s+ " ");
                    }
                    System.out.println();


                    setB = exchange.exchange(setB);//交换set
                    /*处理交换后的数据*/
                    System.out.println("SetB:");
                    for (String s : setB) {
                        System.out.print(s+ " ");
                    }
                    System.out.println();

                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
}

结果:

SetB:
Zhang Li 
SetA:
Wang 

SetB:
Wang 
SetA:
Zhang Li 

2.官方文档

A synchronization point at which threads can pair and swap 
elements within pairs. Each thread presents some object on entry to 
the exchange method, matches with a partner thread, and receives 
its partner's object on return. An Exchanger may be viewed as a 
bidirectional form of a SynchronousQueue. Exchangers may be 
useful in applications such as genetic algorithms and pipeline 
designs.

Sample Usage: Here are the highlights of a class that uses an 
Exchanger to swap buffers between threads so that the thread filling 
the buffer gets a freshly emptied one when it needs it, handing off the 
filled one to the thread emptying the buffer.

线程可以交换元素的同步点。每个线程在exchange方法的入口准备好对象,与伙伴线程进行匹配,接收伙伴线程的对象并返回。Exchanger可以视为SynchronousQueue的双向形式。Exchangers在遗传算法和管道设计等应用中很有用。

示例用法:以下是使用Exchanger在线程间交互缓冲区的关键代码,以便填充缓冲区的线程在需要时获得一个刚刚清空的缓冲区,将填充的缓冲区交换到清空缓冲区的线程。

 class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   DataBuffer initialEmptyBuffer = ... a made-up type
   DataBuffer initialFullBuffer = ...

   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           addToBuffer(currentBuffer);
           if (currentBuffer.isFull())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           takeFromBuffer(currentBuffer);
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
 }
Overview: The core algorithm is, for an exchange "slot",
and a participant (caller) with an item:

核心算法如下:

for (;;) {
  if (slot is empty) {                       // offer
    place item in a Node;
    if (can CAS slot from empty to node) {
      wait for release;
      return matching item in node;
    }
  }
  else if (can CAS slot from node to empty) { // release
    get the item in node;
    set matching item in node;
    release waiting thread;
  }
  // else retry on CAS failure
}
This is among the simplest forms of a "dual data structure" --
see Scott and Scherer's DISC 04 paper and
http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html

This works great in principle. But in practice, like many
algorithms centered on atomic updates to a single location, it
scales horribly when there are more than a few participants
using the same Exchanger. So the implementation instead uses a
form of elimination arena, that spreads out this contention by
arranging that some threads typically use different slots,
while still ensuring that eventually, any two parties will be
able to exchange items. That is, we cannot completely partition
across threads, but instead give threads arena indices that
will on average grow under contention and shrink under lack of
contention. We approach this by defining the Nodes that we need
anyway as ThreadLocals, and include in them per-thread index
and related bookkeeping state. (We can safely reuse per-thread
nodes rather than creating them fresh each time because slots
alternate between pointing to a node vs null, so cannot
encounter ABA problems. However, we do need some care in
resetting them between uses.)

这是“dual data structure”中最简单的形式 - 参见Scott和Scherer的DISC 04论文。

这在原则上很有效。但实际上,就像许多以单个位置的原子更新为中心的算法一样,当有多个参与者使用相同的Exchanger时,它会出现可怕的扩展。因此,实现使用的是elimination arena形式,通过安排一些线程使用不同的slots来分散争用,并且仍然保证最终任何两方能够交换数据。也即,不能完全跨线程进行分区,而是给线程提供areana索引,该索引在竞争情况下增长在缺少争用时收缩。通过定义需要的Nodes作为ThreadLocals来实现,并将线程索引和相关簿记状态包含在里面。(可以安全地重用线程结点,而不是每次创建它们,因为slots在指向结点与null之间交替,因此不会遇到ABA问题。但需,在使用之间重置的时候需要小心。)

Implementing an effective arena requires allocating a bunch of
space, so we only do so upon detecting contention (except on
uniprocessors, where they wouldn't help, so aren't used).
Otherwise, exchanges use the single-slot slotExchange method.
On contention, not only must the slots be in different
locations, but the locations must not encounter memory
contention due to being on the same cache line (or more
generally, the same coherence unit).  Because, as of this
writing, there is no way to determine cacheline size, we define
a value that is enough for common platforms.  Additionally,
extra care elsewhere is taken to avoid other false/unintended
sharing and to enhance locality, including adding padding (via
sun.misc.Contended) to Nodes, embedding "bound" as an Exchanger
field, and reworking some park/unpark mechanics compared to
LockSupport versions.

The arena starts out with only one used slot. We expand the
effective arena size by tracking collisions; i.e., failed CASes
while trying to exchange. By nature of the above algorithm, the
only kinds of collision that reliably indicate contention are
when two attempted releases collide -- one of two attempted
offers can legitimately fail to CAS without indicating
contention by more than one other thread. (Note: it is possible
but not worthwhile to more precisely detect contention by
reading slot values after CAS failures.)  When a thread has
collided at each slot within the current arena bound, it tries
to expand the arena size by one. We track collisions within
bounds by using a version (sequence) number on the "bound"
field, and conservatively reset collision counts when a
participant notices that bound has been updated (in either
direction).

实现一个有效的arena需要分配一堆空间,因此只有在检测到争用时才会这样做。否则,交换只使用单槽的slotExchange方法。在争用时,不仅槽必须位于不同的位置,并且这些位置不能遇到内存争用(由于在相同的高速缓存行,或者更一般的,具有相同的关联单元)。因为没有方法能够检测到缓存行的大小,所以定义一个足以满足常见平台的值。另外,需要额外关注以避免伪共享和增强局部性,包括给Nodes添加填充(通过sun.misc.Contended),在Exchanger中嵌入bound域,并相对于LockSupport版本修改一些park/unpark机制。

arena开始只有一个槽。通过跟踪碰撞(CAS交换时失败)来扩展大小。根据上述算法性质,唯一能够可靠的显式争用的冲突是当两个尝试的releases冲突时——两个尝试的offers可以合法地CAS失败并且不会指示争用。(注意:通过在CAS失败后读取slot值来更准确地检测争用时可能的但是不值得这么做。)但线程在当前arena界限内发生碰撞时,会将arena大小增1。通过在bound域上使用版本号来跟踪边界内的碰撞,并在参与者注意到bound已经更新时保守地重置碰撞计数。

The effective arena size is reduced (when there is more than
one slot) by giving up on waiting after a while and trying to
decrement the arena size on expiration. The value of "a while"
is an empirical matter.  We implement by piggybacking on the
use of spin->yield->block that is essential for reasonable
waiting performance anyway -- in a busy exchanger, offers are
usually almost immediately released, in which case context
switching on multiprocessors is extremely slow/wasteful.  Arena
waits just omit the blocking part, and instead cancel. The spin
count is empirically chosen to be a value that avoids blocking
99% of the time under maximum sustained exchange rates on a
range of test machines. Spins and yields entail some limited
randomness (using a cheap xorshift) to avoid regular patterns
that can induce unproductive grow/shrink cycles. (Using a
pseudorandom also helps regularize spin cycle duration by
making branches unpredictable.)  Also, during an offer, a
waiter can "know" that it will be released when its slot has
changed, but cannot yet proceed until match is set.  In the
mean time it cannot cancel the offer, so instead spins/yields.
Note: It is possible to avoid this secondary check by changing
the linearization point to be a CAS of the match field (as done
in one case in the Scott & Scherer DISC paper), which also
increases asynchrony a bit, at the expense of poorer collision
detection and inability to always reuse per-thread nodes. So
the current scheme is typically a better tradeoff.

通过在一段时间后放弃等待并尝试在到期时减小arena大小来减少有效的arena大小。“一段时间”值是一个经验问题。通过捎带使用spin-> yield-> block来实现,这对于合理的等待性能至关重要——在繁忙的exchanger中,offers总是立即释放,这种情况下,多处理器上的上下文切花极其缓慢和浪费。arena等待指示省略了blocking部分,改用取消。根据经验选择自旋的次数,在一系列测试机器上以最大持续交换速率的99%时间避免阻塞。自旋和yields需要一些有限的随机性(使用简易的xorshift)以避免可能导致无效的增长、收缩循环的规则模式。(也可以使用伪随机数)。此外,在offer期间,当槽改变时等待者可以知道其将会被释放,但是直到匹配设置前还不能继续。在此期间,不能取消offer,所以只能自旋或者yields。

注意:可以通过将线性化点更改为匹配域的CAS(在Scott & Scherer DISC论文中的一个案例中所做的那样)来避免这种二次检查,这也会增加异步性。但代价是较弱的冲突检测和无法重用线程结点。因此,目前的方案通常是更好的权衡。

On collisions, indices traverse the arena cyclically in reverse
order, restarting at the maximum index (which will tend to be
sparsest) when bounds change. (On expirations, indices instead
are halved until reaching 0.) It is possible (and has been
tried) to use randomized, prime-value-stepped, or double-hash
style traversal instead of simple cyclic traversal to reduce
bunching.  But empirically, whatever benefits these may have
don't overcome their added overhead: We are managing operations
that occur very quickly unless there is sustained contention,
so simpler/faster control policies work better than more
accurate but slower ones.

Because we use expiration for arena size control, we cannot
throw TimeoutExceptions in the timed version of the public
exchange method until the arena size has shrunken to zero (or
the arena isn't enabled). This may delay response to timeout
but is still within spec.

Essentially all of the implementation is in methods
slotExchange and arenaExchange. These have similar overall
structure, but differ in too many details to combine. The
slotExchange method uses the single Exchanger field "slot"
rather than arena array elements. However, it still needs
minimal collision detection to trigger arena construction.
(The messiest part is making sure interrupt status and
InterruptedExceptions come out right during transitions when
both methods may be called. This is done by using null return
as a sentinel to recheck interrupt status.)

碰撞时,索引会以相反的顺序遍历arena,当边界改变时在最大索引处重新开始(这将倾向于最稀疏处)。(在到期时,索引会减半直到减到。)可以使用随机化、素数值步进或双重哈希样式遍历而不是简单的循环遍历来减少聚集。但是凭经验,这些可能带来好处的方式都无法克服其增加的开销:因此更简单、更快速的控制策略比更准确但更慢的控制策略更好。

因为对arena大小控制使用过期控制,所以不能在定时版本的公共exchange方法中抛出TimeoutExceptions,直到其大小缩减为。这可能会延迟对超时的响应,但仍然在规范说明内。

基本上所有的实现都在slotExchange和arenaExchange方法中。它们具有相似的整体结构,但因为太多细节不同而不能合并。slotExchange方法使用单个Exchanger域slot而不是arena数组元素。但是其仍然需要最小的碰撞检测来触发arena构造。(最混乱的部分是在转换期间两种方法都可能调用时确保中断状态和InterruptedExceptions正确。这是通过使用null return作为重新检查中断状态的标记来完成的。)

As is too common in this sort of code, methods are monolithic
because most of the logic relies on reads of fields that are
maintained as local variables so can't be nicely factored --
mainly, here, bulky spin->yield->block/cancel code), and
heavily dependent on intrinsics (Unsafe) to use inlined
embedded CAS and related memory access operations (that tend
not to be as readily inlined by dynamic compilers when they are
hidden behind other methods that would more nicely name and
encapsulate the intended effects). This includes the use of
putOrderedX to clear fields of the per-thread Nodes between
uses. Note that field Node.item is not declared as volatile
even though it is read by releasing threads, because they only
do so after CAS operations that must precede access, and all
uses by the owning thread are otherwise acceptably ordered by
other operations. (Because the actual points of atomicity are
slot CASes, it would also be legal for the write to Node.match
in a release to be weaker than a full volatile write. However,
this is not done because it could allow further postponement of
the write, delaying progress.)

方法庞大而单一的方法在这种代码中很常见,因为大多数逻辑依赖于对局部变量维护的字段的读取,因此不能很好地进行组织——这里主要是庞大的spin->yield->block/cancel代码。

注意Node.item域没有被声明为volatile,即使其被释放线程读取,因为只会在CAS操作才会读取。

3.构造器

    public Exchanger() {
        participant = new Participant();
    }
    /**
     * Per-thread state
     */
    private final Participant participant;

    /**
     * Elimination array; null until enabled (within slotExchange).
     * Element accesses use emulation of volatile gets and CAS.
     */
    private volatile Node[] arena;

    /**
     * Slot used until contention detected.
     */
    private volatile Node slot;

    /**
     * The index of the largest valid arena position, OR'ed with SEQ
     * number in high bits, incremented on each update.  The initial
     * update from 0 to SEQ is used to ensure that the arena array is
     * constructed only once.
     */
    private volatile int bound;
    /** The corresponding thread local class */
    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }

    /**
     * Nodes hold partially exchanged data, plus other per-thread
     * bookkeeping. Padded via @sun.misc.Contended to reduce memory
     * contention.
     */
    @sun.misc.Contended static final class Node {
        int index;              // Arena index
        int bound;              // Last recorded value of Exchanger.bound
        int collides;           // Number of CAS failures at current bound
        int hash;               // Pseudo-random for spins
        Object item;            // This thread's current item
        volatile Object match;  // Item provided by releasing thread
        volatile Thread parked; // Set to this thread when parked, else null
    }

4.exchange

等待另一个线程到达该交换点,然后将指定的对象交给对方线程,接受对方线程的对象并返回。

如果对方线程已到达则恢复并接受当前线程的对象,当前线程接受对方线程的对象并立即返回。

如果没有线程在等待则当前线程会休眠直到下面的事情发生:

    public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        if ((arena != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

4.1 slotExchange

    /**
     * Exchange function used until arenas enabled. See above for explanation.
     *
     * @param item the item to exchange
     * @param timed true if the wait is timed
     * @param ns if timed, the maximum wait time, else 0L
     * @return the other thread's item; or null if either the arena
     * was enabled or the thread was interrupted before completion; or
     * TIMED_OUT if timed and timed out
     */
    private final Object slotExchange(Object item, boolean timed, long ns) {
        Node p = participant.get();
        Thread t = Thread.currentThread();
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;

        for (Node q;;) {
            if ((q = slot) != null) {
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                // create arena on contention, but continue until slot null
                if (NCPU > 1 && bound == 0 &&
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            else {
                p.item = item;
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                p.item = null;
            }
        }

        // await release
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        while ((v = p.match) == null) {
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield();
            }
            else if (slot != p)
                spins = SPINS;
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        return v;
    }

对于首先到达交换点的线程:

注意participant是线程局部变量,存储的是Node,每个线程都会初始化自己的Node。

对于后到达交换点的线程:

4.2 arenaExchange

    /**
     * Exchange function when arenas enabled. See above for explanation.
     *
     * @param item the (non-null) item to exchange
     * @param timed true if the wait is timed
     * @param ns if timed, the maximum wait time, else 0L
     * @return the other thread's item; or null if interrupted; or
     * TIMED_OUT if timed and timed out
     */
    private final Object arenaExchange(Object item, boolean timed, long ns) {
        Node[] a = arena;
        int alen = a.length;
        Node p = participant.get();
        for (int i = p.index;;) {                      // access slot at i
            int b, m, c;
            int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);
            if (j < 0 || j >= alen)
                j = alen - 1;
            Node q = (Node)AA.getAcquire(a, j);
            if (q != null && AA.compareAndSet(a, j, q, null)) {
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                if (w != null)
                    LockSupport.unpark(w);
                return v;
            }
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                p.item = item;                         // offer
                if (AA.compareAndSet(a, j, null, p)) {
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    for (int h = p.hash, spins = SPINS;;) {
                        Object v = p.match;
                        if (v != null) {
                            MATCH.setRelease(p, null);
                            p.item = null;             // clear for next use
                            p.hash = h;
                            return v;
                        }
                        else if (spins > 0) {
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 &&          // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        }
                        else if (AA.getAcquire(a, j) != p)
                            spins = SPINS;       // releaser hasn't set match yet
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed ||
                                  (ns = end - System.nanoTime()) > 0L)) {
                            p.parked = t;              // minimize window
                            if (AA.getAcquire(a, j) == p) {
                                if (ns == 0L)
                                    LockSupport.park(this);
                                else
                                    LockSupport.parkNanos(this, ns);
                            }
                            p.parked = null;
                        }
                        else if (AA.getAcquire(a, j) == p &&
                                 AA.compareAndSet(a, j, p, null)) {
                            if (m != 0)                // try to shrink
                                BOUND.compareAndSet(this, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1;        // descend
                            if (Thread.interrupted())
                                return null;
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break;                     // expired; restart
                        }
                    }
                }
                else
                    p.item = null;                     // clear offer
            }
            else {
                if (p.bound != b) {                    // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    i = (i != m || m == 0) ? m : m - 1;
                }
                else if ((c = p.collides) < m || m == FULL ||
                         !BOUND.compareAndSet(this, b, b + SEQ + 1)) {
                    p.collides = c + 1;
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                }
                else
                    i = m + 1;                         // grow
                p.index = i;
            }
        }
    }

与slotExchange不同的地方是会通过一个数组允许超过两个的线程相互交换数据。

1)xorshift实现的伪随机数:
实际运行中,大约会有50%几率(arenaExchange注释中有提到)得到负数。又由于代码中在随机数是负数时减少spins,所以默认的1024次spins实际会被执行大约2048次,换句话说for循环大约执行2048次后,spins变成0,而不是1024次。同时,代码中会判断spins和 (SPINS >>> 1) – 1的位操作结果来执行yield,注释提到会执行两次。为什么是两次?因为这两次分别是spins为1 << 9和0的时候,只有这两次时位操作结果才是0。

2)对于false sharing的处理:
1.8引入的Contended标注,在类上设置这个标注之后,类的前后会有128字节(是的,字节)的padding。假如你在类上设置了这个标注,那么这个类构成的数组是否也解决了false sharing问题?答案是否定的。这涉及到Java对象的内存布局。Java的数组首先有一个数组头,然后是每个对象object的指针,这个指针在Java 1.8默认启动指针压缩的情况下是4个字节。所以你对数组元素进行CAS的话,还是会碰到false sharing问题,即便你在对象上设置了Contended。记住,Contended只能保证连续的对象内容之间不会有false sharing问题,但是不能处理连续对象指针之间的问题。所以Exchanger中是这样初始化数组的

arena = new Node[(FULL + 2) << ASHIFT];

3)Exchanger中bound这个变量:
内容是一个复合变量。低16位是arena的逻辑数组长度,16位之前是版本号。为什么需要一个版本号?原因在于arenaExchange在运行过程中会动态修改逻辑数组长度,而且这个长度会增长也会缩小的,所以单纯对长度进行CAS的话会碰到ABA问题。ABA问题最简单的解决方案是加版本号,所以这里是一个复合变量,带版本号。

4)Exchanger基于数组的扩大和缩小策略:
Exchanger选择使用threadlocal和数组而不是原来dual data structure的链表。用链表的话,就无法用threadlocal,因为用于交换的节点会被其他线程访问和修改,主要是next,一方面生成周期不可控,另外一方可能会有ABA问题。但是用链表就不需要现在Exchanger里这么复杂的扩大和缩小策略,因为新来的线程在失败时会创建新节点,然后等待其他线程来访。Exchanger在讲到等待match时候优先选择用threadlocal,这是一个tradeoff。扩大和缩小的策略如下:

参考

上一篇下一篇

猜你喜欢

热点阅读