JUC线程高级教程

2018-08-24  本文已影响0人  文思li

                        JUC

                                原创者:文思,感谢尚硅谷,资料来源于尚硅谷

目录:

1、volatile关键字与内存可见性

2、原子变量与CAS算法

3、同步容器类

4、闭锁操作

5、Callable接口(常用)

6、Lock同步锁

7、读写锁

8、线程八锁

9、线程池

10、线程调度

11、ForkJoinPool分支/合并框架 工作窃取

一、JUC

1、volatile关键字与内存可见性

    Java5.0提供了java.util.concurrent(简称JUC )包,在此包中增加了在并发编程中很常用的实用工具类,用于定义类似于线程的自定义子系统,包括线程池、异步IO 和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的Collection实现等。

内存可见性(Memory Visibility)是指当某个线程正在使用对象状态而另一个线程在同时修改该状态,需要确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。

   可见性错误是指当读操作与写操作在不同的线程中执行时,我们无法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事情。

如下代码示例:

public classTestVolatile {

    public static void main(String[] args){

       Testt = new Test();

       new Thread(t).start();

       while(true){

           if(t.getFlag()){

              System.out.println("-----enter main while if----");

              break;

           }

       }

    }

}

class Test implementsRunnable{

    private boolean flag = false;

    @Override

    public void run() {

       try{

           Thread.sleep(1000);

       }catch(Exception e){ }

       flag = true;

       System.out.println("------this is test child thread-----");

    }

    public boolean getFlag(){

       return this.flag;

    }

}

运行结果:

----------this is test child thread-----------

主线程main的------enter main while if-------一直没进入执行。这就是发生了内存可见性错误,上述程序执行流程:

由于布尔变量flag偏底层,所以jvm执行布尔运算时非常快,所以在test子线程内复制修改flag之后,还没有来得及写入main主线程,main主线程就进入到while中,导致如上效果。

如果用同步synchronized锁则往往是噩梦的开始:

while(true){

           synchronized (t) {

              if(t.getFlag()){

                  System.out.println("-----enter main while if-----");

                  break;

              }

           }         

       }

这样会导致极低的效率。同步锁具有互斥性,如果多个县城访问这块,如果一个线程已持有这个锁,另一个线程发现后就阻塞在这里等待锁的释放,然后就等待另一个线程调用完释放锁,然后cpu再调度。所以不要轻易甚至不要使用同步synchronized锁。这就需要volatile关键字:private volatile boolean flag = false。

/*

 *一、volatile关键字当多个线程进行操作共享数据时,可以保证内存中的数据可见。

 *      相较于 synchronized 是一种较为轻量级的同步策略。

 *注意:

 * 1. volatile不具备“互斥性”

 * 2. volatile不能保证变量的“原子性”

 */

public classTestVolatile {

    public static void main(String[] args){

       Testt = new Test();

       new Thread(t).start();     

       while(true){

           if(t.getFlag()){

              System.out.println("------enter main while if------");

              break;

           }         

       }

    }

}

class Test implementsRunnable{

    private volatile boolean flag = false;

    @Override

    public void run() {

       try{

           Thread.sleep(1000);

       }catch(Exception e){    

       }

       flag = true;

       System.out.println("-------this is test child thread-----");

    }

    public boolean getFlag(){

       return this.flag;

    }

}

运行结果:

-----------enter main while if---------

----------this is test child thread-----------

程序执行流程:

子线程不会同步变量到自己的线程缓存中,直接修改主内存中的变量。

Java提供了一种稍弱的同步机制,即volatile 变量,用来确保将变量的更新操作通知到其他线程。可以将volatile 看做一个轻量级的锁,但是又与锁有些不同:

对于多线程,不是一种互斥关系

不能保证变量状态的“原子性”操作

2、原子变量与CAS算法

volatile只能解决变量同步问题,但解决不了并发中的原子性问题(原子性问题不再演示)。i++ 的操作实际上分为三个步骤“读-改-写”

int i = 10; i = i++; 相当于:int temp = i; i = i + 1; i = temp;可看出仅仅依靠volatile解决不了原子性问题。

Jdk提供了原子变量来解决此问题:

Java.util.concurrent.atomic类的小工具包,支持在单个变量上解除锁的线程安全编程。事实上,此包中的类可将volatile 值、字段和数组元素的概念扩展到那些也提供原子条件更新操作的类.

类AtomicBoolean、AtomicInteger、AtomicLong 和AtomicReference 的实例各自提供对相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。AtomicIntegerArray、AtomicLongArray 和AtomicReferenceArray 类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供volatile 访问语义方面也引人注目,这对于普通数组来说是不受支持的。

核心方法:booleancompareAndSet(expectedValue, updateValue)

java.util.concurrent.atomic 包下提供了一些原子操作的常用类:

AtomicBoolean 、AtomicInteger 、AtomicLong 、AtomicReference

AtomicIntegerArray 、AtomicLongArray

AtomicMarkableReference

AtomicReferenceArray

AtomicStampedReference

CAS (Compare-And-Swap) 是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问

CAS 是一种无锁的非阻塞算法的实现

CAS 包含了3 个操作数:

1、需要读写的内存值V

2、进行比较的值A

3、拟写入的新值B

当且仅当V 的值等于A 时,CAS 通过原子方式用新值B 来更新V 的值,否则不会执行任何操作

示例:

/*

 *一、i++ 的原子性问题:i++ 的操作实际上分为三个步骤“读-改-写”

 *      inti = 10;

 *      i = i++; //10

 *      inttemp= i;

 *      i = i + 1;

 *      i =temp;

 *二、原子变量:在 java.util.concurrent.atomic 包下提供了一些原子变量。

 *    1.volatile保证内存可见性

 *    2.CAS(Compare-And-Swap) 算法保证数据变量的原子性

 *        CAS算法是硬件对于并发操作的支持

 *        CAS包含了三个操作数:

 *        ①内存值  V

 *        ②预估值  A

 *        ③更新值  B

 *        当且仅当 V == A 时, V = B; 否则,不会执行任何操作。

 */

public classTestAtomicDemo {

    public static void main(String[] args) {

       AtomicDemoa = new AtomicDemo();

       for(int i=0;i<10;i++){//启动10个子线程

           new Thread(a).start();

       }

    }

}

class AtomicDemo implements Runnable{

    private AtomicInteger number = new AtomicInteger(0);

    public void run() {

       try{

           Thread.sleep(2000);

       }catch(Exception e){    

       }

       System.out.println("number="+this.handlerNumber());

    }

    public int handlerNumber(){//子线程里加1

       return number.getAndIncrement();

    }

}

以上使用原子性变量解决了原子性问题。CAS算法是基于硬件底层算法实现的,也不是使用锁的互斥原理,不会释放cpu的控制权,所以效率比锁高。

3、同步容器类

HashMap与HashTable的区别之一是HashMap线程不安全,HashTable线程安全,所以HashTable的效率也低。

Java 5在java.util.concurrent 包中提供了多种并发容器类来改进同步容器的性能。

ConcurrentHashMap同步容器类是Java 5 增加的一个线程安全的哈希表。对与多线程的操作,介于HashMap 与Hashtable 之间。内部采用“锁分段”机制替代Hashtable 的独占锁,进而提高性能。

此包还提供了设计用于多线程上下文中的Collection 实现:ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和CopyOnWriteArraySet。当期望许多线程访问一个给定collection 时,ConcurrentHashMap 通常优于同步的HashMap,ConcurrentSkipListMap 通常优于同步的TreeMap。当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList 优于同步的ArrayList。

JDK1.8对ConcurrentSkipListMap又取消了分段锁,又采用回了CAS。

示例:

import java.util.Iterator;

import java.util.concurrent.CopyOnWriteArrayList;

/*

 *CopyOnWriteArrayList/CopyOnWriteArraySet :“写入并复制”

 *注意:添加操作多时,效率低,因为每次添加时都会进行复制,开销非常的大。并发迭代操作多时可以选择。

 */

public class TestCopyOnWriteArrayList {

       publicstatic void main(String[] args) {

              HelloThreadht = new HelloThread();

              for(int i = 0; i < 10; i++) {

                     newThread(ht).start();

              }

       }

}

class HelloThread implements Runnable{

       private staticCopyOnWriteArrayList list = new CopyOnWriteArrayList<>();

       static{

              list.add("AA");

              list.add("BB");

              list.add("CC");

       }

       @Override

       publicvoid run() {

              Iteratorit = list.iterator();

              while(it.hasNext()){

                     System.out.println(it.next());

                     list.add("AA");

              }

       }

}

4、闭锁操作

CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。

闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:

1、确保某个计算在其需要的所有资源都被初始化之后才继续执行;

2、确保某个服务在其依赖的所有其他服务都已经启动之后才启动;

3、等待直到某个操作所有参与者都准备就绪再继续执行

示例:

/*

 * CountDownLatch:闭锁,在完成某些运算是,只有其他所有线程的运算全部完成,当前运算才继续执行

 */

public classTestCountDownLatch {

    public static void main(String[] args) {

//new CountDownLatch(10)里面的参数是倒计时初始化个数,因为下面会创建10个线程,每启动一个线程就减1,当为0是就说明所有线程创建完毕

       final CountDownLatch latch = new CountDownLatch(10);

       LatchDemold = new LatchDemo(latch);

       long startTime= System.currentTimeMillis();

       for(int i=0;i<10;i++){

           new Thread(ld).start();

       }

       try {

           latch.await();

       }catch(InterruptedException e) {

       }

       long endTime= System.currentTimeMillis();

       System.out.println("----------"+(endTime-startTime));

    }

}

class LatchDemo implements Runnable{

    private CountDownLatch latch;

    publicLatchDemo(CountDownLatch latch){

       this.latch = latch;

    }

    @Override

    public void run() {

       for(int i=0;i<50000;i++){

           if(1%2==0){

              System.out.println(i);

           }

       }

       latch.countDown();

    }

}

一定要在await()之前必须调用countDown()进行递减操作。

5、Callable接口(常用)

以前都认为创建线程的方式是两种:继承Thread类和实现Runnable接口。现在可归结为四种,还有:实现Callable接口与

Java 5.在java.util.concurrent 提供了一个新的创建执行线程的方式:Callable 接口。

Callable 接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是Runnable 不会返回结果,并且无法抛出经过检查的异常

Callable 需要依赖FutureTask 。

根据以上分析FutureTask 也可以用作闭锁,实现闭锁的效果。

示例:

/*

 *一、创建执行线程的方式三:实现Callable 接口。 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。

 *二、执行Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。  FutureTask是  Future接口的实现类

 */

public classTestCallable {

    public static void main(String[] args) {

       int t = 0;

       ThreadDemotd = new ThreadDemo(t);

       FutureTaskresult= newFutureTask(td);

       long starttime= System.currentTimeMillis();

       new Thread(result).start();

       try {

           IntegerresultInt= result.get();

           long endtime= System.currentTimeMillis();

           System.out.println("---time---"+(endtime-starttime));

           System.out.println("---resultInt---"+resultInt);

       }catch(InterruptedException e) {

       }catch(ExecutionException e){

       }

    }

}

class ThreadDemo implements Callable{

    private Integer count;

    public ThreadDemo(Integer count){

       this.count = count;

    }

    @Override

    public Integer call() throws Exception {

       for(int i=0;i<1000000000;i++){

           count++;

       }

       return count;

    }

}

运行结果:

---time---3369

---resultInt---1000000000

6、Lock同步锁

在jdk1.5以前,解决线程安全只能用同步代码块和同步方法以及volatile 。

Jdk1.5后可以用同步锁Lock,这是一个显示锁,需要通过lock()方法上锁,通过unlock()方法释放锁。但并不是一种替代内置锁的方法,而是当内置锁不适用时,作为一种可选择的高级功能。

ReentrantLock 实现了Lock 接口,并提供了与synchronized 相同的互斥性和内存可见性。但相较于synchronized 提供了更高的处理锁的灵活性。

不加锁的同步问题:

加锁示例:

/*

 *一、用于解决多线程安全问题的方式:

 * synchronized:隐式锁

 * 1.同步代码块. 2. 同步方法

 *jdk1.5后:

 * 3.同步锁 Lock

 *注意:是一个显示锁,需要通过 lock() 方法上锁,必须通过 unlock() 方法进行释放锁

 */

public classTestLock {

    public static void main(String[] args){

       LockDemold = new LockDemo();

       new Thread(ld,"1号售票点,").start();

       new Thread(ld,"2号售票点,").start();

       new Thread(ld,"3号售票点,").start();

    }

}

class LockDemo implements Runnable{

    private volatile int count=10;

    private Lock lock = new ReentrantLock();

    @Override

    public void run() {

       while(count>0){

           lock.lock();

           try {

              Thread.sleep(100);

           }catch(InterruptedException e) {

           }finally{

              lock.unlock();

           }

           count--;

           System.out.println(Thread.currentThread().getName()+"剩余票数:"+count);

       }  

    }

}

7、读写锁

写写,读写 需要"互斥",读读 不需要"互斥",这样即避免了线程安全问题又提高了效率。

一个读写锁维护了一个读锁和一个写锁,其中读锁可以多个线程使用,写锁只能一个线程独占。

/**

 *写写,读写 需要"互斥"

 *读读 不需要"互斥"

 */

public classTestReadWriteLock {

    public static void main(String[] args){

       ReadWriteLockDemod = new ReadWriteLockDemo();

       new Thread(new Runnable(){

           @Override

           public void run() {

              d.write(1000000000);

           }

       },"write").start();

       for(int i=0;i<10;i++){

           new Thread(new Runnable(){

              @Override

              public void run() {

                  d.read();

              }

           },"read").start();

       }

    }

}

class ReadWriteLockDemo{

    private int number = 0;

    ReadWriteLocklock= newReentrantReadWriteLock();

    public void read(){

       lock.readLock().lock();;

       try{

        System.out.println(Thread.currentThread().getName()+":number="+this.number);

       }finally{

           lock.readLock().unlock();

       }

    }

    public void write(int number){

       lock.writeLock().lock();

       try{

           System.out.println(Thread.currentThread().getName());

           this.number = number;

       }finally{

           lock.writeLock().unlock();

       }     

    }

}

运行结果:

write

read:number=1000000000

read:number=1000000000

read:number=1000000000

read:number=1000000000

read:number=1000000000

read:number=1000000000

read:number=1000000000

read:number=1000000000

read:number=1000000000

read:number=1000000000

8、线程八锁

a:两个普通同步方法,两个线程,标准打印

public classTestThread8demo {

    public static void main(String[] args) {

       Numbern = new Number();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getOne();

           }

       }).start();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getTwo();

           }

       }).start();

    }

}

class Number{

    public synchronized void getOne(){

       System.out.println("One");

    }

    public synchronized void getTwo(){

       System.out.println("Two");

    }

}

运行结果:立刻打印出

One

Two

b:两个普通同步方法,其中一个线程休眠3秒后,两个线程再标准打印

public synchronizedvoidgetOne(){

       try {

           Thread.sleep(3000);

       }catch(InterruptedException e) {

       }

       System.out.println("One");

    }

    public synchronized void getTwo(){

       System.out.println("Two");

    }

运行结果:3秒后打印出

One

Two

推论:

一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些synchronized方法。

锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法。

c:新增一个普通方法(这样一个同步锁并休眠3秒,一个同步锁,一个普通方法)

public classTestThread8demo {

    public static void main(String[] args) {

       Numbern = new Number();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getOne();

           }

       }).start();

       new Thread(new Runnable(){

           @Override

           public void run() {

               n.getTwo();

           }

       }).start();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getThree();

           }

       }).start();

    }

}

class Number{

    public synchronized void getOne(){

       try {

           Thread.sleep(3000);

       }catch(InterruptedException e) {

       }

       System.out.println("One");

    }

    public synchronized void getTwo(){

       System.out.println("Two");

    }

    public void getThree(){

       System.out.println("Three");

    }

}

运行结果:

Three

3秒后……

One

Two

推论:普通方法与同步锁无关。

d:两个普通的同步方法,同时两个对象访问,打印

public classTestThread8demo {

    public static void main(String[] args) {

       Numbern = new Number();

       Numbern2 = new Number();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getOne();

           }

       }).start();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n2.getTwo();

           }

       }).start();

    }

}

class Number{

    public synchronized void getOne(){

       try {

           Thread.sleep(3000);

       }catch(InterruptedException e) {

       }

       System.out.println("One");

    }

    public synchronized void getTwo(){

       System.out.println("Two");

    }  

}

运行结果:

Two

3秒后……

One

推论:

换成两个对象后,就不是同一把锁了,彼此不受影响。更进一步印证了synchronized最小作用域的范围是对象。

也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的非静态同步方法因为跟该实例对象的非静态同步方法用的是不同的锁,所以毋须等待该实例对象已获取锁的非静态同步方法释放锁就可以获取他们自己的锁。

e:修改getOne为静态同步,然后一个对象访问打印

public classTestThread8demo {

    public static void main(String[] args) {

       Numbern = newNumber();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getOne();

           }

       }).start();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getTwo();

           }

       }).start();

    }

}

class Number{

    public static synchronized void getOne(){

       try {

           Thread.sleep(3000);

       }catch(InterruptedException e) {

       }

       System.out.println("One");

    }

    public synchronized void getTwo(){

       System.out.println("Two");

    }

}

运行结果:

Two

3秒后……

One

推论:

所有的静态同步方法用的也是同一把锁:类对象本身,和非静态对象锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。所以two先执行了,one休眠后继续执行。但是请看g

f:修改getOne和getTwo都为静态同步,然后一个对象访问打印

public classTestThread8demo {

    public static void main(String[] args) {

       Numbern = new Number();

       Numbern2= newNumber();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getOne();

           }

       }).start();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getTwo();

           }

       }).start();

    }

}

class Number{

    public static synchronized void getOne(){

       try {

           Thread.sleep(3000);

       }catch(InterruptedException e) {

       }

       System.out.println("One");

    }

    public static synchronized void getTwo(){

       System.out.println("Two");

    }

}

运行结果:

3秒后…

One

Two

推论:

进一步且只印证了所有的静态同步方法用的也是同一把锁:类对象本身,但是请看g

g:修改getOne和getTwo都为静态同步,然后两个对象访问打印

public classTestThread8demo {

    public static void main(String[] args) {

       Numbern = new Number();

       Numbern2 = newNumber();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getOne();

           }

       }).start();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n2.getTwo();

           }

       }).start();

    }

}

class Number{

    public static synchronized void getOne(){

       try {

           Thread.sleep(3000);

       }catch(InterruptedException e) {

       }

       System.out.println("One");

    }

    public static synchronized void getTwo(){

       System.out.println("Two");

    }

}

运行结果:

3秒后……

One

Two

推论:

所有的静态同步方法用的也是同一把锁:类对象本身,和非静态对象锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁。

h:只修改getOne为静态同步,然后两个对象访问打印

public classTestThread8demo {

    public static void main(String[] args) {

       Numbern = new Number();

       Numbern2 = new Number();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n.getOne();

           }

       }).start();

       new Thread(new Runnable(){

           @Override

           public void run() {

              n2.getTwo();

           }

       }).start();

    }

}

class Number{

    public static synchronized void getOne(){

       try {

           Thread.sleep(3000);

       }catch(InterruptedException e) {

       }

       System.out.println("One");

    }

    public synchronized void getTwo(){

       System.out.println("Two");

    }

}

运行结果:

Two

3秒后……

One

推论:进一步印证了静态同步方法的锁是类对象本身,非静态方法的锁是对象本身,两类锁不是同一把锁。

线程八锁关键点总结:

I:非静态方法的锁默认为  this,  静态方法的锁为 对应的 Class 实例。对象锁与类锁不互相干扰,不会有竞态条件。

Ii:某一个时刻内,只能有一个线程持有锁,无论几个方法。

9、线程池

第四种获取线程的方法:线程池。

提供一个线程队列,队列中保存着所有等待状态的线程,避免了创建与销毁的开销,提高了响应的速度。

线程池的体系结构:

 * java.util.concurrent.Executor :负责线程的使用与调度的根接口

 *        |--**ExecutorService子接口: 线程池的主要接口

 *               |--ThreadPoolExecutor线程池的实现类

 *               |--ScheduledExecutorService子接口:负责线程的调度

 *                            |--ScheduledThreadPoolExecutor:继承 ThreadPoolExecutor, 实现 ScheduledExecutorService

可以看到,虽然ExecutorService下有许多子接口和实现类,但是这里建议使用Executors来获取线程。

工具类: Executors

 * ExecutorServicenewFixedThreadPool() :创建固定大小的线程池

 * ExecutorServicenewCachedThreadPool() :缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。可自动进行线程回收。

 * ExecutorServicenewSingleThreadExecutor() :创建单个线程池。线程池中只有一个线程

 *ScheduledExecutorService newScheduledThreadPool() :创建固定大小的线程,可以延迟或定时的执行任务。

示例1:

public classTestThreadPool {

    public static void main(String[] args) {

       //1创建有5个线程的线程池

       ExecutorServiceexecutorService=  Executors.newFixedThreadPool(5);

       //执行10次线程,为线程分配任务

       for (int i = 0; i < 10; i++) {

           executorService.submit(new Runnable(){

              @Override

              public void run() {

                  int sum = 0;

                  for (int i = 0; i < 100; i++) {

                     sum ++;

                  }  

        System.out.println(Thread.currentThread().getName()+"="+sum);

              }

           });

       }

       //关闭线程,executorService.shutdownNow()是立即关闭

       executorService.shutdown();

    }

}

运行结果:

pool-1-thread-1:1

pool-1-thread-1:3

pool-1-thread-1:4

pool-1-thread-2:0

pool-1-thread-3:7

pool-1-thread-3:9

pool-1-thread-5:94

pool-1-thread-2:98

pool-1-thread-2:99

可以看到始终thread1-5,都用的线程池里的5个线程。

示例2:使用带返回值的Callable接口

public classTestThreadPool {

    public static void main(String[] args) throwsInterruptedException,ExecutionException {

       //1创建有5个线程的线程池

       ExecutorServiceexecutorService=  Executors.newFixedThreadPool(5);

       //为线程分配10个任务

       for(int i=0;i<10;i++){

           Futureresult= executorService.submit(new Callable(){

              @Override

              public Integer call() throws Exception {

                  int count = 0;

                  for(int i=0;i<100;i++){

                     count++;

                  }

                  return count;

              }  

           });

           System.out.println(result.get());        

       }

       //关闭线程,executorService.shutdownNow()是立即关闭

       executorService.shutdown();

    }

}

10、线程调度

即ScheduledExecutorService的使用示例:

public classTestScheduledThreadPool {

    public static void main(String[] args) throws InterruptedException,

ExecutionException {

       ScheduledExecutorServicepool= Executors.newScheduledThreadPool(5);

       //3个参数,第1个是线程执行体,第2个延时多久,第3个延时单位

       for(int i=0;i<3;i++){

           Futureresult= pool.schedule(newCallable(){

              @Override

              public Integer call() throws Exception {

                  int t = newRandom().nextInt();

                   System.out.println(Thread.currentThread().getName()+"="+t);

                  return t;

              }     

           },3, TimeUnit.SECONDS);//延时3秒执行

       }

       pool.shutdown();

    }

}

运算结果:

3秒后……

pool-1-thread-2=-1843839421

pool-1-thread-3=-1333144512

pool-1-thread-1=1075550443

程序中Future result = pool.schedule(new Callable(){ 括号外加输出语句:

public staticvoidmain(String[] args)throwsInterruptedException, ExecutionException {

       ScheduledExecutorServicepool= Executors.newScheduledThreadPool(5);

       //3个参数,第1个是线程执行体,第2个延时多久,第3个延时单位

       for(int i=0;i<3;i++){

           Futureresult= pool.schedule(newCallable(){

              @Override

              public Integer call() throws Exception {

                  int t = newRandom().nextInt();            System.out.println(Thread.currentThread().getName()+"="+t);

                  return t;

              }     

           },3, TimeUnit.SECONDS);//延时3秒执行

           System.out.println(Thread.currentThread().getName()+"="+result.get());

       }

       pool.shutdown();

    }

运行结果:

3秒后……

pool-1-thread-1=-506702518

main=-506702518

pool-1-thread-1=173222792

main=173222792

pool-1-thread-2=-1115855720

main=-1115855720

发现pool.schedule体外就是main主线程了。

二、ForkJoinPool分支/合并框架 工作窃取

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行join 汇总。

采用“工作窃取”模式(work-stealing):

当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。

相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能。

jdk7和jdk8的使用不一样,现在分别举例:

首先20亿循环计算查看耗时:

1731毫秒。

jdk7反面示例1(拆分临界值太小,导致拆分太多):

public classTestForkJoinPool {

    public static void main(String[] args) {

       long starttime= System.currentTimeMillis();

       ForkJoinPoolpool= newForkJoinPool();

       ForkJoinTasktask= newForkJoinSumCalculate(0l,2000000000l);

       long sum = pool.invoke(task);

       long endtime= System.currentTimeMillis();

       System.out.println(sum);

       System.out.println("---20亿循环计算耗时:"+(endtime-starttime));

    }

}

class ForkJoinSumCalculate extends RecursiveTask{

    private static final long serialVersionUID= -259195479995561737L;

    private long start;

    private long end;

    private static final long THURSHOLD= 10000L;//任务拆分临界值(1万)

    public ForkJoinSumCalculate(long start,long end){

       this.start = start;

       this.end = end;

    }

    @Override

    protected Long compute() {

       long length = end -start;

       if(length<THURSHOLD){

           long sum = 0L;

           for(long i=start;i<=end;i++){

              sum+=1;

           }

           return sum;

       }else{

           long middle = (start-end)/2;

           ForkJoinSumCalculateleft= newForkJoinSumCalculate(start,middle);

           left.fork();//进行拆分并压入线程队列

           ForkJoinSumCalculateright= newForkJoinSumCalculate(middle,end);

           right.fork();//进行拆分并压入线程队列

           return left.join() + right.join();

       }

    }

}

运行结果:

Exception in thread "main"java.lang.StackOverflowError

       atsun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source)

       at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

       atjava.lang.reflect.Constructor.newInstance(Constructor.java:423)

       atjava.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)

       at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)

栈溢出了。大家知道内存分堆(堆内存)和栈(栈内存),栈主要存变量及定义。我猜测拆分的变量都放在栈中,临界值小,从而拆的多,所以栈中的变量多,就益处了,那调大临界值试试看,从1万调到100万试试看:

private static final long THURSHOLD = 100000000L;//任务拆分临界值100万,还报栈内存溢出 ,那就改成1000万:private static final long THURSHOLD = 1000000000L;//任务拆分临界值1000万

不报错了,成功拆分,但貌似性能比不拆分慢很多,所以这种任务拆分不一定能提升性能,要根据机器硬件来适当选择。

使用jdk1.8示例:

public classTestForkJoinPool {

    public static void main(String[] args) {

       long starttime= System.currentTimeMillis();

       Long sum= LongStream.rangeClosed(0L, 2000000000l)

               .parallel()

               .reduce(0L, Long::sum);

       long endtime= System.currentTimeMillis();

       System.out.println(sum);

       System.out.println("---20亿循环计算耗时:"+(endtime-starttime));

    }

}

运行结果:

2000000001000000000

---20亿循环计算耗时:1620

用了1620毫秒,快了一点。

上一篇下一篇

猜你喜欢

热点阅读