JUC框架&多线程

2020-08-05  本文已影响0人  任笙_8b8c

5 Volatile 关键字

 private volatile boolean flag = true;

Volatile实现内存可见性的过程 线程写Volatile变量的过程: 1. 改变线程本地内存中Volatile变量副本的值; 2. 将改变后的副本的值从本地内存刷新到主内存
线程读Volatile变量的过程:

  1. 从主内存中读取Volatile变量的新值到线程的本地内存中
  2. 从本地内存中读取Volatile变量的副本
Volatile实现内存可见性原理:

       写操作时,通过在写操作指令后加入一条store屏障指令,让本地内存中变量的值能够刷新到主内存中
读操作时,通过在读操作前加入一条load屏障指令,及时读取到变量在主内存的值

如果我们用这个关键字替代 synchronized 重量级锁,那怎么保证它的原子性呢?
演示:
使用synchronized

public synchronized void addCount() {   
 for (int i = 0; i < 10000; i++) {   
     count++; 
   } 

使用ReentrantLock(可重入锁)
//可重入锁 private Lock lock = new ReentrantLock();
public void addCount() { 
   for (int i = 0; i < 10000; i++) {   
     lock.lock();    
    count++;      
  lock.unlock();    } 
}

使用AtomicInteger(原子操作)
public static AtomicInteger count = new AtomicInteger(0); public void addCount() {   
 for (int i = 0; i < 10000; i++) { 
       //count++; 
       count.incrementAndGet(); 
   } 

5.3 Volatile 适合使用场景
5.4 synchronized和volatile的区别

a)volatile不需要加锁,比synchronized更轻便,不会阻塞线程
b)synchronized既能保证可见性,又能保证原子性,而volatile只能保证可见性,和有序性,无法保证原子性
与锁相比,Volatile 变量是一种非常简单但同时又非常脆弱的同步机制,它在某些情况下将提供优于锁 的性能和伸缩性。如果严格遵循 volatile 的使用条件(变量真正独立于其他变量和自己以前的值 ) 在 某些情况下可以使用 volatile 代替 synchronized 来优化代码提升效率

6 J.U.C之CAS

J.U.C 即 java.util.concurrent 是 Java 5 中引入的,是 JSR 166 标准规范的一个实现;
内容:
我们熟悉的线程池机制就在这个包,J.U.C 框架包含的内容有:

6.1 CAS介绍悲观锁策略)
CAS原理
6.3 native关键词

前面提到了sun.misc.Unsafe这个类,里面的方法使用native关键词声明本地方法,为什么要用native

多CPU的CAS处理

CAS的缺陷

主要表现在三个方法:循环时间太长、只能保证一个共享变量原子操作、ABA问题。
//这个是ABA问题的展示
 private static AtomicInteger ai = new AtomicInteger(100);   

//解决ABA问题
 private static AtomicStampedReference air = new AtomicStampedReference(100, 1);
 
 //ABA问题演示:   
 //1. 线程1先对数据进行修改 A-B-A过程   
        public void run() {           
     ai.compareAndSet(100, 110); 
     ai.compareAndSet(110, 100);       
     }      
  });
 //2. 线程2也对数据进行修改 A-C的过程
  //AtomicStampedReference可以看到每次修改都需要设置标识Stamp,相当于进行了1A-2B3A的操作       
 //线程2进行操作的时候,虽然数值都一样,但是可以根据标识很容易的知道A是以前的1A,还是现 在的3A 
   // 预期引用:100,更新后的引用:110,预期标识getStamp() 更新后的标识 getStamp() + 1
    air.compareAndSet(100, 110, air.getStamp(), air.getStamp() + 1);               
    air.compareAndSet(110, 100, air.getStamp(), air.getStamp() + 1);      
      }  
      });
 
jvm.png

7 J.U.C之atomic包

atomic
atomic里的类主要包括:

基本类型 使用原子的方式更新基本类型

引用类型

public static void main(String[] args) throws InterruptedException {   
   User u1 = new User("张三", 22);     
  User u2 = new User("李四", 33);
       AtomicReference ar = new AtomicReference(u1);    
   ar.compareAndSet(u1, u2);
       System.out.println(ar.get());
   }

-  AtomicMarkableReference :原子更新带有标记位的引用类型(true,flase)
 public static void main(String[] args) throws InterruptedException {        
User u1 = new User("张三", 22);    
   User u2 = new User("李四", 33);
       //和AtomicStampedReference效果一样,用于解决ABA的       
      //区别是表示不是用的版本号,而只有true和false两种状态。相当于未修改和已修改  
          AtomicMarkableReference<User> amr = new AtomicMarkableReference(u1, true);     
         amr.compareAndSet(u1, u2, false, true); 

区别的是:它描述更加简单的是与否的关系。通常ABA问题只有两种状态,而AtomicStampedReference是多种状态。

数组类型 使用原子的方式更新数组里的某个元素

public class Demo9Compare {
    public static void main(String[] args) {    
    AtomicLong atomicLong = new AtomicLong(0L);       
    LongAdder longAdder = new LongAdder();
        long start = System.currentTimeMillis();    
    for (int i = 0; i < 50; i++) {        
    new Thread(new Runnable() {          
     @Override 
    public void run() {            
        for (int j = 0; j < 1000000; j++) {                        //atomicLong.incrementAndGet();                        longAdder.increment();             
       }                }       
     }).start();   
     }
        while (Thread.activeCount() > 2) {        }
        System.out.println(atomicLong.get());        System.out.println(longAdder.longValue());
        System.out.println("耗时:" + (System.currentTimeMillis() - start));
    } }
cas.png

8 J.U.C之AQS

AQS定义两种资源共享方式:

Exclusive(独占,只有一个线程能执行,如ReentrantLock) 竞争不激烈
Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)

AQS核心图解:
cas.png
图解:
出列:

首节点的线程释放同步就是将state的值-1,它的值为0表示无锁状态,然后会去唤醒它的后继节点,当后继节点拿到同步状态就是将state的值+1,后他把自己设置为首节点.这个过程不用cas来保证.

入列:

CLH队列入列,就是tail指向新节点、新节点的prev指向当前后的节点,当前后一个节点 的next指向当前节点


cas.png
总:

当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点 (Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒 (公平锁),使其再次尝试获取同步状态

J.U.C之锁

9.1.1 互斥锁

在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。

9.1.2 阻塞锁

阻塞锁,是让线程进入阻塞状态进行等待(wait()等方法),当获得相应的信号(唤醒,时间) 时,才可以进入线 程的准备就绪状态,准备就绪状态的所有线程,通过竞争,进入运行状态。

9.1.3 自旋锁

自旋锁是采用让当前线程不停地的在循环体内执行实现的,当循环的条件被其他线程改变时,才能进入 临界区。
由于自旋锁只是将当前线程不停地执行循环体执行一段没有意义的代码,不进行线程状态的改变,所以响应速度更快。但当线程 数不停增加时,性能下降明显,因为每个线程都需要执行,占用CPU时间。如果线程竞争不激烈,并且 保持锁的时间段。适合使用自旋锁。

9.1.4 读写锁

读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行 读访问,写者则需要对共享资源进行写操作。
读写锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读者来访问共享 资源,大可能的读者数为实际的逻辑CPU数。写者是排他性的,一个读写锁同时只能有一个写者或多 个读者(与CPU数相关),但不能同时既有读者又有写者。

9.1.5 公平锁

公平锁(Fair):加锁前检查是否有排队等待的线程,优先排队等待的线程,先来先得,

9.1.6 非公平锁(Nonfair):

加锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待
非公平锁性能比公平锁高,因为公平锁需要在多核的情况下维护一个队列

9.2 ReentrantLock (可重入锁)

ReentrantLock,可重入锁,是一种递归无阻塞的同步机制。它可以等同于synchronized的使用,但是 ReentrantLock提供了比synchronized更强大、灵活的锁机制,可以减少死锁发生的概率。
ReentrantLock还提供了公平锁和非公平锁的选择,构造方法接受一个可选的公平参数(默认非公平 锁),当设置为true时,表示公平锁,否则为非公平锁。
实底层就是使用AQS同步队列。当同步队列的状态state == 0 时,则将锁持有线程设 置为null,free= true,表示释放成功。

public ReentrantLock() {
    //非公平锁    sync = new NonfairSync();
 }
public ReentrantLock(boolean fair) {  
  //公平锁    sync = fair ? new FairSync() : new NonfairSync();
 }
9.2.3 公平锁与非公平锁原理
public final boolean hasQueuedPredecessors() {   
 Node t = tail;  //尾节点  
  Node h = head;  //头节点  
  Node s;
    //头节点 != 尾节点  
  //同步队列第一个节点不为null 
   //当前线程是同步队列第一个节点 
   return h != t &&       
 ((s = h.next) == null || s.thread != Thread.currentThread()); 
}

该方法主要做一件事情:
主要是判断当前线程是否位于CLH同步队列中的第一个。如果是则返回true, 否则返回false。

9.2.4 ReentrantLock与synchronized的区别

到ReentrantLock提供了比synchronized更加灵活和强大的锁机制,那么它的灵活和强大之处在 哪里呢?
他们之间又有什么相异之处呢?

9.3 读写锁ReentrantReadWriteLock

可重入锁ReentrantLock是互斥锁,互斥锁在同一时刻仅有一个线程可以进行访问,但是在大多数场景 下,大部分时间都是提供读服务,而写服务占有的时间较少。然而读服务不存在数据竞争问题,如果一 个线程在读时禁止其他线程读势必会导致性能降低。所以就提供了读写锁。
读写锁维护着一对锁,一个读锁和一个写锁。通过分离读锁和写锁,使得并发性比一般的互斥锁有了较 大的提升:在同一时间可以允许多个读线程同时访问,但是在写线程访问时,所有读线程和写线程都会 被阻塞。
读写锁的主要特性:

  1. 公平性:支持公平性和非公平性。
  2. 重入性:支持重入。读写锁多支持65535个递归写入锁和65535个递归读取锁。
  3. 锁降级:写锁能够降级成为读锁,遵循获取写锁、获取读锁在释放写锁的次序。读锁不能升级为写 锁。

读写锁ReentrantReadWriteLock实现接口ReadWriteLock,该接口维护了一对相关的锁,一个用于只 读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是 独占的。
ReadWriteLock定义了两个方法。readLock()返回用于读操作的锁,writeLock()返回用于写操作的锁。

J.U.C之Condition

Condition提供了一系列的方法来对阻塞和唤醒线程:
  1. await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
  2. await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前 一直处于等待状态。
  3. awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之 前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout – 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
  4. awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对 中断不敏感】。
  5. awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定后期限之前一直 处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回 false。
  6. signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
  7. signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。 Condition是一种广义上的条件队列(等待队列)。他为线程提供了一种更为灵活的等待/通知模 式,线程在调用await方法后执行挂起操作,直到线程等待的某个条件为真时才会被唤醒。 Condition必须要配合锁一起使用,因为对共享状态变量的访问发生在多线程环境下。一个 Condition的实例必须与一个Lock绑定,因此Condition一般都是作为Lock的内部实现。
10.2.2 等待状态

调用Condition的await()方法会使当前线程进入等待状态,同时会加入到Condition等待队列同时释放 锁。当从await()方法返回时,当前线程一定是获取了Condition相关连的锁。

10.2.3 通知

调用Condition的signal()方法,将会唤醒在等待队列中等待长时间的节点(条件队列里的首节点), 在唤醒节点前,会将节点移到CLH同步队列中中


11 J.U.C之并发工具类

11.1 CyclicBarrie(同步屏障可理解为栅栏)

CyclicBarrier也叫同步屏障,在JDK1.5被引入的一个同步辅助类,
官方解释:允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这 些线程必须偶尔等待彼此。 屏障被称为循环,因为它可以在等待的线程被释放之后重新使用。

CyclicBarrier自己理解: 它的屏障就好比一扇门,当所有人都到达了这扇门时才会开门一起进去. 1.png
原理分析:

它有俩个构造方法:
CyclicBarrier(int parties):它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在 启动屏障时执行预定义的操作。parties表示拦截线程的数量。
CyclicBarrier(int parties, Runnable barrierAction) :创建一个新的 CyclicBarrier,它将在给定数 量的线程处于等待状态时启动,并在启动屏障时执行给定的屏障操作,该操作由后一个进入屏障的线程执行。
await()方法的逻辑:如果该线程不是到达的后一个线程,则他会一直处于等待状态,除非发生以下情 况:
1. 后一个线程到达,即index == 0
2. 超出了指定时间(超时等待)
3. 其他的某个线程中断当前线程
4. 其他的某个线程中断另一个等待的线程
5. 其他的某个线程在等待屏障超时 .
6. 其他的某个线程在此屏障调用reset()方法。reset()方法用于将屏障重置为初始状态。

案例介绍:
/*
        *CyclicBarrier案例
        *叫同步屏障
        **/
        publicclassDemo1CyclicBarrier{
        
        publicstaticvoidmain(String[]args){
        //指定屏障发行的数量
        CyclicBarriercyclicBarrier=newCyclicBarrier(5);
        List<Thread>threadList=newArrayList<>();
        for(inti=0;i<5;i++){
        Threadt=newThread(newAthlete(cyclicBarrier,"运动员"+i));
        threadList.add(t);
        }
        for(Threadt:threadList){
        t.start();
        }
        }
        
        staticclassAthleteimplementsRunnable{
        
        privateCyclicBarriercyclicBarrier;
        privateStringname;
        
        publicAthlete(CyclicBarriercyclicBarrier,Stringname){
        this.cyclicBarrier=cyclicBarrier;
        this.name=name;
        }
        
        @Override
        publicvoidrun(){
        System.out.println(name+"就位");
        try{
        //屏障,栅栏所有线程都准备好达到预期的数量放可放行
        cyclicBarrier.await();
        System.out.println(name+"跑到终点。");
        }catch(Exceptione){
        
        }
        }
        }
        }
CountDownLatch :

CountDownLatch仅提供了一个构造方法: CountDownLatch(int count) : 构造一个用给定计数初始化的 CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
sync为CountDownLatch的一个内部类,通过这个内部类Sync可以知道CountDownLatch是采用共享锁 来实现的。长用的两个方法是await()和countDown():
CountDownLatch提供await()方法来使当前线程在锁存器倒计数至零之前一直等待,除非线程被 中断。内部使用AQS的getState方法获取计数器,如果计数器值不等于0,则会以自旋方式会尝试 一直去获取同步状态。
CountDownLatch提供countDown() 方法递减锁存器的计数,如果计数到达零,则释放所有等待 的线程。内部调用AQS的releaseShared(int arg)方法来释放共享锁同步状态。

案例:
public class Demo2CountDownLatch {
    public static void main(String[] args) {    
    CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        List<Thread> threadList = new ArrayList<>();    
    for (int i = 0; i < 5; i++) {     
       CountDownLatch countDownLatch = new CountDownLatch(1);  
          //起点运动员         
   Thread t1 = new Thread(new Athlete(cyclicBarrier, countDownLatch, "起点运动员" + i));
            //接力运动员     
       Thread t2 = new Thread(new Athlete(countDownLatch, "接力运动员" + i));
            threadList.add(t1);     
       threadList.add(t2);        }
        for (Thread t : threadList) {    
        t.start();      
  }  
  }
    static class Athlete implements Runnable {
        private CyclicBarrier cyclicBarrier;    
    private String name;
        CountDownLatch countDownLatch; 
        //起点运动员     
   public Athlete(CyclicBarrier cyclicBarrier, CountDownLatch countDownLatch, String name) {       
     this.cyclicBarrier = cyclicBarrier;      
      this.countDownLatch = countDownLatch;   
         this.name = name;   
     }
        //接力运动员   
     public Athlete(CountDownLatch countDownLatch, String name) {  
          this.countDownLatch = countDownLatch;        
    this.name = name;    
    }
        @Override    
    public void run() {       
     //判断是否是起点运动员       
     if (cyclicBarrier != null) {

                System.out.println(name + "就位"); 
               try {           
         cyclicBarrier.await();   
                 System.out.println(name + "到达交接点。");
                    //已经到达交接点                 
   countDownLatch.countDown();       
         } catch (Exception e) {    
            }       
     }
            //判断是否是接力运动员     
       if (cyclicBarrier == null) {     
           System.out.println(name + "就位");       
         try {                  
  countDownLatch.await();   
                 System.out.println(name + "到达终点。"); 
               } catch (Exception e) {       
         }        
    }    
    } 
   } 
}

Semaphore 信号量

public void acquire() throws InterruptedException {   
 sync.acquireSharedInterruptibly(1);
 }
[图片上传中...(图片.png-c689a2-1597328192438-0)]


release()来释放许可。

public void release() {  
  sync.releaseShared(1);
 }

J.U.C之并发容器ConcurrentHashMap

HashMap: 是线程不安全的集合
JDK7的hashMap:
map.png
扩容原理

扩容方法 数组长度直接扩容至原来的两倍,,hashMap中元素的存放位置取决于元素哈希值与数组长度-1的位与&运算,默认情况下,长度为16,即后四位1111与元素hash值的比较,在扩容之后,长度为32,将第五位也变为1(11111=31),将各个元素与新长度进行位与&运算只需观察元素hash值和数组长度-1的第五位值的比较,为1就保持原地址不动,为0就会将该元素地址索引增加16移至新空间,如此扩容之后平分原有元素。
在jdk1.8版本之后,hashMap底层加入了红黑树机制,当链表的长度大于8,数组长度大于64时,链表将会转换为红黑树(不过链表的形式依然存在,便于后期扩容或者删减之后该红黑树退化成链表),因为从时间复杂度来说,红黑树优于链表,便于查询,不过红黑树的节点占据空间是普通节点两倍,只有在节点足够多时才会采用红黑树,当链表节点超过8的时候几率大约千分之一,链表性能很差了,采用红黑树是很好的应对策略.当红黑树节点小于6时,又会转换为链表
Hashmap最多存储键值对 1<<30(230),应该为int值最大值(231-1)之下,又是2的幂次数.

根据 key 计算 hash 值。
根据hash值找到相应的数组下标。
遍历该数组位置处的链表,直到找到相等的 key。

JDK8 HashMap
  1. 计算 key 的 hash 值,根据 hash 值找到对应数组下标。
  2. 判断数组该位置处的元素是否刚好就是我们要找的,如果不是,走第三步。
  3. 判断该元素类型是否是 TreeNode,如果是,用红黑树的方法取数据,如果不是,走第四步。
  4. 遍历链表,直到找到相等(==或equals)的 key
1.8中更换了数据插入的顺序修复上述问题

12.3 JDK7 ConcurrentHashMap

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)
 初始化方法
  • put过程
    根据 hash 值能找到相应的 Segment,之后就是 Segment 内部的 put 操作了。 Segment 内部是由 数组+链表 组成的,由于有独占锁的保护,所以 segment 内部的操作并不复 杂。保证多线程安全的,就是做了一件事,那就是获取该 segment 的独占锁。 Segment 数组不能扩容,rehash方法扩容是 segment 数组某个位置内部的数组 HashEntry[] 进 行扩容,扩容后,容量为原来的 2 倍。
    get过程
    计算 hash 值,找到 segment 数组中的具体位置 segment 数组中也是数组,再根据 hash 找到数组中具体值的位置 到这里是链表了,顺着链表进行查找即可
JDK8 ConcurrentHashMap :
ConcurrentHashMap总结:
Hashtable和ConcurrentHashMap的不同点:
上一篇下一篇

猜你喜欢

热点阅读