程序员Java 并发

【Java 并发笔记】Exchanger 相关整理

2019-01-24  本文已影响4人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

1. 简介

2. Exchanger 的原理

public V exchange(V x) throws InterruptedException
public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

数据结构

@sun.misc.Contended 
static final class Node {
    int index;              // arena的下标,多个槽位的时候利用
    int bound;              // 上一次记录的Exchanger.bound;
    int collides;           // 在当前bound下CAS失败的次数;
    int hash;               // 用于自旋;
    Object item;            // 这个线程的当前项,也就是需要交换的数据;
    volatile Object match;  // 交换的数据
    volatile Thread parked; // 线程
}
/**
 * Value representing null arguments/returns from public
 * methods. Needed because the API originally didn't disallow null
 * arguments, which it should have.
 * 如果交换的数据为 null,则用NULL_ITEM  代替
 */
private static final Object NULL_ITEM = new Object();

2.1 单槽 Exchanger

/** The number of CPUs, for sizing and spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
 * The bound for spins while waiting for a match. The actual
 * number of iterations will on average be about twice this value
 * due to randomization. Note: Spinning is disabled when NCPU==1.
 */
private static final int SPINS = 1 << 10; // 自旋次数
/**
 * Slot used until contention detected.
 */
private volatile Node slot; // 用于交换数据的槽位
/**
 * Per-thread state  每个线程的数据,ThreadLocal 子类
 */
private final Participant participant;

/** The corresponding thread local class */
 static final class Participant extends ThreadLocal<Node> {
     // 初始值返回Node
     public Node initialValue() { return new Node(); }
 }

exchange 方法

没有设定超时时间的 exchange 方法

public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x; // 转换成空对象
        // arena == null, 路由到slotExchange(单槽交换), 如果arena != null或者单槽交换失败,且线程没有被中断,则路由到arenaExchange(多槽交换),返回null,则抛出中断异常
        if ((arena != null || (v = slotExchange(item, false, 0L)) == null)
                && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V) v;
}

具有超时功能的 exchange 方法

public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x;// 转换成空对象
        long ns = unit.toNanos(timeout);
        // arena == null, 路由到slotExchange(单槽交换), 如果arena != null或者单槽交换失败,且线程没有被中断,则路由到arenaExchange(多槽交换),返回null,则抛出中断异常
        if ((arena != null || (v = slotExchange(item, true, ns)) == null)
                && ((Thread.interrupted() || (v = arenaExchange(item, true, ns)) == null)))
            throw new InterruptedException();
        if (v == TIMED_OUT)// 超时
            throw new TimeoutException();
        return (v == NULL_ITEM) ? null : (V) v;
}

slotExchange 方法

private final Object slotExchange(Object item, boolean timed, long ns) {
        Node p = participant.get(); // 获取当前线程携带的Node
        Thread t = Thread.currentThread(); // 当前线程
        if (t.isInterrupted()) // 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记
            return null;
        for (Node q;;) {
            if ((q = slot) != null) { // slot不为null, 说明已经有线程在这里等待了
                if (U.compareAndSwapObject(this, SLOT, q, null)) { // 将slot重新设置为null, CAS操作
                    Object v = q.item; // 取出等待线程携带的数据
                    q.match = item; // 将当前线程的携带的数据交给等待线程
                    Thread w = q.parked; // 可能存在的等待线程(可能中断,不等了)
                    if (w != null)
                        U.unpark(w); // 唤醒等待线程
                    return v; // 返回结果,交易成功
                }
                // CPU的个数多于1个,并且bound为0时创建 arena,并将bound设置为SEQ大小
                if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT]; // 根据CPU的个数估计Node的数量
            } else if (arena != null)
                return null; // 如果slot为null, 但arena不为null, 则转而路由到arenaExchange方法
            else { // 最后一种情况,说明当前线程先到,则占用此slot
                p.item = item; // 将携带的数据卸下,等待别的线程来交易
                if (U.compareAndSwapObject(this, SLOT, null, p)) // 将slot的设为当前线程携带的Node
                    break; // 成功则跳出循环
                p.item = null; // 失败,将数据清除,继续循环
            }
        }
        // 当前线程等待被释放, spin -> yield -> block/cancel
        int h = p.hash; // 伪随机,用于自旋
        long end = timed ? System.nanoTime() + ns : 0L; // 如果timed为true,等待超时的时间点; 0表示没有设置超时
        int spins = (NCPU > 1) ? SPINS : 1; // 自旋次数
        Object v;
        while ((v = p.match) == null) { // 一直循环,直到有线程来交易
            if (spins > 0) { // 自旋,直至spins不大于0
                h ^= h << 1; // 伪随机算法, 目的是等h小于0(随机的)
                h ^= h >>> 3;
                h ^= h << 10;
                if (h == 0) // 初始值
                    h = SPINS | (int) t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield(); // 等到h < 0, 而spins的低9位也为0(防止spins过大,CPU空转过久),让出CPU时间片,每一次等待有两次让出CPU的时机(SPINS >>> 1)
            } else if (slot != p) // 别的线程已经到来,正在准备数据,自旋等待一会儿,马上就好
                spins = SPINS;
            // 如果线程没被中断,且arena还没被创建,并且没有超时
            else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this); // 设置当前线程将阻塞在当前对象上
                p.parked = t; // 挂在此结点上的阻塞着的线程
                if (slot == p)
                    U.park(false, ns); // 阻塞, 等着被唤醒或中断
                p.parked = null; // 醒来后,解除与结点的联系
                U.putObject(t, BLOCKER, null); // 解除阻塞对象
            } else if (U.compareAndSwapObject(this, SLOT, p, null)) { // 超时或其它(取消),给其它线程腾出slot
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        // 归位
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        return v;
}
slotExchange 流程图

2.2 多槽 Exchanger

private static final int ASHIFT = 7; // 两个有效槽(slot -> Node)之间的字节地址长度(内存地址,以字节为单位),1 << 7至少为缓存行的大小,防止伪共享 
private static final int MMASK = 0xff; // 场地(一排槽,arena -> Node[])的可支持的最大索引,可分配的大小为 MMASK + 1
private static final int SEQ = MMASK + 1; // bound的递增单元,确立其唯一性
private static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的个数,用于场地大小和自旋控制
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; // 最大的arena索引
private static final int SPINS = 1 << 10; // 自旋次数,NCPU = 1时,禁用
private static final Object NULL_ITEM = new Object();// 空对象,对应null
private static final Object TIMED_OUT = new Object();// 超时对象,对应timeout
// 多个线程交换/多槽位
private volatile Node[] arena;

arenaExchange 方法

private final Object arenaExchange(Object item, boolean timed, long ns) {
    Node[] a = arena; // 交换场地,一排slot
    Node p = participant.get(); // 获取当前线程携带的Node   p.index 初始值为 0
    for (int i = p.index;;) { // arena的索引,数组下标
        int b, m, c;
        long j; // 原数组偏移量,包括填充值
        // 从场地中选出偏移地址为(i << ASHIFT) + ABASE的内存值,也即真正可用的Node
        //如果i为0,j相当于是 "第一个"槽位
        Node q = (Node) U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
        if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 此槽位不为null, 说明已经有线程在这里等了,重新将其设置为null, CAS操作
            Object v = q.item; // 取出等待线程携带的数据
            q.match = item; // 将当前线程携带的数据交给等待线程
            Thread w = q.parked; // 可能存在的等待线程
            if (w != null)
                U.unpark(w); // 唤醒等待线程
            return v; // 返回结果, 交易成功
        } else if (i <= (m = (b = bound) & MMASK) && q == null) { // 有效交换位置,且槽位为空
            p.item = item; // 将携带的数据卸下,等待别的线程来交易
            if (U.compareAndSwapObject(a, j, null, p)) { // 槽位占领成功
                long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; // 计算出超时结束时间点
                Thread t = Thread.currentThread(); // 当前线程
                for (int h = p.hash, spins = SPINS;;) { // 一直循环,直到有别的线程来交易,或超时,或中断
                    Object v = p.match; // 检查是否有别的线程来交换数据
                    if (v != null) { // 有则返回
                        U.putOrderedObject(p, MATCH, null); // match重置,等着下次使用
                        p.item = null; // 清空,下次接着使用
                        p.hash = h;
                        return v; // 返回结果,交易结束
                    } else if (spins > 0) { // 自旋
                        h ^= h << 1;
                        h ^= h >>> 3;
                        h ^= h << 10; // 移位加异或,伪随机
                        if (h == 0) // 初始值
                            h = SPINS | (int) t.getId();
                        else if (h < 0 && // SPINS >>> 1, 一半的概率
                                (--spins & ((SPINS >>> 1) - 1)) == 0)
                            Thread.yield(); // 每一次等待有两次让出CPU的时机
                    } else if (U.getObjectVolatile(a, j) != p)
                        spins = SPINS; // 别的线程已经到来,正在准备数据,自旋等待一会儿,马上就好
                    else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) {
                        U.putObject(t, BLOCKER, this); // 设置当前线程将阻塞在当前对象上
                        p.parked = t; // 挂在此结点上的阻塞着的线程
                        if (U.getObjectVolatile(a, j) == p)
                            U.park(false, ns); // 阻塞, 等着被唤醒或中断
                        p.parked = null; // 醒来后,解除与结点的联系
                        U.putObject(t, BLOCKER, null); // 解除阻塞对象
                    } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) {
                        if (m != 0) // 尝试缩减
                            U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 更新bound, 高位递增,低位 -1
                        p.item = null; // 重置
                        p.hash = h;
                        i = p.index >>>= 1; // 索引减半,为的是快速找到汇合点(最左侧)
                        if (Thread.interrupted())// 保留中断状态,以便调用者可以重新检查,Thread.interrupted() 会清除中断状态标记
                            return null;
                        if (timed && m == 0 && ns <= 0L) // 超时
                            return TIMED_OUT;
                        break; // 重新开始
                    }
                }
            } else
                p.item = null; // 重置
        } else {
            if (p.bound != b) { // 别的线程更改了bound,重置collides为0, i的情况如下:当i != m, 或者m = 0时,i = m; 否则,i = m-1; 从右往左遍历
                p.bound = b;
                p.collides = 0;
                i = (i != m || m == 0) ? m : m - 1; // index 左移
            } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { // 更新bound, 高位递增,低位 +1
                p.collides = c + 1;
                i = (i == 0) ? m : i - 1; // 左移,遍历槽位,m == FULL时,i == 0(最左侧),重置i = m, 重新从右往左循环遍历
            } else
                i = m + 1; // 槽位增长
            p.index = i;
        }
    }
}
static final class Participant extends ThreadLocal<Node> {
    public Node initialValue() { return new Node(); }
}

伪随机

h ^= h << 1; 
h ^= h >>> 3; 
h ^= h << 10;

为什么选用 1,3,10

[4294967295] (1, 3, 10) (2, 7, 7) (2, 7, 9) (5, 9, 7) (7, 1, 9) (7, 7, 2) (7, 9, 5)

为什么要有两次左移和一次右移

自旋等待

private static final int SPINS = 1 << 10;
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) // h < 0,一半的概率
       Thread.yield(); // 每一次等待有两次让出CPU的时机

arena 的创建

static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;
private static final int ASHIFT = 7;

private static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int MMASK = 0xff;      // 255
......
if (NCPU > 1 && bound == 0 &&U.compareAndSwapInt(this, BOUND, 0, SEQ))
      arena = new Node[(FULL + 2) << ASHIFT];
private static final sun.misc.Unsafe U;
private static final int ABASE;
U = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Node[].class;
s = U.arrayIndexScale(ak);
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT);

FULL 和 ASHIFT 的定义

arena 数组结构
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
 if (q != null && U.compareAndSwapObject(a, j, q, null)) {
        Object v = q.item;   // 获取槽位中结点 q 的数据
        q.match = item;      // 把当前线程的数据交换给它
        Thread w = q.parked; // 获得槽位中结点 q 对应的线程对象
        if (w != null)     
            U.unpark(w);     //唤醒该线程
        return v;
}

bound 和 collides

private static final int MMASK = 0xff;
private static final int SEQ = MMASK + 1;
......
// MASK: 00000000000000000000000011111111
//  SEQ: 00000000000000000000000100000000(MASK + 1)
//    1: 00000000000000000000000000000001
if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))
// 当 bound 为 0 时,bound 被更新为 SEQ

//第一次更新
//b0: 00000000000000000000000100000000
U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)
//SEQ+1: 00000000000000000000000100000001
//b0+SEQ+1=b1: 00000000000000000000000200000001

//第二次更新
//b1+SEQ: 00000000000000000000000300000001
//第二次是 -1 的情况
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1)
//b1+SEQ-1=b2: 00000000000000000000000300000000
bound 操作

3. 总结

参考资料

https://blog.csdn.net/carson0408/article/details/79477280
https://blog.csdn.net/u014634338/article/details/78385521
https://blog.csdn.net/chenssy/article/details/72550933
https://www.cnblogs.com/d-homme/p/9387948.html
https://www.cnblogs.com/aniao/p/aniao_exchanger.html

上一篇下一篇

猜你喜欢

热点阅读