Java 并发编程

2019-04-15  本文已影响0人  清雨季

一 多线程基础

1.1 线程的状态及其变更

线程的状态及其变更

1.2涉及到的各种方法

二 并发包之Executor

2.1 Future

Future的体系结构:https://blog.csdn.net/u014209205/article/details/80598209

1.1.1 RunnableFuture

Runnalbe接口和Future接口的组合类,可以被线程执行,执行成功后可以获取结果

2.1.2 CompleteableFuture

用于处理Future之间的组合调用,依赖等问题,参考文档20种CompletableFuture使用方式

2.2 线程池

2.3 锁

自旋锁:一种通过循环检测方式实现的锁。

class SpinLock {
        //java中原子(CAS)操作
    AtomicReference<Thread> owner = new AtomicReference<Thread>();//持有自旋锁的线程对象
    private int count;
    public void lock() {
        Thread cur = Thread.currentThread();
        while (!owner.compareAndSet(null, cur)){
        }
    }
    public void unLock() {
        Thread cur = Thread.currentThread();
            owner.compareAndSet(cur, null);
        }
    }
}

重入锁:可以重复获取的锁,如ReentrantLock和synchronized。
重量级锁:底层依赖操作系统互斥量来实现,成本比较高(用户态与内核态切换,线程切换),但是不需要自旋,适合锁竞争激烈的时候使用。
轻量级锁:通过自旋来实现,避免内核切换和线程切换,但是锁竞争激烈的时候会膨胀成重量级锁,适合锁竞争不激烈的时候适用。
偏向锁:轻量级锁进一步优化得到的锁,假定只有一个线程会获取锁,只有一次CAS操作。
公平锁:公平锁是指当锁可用时,在锁上等待时间最长的线程将获得锁的使用权。而非公平锁则随机分配这种使用权。公平锁可以避免饥饿,但是非公平锁更高效。
参考资料

2.4 java中的锁实现

AbstractQueuedSynchronizer

所有同步对象的基础,基于原子操作和valitale关键字实现最基础的同步。
获取独占锁流程:
1)获取同步状态,原子操作设置状态
2)若成功,则获取成功,走下面的代码
3)不成功,则创建新结点,使用CAS加入到队尾中
4)进入自旋,判断是否获取到了锁,没有则进入阻塞状态,被唤醒后进入下一次自旋,直到获取到锁为止
5)获取到锁则把自己设置为头节点,释放锁时唤醒下一个节点

Reentranklock

使用AQS完成,相比普通的锁,多处理了重入的过程
使用AQS中的state表示重入次数,lock 时加1 unlock时减1
内部使用两个内部类(都是AQS的子类)来完成,分别是Sync(公平锁) NonfairSync(非公平锁)
公平锁相对多了一个判断:加锁时要求无线程等待,或者当前线程是队头节点

ReadWriteLock

读写锁,使用AQS完成,AQS的state的高16位表示读锁,低16位表示写锁

获取读锁的条件:没有线程获取了写锁,等待队列中头节点的等待类型不是写类型,获取写锁的次数没达到上限

获取写锁的条件:没有线程获取了读锁,没有线程获取读锁,或者仅有当前线程获取了写锁

Condition

用于线程之间的通信,相比Object中的方法,可以控制到线程级别的粒度,可以支持超时等待。

Condition中维护了一个等待队列,线程调用Condition.await时,把当前线程节点从同步队列移动至等待队列,然后LockSupport.park()

当调用Condition.signal 方法时,会把等待队列的头节点移动至同步队列中,然后LockSupport.unpark唤醒该节点

ConcurrentHashMap

线程安全,效率比Hashtable高,底层使用Segment实现,一个Segment的结构类似于一个HashMap,同时它也是Lock的子类,相当于做了一个锁粒度的变更

添加元素的过程:

            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                doRealInit();
            }

阻塞队列

BlockingQueue接口中定义的方法

记忆方法:带字母t的是阻塞的,除add外,有连续两个字母相同的是返回特殊值的

LinkedBlockingQueue

底层采用链表实现:

    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

使用两个Condition来实现阻塞功能

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

put方法中,会先putLock.lock,然后如果当前队列满了,则会notFull.await(),此外如果原来队列中没有元素,则还需要notEmpty.signal();

由于可能有多个线程block在put方法中,因此put完成后如果发现队列还没有满,还需要发一下notFull.signal()

get方法中,先判断是否为空,是则notEmpty.await(),如果不为空,则取数据。如果取之前队列是满的,则取完后需要发一下notFull.signal()。

同理,取完后如果队列不为空,则需要发一个notEmpty.signal()

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
ArrayBlockingQueue

与LinkedBlockingQueue的区别是,底层采用的是数组的方式实现,并且ArrayBlockingQueue读写共用一把锁,因此读写是串行的,阻塞的实现方式与LinkedBlockingQueue是一样的

PriorityBlockingQueue

先复习PriorityQueue:优先级队列,并不是"先进先出"的队列,而按照优先级出队,底层采用数组实现,元素的排序方式可以使用Comparetor对象,也可以让元素实现Compareable

PriorityBlockingQueue 底层同样使用数组实现,同样需要为元素指定排序规则。
与前两种Blocking的区别是,这是一个无界队列,会自动扩容,因此它只会阻塞读,但是不会阻塞写。

由于只需要阻塞读,因此只使用了一个notEmpty Condition。

DelayQueue

延迟队列,底层由PriorityQueue实现,是一个无界队列,会对数据排序,越早完成延期的数组排在前面。

读取数据时会读队头节点,即最先完成延时的节点,因此如果队头节点没有完成延时,则会阻塞

SynchronousQueue

不储存元素的阻塞队列,相当于只有一个容量,只做了元素的传递,从写线程传到消费线程。

并发工具

CountDownLatch

创建对象是会指定一个int类型初始值count,执行await()方法时会阻塞住,其他线程执行countDown()时会把count减1,当count为0时,被await()阻塞的线程会被唤醒继续执行

private static void testCDL() throws Exception{
    CountDownLatch countDownLatch = new CountDownLatch(5);
    new Thread(){
        @Override
        public void run() {
            try {
                while(countDownLatch.getCount() > 0) {
                    TimeUnit.SECONDS.sleep(1);
                    countDownLatch.countDown();
                    System.out.println("剩余时间:" + countDownLatch.getCount());
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
    }.start();
    countDownLatch.await();
    System.out.println("Finish");
}

实现方式:

通过源码可以发现:

CyclicBarrier

可循环使用的屏障,让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时再唤醒所有阻塞线程

    private static void testCyclicBarrier() throws Exception{
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        executeOnNewThread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    cyclicBarrier.await();
                    System.out.println(1);
                    cyclicBarrier.await();
                    System.out.println(2);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "Thread-1");
        cyclicBarrier.await();
        System.out.println(1);
        cyclicBarrier.await();
        System.out.println(2);
    }

实现方式应该与CountDownLatch类似,就不再看源码了。从示例中可以看出与CountDownLatch的不同之处:

信号量

用于做流量控制的,同一时间内只允许有限个线程执行,示例代码如下:

    private static void testSemaphore() throws Exception{
        Semaphore semaphore = new Semaphore(3);
        for (int j = 0; j < 10; j++) {
            executeOnNewThread(new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println(Thread.currentThread().getName());
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                }
            }, "Thread-" + j);
        };
    }

同一时间只有三个线程可以执行。
依然是使用AQS锁实现的,使用state表实当前剩余的信号量,acquire时,如果当前信号量够,则减1返回,如果信号量不够,则把线程加入同步列队中并且LockSupport.park()暂停住。
release时,如果release成功,则唤醒同步队列中的头节点

Exchanger

用于交换线程的数据

线程池

线程池中的几个参数

corePoolSize:核心线程数
runnableTaskQueue:任务队列,BlockingQueue的实现类

maximumPoolSize :最大线程数
ThreadFactory :线程工厂,用于创建线程
RejectedExecutionHandler 饱和策略,指工作队列满了之后的策略,可选以下几种

也可以自己实现一个RejectedExecutionHandler

keepAliveTime : 线程的存活时间

提交任务的两种方式

execute: 提交线程,无返回值
submit: 提交线程,要返回值

关闭线程池的方式

shutdown: 将线程池至为SHUTDOWN状态,然后遍历并中断所有没有正在执行任务的线程。
shutdownNow: 立即将线程池状态至为STOP状态,然后尝试停止所有任务。
也就是说shutdown会让已经开始的任务执行完,而shutdownNow则会立即停止所有线程,可能有的任务会只执行到一半。

Executor 框架

ThreadPoolExecutor

线程池的状态及流转:
几个关键的字段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static final int COUNT_BITS = Integer.SIZE - 3;
private static int runStateOf(int c)  { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池收后一个新任务后的处理流程

1)判断core thread 是否满了,不是,则直接创建线程执行,否则转2)
2)工作队列是否满了,没满则加入,否则转3)
3)工作线程是否是上限,没有则创建新的工作线程执行,否则转4)
4)使用饱合策略
只有第1,3步会加锁,第一步通常都是满了,所以通常只有第三步会加锁,使用prestartAllCoreThread方法可以提前创建好所有的核心线程。
源码如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
添加线程的流程

步骤一:检查。只有线程池处理RUNNING状态,或者处于SHUTDOWN状态并且任务队列不为空的情况下才能继续添加线程。
步骤二:检查。检查线程池数量是否超过上限,如果添加的是核心线程,上限即corePoolSize,如果不是核心线程,上限即为maximumPoolSize。
步骤三:将工作线程数量原子加1,如果加成功,则继续步骤四添加工作线程。如果不成功,说明ctl字段已经被其他线程修改过了,所以要回到步聚一重新检查。
步骤四:创建一个Worker对象(Worker是一个内部类,可以认为就是一个线程)
步骤五:加锁 mainLock.lock()
步骤六:重新检查线程池状态
步骤七:把创建好的Worker对象加入到workers中,并且更新数量

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
Worker内部类工作逻辑

Worker类的定义如下:

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
        
        public void run() {
            runWorker(this);
        }
    }

Worker类封闭了一个Thread对象,同时实现了AQS锁,这个锁主要是在中断线程时使用。
Worker的主要作用是让线程不断循环,从任务队列中取任务执行,核心循环代码如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

从源码上看,这个循环其实就是处理了三件事:

Java中提供的四种线程池

new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>())
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

ScheduledThreadPool 使用

不怎么用这个类,这里就暂时先不看它的源码了,使用的几个关键方法如下:

public ScheduledFuture<?> schedule(Runnable command,
                                     long delay, TimeUnit unit);

command: 任务, delay和unit:指定延时时间

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

示例代码如下:3秒后每隔1秒输出一次当前时间

        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(System.currentTimeMillis());
                    Thread.sleep(500);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 3, 1, TimeUnit.SECONDS);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

它与上一个的区别在于,这个方法是执行完成任务后才开始算间隔时间,而scheduleAtFixedRate是从任务开始执行时就计算间隔时间。
如果上面的示例代码改使用scheduleWithFixedDelay方法,则会变成3秒后每隔1.5秒输出一次系统时间

内存模型

三种重排序类型

  1. 编译器优化重排序
    例如下面的代码:
a = 1;
b = true;

由于这两行代码在单线程的条件下没有任何关系。因此可以被优化成:

b = true;
a = 1;

这样改动两行代码的顺序,在单线程的情况下效果是一样的。

  1. 指令级并行重排序
    现代CPU采用了指令级并行技术来将多条指令重叠执行,如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。

  2. 内存系统的重排序
    由于处理器使用了缓存和读/写缓冲区,这使用加载和存储操作看上去可能是乱序的。
    如下图所示:假如有两个处理器分别执行A1,A2和B1,B2操作,可能得到的结果是x=y=0。



    执行的过程可以如下图所示:


按照时间顺序,可能的执行顺序是:A1 -> B1 -> A2 -> B2 -> A3 -> B3
由于A2,B2在执行时A3和B3没有被执行,所以读到的数据还都是0。

虽然处理器执行的顺序是A1 -> A2,但是实际内存操作的顺序却成了A2 -> A1了。

以上说的三种重排序中,只有1属于编译器重排序,而2,3属于处理器重排序。

处理器重排序和内存屏障

处理器重排序规则:


可以看出所有的处理器都允许Store-Load 重排序。

为了保证内存可见性,Java编译器在生成指令序列的适当位置插入内存屏障指令来禁止特定类型的处理器重排序:


happens-before原则

如果A操作happens-before B操作,指的是A操作执行的结果对B操作一定可见(并不意味着A操作要在B操作之前执行)。

Java语言中定义的happens-before原则(以下称先行发生)有以下八个,最重要的是前面四个:

以上加红的两个后面指的是时间上的先后。

数据依赖性

如果两个操作存在数据依赖性,则这两个操作不被重排序,如下代码所示:

//情况一
a = 1;
b = a;
//情况二
a = 1;
a = 2;
//情况三
a = b;
b = 1;

这里说的数据依赖指的是单线程的依赖。

happens-before 原则,数据依赖性和重排序

如以下代码:

a = 1;          //A
b = 2;          //B
c = a + b;    //C

根据程序次序规则,以上三行代码存在三个先行发生原则:
A happens-before B
B happens-before C
A happens-before C

同时还存在两个数据依赖:
C依赖A
C依赖B

注意A和B不存在数据依赖
处理器的执行顺序可能有两种情况:
A -> B -> C
B -> A -> C

即虽然A happens-before B ,但是还是会被重排序。

volatile关键字

volatile变量自身具有以下特性:

volatile写的内存语义:当写一个volatile变量时,会把写后值刷新到主内存。
volatile读的内存语义:当读一个volatile变量时,会把线程本地内存置为无效,直接从主内存读取数据。

volatile的实现:JMM使用内存屏障来禁用特定的重排序来实现volatile关键字

内存屏障插入策略:

上一篇下一篇

猜你喜欢

热点阅读