多线程编程

2020-12-15  本文已影响0人  laowangv2

同步机制

Atomic类

  1. 原理
    CAS+自旋,CAS依赖unsafe实现。缺点的高并发是自旋消耗cpu
  2. jdk8的优化
    新增了几个继承自Striped64的类,LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator,思想是CAS的目标分段,每个线程分别对应一个段,降低冲突,最后把所有段加起来返回

  1. synchronized关键字
    对象头中指向一个ObjectMonitor(c++实现)对象,monitor实现锁依赖于操作系统的Mutex Lock,使用时需要进行用户态到内核态的切换,所以效率较低。
    锁优化过程:因为synchronized的效率问题,JDK1.6进行了优化,主要依靠对象头设计了三种不同类型的锁——偏向锁、轻量级锁、重量级锁。初始获得偏向锁,待其他线程竞争时,将偏向锁升级到轻量级锁,原持有锁的线程继续执行,竞争者自旋,一定次数后升级为重量级锁,竞争者阻塞。
    详见:
    深入理解Java并发之synchronized实现原理
    Java性能 -- synchronized锁升级优化
  2. Lock
    典型用法:
 Lock l = ...;
 l.lock();
 try {
   // access the resource protected by this lock
 } finally {
   l.unlock();
 }

和synchronized的一些区别,除了使用方式上:

  1. 读写锁
    • 不保证优先级,支持公平/非公平
    • 可重入
    • 可降级
    • 可中断
    • 支持条件变量

官方示例:

class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
       // Must release read lock before acquiring write lock
       rwl.readLock().unlock();
       rwl.writeLock().lock();
       try {
         // Recheck state because another thread might have
         // acquired write lock and changed state before we did.
         if (!cacheValid) {
           data = ...
           cacheValid = true;
         }
         // Downgrade by acquiring read lock before releasing write lock
         rwl.readLock().lock();
       } finally {
         rwl.writeLock().unlock(); // Unlock write, still hold read
       }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

 class RWDictionary {
   private final Map<String, Data> m = new TreeMap<String, Data>();
   private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
   private final Lock r = rwl.readLock();
   private final Lock w = rwl.writeLock();

   public Data get(String key) {
     r.lock();
     try { return m.get(key); }
     finally { r.unlock(); }
   }
   public String[] allKeys() {
     r.lock();
     try { return m.keySet().toArray(); }
     finally { r.unlock(); }
   }
   public Data put(String key, Data value) {
     w.lock();
     try { return m.put(key, value); }
     finally { w.unlock(); }
   }
   public void clear() {
     w.lock();
     try { m.clear(); }
     finally { w.unlock(); }
   }
 }
  1. 可重入实现
    计数器 + 记录持有锁的线程

信号量

本质上是个计数器,可以限制能够访问资源的线程个数。常见场景如池的访问,官方示例:

 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

当计数器为1时,构成一个二元信号量,可以当做一个“锁”使用,这个“锁”可以由其他线程释放,也就是说信号量没有owner的概念,这一点在死锁恢复中很有用。

条件变量

获取锁之后,可以等待某个条件,等待时释放锁。使用时需要和锁绑定在一起。
与wait、notify/notifyAll的区别类似于synchronized和lock。
官方示例:

 class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   }
 }

AQS

AQS是JUC中大部分同步类(如上面说的ReentrantLock、Semaphore等)的底层实现框架。通过继承AQS并实现几个必要的方法,我们可以很容易地实现自己的同步类。简单地说,AQS通过CAS操作和一个等待队列(CLH队列的变体)来实现同步功能,如下图:

AQS基本原理
详见美团技术团队的文章:
从ReentrantLock的实现看AQS的原理及应用

线程池

  1. Executor,顶层接口,只有execute方法
  2. ExecutorService,继承自Executor,增加了shutdown和返回future的能力,关闭示例:
void shutdownAndAwaitTermination(ExecutorService pool) {
   pool.shutdown(); // Disable new tasks from being submitted
   try {
     // Wait a while for existing tasks to terminate
     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
       pool.shutdownNow(); // Cancel currently executing tasks
       // Wait a while for tasks to respond to being cancelled
       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
     }
   } catch (InterruptedException ie) {
     // (Re-)Cancel if current thread also interrupted
     pool.shutdownNow();
     // Preserve interrupt status
     Thread.currentThread().interrupt();
   }
 }

线程池的五种状态:


线程池的状态

https://www.jianshu.com/p/03ecc5a4316c

  1. ThreadPoolExecutor,核心参数:
    • Core and maximum pool sizes
      线程数小于core size时,提交任务直接创建新线程,不管有没有idle线程;
      线程数介于core size和maximum之间时,只有没有idle线程才会创建
    • Keep-alive times
      超过core size的线程存活时间
    • 队列
      线程数大于core size后,会优先扔进队列,队列满后会新建线程直到已经达到maximumPoolSIze,这时任务会被拒绝
      常见的3种队列:
      1. 直传,如使用SynchronousQueue,不存储,消费一个放一个,一般要求无界的maximumPoolSize防止拒绝任务。这种策略防止任务之间有依赖的时候卡住;
      2. 无界队列,如LinkedBLockingQueue,此时maximumPoolSize是无效的,也不会大于corePoolSize个数的线程被创建出来;
      3. 有界队列,如ArrayBlockingQueue,queue size和maximumPoolSize更难取舍,取决于cpu占用和吞吐量的取舍
    • 拒绝任务
      队列大小和最大线程数被触达后就会执行RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
      四种预置的拒绝策略包括:
      1. ThreadPoolExecutor.AbortPolicy,
        RejectedExecutionException
      2. ThreadPoolExecutor.CallerRunsPolicy
        提交线程自己执行
      3. ThreadPoolExecutor.DiscardPolicy
        丢弃
      4. ThreadPoolExecutor.DiscardOldestPolicy
        丢弃队列头任务,然后重试

总结ThreadPoolExecutor的工作流程:
当线程数小于corePoolSize时,每次提交直接创建新线程;当corePoolSize达到后,再提交会放到队列,当队列满后,继续创建线程直到maximumPoolSIze,然后就是开始拒绝任务提交

如何设置线程池的参数?

  1. Executors
    工厂类,常见的四种线程池:

    • newCachedThreadPool,可缓存线程


      newCachedThreadPool
    • newFixedThreadPool,定长,队列无界


      newFixedThreadPool
    • newScheduledThreadPool,定时,支持延迟,异常之后不会继续执行


      newScheduledThreadPool

      实现延时基于:DelayedWorkQueue,一般而言,延时队列基于优先级队列(堆)实现。
      两种用法:

      • scheduleWithFixedDelay
        固定延时,等上次执行完后,等待延时时间执行
      • scheduleAtFixedRate
        固定频率执行,如果任务执行时间超过定时,就立刻开始,否则等待固定时间到再执行
    • newSingleThreadExecutor


      newSingleThreadExecutor
  2. fork join
    传统的线程池无法处理任务直接存在依赖的情况,也就是分治。fork join可以应用在这种场景中,类似map-reduce,fork把任务拆成小任务,join合并结果,多个线程有各自的队列,当自己空闲时会去其他线程队列偷任务执行。使用要注意不要阻塞父线程使其成为监工。

Java内存模型

理解happens-before原则:因为工作线程的缓存和主内存同步问题,先行发生的线程对内存的操作未必能被后续线程观测到,而如果满足hb原则则可以保证这一点

其他

  1. ThreadLocal
    每个thread对象里有一个threadlocals属性,这是一个map,以ThreadLocal为key,set的值为value。所以对每个threadlocal,每个thread内部都有一个map存对应的值。
上一篇下一篇

猜你喜欢

热点阅读