多线程高并发开发JVM乱七八糟风暴

多线程知识梳理(1) - 并发编程的艺术笔记

2017-02-20  本文已影响587人  泽毛

第三章 Java内存模型

3.1 Java内存模型的基础

Java中,所有实例域、静态域和数组元素都存储在堆内存中,堆内存在线程之间共享;局部变量、方法定义参数和异常处理器参数不会在线程之间共享。
从抽象角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存涵盖了缓存、写缓冲区、寄存器以及其它的硬件和编译器优化。
JMM通过控制主内存与每个线程的本地内存之间的交互,来为Java程序员提供内存可见性保证

重排序

指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段:

JMM的编译器重新排序规则会禁止特定类型的编译器重排序,对于处理器重排序,JMM的处理器重排序规则会要求Java编译器在生成指令时,插入特定类型的内存屏障。
现代的处理器使用写缓冲区临时保存向内存写入的数据,但每个处理器上的写缓冲区,仅仅对它所在的处理器可见
由于写缓冲区仅对自己的处理器可见,它会导致处理器执行内存操作的顺序可能会与内存实际的操作执行顺序不一致,由于现代的处理器都会使用写缓冲区,因此现代的处理器都会允许对写-读操作进行重排序,但不允许对存在数据依赖的操作做重排序。

happens-before简介

用来阐述操作之间的内存可见性,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作必须要存在happens-before关系,这两个操作既可以在一个线程之内,也可以在不同线程之间,但并不等于前一个操作必须要在后一个操作之前执行

数据依赖性

编译器和处理器不会改变存在数据依赖关系的两个操作的执行顺序,但是仅针对单个处理器中执行的指令序列和单个线程中执行的操作。

as-if-serial

无论怎么重排序,单线程程序的执行结果不能改变。

在单线程中,对存在控制依赖的操作重排序,不会改变执行结果;但在多线程程序中,对存在控制依赖的操作重排序,可能会改变程序的执行结果。

顺序一致性

顺序一致性是一个理论参考模型,在设计的时候,处理器的内存模型和编程语言的内存模型都会以顺序一致性内存作为参照。
如果程序是正确同步的,程序的执行将具有顺序一致性:即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。
如果程序是正确同步的,程序的执行将具有顺序一致性:即程序的执行结果该程序在顺序一致性内存模型中的执行结果相同。
顺序一致模型有两大特性:

对于未同步或未正确同步的多线程程序,JMM只提供最小安全性:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值。
JMM不保证未同步程序的执行结果与该程序在顺序一致性模型中的执行结果一致。
未同步程序在两个模型中的执行特征有如下差异:

第四章 Java并发编程基础

synchronized(对象) {
        while(条件不满足) {
            对象.wait();
        }
        对应的处理逻辑;
}
synchronized(对象) {
        改变条件;
        对象.notifyAll();
}

第五章 Java中的锁

5.1 Lock接口

5.2 队列同步器

5.2.1 队列同步器接口

5.2.2 队列同步器的实现分析

5.2.2.1 同步队列

5.2.2.2 独占式同步状态获取与释放

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

它的主要逻辑是:

   private Node addWaiter(Node mode) {
       Node node = new Node(Thread.currentThread(), mode);
       // Try the fast path of enq; backup to full enq on failure
       Node pred = tail;
       if (pred != null) {
           node.prev = pred;
           //1.确保节点能够线程安全地被添加
           if (compareAndSetTail(pred, node)) {
               pred.next = node;
               return node;
           }
       }
       //2.通过死循环来确保节点的正确添加,在"死循环"中只有通过`CAS`将节点设置为尾节点之后,当前线程才能从该方法返回,否则当前线程不断地进行尝试。
       enq(node);
       return node;
   }

   private Node enq(final Node node) {
       for (;;) {
           Node t = tail;
           if (t == null) { // Must initialize
               if (compareAndSetHead(new Node()))
                   tail = head;
           } else {
               node.prev = t;
               if (compareAndSetTail(t, node)) {
                   t.next = node;
                   return t;
               }
           }
       }
   }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //1.得到当前节点的前驱节点
                final Node p = node.predecessor();
                //2.如果当前节点的前驱节点是头节点,只有在这种情况下获取同步状态成功
                if (p == head && tryAcquire(arg)) {
                    //3.将当前节点设为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  public final boolean release(int arg) {
      if (tryRelease(arg)) {
          Node h = head;
          if (h != null && h.waitStatus != 0)
              unparkSuccessor(h);
          return true;
      }
      return false;
  }

总结:
1.在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中进行自旋;
2.移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。
3.在释放同步状态时,同步器调用tryRelease(int arg)方法来释放同步状态,然后唤醒头节点的后继节点。

5.2.2.3 共享式同步状态获取与释放

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

tryAcquireShared返回int类型,如果同步状态获取成功,那么返回值大于等于0,否则进入自旋状态;成功获取到同步状态并退出自旋状态的条件是当前节点的前驱节点为头节点,并且返回值大于等于0.

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

5.2.2.4 独占式超时获取同步状态

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //1.计算出截止时间.
        final long deadline = System.nanoTime() + nanosTimeout;
       //2.加入节点
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                //3.取出前驱节点
                final Node p = node.predecessor();
                //4.如果获取成功则直接返回
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //5.如果到了超时时间,则直接返回
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //6.如果在自旋过程中被中断,那么抛出异常返回
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

通过上面的代码可以知道,它和独占式获取的区别在于未获取到同步状态时的处理逻辑:独占式获取在获取不到是会一直自旋等待;而超时获取则会使当前线程等待nanosTimeout纳秒,如果当前线程在这个时间内没有获取到同步状态,将会从等待逻辑中自动返回。

5.2.2.5 自定义同步组件 - TwinsLock

TwinsLock只允许至多两个线程同时访问,超过两个线程的访问将会被阻塞。

public class TwinsLock implements Lock {
    
    private final Sync sync = new Sync(2);
    
    private static final class Sync extends AbstractQueuedSynchronizer {
        
        Sync(int count) {
            //初始值为2.
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for(;;) {
                //1.获得当前的状态.
                int current = getState();
                //2.newCount表示剩余可获取同步状态的线程数
                int newCount = current - arg;
                //3.如果小于0,那么返回获取同步状态失败;否则通过CAS确保设置的正确性.
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    //4.当返回值大于等于0表示获取同步状态成功.
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for (;;) {
                int current = getState();
                //将可获取同步状态的线程数加1.
                int newCount = current + current;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, @NonNull TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        
    }

    @NonNull
    @Override
    public Condition newCondition() {
        return null;
    }
}

测试用例:

    public static void createTwinsLock() {
        final Lock lock = new TwinsLock();
        class TwinsLockThread extends Thread {

            @Override
            public void run() {
                Log.d(TAG, "TwinsLockThread, run=" + Thread.currentThread().getName());
                while (true) {
                    lock.lock();
                    try {
                        Thread.sleep(1000);
                        Log.d(TAG, "TwinsLockThread, name=" + Thread.currentThread().getName());
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        Log.d(TAG, "TwinsLockThread, unlock=" + Thread.currentThread().getName());
                        lock.unlock();
                    }
                }
            }
        }
        for (int i = 0; i < 10; i++) {
            Thread thread = new TwinsLockThread();
            thread.start();
        }
    }

5.3 重入锁

5.3.1 实现重进入

重进入需要解决两个问题:

5.3.2 公平与非公平锁的区别

5.4 读写锁

ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLockwriteLock,而其实现ReentrantReadWriteLock

下面是一个读写锁的简单用例:

public class ReadWriteCache {
    
    static Map<String, Object> map = new HashMap<>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();
    
    public static Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }
    
    public static Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }
    
    public static void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}

5.4.2 读写锁的实现分析

5.6 Condition接口

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁,Condition是依赖Lock对象的。
当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal方法,通知当前线程后,当前线程才从await方法返回,并且在返回前已经获取了锁。
获取一个Condition必须通过LocknewCondition方法,下面是一个有界队列的示例:

public class BoundedQueue<T> {

    private Object[] items;
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();
    private Condition notFull = lock.newCondition();

    public BoundedQueue(int size) {
        items = new Object[size];
    }

    public void add(T t) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) { //如果当前队列内的个数等于最大长度,那么释放锁.
                notFull.await();
            }
            if (++addIndex == items.length) { //如果已经到了尾部,那么从头开始.
                addIndex = 0;
            }
            ++count;
            notEmpty.signal(); //通知阻塞在"空"条件上的线程.
        } finally {
            lock.unlock();
        }
    }

    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await(); //如果当前队列的个数等于0,那么释放锁.
            }
            Object x = items[removeIndex];
            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.signal(); //通知阻塞在"满"条件上的线程.
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

Condition的方法:

5.6.2 Condition的实现

ConditionObjectAbstractQueuedSynchronizer的内部类,每个Condition对象都包含着一个队列。

1.等待队列

在队列中的每个节点都包含了一个线程的引用,该线程就是在Condition对象上等待的线程,同步队列和等待队列中节点的类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node
由于Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。
当调用await方法时,将会以当前线程构造节点,并将节点从尾部加入到等待队列,也就是将同步队列移动到Condition队列当中。

2.等待

调用该方法的前提是当前线程必须获取了锁,也就是同步队列中的首节点,它不是直接加入到等待队列当中,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入到等待队列当中。

3.通知

调用该方法的前提是当前线程必须获取了锁,接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
被唤醒的线程,将从await方法中的while中返回,进而调用同步器的acquireQueued方法加入到获取同步状态的竞争中。
ConditionsignalAll方法,相当于对等待队列中的每个节点均执行一次signal方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点。

六、Java并发容器和框架

6.1 ConcurrentHashMap

ConcurrentHashMap是线程安全并且高效的HashMap,其它的类似容器有以下缺点:

6.1.2 ConcurrentHashMap的结构

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成:

一个ConcurrentHashMap里包含一个Segment数组,它的结构和HashMap类似,是一种数组和链表结构。
一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。

6.1.5 ConcurrentHashMap的操作

get
get的高效在于整个get过程中不需要加锁,除非读到的值是空才会加锁重读。原因是它的get方法将要使用的共享变量都设为volatile,能够在线程间保持可见性,能够被多线程同时读,并且不会读到过期的值,例如用于统计当前Segment大小的count字段和用于存储值的HashEntryvalue
put
put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量之前必须加锁,put首先定位到Segment,然后在Segment里进行插入操作。
size
先尝试2次通过不锁住Segment的方式来统计各个Segment的大小,如果统计的过程中,容器的count发生了变化,则再用加锁的方式来统计所有Segment的大小。

6.2 ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,它采用CAS算法来实现。

6.2.1 入队列

入队主要做两件事情:

在多线程情况下,如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时第一个线程要暂停入队操作,然后重新获取尾节点。
整个入队操作主要做两件事:

6.3 阻塞队列

6.3.1 阻塞队列

阻塞队列是一个支持两个附加操作的队列,这两个附加的操作支持阻塞的插入和移除方法:

在阻塞队列不可用时,附加操作提供了4种处理方式:抛出异常、返回特殊值、一直阻塞、超时退出。每种方式通过调用不同的方法来实现。
Java里面提供了7种阻塞队列。

6.4 Fork/Join框架

用于并行执行任务的框架,是把一个大任务分割成若干个小任务,最终汇总每个小任务结果后得到大人物结果的框架。
Fork/Join使用两个类来完成事情:

七、Java中的13个原子操作类

Atomic包里提供了:原子更新基本类型、原子更新数组、原子更新引用和原子更新属性。

7.1 原子更新基本类型:

基本方法:

7.2 原子更新引用类型

基本方法:

7.3 原子更新引用类型

用于原子更新多个变量,提供了3种类型:

7.4 原子更新字段类

原子地更新字段需要两步:

八、Java中的并发工具类

九、Java中的线程池

线程池的优点:降低资源消耗,提高响应速度,提高线程的可管理性。

9.1 线程池的实现原理

线程池的处理流程如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //1.添加进入核心线程.
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //2.添加进入队列.
            int recheck = ctl.get();
            if (!isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //3.添加进入非核心线程.
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

在以上的三步中,除了加入队列不用获取全局锁以外,其它两种情况都需要获取,为了尽可能地避免获取全局锁,在ThreadPoolExecutor完成预热之后(当前运行的线程数大于corePoolSize),几乎所有的execute方法调用都是加入到队列当中。

9.2 线程池的使用

9.2.1 线程池的创建

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

9.2.2 向线程池提交任务

9.2.3 关闭线程池

十、Executor框架

(1)在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。
(2)在HotSpot VM的线程模型中,Java线程再被一对一映射为本地操作系统线程,Java线程启动时会创建一个本地操作系统线程,当该线程终止时,这个操作系统线程也会被回收。
(3)操作系统会调度所有线程并将它们分配给可用的CPU

Executor框架

由三个部分组成:

10.2 ThreadPoolExecutor详解

通过工具类Executors,可以创建以下三种类型的ThreadPoolExecutor,调用静态创建方法之后,会返回ExecutorService

10.3 ScheduledThreadPoolExecutor详解

用来在给定的延迟之后执行任务,或者定期执行任务,并且可以在指定的构造函数中指定多个对应的后台线程数。
它采用DelayQueue这个无界队列作为工作队列,其执行分为两个部分:

上一篇下一篇

猜你喜欢

热点阅读