多线程学习笔记

2019-05-03  本文已影响0人  YFaye

多线程出现目的

场景:

  1. 当一个进程处理过程中,遇到网络与IO操作都会进入阻塞状态,不再处理任何东西,浪费系统资源。
  2. 一个函数的处理非常耗时,其实其中多个逻辑可以并行处理。

多线程的面世就是要解决以上问题。

如何使用多线程

  1. extends Thread
public class ThreadDemo extends Thread {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ":" + "ThreadDemo Running");
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new ThreadDemo().start();
        }
    }
}
  1. implements Runnable
public class RunnableDemo implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ":" + "RunnableDemo Running");
    }

    public static void main(String[] args) {
        RunnableDemo runnableDemo = new RunnableDemo();
        for (int i = 0; i < 10; i++) {
            new Thread(runnableDemo).start();
        }
    }
}
  1. ExecutorService
    • Executors.newFixedThreadPool
    • Executors.newCachedThreadPool
    • Executors.newSingleThreadPool
    • Executors.newScheduledThreadPool
public class ExecutorServiceDemo {

    private static ThreadPoolExecutor threadPool;

    private static ThreadFactory factory = new ThreadFactory() {
        private final AtomicInteger integer = new AtomicInteger();

        @Override
        public Thread newThread(Runnable r) {
            int threadName = integer.getAndIncrement();
            System.out.println("Created Thread:" + threadName);
            return new Thread(r, "ThreadPool Thread:" + threadName);
        }
    };

    private static BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        threadPool = new ThreadPoolExecutor(10, 15, 1000L,
                TimeUnit.SECONDS,
                workQueue,
                factory);

        //execute()与submit()的区别在于submit有一个Future类型的返回,
        // 实际submit是把Callable入参包装成RunnableFuture类型后再调用execute();
        for (int i = 0; i < 15; i++) {
            System.out.println("threadPool.execute");
            threadPool.execute(new RunnableDemo());
        }

        for (int i = 0; i < 15; i++) {
            System.out.println("threadPool.submit");
            Future<?> future = threadPool.submit(new CallableDemo());
            System.out.println(future.get());
        }

    }
}
  1. implements Callable<>
public class CallableDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
        System.out.println(Thread.currentThread().getName() + ":" +"CallableDemo Running");
        return "Callable Result";
    }

    public static void main(String[] args) throws Exception {
        CallableDemo callableDemo = new CallableDemo();
        String callableReturn = callableDemo.call();
        System.out.println("callableReturn :" + callableReturn);
    }
}

Callable与Runable区别:
Re:

  1. Callable任务线程能返回执行结果,而Runnable任务线程不能返回结果
  2. Callable能向上抛出异常,而Runnable接口异常只能内部消化

为什么提供extends Thread又提供implements Runnable
Re:因为JAVA不支持多继承

线程状态(6种)

image.png

状态变更图示:


image.png

线程开启/停止

开始:start()
停止:interrupt()
通过设置标志位的方式终止线程,使其能有机会去清理资源,而非暴力的方式直接kill掉,这种方式更新安全。

public class demo4 {
    private static int num;

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                num++;
                System.out.println("Num:" + num);
            }
        });
        thread.start();
        TimeUnit.SECONDS.sleep(1);
        thread.interrupt();
    }
}

线程安全

Volilate

public class VolatileDemo {
    private volatile static boolean stop = false;

    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            int i = 0;
            while (!stop) {
                i++;
                System.out.println("i: " + i);
            }
        });

        try {
            long startTime = System.currentTimeMillis();
            thread.start();
            System.out.println("Thread Start");
            TimeUnit.SECONDS.sleep(1);
            stop = true;
            long endTime = System.currentTimeMillis();
            System.out.println("Runtime: " + (endTime - startTime) / 1000 + " Second");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

因为每个线程有自己私有的内存空间,修改变量需要同步回主存才能对其它线程可见,而volatile就会有哪下作用

  1. 修改volatile修饰的变量时,会强制将修改的值回写到主存。
  2. 读取volatile修饰的变量时,会强制到主存获取数据,不再到缓存读取
  3. volatile会使被volatile修饰的语句禁止指令重排序

指令重排序实例:

int a = 1;
int b = 2;
int c = 3;
//以上例子,可能是int c = 3优先于int a = 1和int b = 2执行
int a = 1;
volatile int b = 2;
int c = 3;
//以上例子则volatile int b = 2一定是在int a = 1和int c = 3之间执行。

问题:为什么需要编译器指令重排?
Re:优化执行效率。

问题:什么是CPU乱序执行?


image.png

问题:为什么要禁止编译器指令重排呢?
Re:因为多线程下指令重排可能会导致处理出错,例如:

Thread-1:
int b = 10;
int c = b;
boolean flag = true;

Thread-2:
while(flag){
    System.out.println(b);
}

如果编译器把Thread-1的第3条指令重排到第一行,那Thread-2就有可能出错,因为B还没有初始化。

问题:volilate为什么不能保证原子性
Re:因为volilate对变量的操作在字节码层面是由多条指令组成,非原子性操作,所以它只保证了可见性,不保证原子性。


Volilate因为只保证了Read and Load即从主存加载到工作内存时加载的值是最新的,例如:
线程1和线程2在执行Read and Load的时候,发现主存里的值都是5,双方都加载了这个最新值,然后双方都对该值加1,再把值放回主存,事实主存值结果为6,此操作有线程安全问题。

小结
声明了volatile的变量进行写操作,JVM就会向处理器发送一条Lock前缀的指令,把这个变量所在的缓存行的数据写回到系统内存,再根据我们前面提到过的MESI的缓存一致性协议,来保证多CPU下的各个高速缓存中的数据的一致性。

Sychronized

机制

  1. 使用方法
  1. 使用Sychronized后,会通过字节码生成以下指令:
  1. 获取锁情况

如何实现锁

本质:对象监视器的获取(独占锁)

为什么任何一个对象都可以成为锁

因为对象在内存中分为三块区域:对象头、实例数据、对齐填充


image.png

对象头:


image.png

而Synchroned使用的锁存在每一个对象的对象头里,其中锁标志位指向的是monitor对象(也称为管程或监视器锁)的起始地址。

锁的优化

锁的状态:

注意:锁只能从轻到重的方向发展,不可逆。

Lock与Synchronized区别

  1. Lock是一个接口
  2. synchronized是JVM层的一个实现
  3. synchronized是被动的触发锁机制,而Lock是可以灵活的控制,锁的创建和释放都需要人为控制,特别是异常发生的时候要注意释放锁。
  4. Lock相对来讲控制粒度更小,例如还可以分别控制读写锁
  5. Lock支持公平、非公平锁,而synchronized只支持非公平锁

CAS

CAS是JDK提供的Unsafe类里的一系列操作,这一系列操作由JDK来保证原子性。

    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
image.png

Atomic一系列的对象是根据CAS的封装来实例原子性。

AQS(AbstractQueuedSychronizer)

AQS的关键数据结构:


image.png

链表的操作通过CAS原子操作来保证多线程下的原子性:

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
     /**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

通过compareAndSwapObject这个native方法来保证链表操作的线程安全性

ReentrantLock

Lock()加锁分析

image.png

非公平锁逻辑流程图

image.png

公平锁与非公平锁的差异

    final void lock() {
        acquire(1);
    }
    
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
 
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    } 
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

以上可以看出,非公平锁在Lock()方法被调用时是首先尝试当前线程是否能直接获得锁,然后tryAcquire()方法的时候公平锁是需要检查AQS队列里是否有等待的节点,有的话是当前线程获取锁不成功,而非公平锁是直接CAS当前锁的状态,若通过就把锁给当前线程了。同时也可以看出双方在获取不到锁的时候,进行AQS队列方式是一样的,都是加在队尾。在加入队列后,还需要根据当前节点的前驱节点的waitStatus若是Node.SIGNAL状态判断是否需要把当前线程挂起,以省系统资源,

unlock()释放锁分析

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

可以看出,每个unlock()操作都是一个State - 1操作,直到State == 0的时候,把ExclusiveOwnerThread即当前获得锁的线程设置为null来释放锁。

小结
在获取锁的时候,会维护一个双向链表,用于存放获取锁失败的的线程到队列中进行自旋来获取锁,

CountDownLatch

是什么

CountDownLatch是JUC中提供的一个同步工具,使用调用await()它可以使一个或者多个线程进行等待,直到其它线程执行CountDown()方法把倒数器减至0后,等待的线程才会启动。

如何使用

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(3);

        new Thread(() -> {
            System.out.println(Thread.currentThread() + "执行完毕");
            countDownLatch.countDown();
        }, "Thread-1").start();
        new Thread(() -> {
            System.out.println(Thread.currentThread() + "执行完毕");
            countDownLatch.countDown();
        }, "Thread-2").start();
        new Thread(() -> {
            System.out.println(Thread.currentThread() + "执行完毕");
            countDownLatch.countDown();
        }, "Thread-3").start();

        countDownLatch.await();
        System.out.println("全部线程执行完毕");
    }
}

分析

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
    

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted()) // 若线程中端,直接抛异常
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}


// 计数为0时,表示获取锁成功
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

// 阻塞,并入队
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED); // 入队
    boolean failed = true;
    try {
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 获取锁成功,设置队列头为node节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) // 线程挂起
              && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 检查计数器是否为0,为0直接返回
  2. 计数器大于0,即当前线程需要阻塞并等待计数器变为0
  3. 当前线程需要被封装成Node对象并添加到AQS双向链表里去
  4. 最后自旋尝试获取锁,即检查计数器是否为0,获取成功即出队,然后放行当前线程
// 计数-1
public void countDown() {
    sync.releaseShared(1);
}


public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) { // 首先尝试释放锁
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0) //如果计数已经为0,则返回失败
            return false;
        int nextc = c-1;
        // 原子操作实现计数-1
        if (compareAndSetState(c, nextc)) 
            return nextc == 0;
    }
}

// 唤醒被阻塞的线程
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 队列非空,表示有线程被阻塞
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { 
            // 头结点如果为SIGNAL,则唤醒头结点下个节点上关联的线程,并出队
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head) // 没有线程被阻塞,直接跳出
            break;
    }
}
  1. 尝试释放锁,即将计数器-1,并判断state是否为0,若为0即表示当前没有锁,可以开始唤醒链表中阻塞中的线程
  2. 如果链表里为空,即没有阻塞的线程,直接退出
  3. 如果头节点waitStatus为SIGNAL,就依次唤醒下个节点的线程,并出队
上一篇下一篇

猜你喜欢

热点阅读