Java

并发编程

2019-10-30  本文已影响0人  强某某

什么是并发编程

并发历史: 早期计算机--从头到尾执行一个程序,资源浪费​ 操作系统出现--计算机能运行多个程序,不同的程序在不同的单独的进程中运行

一个进程,有多个线程​ 提高资源的利用率,公平

串行:洗茶具、打水、烧水、等水开、冲茶​ 并行:打水、烧水同时洗茶具、水开、冲茶

好处:可以缩短整个流程的时间

摩尔定律:当价格不变时,集成电路上可容纳的元器件的数目,约每隔18-24个月便会增加一倍,性能也将提升一倍。这一定律揭示了信息技术进步的速度。​ 让程序充分利用计算机资源​ 加快程序响应速度(耗时任务、web服务器)​ 简化异步事件的处理

任务会阻塞线程,导致之后的代码不能执行:比如一边从文件中读取,一边进行大量计算的情况 任务执行时间过长,可以划分为分工明确的子任务:比如分段下载 任务间断性执行:日志打印 任务本身需要协作执行:比如生产者消费者问题

上下文切换

无锁并发编程。多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁,如将数据的ID按照Hash算法取模分段,不同的线程处理不同段的数据

Java的Atomic包使用CAS算法来更新数据,而不需要加锁。使用最少线程

避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这样会造成大量线程都处于等待状态

在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。--GO

//死锁
public class DeadLockDemo {
   public static final Object HAIR_A=new Object();
   public static final Object HAIR_B=new Object();

   public static void main(String[] args) {
       new Thread(()->{
           synchronized (HAIR_A) {
               try {
                   Thread.sleep(50);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
               synchronized (HAIR_B) {
                   System.out.println("A-B");
               }
           }
       }).start();
       new Thread(()->{
           synchronized (HAIR_B) {
               synchronized (HAIR_A) {
                   System.out.println("B-A");
               }
           }
       }).start();
   }
}
package com.xdclass.synopsis;
       import java.util.concurrent.CountDownLatch;
       /**
        * 线程不安全操作代码实例
        */
       public class UnSafeThread {
​
           private static int num = 0;
​
           private static CountDownLatch countDownLatch = new CountDownLatch(10);
​
           /**
            * 每次调用对num进行++操作
            */
           public static void inCreate() {
               num++;
           }
​
           public static void main(String[] args) {
               for (int i = 0; i < 10; i++) {
                   new Thread(()->{
                       for (int j = 0; j < 100; j++) {
                           inCreate();
                           try {
                               Thread.sleep(10);
                           } catch (InterruptedException e) {
                               e.printStackTrace();
                           }
                       }
                       //每个线程执行完成之后,调用countdownLatch
                       countDownLatch.countDown();
                   }).start();
               }
​
               while (true) {
                   if (countDownLatch.getCount() == 0) {
                       System.out.println(num);
                       break;
                   }
               }
           }
       }

线程

线程与进程的区别

线程的状态及其相互转换

创建线程的方式

实际开发中,选第2种:java只允许单继承​ 增加程序的健壮性,代码可以共享,代码跟数据独立

public class MyThread extends Thread {
    @Override
    public void run() {
        super.run();
        System.out.println(Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        MyThread myThread=new MyThread();
        myThread.setName("thread");
        myThread.start();
    }
}
public class MyRunable implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName());
    }

    public static void main(String[] args) {
       Thread thread=new Thread(new MyRunable());
       thread.setName("runable");
       thread.start();
//       thread.run();  是在当前的main线程运行,而不是启动子线程
    }
}
public class MyThread {
    public static void main(String[] args) {
        //匿名内部类
        Thread thread=new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName());
            }
        });
        thread.start();
    }
}
public class MyThread1 {
    public static void main(String[] args) {
        //lambda表达式
        Thread thread=new Thread(() -> System.out.println(Thread.currentThread().getName()));
        thread.start();
    }
}
public class ThreadPool {
    //线程池
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(()->{
            System.out.println(Thread.currentThread().getName());
        });
    }
}

线程的挂起跟恢复

cpu分配的时间片非常短、同时也非常珍贵。避免资源的浪费。

被废弃的方法​ thread.suspend() 该方法不会释放线程所占用的资源。如果使用该方法将某个线程挂起,则可能会使其他等待资源的线程死锁​ thread.resume() 方法本身并无问题,但是不能独立于suspend()方法存在​ 可以使用的方法​ wait() 暂停执行、放弃已经获得的锁、进入等待状态​ notify() 随机唤醒一个在等待锁的线程​ notifyAll() 唤醒所有在等待锁的线程,自行抢占cpu资源,但是此处抢占指的是优先顺序,实际上每个最终都会执行,而notify之后随机唤醒一个。

我等的船还不来(等待某些未就绪的资源),我等的人还不明白。直到notify方法被调用

线程的中断操作

//不停输出,大概两秒后停止程序结束
public class Demo implements Runnable {
    @Override
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println(Thread.currentThread().getName());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread=new Thread(new Demo());
        thread.start();
        Thread.sleep(2000L);
        thread.interrupt();
    }
}

线程的优先级

快速处理:设置高的优先级​ 慢慢处理:设置低的优先级

public class PriorityDemo {

   public static void main(String[] args) {
       Thread thread = new Thread(() -> {
           while (true) {
               System.out.println(Thread.currentThread().getName());
           }
       }, "线程1");

       Thread thread2 = new Thread(() -> {
           while (true) {
               System.out.println(Thread.currentThread().getName());
           }
       }, "线程2");

       thread.setPriority(Thread.MIN_PRIORITY);
       thread2.setPriority(Thread.MAX_PRIORITY);

       thread.start();
       thread2.start();
   }
}

守护线程

用户线程、守护线程​ 守护线程:任何一个守护线程都是整个程序中所有用户线程的守护者,只要有活着的用户线程,守护线程就活着。当JVM实例中最后一个非守护线程结束时,也随JVM一起退出

public class DaemonThreadDemo implements Runnable{
   @Override
   public void run() {
       while (true) {
           System.out.println(Thread.currentThread().getName());
           try {
               Thread.sleep(1000L);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
   public static void main(String[] args) throws InterruptedException {
       Thread thread = new Thread(new DaemonThreadDemo());
       thread.start();
       thread.setDaemon(true);
       Thread.sleep(2000L);
   }
}

线程安全问题

什么是线程安全性?

多线程并发访问时,得不到正确的结果。

从字节码角度剖析线程不安全操作

原子操作

一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。​ A想要从自己的帐户中转1000块钱到B的帐户里。那个从A开始转帐,到转帐结束的这一个过程,称之为一个事务。在这个事务里,要做如下操作:​ 从A的帐户中减去1000块钱。如果A的帐户原来有3000块钱,现在就变成2000块钱了。​ 在B的帐户里加1000块钱。如果B的帐户如果原来有2000块钱,现在则变成3000块钱了。如果在A的帐户已经减去了1000块钱的时候,忽然发生了意外,比如停电什么的,导致转帐事务意外终止了,而此时B的帐户里 还没有增加1000块钱。那么,我们称这个操作失败了,要进行回滚。回滚就是回到事务开始之前的状态,也就是回到A的帐户还没减1000块的状态,B的帐户的原来的状态。此时A的帐户仍然有3000块,B的帐户仍然有 2000块。通俗点讲:操作要成功一起成功、要失败大家一起失败

深入理解synchronized

每个java对象都可以用做一个实现同步的锁,这些锁称为内置锁。线程进入同步代码块或方法的时候会自动获得该锁,在退出同步代码块或方法时会释放该锁。获得内置锁的唯一途径就是进入这个锁的保护的同步代码块或方法。

内置锁是一个互斥锁,这就是意味着最多只有一个线程能够获得该锁,当线程A尝试去获得线程B持有的内置锁时,线程A必须等待或者阻塞,直到线程B释放这个锁,如果B线程不释放这个锁,那么A线程将永远等待下去。

volatile关键字及其使用场景

volatile可见性原理

public class Test {
    private volatile int a;
    public void update() {
        a = 1;
    }
    public static void main(String[] args) {
        Test test = new Test();
        test.update();
    }
}
  0x0000000002951563: and    $0xffffffffffffff87,%rdi
  0x0000000002951567: je     0x00000000029515f8
  0x000000000295156d: test   $0x7,%rdi
  0x0000000002951574: jne    0x00000000029515bd
  0x0000000002951576: test   $0x300,%rdi
  0x000000000295157d: jne    0x000000000295159c
  0x000000000295157f: and    $0x37f,%rax
  0x0000000002951586: mov    %rax,%rdi
  0x0000000002951589: or     %r15,%rdi
  0x000000000295158c: lock cmpxchg %rdi,(%rdx)  //在 volatile 修饰的共享变量进行写操作的时候会多出 lock 前缀的指令
  0x0000000002951591: jne    0x0000000002951a15
  0x0000000002951597: jmpq   0x00000000029515f8
  0x000000000295159c: mov    0x8(%rdx),%edi
  0x000000000295159f: shl    $0x3,%rdi
  0x00000000029515a3: mov    0xa8(%rdi),%rdi
  0x00000000029515aa: or     %r15,%rdi

volatile 有序性实现

volatile 的 happens-before 关系

//假设线程A执行writer方法,线程B执行reader方法
class VolatileExample {
    int a = 0;
    volatile boolean flag = false;
    
    public void writer() {
        a = 1;              // 1 线程A修改共享变量
        flag = true;        // 2 线程A写volatile变量
    } 
    
    public void reader() {
        if (flag) {         // 3 线程B读同一个volatile变量
        int i = a;          // 4 线程B读共享变量
        ……
        }
    }
}

volatile 禁止重排序

3.png 4.png

单例与线程安全

在类加载的时候,就已经进行实例化,无论之后用不用到。如果该类比较占内存,之后又没用到,就白白浪费了资源。

/**
 * 饿汉式单例
 */
public class SingleOne {
    private static SingleOne singleOne=new SingleOne();
    private SingleOne(){}

    public static SingleOne getInstance() {
        return singleOne;
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                System.out.println(SingleOne.getInstance());
            }).start();
        }
    }
}

在需要的时候再实例化

/**
 * 双重检测锁
 */
public class SingleTwo {

    //volatile是为了避免jvm的指令重排,此时修饰singleTwo,则很多singleTwo的判断不会指令重排合并判断
    private static volatile SingleTwo singleTwo = null;

    private SingleTwo() {}

    private static SingleTwo getInstance() {
        if (singleTwo == null) {
//            try {
//                Thread.sleep(100L);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            synchronized (SingleTwo.class) {
                if (singleTwo == null) {
                    //singleTwo = new SingleTwo();这句,这并非是一个原子操作,事实上在 JVM 中这句话大概做了下面 3 件事情:
                    /**
                     * 给 singleton 分配内存
                     * 调用 SecondSingleton的构造函数来初始化成员变量
                     * 将singleton 对象指向分配的内存空间(执行完这步 singleton 就为非 null 了)
                     */
                    singleTwo = new SingleTwo();
                }
            }
        }
        return singleTwo;
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                System.out.println(SingleTwo.getInstance());
            }).start();
        }
    }
}

如何避免线程安全性问题

打破成因中三点任意一点 1:多线程环境--将多线程改单线程(必要的代码,加锁访问) 2:多个线程操作同一共享资源--不共享资源(ThreadLocal、不共享、操作无状态化、不可变) 3:对该共享资源进行了非原子性操作-- 将非原子性操作改成原子性操作(加锁、使用JDK自带的原子性操作的类、JUC提供的相应的并发工具类)

锁的分类

这些锁分类是根据不同定义的大分类,实际案例中,一个锁可能有多个"身份"

深入理解Lock接口

lock与synchronized的区别

案例

public class UnSafeThread {
    public static int num = 0;
    private static CountDownLatch countDownLatch = new CountDownLatch(10);
    //非公平锁,推荐
    private static Lock lock = new ReentrantLock();
    //公平锁
    // private static Lock lock = new ReentrantLock(false);
    public static  void inCreate() {
        lock.lock();
        num++;
        lock.unlock();
    }
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    inCreate();
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                countDownLatch.countDown();
            }).start();
        }

            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //此处正常输出1000
            System.out.println(num);
    }
}
/**
 * 自定义锁可重入锁,但是只是简单测试,理解很简化很简化的可重入锁的实现机制
 * 多线程环境下,一堆问题,官方jdk实现,更多借助cas保证原子性,此demo只理解即可
 */
public class MyLock1 implements Lock {


    private  boolean isHoldLock = false;
    private  Thread holdLockThread = null;
    private  int  reentryCount = 0;

    /**
     * 用一个时刻能切仅能又一个线程获取到锁,其他线程,只能等待该线程释放锁之后才能获取到锁
     */
    @Override
    public synchronized void lock() {
        System.out.println(isHoldLock+"...."+(Thread.currentThread() != holdLockThread));
        if (isHoldLock && Thread.currentThread() != holdLockThread) {
            try {
                wait();
//                System.out.println(reentryCount+"....."+Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        holdLockThread = Thread.currentThread();
        reentryCount++;
        isHoldLock = true;
//        System.out.println(reentryCount+"----"+(isHoldLock && Thread.currentThread() != holdLockThread));
    }

    @Override
    public synchronized void unlock() {
        System.out.println(Thread.currentThread() == holdLockThread);
        if (Thread.currentThread() == holdLockThread) {
            reentryCount--;
            if (reentryCount == 0) {
                notify();//随机唤醒
                isHoldLock = false;
            }
        }
//        System.out.println(reentryCount+"----释放");
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }


    @Override
    public Condition newCondition() {
        return null;
    }
}
public class ReentryDemo {
    public Lock lock=new MyLock1();
    public void methodA() {
        lock.lock();
        System.out.println("A");
        methodB();
        lock.unlock();
    }

    public void methodB() {
        lock.lock();
        System.out.println("B");
        lock.unlock();
    }
        //此时是单线程测试可重入锁
    public static void main(String[] args) {
        ReentryDemo reentryDemo = new ReentryDemo();
        reentryDemo.methodA();
    }

}

AbstractQueuedSynchronizer浅析

AbstractQueuedSynchronizer -- 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。​ 此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。​ 子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。​ 假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。子类可以维护其他状态字段,但只是为了获得同步而只追踪使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。​ 应该将子类定义为非公共内部帮助器类,可用它们来实现其封闭类的同步属性。类 AbstractQueuedSynchronizer 没有实现任何同步接口。而是定义了诸如 acquireInterruptibly(int) 之类的一些方法,在适当的时候可以通过具体的锁和相关同步器来调用它们,以实现其公共方法。

此类支持默认的独占 模式和共享 模式之一,或者二者都支持。处于独占模式下时,其他线程试图获取该锁将无法取得成功。在共享模式下,多个线程获取某个锁可能(但不是一定)会获得成功。此类并不“了解”这些不同,除了机械地意识到当在共享模式下成功获取某一锁时,下一个等待线程(如果存在)也必须确定自己是否可以成功获取该锁。处于不同模式下的等待线程可以共享相同的 FIFO 队列。通常,实现子类只支持其中一种模式,但两种模式都可以在(例如)ReadWriteLock 中发挥作用。只支持独占模式或者只支持共享模式的子类不必定义支持未使用模式的方法。

此类通过支持独占模式的子类定义了一个嵌套的 AbstractQueuedSynchronizer.ConditionObject 类,可以将这个类用作 Condition 实现。isHeldExclusively() 方法将报告同步对于当前线程是否是独占的;使用当前 getState() 值调用 release(int) 方法则可以完全释放此对象;如果给定保存的状态值,那么 acquire(int) 方法可以将此对象最终恢复为它以前获取的状态。没有别的 AbstractQueuedSynchronizer 方法创建这样的条件,因此,如果无法满足此约束,则不要使用它。AbstractQueuedSynchronizer.ConditionObject 的行为当然取决于其同步器实现的语义。

此类为内部队列提供了检查、检测和监视方法,还为 condition 对象提供了类似方法。可以根据需要使用用于其同步机制的 AbstractQueuedSynchronizer 将这些方法导出到类中。

此类的序列化只存储维护状态的基础原子整数,因此已序列化的对象拥有空的线程队列。需要可序列化的典型子类将定义一个 readObject 方法,该方法在反序列化时将此对象恢复到某个已知初始状态。​

  tryAcquire(int)
        tryRelease(int)
        tryAcquireShared(int)
        tryReleaseShared(int)
        isHeldExclusively()
            Acquire:
             while (!tryAcquire(arg)) {
                    enqueue thread if it is not already queued;
                    possibly block current thread;
                 }
​
            Release:
                   if ((arg))
                        unblock the first queued thread;

读写锁特性及ReentrantReadWriteLock的使用

/**
 * 锁降级Demo
 */
public class LockDegradeDemo {

    private int i = 0;

    private ReadWriteLock readWriteLock =  new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    Lock writeLock = readWriteLock.writeLock();

    public void doSomething() {
        writeLock.lock();
        try {
            i++;
            //获取读锁必须在这里,如果放到读锁解锁后在获取,其实就相当于出让了cpu,结果这样一读一写的情况下,输出一般都是错误的
            readLock.lock();
        }finally {
            writeLock.unlock();
        }

        try {
            //模拟其他复杂的操作
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            if (i == 1) {
                System.out.println("i的值是======》1");
            } else {
                System.out.println("i的值是"+i);
            }
        } finally {
            readLock.unlock();
        }

    }

    public static void main(String[] args) {
        LockDegradeDemo lockDegradeDemo = new LockDegradeDemo();
        for (int i = 0; i < 4; i++) {
            new Thread(()->{
                lockDegradeDemo.doSomething();
            }).start();
        }
    }
}

源码探秘之AQS如何用单一int值表示读写两种状态

int 是32位,将其拆分成两个无符号short
    高位表示读锁          低位表示写锁
    0000000000000000   0000000000000000
​
两种锁的最大次数均为65535也即是2的16次方减去1
    读锁: 每次都从当前的状态加上65536
    0000000000000000   0000000000000000
    0000000000000001   0000000000000000
    -----------------------------------
    0000000000000001   0000000000000000
    0000000000000001   0000000000000000
    -----------------------------------
    0000000000000010   0000000000000000
​
    获取读锁个数,将state整个无符号右移16位就可得出读锁的个数
                       0000000000000001 
​
    写锁:每次都直接加1
    0000000000000000   0000000000000000
    0000000000000000   0000000000000001
    -----------------------------------
    0000000000000000   0000000000000001
​
    获取写锁的个数
    0000000000000000   0000000000000001
    0000000000000000   1111111111111111   
    ----------------------------------- 
    0000000000000000   0000000000000001

其实这种读写合一的锁,是通过高低位来分区读写锁的

锁降级详解

注意点:锁降级之后,写锁并不会直接降级成读锁,不会随着读锁的释放而释放,因此需要显式地释放写锁

在ReentrantReadWriteLock里面,不存在锁升级这一说法,因为锁升级存在,则多个读锁升级写锁,并发出问题

用于对数据比较敏感,需要在对数据修改之后,获取到修改后的值,并进行接下来的其他操作

所谓锁降级:其实就是没有释放锁,在写锁释放前需要先获取读锁,这样才是锁降级,如果写锁释放之后获取读锁,则不是锁降级;降级其实可以简单的理解为,readLock.lock();把读锁+1,在下面writeLock.unlock();时候,根据线程标记和读锁个数对应关系会把一些标记位改为当前线程是读锁的线程,从而实现下面代码紧接着可读,其实根本没有释放锁。而且,注意,锁有多种,而且锁很多在源码级别也只是要一些判断关系,甚至多种锁其实有些都是通过判断来区分最终效果的。

/**
 * 锁降级Demo
 */
public class LockDegradeDemo {

    private int i = 0;

    private ReadWriteLock readWriteLock =  new ReentrantReadWriteLock();
    Lock readLock = readWriteLock.readLock();
    Lock writeLock = readWriteLock.writeLock();

    public void doSomething() {
        writeLock.lock();
        try {
            i++;
            //获取读锁必须在这里,如果放到读锁解锁后在获取,其实就相当于出让了cpu,结果这样一读一写的情况下,输出一般都是错误的
            readLock.lock();
        }finally {
            writeLock.unlock();
        }

        try {
            //模拟其他复杂的操作
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            if (i == 1) {
                System.out.println("i的值是======》1");
            } else {
                System.out.println("i的值是"+i);
            }
        } finally {
            readLock.unlock();
        }

    }

    public static void main(String[] args) {
        LockDegradeDemo lockDegradeDemo = new LockDegradeDemo();
        for (int i = 0; i < 4; i++) {
            new Thread(()->{
                lockDegradeDemo.doSomething();
            }).start();
        }
    }
}
public class StampeLock {
    private double x,y;
    public  final StampedLock sl=new StampedLock();

    //排他锁,读锁
    void move(double deltaX,double deltaY) {
        long stamp=sl.writeLock();
        try {
            x+=deltaX;
            y+=deltaY;
        }finally {
            sl.unlockWrite(stamp);
        }
    }

    //乐观读锁
    double distanceFromOrigin() {
        //尝试获取乐观锁
        long stamp=sl.tryOptimisticRead();
        //将全部变量拷贝到方法体栈内
        double currentX=x,currentY=y;
        //检查在(1)获取到读锁票据后,锁有没有被其他写线程排他性抢占
        if (sl.validate(stamp)) {
            //如果被抢占则获取一个共享读锁(悲观获取)
            stamp=sl.readLock();
            try {
                currentX=x;
                currentY=y;
            }finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX*currentX+currentY*currentY);
    }

    //使用悲观锁获取读数据,并尝试转换为写锁
    void moveIfAtOrigin(double newX,double newY) {
        //这里可以使用乐观读锁替换
        long stamp=sl.readLock();
        try {
            //如果当前点在原点移动
            while (x==0.0&&y==0.0) {
                //尝试将获取的读锁升级为写锁
                long ws=sl.tryConvertToWriteLock(stamp);
                //升级成功,则更新票据,并设置坐标值,然后退出循环
                if (ws != 0L) {
                    stamp=ws;
                    x=newX;
                    y=newY;
                    break;
                }else{
                    //读锁升级写锁失败则释放读锁,显示获取独占写锁,然后循环重试
                    sl.unlockRead(stamp);
                    stamp=sl.writeLock();
                }
            }
        }finally {
            //释放锁
            sl.unlock(stamp);
        }
    }

    public static void main(String[] args) {

    }
}

一般应用,都是读多写少,ReentrantReadWriteLock 因读写互斥,故读时阻塞写,因而性能上上不去。可能会使写线程饥饿

所有获取锁的方法,都返回一个邮戳(Stamp),Stamp为0表示获取失败,其余都表示成功;​ 所有释放锁的方法,都需要一个邮戳(Stamp),这个Stamp必须是和成功获取锁时得到的Stamp一致;​ StampedLock是不可重入的;(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)​ 支持锁升级跟锁降级​ 可以乐观读也可以悲观读​ 使用有限次自旋,增加锁获得的几率,避免上下文切换带来的开销​ 乐观读不阻塞写操作,悲观读,阻塞写得操作

相比于ReentrantReadWriteLock,吞吐量大幅提升

api相对复杂,容易用错​ 内部实现相比于ReentrantReadWriteLock复杂得多

每次获取锁的时候,都会返回一个邮戳(stamp),相当于mysql里的version字段​ 释放锁的时候,再根据之前的获得的邮戳,去进行锁释放

如果使用乐观读,一定要判断返回的邮戳是否是一开始获得到的,如果不是,要去获取悲观读锁,再次去读取

线程间通信

wait、notify、notifyAll

nofity随机唤醒一个等待的线程​,notifyAll唤醒所有在该对象上等待的线程,然后多线程竞争,但是实际上不论早晚notifyAll唤醒的线程最终都会执行,而不是像notify最终只有一个执行

等待通知经典模型之生产者消费者

public class Consumer implements Runnable{

    private Medium medium;

    public Consumer(Medium medium) {
        this.medium = medium;
    }

    @Override
    public void run() {
        while (true) {
            medium.take();
        }
    }
}

public class Medium {

    private int num = 0;
    private static final int TOTAL = 20;

    /**
     * 接收生产数据
     */
    public synchronized void put() {
        //判断当前的库存,是否已经是最大的库存容量
        if (num < TOTAL) {
            //如果不是,生产完成之后,通知消费者进行消费
            System.out.println("新增库存-------->当前库存" + ++num);
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            notifyAll();
        } else {
            //如果是,则通知生产者进行等待
            try {
                System.out.println("新增库存---------> 库存已满"+num);
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 获取消费数据
     */
    public synchronized void take() {
        //判断当前库存是否不足
        if (num > 0) {
            //如果充足,在消费完成之后通知生产者进行生产
            System.out.println("消费库存-----------> 当前库存容量" + --num);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            notifyAll();
        } else {
            //如果不足,通知消费者暂停消费
            System.out.println("消费库存-----------> 库存不足"+num);
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
public class Producer implements Runnable {

    private Medium medium;

    public Producer(Medium medium) {
        this.medium = medium;
    }

    @Override
    public void run() {
        while (true) {
            medium.put();
        }
    }
}
public class Main {
    public static void main(String[] args) {
        Medium medium = new Medium();
        new Thread(new Consumer(medium)).start();
        new Thread(new Consumer(medium)).start();
        new Thread(new Consumer(medium)).start();

        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
    }

}

使用管道流进行通信

public class Reader implements Runnable{

    private PipedInputStream pipedInputStream;

    public Reader(PipedInputStream pipedInputStream) {
        this.pipedInputStream = pipedInputStream;
    }

    @Override
    public void run() {
        if (pipedInputStream != null) {
            String collect = new BufferedReader(new InputStreamReader(pipedInputStream)).lines().collect(Collectors.joining("\n"));
            System.out.println(Thread.currentThread().getName() +collect);
        }
        try {
            pipedInputStream.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class Main {

    public static void main(String[] args) throws IOException {
        PipedInputStream pipedInputStream = new PipedInputStream();
        PipedOutputStream pipedOutputStream = new PipedOutputStream();

        pipedOutputStream.connect(pipedInputStream);

        new Thread(new Reader(pipedInputStream)).start();
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            pipedOutputStream.write(bufferedReader.readLine().getBytes());
        } finally {
            pipedOutputStream.close();
            if (bufferedReader != null) {
                bufferedReader.close();
            }
        }

    }
}

Thread.join通信

public class Demo2 {

    static int x = 0, y = 0, a = 0, b = 0;

    public static void main(String[] args) throws InterruptedException {

        int i = 0;
        boolean flag = true;

        while (flag) {
            i++;
            Thread thread = new Thread(() -> {
                a = 1;
                x = b;
            });

            Thread thread1 = new Thread(() -> {
                b = 1;
                y = a;
            });
            thread.start();
            thread1.start();
            thread.join();
            thread1.join();
            //上面join是把子线程调度添加到主线程,执行完毕后才会执行下面的输出
            System.out.println("第" + i + "次" + "x=======>" + x + "    y=========>" + y);
            if (x == 0 && y == 0) {
                flag = false;
            } else {
                x = 0;
                y = 0;
                a = 0;
                b = 0;
            }
        }
    }
}

ThreadLocal的使用

/**
 * ThreadLocalDemo
 */
public class ThreadLocalDemo {
    ThreadLocal<Integer> num = ThreadLocal.withInitial(() -> 0);

    /**
     * 自增并输出num的值
     */
    public void inCreate() {
        Integer myNum = num.get();
        myNum++;
        System.out.println(Thread.currentThread().getName() + "----------->" + myNum);
        num.set(myNum);
    }

    public static void main(String[] args) {
        ThreadLocalDemo threadLocalDemo = new ThreadLocalDemo();
        for (int i = 1; i < 3; i++) {
            int finalI = i;
            new Thread(() -> {
                while (true) {
                    threadLocalDemo.inCreate();
                    try {
                        Thread.sleep(finalI * 1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}

Condition的使用

例如下面的生产者消费者例子,避免每次唤醒都唤醒所有生产者和消费者,而是有指定的单独唤醒特定方

/**
 * 消费者
 */
public class Consumer implements Runnable{

    private Medium medium;

    public Consumer( Medium medium) {
        this.medium = medium;
    }

    @Override
    public void run() {
        while (true) {
            medium.take();
        }
    }
}
/**
 * 中间商
 */
public class Medium {

    private int num = 0;
    private static final int TOTAL = 20;

    private Lock lock = new ReentrantLock();
    //没有特定指向,应该是根据线程在代码内部做的到底是哪个线程,线程执行不能类,自然关联具体的生产者还是消费者
    private Condition consumerCondition = lock.newCondition();
    private Condition producerCondition = lock.newCondition();

    /**
     * 接收生产数据
     */
    public void put() {
        lock.lock();
        try {
            //判断当前库存,是否已经是最大的库存容量,
            if (num < TOTAL) {
                System.out.println("新增库存---------> 当前库存:" + ++num);
                // 如果不是,生产完成之后,通知消费者进行消费
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                consumerCondition.signalAll();
            } else {
                // 如果是,则通知生产者进行等待,
                try {
                    System.out.println("新增库存---------> 库存已满:" + num);
                    producerCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 获取消费数据
     */
    public void take() {
        lock.lock();
        try {
            //判断当前库存是否不足
            if (num > 0) {
                //如果充足,在消费完成之后,通知生产者进行生产
                System.out.println("消费库存------> 当前库存容量" + --num);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                producerCondition.signalAll();
            } else {
                //如果不足,通知消费者暂停消费
                try {
                    System.out.println("消费库存---------> 库存不足:" + num);
                    consumerCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            lock.unlock();
        }
    }
}
/**
 * 生产者
 */
public class Producer implements Runnable{

    private Medium medium;

    public Producer(Medium medium) {
        this.medium = medium;
    }

    @Override
    public void run() {
        while (true) {
            medium.put();
        }
    }
}
public class Main {

    public static void main(String[] args) {
        Medium medium = new Medium();
        new Thread(new Consumer(medium)).start();
        new Thread(new Consumer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
        new Thread(new Producer(medium)).start();
    }
}

原子类

什么是原子类

对多线程访问同一个变量,我们需要加锁,而锁是比较消耗性能的,JDk1.5之后,​ 新增的原子操作类提供了一种用法简单、性能高效、线程安全地更新一个变量的方式,​ 这些类同样位于JUC包下的atomic包下,发展到JDk1.8,该包下共有17个类,​ 囊括了原子更新基本类型、原子更新数组、原子更新属性、原子更新引用

DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder、Striped64

原子更新基本类型

AtomicBoolean、AtomicInteger、AtomicLong、DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder

AtomicBoolean、AtomicInteger、AtomicLong 元老级的原子更新,方法几乎一模一样​ DoubleAdder、LongAdder 对Double、Long的原子更新性能进行优化提升​ DoubleAccumulator、LongAccumulator 支持自定义运算

原子更新数组类型

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

原子地更新属性

原子地更新某个类里的某个字段时,就需要使用原子更新字段类,Atomic包提供了以下4个类进行原子字段更新​ AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicStampedReference、AtomicReferenceFieldUpdater

 使用上述类的时候,必须遵循以下原则
    字段必须是volatile类型的,在线程之间共享变量时保证立即可见
    字段的描述类型是与调用者与操作对象字段的关系一致。
    也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。
    对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
    只能是实例变量,不能是类变量,也就是说不能加static关键字。
    只能是可修改变量,不能使final变量,因为final的语义就是不可修改。
    对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装类型(Integer/Long)。
    如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。

原子更新引用

public class Demo1 {

    private static AtomicInteger sum = new AtomicInteger(0);

    public static void inCreate() {
        sum.incrementAndGet();
    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                for (int j = 0; j < 100; j++) {
                    inCreate();
                    System.out.println(sum);
                }
            }).start();

        }
    }
}

public class Demo2 {

    public static void main(String[] args) {
        //输入一个数字,如果比上一个输入的大,则直接返回,如果小,则返回上一个
        LongAccumulator longAccumulator = new LongAccumulator((left, right) ->
                left * right, 0L
        );

        longAccumulator.accumulate(3L);
        System.out.println(longAccumulator.get());
        longAccumulator.accumulate(5L);
        System.out.println(longAccumulator.get());
    }
}
public class AtomicIntegerArrayDemo {

    public static void main(String[] args) {
        int[] arr = new int[]{3, 2};
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(arr);
        System.out.println(atomicIntegerArray.addAndGet(1, 8));

        int i = atomicIntegerArray.accumulateAndGet(0, 2, (left, right) ->
                left * right / 3
        );
        System.out.println(i);
    }
}
/**
 * AtomicLongFieldUpdaterDemo
 */
public class AtomicLongFieldUpdaterDemo {

    public static void main(String[] args) {
        AtomicLongFieldUpdater<Student> longFieldUpdater = AtomicLongFieldUpdater.newUpdater(Student.class, "id");

        Student xdclass = new Student(1L, "xdclass");
        longFieldUpdater.compareAndSet(xdclass, 1L, 100L);
        System.out.println("id="+xdclass.getId());

        AtomicReferenceFieldUpdater<Student, String> referenceFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
        referenceFieldUpdater.compareAndSet(xdclass, "xdclass", "wiggin");
        System.out.println("name="+xdclass.getName());
    }
}

class Student{
    volatile long id;
    volatile String name;

    public Student(Long id, String name) {
        this.id = id;
        this.name = name;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
/**
 * AtomicReferenceDemo
 */
public class AtomicReferenceDemo {

    public static void main(String[] args) {
        AtomicReference<Student> studentAtomicReference = new AtomicReference<>();
        Student student = new Student(1L, "xdclass");
        Student student1 = new Student(2L, "wiggin");
        studentAtomicReference.set(student);
        studentAtomicReference.compareAndSet(student, student1);
        Student student2 = studentAtomicReference.get();
        System.out.println(student2.getName());
    }
}

class Student{
    private long id;
    private String name;

    public Student(long id, String name) {
        this.id = id;
        this.name = name;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

CAS

什么是CAS?

在计算机科学中,比较和交换(Conmpare And Swap)是用于实现多线程同步的原子指令。 它将内存位置的内容与给定值进行比较,只有在相同的情况下,将该内存位置的内容修改为新的给定值。 这是作为单个原子操作完成的。 原子性保证新值基于最新信息计算; 如果该值在同一时间被另一个线程更新,则写入将失败。 操作结果必须说明是否进行替换; 这可以通过一个简单的布尔响应(这个变体通常称为比较和设置),或通过返回从内存位置读取的值来完成(摘自维基本科)

JAVA1.5开始引入了CAS,主要代码都放在JUC的atomic包下,如下图:


5.png

JAVA中如何实现CAS操作

以比较简单的AtomicInteger为例,我们看一下都有哪些方法


6.png

从图中可以看出JAVA中的CAS操作都是通过sun包下Unsafe类实现,而Unsafe类中的方法都是native方法,由JVM本地实现,笔者为了弄清楚真正的实现原理,查看了openJDK7的源码,下面就稍作分析:


7.png

Unsafe中对CAS的实现是C++写的,从上图可以看出最后调用的是Atomic:comxchg这个方法,这个方法的实现放在hotspot下的os_cpu包中,说明这个方法的实现和操作系统、CPU都有关系,我们以linux的X86处理器的实现为例来进行分析


8.png

Linux的X86下主要是通过cmpxchgl这个指令在CPU级完成CAS操作的,但在多处理器情况下必须使用lock指令加锁来完成。从这个例子就可以比较清晰的了解CAS的底层实现了,当然不同的操作系统和处理器的实现会有所不同

CAS在JUC中的运用

看一下JUC中非常重要的一个类AbstractQueuedSynchronizer,作为JAVA中多种锁实现的父类,其中有很多地方使用到了CAS操作以提升并发的效率


9.png

上图为同步队列的入队操作,也是一种乐观锁的实现,多线程情况下,操作头节点和尾节点都有可能失败,失败后会再次尝试,直到成功。

ABA问题

CAS可以有效的提升并发的效率,但同时也会引入ABA问题。

如线程1从内存X中取出A,这时候另一个线程2也从内存X中取出A,并且线程2进行了一些操作将内存X中的值变成了B,然后线程2又将内存X中的数据变成A,这时候线程1进行CAS操作发现内存X中仍然是A,然后线程1操作成功。虽然线程1的CAS操作成功,但是整个过程就是有问题的。比如链表的头在变化了两次后恢复了原值,但是不代表链表就没有变化。

所以JAVA中提供了AtomicStampedReference/AtomicMarkableReference来处理会发生ABA问题的场景,主要是在对象中额外再增加一个标记来标识对象是否有过变更。

容器

同步容器

同步容器

同步容器类的缺点

public class VectorDemo {

    public static void main(String[] args) {
        Vector<String> stringVector = new Vector<>();
        for (int i = 0; i < 1000; i++) {
            stringVector.add("demo" + i);
        }

        //错误遍历:同步容器不允许在遍历的同时对容器进行其他操作,例如增删等,报错ConcurrentModificationException
//        stringVector.forEach(e->{
//            if (e.equals("demo3")) {
//                stringVector.remove(e);
//            }
//            System.out.println(e);
//        });

        //单线程正确迭代
//        Iterator<String> iterator = stringVector.iterator();
//        while (iterator.hasNext()) {
//            String next = iterator.next();
//            if (next.equals("demo2")) {
//                iterator.remove();
//            }
//        }
        //多线程:多运行几次NoSuchElementException
        //因为在例如俩线程都进入next.equals("demo2"),但是一个先删除,后者就报错了
//        Iterator<String> iterator = stringVector.iterator();
//        for (int i = 0; i < 4; i++) {
//            new Thread(() -> {
//                while (iterator.hasNext()) {
//                    String next = iterator.next();
//                    if (next.equals("demo2")) {
//                        iterator.remove();
//                    }
//                }
//            }).start();
//        }

        //多线程正确迭代
        Iterator<String> iterator = stringVector.iterator();
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                synchronized (iterator) {
                    while (iterator.hasNext()) {
                        String next = iterator.next();
                        if (next.equals("demo2")) {
                            iterator.remove();
                        }
                    }
                }
            }).start();
        }
    }
}
 public class Demo {
    public static void main(String[] args) {
        ArrayList<String> strings = new ArrayList<>();
        List<String> stringList = Collections.synchronizedList(strings);
    }
}

并发容器

public class Demo {

    public static void main(String[] args) {
        CopyOnWriteArrayList<String> strings = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 1000; i++) {
            strings.add("demo" + i);
        }
        //正确
//        strings.forEach(e->{
//            if (e.equals("demo2")) {
//                strings.remove(e);
//            }
//        });


            //错误:UnsupportedOperationException,跟踪源码可知,CopyOnWriteArrayList直接remove抛出这个异常
//        Iterator<String> iterator = strings.iterator();
//        while (iterator.hasNext()) {
//            String next = iterator.next();
//            if (next.equals("demo2")) {
//                iterator.remove();
//            }
//        }
        
        //正确:证明多线程也可以
        for (int i = 0; i < 4; i++) {
            new Thread(() -> {
                strings.forEach(e -> {
                    if (e.equals("demo2")) {
                        strings.remove(e);
                    }
                });
            }).start();
        }
    }
}

LinkedBlockingQueue的使用及其源码探秘

在并发编程中,LinkedBlockingQueue使用的非常频繁。因其可以作为生产者消费者的中间商

public class Demo2 {
    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> strings = new LinkedBlockingQueue<>();
        //往队列里存元素
        strings.add("111");
        strings.offer("111");
        strings.put("111");

        //从队列中取元素
        String remove = strings.remove();
        strings.poll();
        strings.take();

    }
}

并发工具类

CountDownLatch

public class CountDownLatchDemo {

    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(8);
        new Thread(()->{
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("800米比赛结束,准备清空跑道并继续跨栏比赛");
        }).start();

        for (int i = 0; i < 8; i++) {
            int finalI = i;
            new Thread(()->{
                try {
                    Thread.sleep(finalI * 1000L);
                    System.out.println(Thread.currentThread().getName()+"到达终点");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }
    }
}

CyclicBarrier--栅栏

允许一组线程相互等待达到一个公共的障碍点,之后再继续执行

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(8);

        for (int i = 0; i < 8; i++) {
            int finalI = i;
            new Thread(() -> {
                try {
                    Thread.sleep(finalI * 1000L);
                    System.out.println(Thread.currentThread().getName() + "准备就绪");
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }

                System.out.println("开始比赛");
            }).start();
        }
    }
}

Semaphore--信号量

 public class SemaphoreDemo {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);

        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "开始执行");
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            }).start();
        }
    }
    
}

Exchanger

public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger<String> stringExchanger = new Exchanger<>();

        String str1 = "xdclass";
        String str2 = "wiggin";

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "初始值==========>" + str1);
            try {
                String exchange = stringExchanger.exchange(str1);
                System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程1").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "初始值==========>" + str2);
            try {
                String exchange = stringExchanger.exchange(str2);
                System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程2").start();

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "初始值==========>" + str2);
            try {
                String exchange = stringExchanger.exchange(str2);
                System.out.println(Thread.currentThread().getName() + "交換后的数据==========>" + exchange);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程3").start();
    }
}

线程池及Executor框架

为什么要使用线程池?

诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方式可能是通过网络协议(例如 HTTP、FTP )、通过 JMS队列或者可能通过轮询数据库。不管请求如何到达,服务器应用程序中经常出现的情况是:单个任务处理的时间很短而请求的数目却是巨大的。每当一个请求到达就创建一个新线程,然后在新线程中为请求服务,但是频繁的创建线程,销毁线程所带来的系统开销其实是非常大的。

线程池为线程生命周期开销问题和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。其好处是,因为在请求到达时线程已经存在,所以无意中也消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。而且,通过适当地调整线程池中的线程数目,也就是当请求的数目超过某个阈值时,就强制其它任何新到的请求一直等待,直到获得一个线程来处理为止,从而可以防止资源不足。

风险与机遇:
用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。

创建线程池及其使用

/**
 * 线程池Demo
 */
public class ThreadPoolDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        LinkedBlockingQueue<Runnable> objects = new LinkedBlockingQueue<>(20);

//        for (int i = 0; i <100 ; i++) {
//            objects.put(()->{
//                System.out.println(Thread.currentThread().getName());
//            });
//        }


        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,20,3L,TimeUnit.SECONDS,objects,new CustomPolicy());
        threadPoolExecutor.prestartAllCoreThreads();
//        Future<String> submit = null;
        for (int i = 0; i < 50; i++) {
            threadPoolExecutor.submit(()->{
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(threadPoolExecutor.getActiveCount());
            });
        }
//
//        for (int i = 0; i < 100; i++) {
//            System.out.println(submit.get());
//        }
    }
}

Future与Callable、FutureTask

public class CallableDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(3000L);
        return "1111";
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CallableDemo callableDemo = new CallableDemo();
        FutureTask<String> stringFutureTask = new FutureTask<>(callableDemo);
        new Thread(stringFutureTask).start();
        System.out.println(stringFutureTask.get());//输出1111
    }
}

线程池的核心组成部分及其运行机制

运行机制

通过new创建线程池时,除非调用prestartAllCoreThreads方法初始化核心线程,否则此时线程池中有0个线程,即使工作队列中存在多个任务,同样不会执行

例如:任务数X

x <= cSize 只启动x个线程

x >= cSize && x < nWorks + cSize 会启动 <= cSize 个线程 其他的任务就放到工作队列里

x > cSize && x > nWorks + cSize

x-(nWorks) <= mSize 会启动x-(nWorks)个线程

x-(nWorks) > mSize 会启动mSize个线程来执行任务,其余的执行相应的拒绝策略

线程池拒绝策略

Executor框架

通过相应的方法,能创建出6种线程池

线程池的使用建议

newFixedThreadPool newSingleThreadExecutor 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。​ newCachedThreadPool newScheduledThreadPool​ 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

/**
 * 模拟OOM
 */
public class OOMDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        while (true) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
public class OOMDemo2 {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        while (true) {
            executorService.submit(() -> {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

JVM与并发

jvm内存模型

处理器--》高速缓存--》缓存一致性协议--》主存

线程《--》工作内存《--》save和load 《---》主存

java内存模型.png

上面8中操作必须满足以下规则

先行发生原则 happens-before

public class Demo {
    private volatile int  value = 0;

    //b后调用
    public synchronized int getValue() {
        return value;
    }

    //a先调用
    public synchronized void setValue(int value) {
        this.value = value;
    }

    public static void main(String[] args) {
        //例如此时,j的赋值可能就时间上先于i赋值,因为不在上面几条中
        //针对:程序次序规则只是针对先行发生,但是先行发生和时间上的前后没有必然关系
        int i = 0;
        int j = 1;
    }
}

指令重排序

重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。

编译器和处理器在重排序时,会遵守数据依赖性,编译器和处理器不会改变存在数据依赖关系的两个操作的执行顺序。(仅针对单个处理器中执行的指令序列和单个线程中执行的操作,不同处理器之间和不同线程之间的数据依赖性不被编译器和处理器考虑。)

不管怎么重排序(编译器和处理器为了提高并行度),(单线程)程序的执行结果不能被改变。

x=0,y=1​ x=1, y=0​ x=1, y=1​ x=0, y=0

/**
 * 指令重排序 demo
 */
public class Demo2 {

    static int x = 0, y = 0, a = 0, b = 0;

    public static void main(String[] args) throws InterruptedException {

        int i = 0;
        boolean flag = true;

        while (flag) {
            i++;
            Thread thread = new Thread(() -> {
                a = 1;
                x = b;
            });

            Thread thread1 = new Thread(() -> {
                b = 1;
                y = a;
            });

            thread.start();
            thread1.start();
            thread.join();
            thread1.join();

            System.out.println("第" + i + "次" + "x=======>" + x + "    y=========>" + y);

            if (x == 0 && y == 0) {
                flag = false;
            } else {
                x = 0;
                y = 0;
                a = 0;
                b = 0;
            }
        }
    }
}
上一篇下一篇

猜你喜欢

热点阅读