java多线程高并发

Java并发总结

2017-04-03  本文已影响87人  不会上树的猴子

Java并发总结

1.多线程的优点

2.创建线程

1.实现Runnable接口

new Thread(Runnable).start()

2.继承Thread类

new MyThread().start()

注意:启动线程的方式必须是start(),若是直接调用Thread.run()代码也能执行,但是就变成了普通方法的调用了,并没有启动线程

3.线程状态

1.线程状态介绍

线程在一定条件下,状态会发生变化。线程一共有以下几种状态:

线程状态图

2.中断机制

1.可以通过调用Thread对象的interrupt()方法来中断线程,但是此方法只是将中断标志设置为true标志,并不能直接中断线程,若执行interrupt()方法时线程处于:

2.检查中断状态:

4.守护线程

Java中有两类线程:

1.用户线程:运行在前台的线程
2.守护线程:运行在后台的线程,并为前台线程的运行提供便利服务(比如垃圾回收线程),当所有的用户线程都结束了,那么守护线程也会结束,因为被守护者没有了。因此,不要在守护线程中执行业务逻辑操作(比如对数据的读写等)。

5.同步机制

1.原子性和可见性

2.volatile关键字

private volatile boolean value;  

volatile关键字具有可见性,被它修饰的变量不能被线程拷贝,即直接在主存读和写,保证了新值能立即刷新,每个线程时刻看到的都是最新值。因此,保证了多线程操作时变量的可见性,而不具有原子性,因为它不会阻塞线程,是一种稍弱的同步机制,要使volatile变量提供理想的线程安全(同时可见性和原子性),必须同时满足下面两个条件,否则要加锁来保证原子性:

3.synchronised关键字

采用synchronized修饰符实现的同步机制叫做互斥锁机制,每一个对象都有一个monitor(锁标记),只能分配给一个线程。当线程拥有这个锁标记时才能访问这个资源,没有锁标记便进入锁池,因此叫做互斥锁,synchronised同时具有可见性和原子性,原子性是因为锁内的操作不可分割,可见性因为 入锁(进入synchronized)会获取主存中变量的最新值和出锁(退出synchronized)会将变量的最新值刷新回主存

1.实例方法的同步与实例方法内的同步块
实例方法内的synchronized是同步在某个实例对象上。即对象锁

public class MyClass {
public synchronized void log1(String msg1, String msg2){
        //...
}
public void log2(String msg1, String msg2){
        synchronized(object){
            //...
        }
    }}

2.静态方法的同步与静态方法内的同步块
静态方法内的synchronized是同步在类对象上,锁住的是整个类,即类锁
public class MyClass {

public static synchronized void log1(String msg1, String msg2){
        //...
}
public void log2(String msg1, String msg2){
        synchronized(MyClass.class){
            //...
        }
    }
}

使用同步机制获取互斥锁的情况,进行几点说明:

4.显示的Lock锁

5.synchronized 和 Volatile的比较

6.TheadLocal

ThreadLocal类的实例,即便被多个线程锁共享,但是在每个线程当中都有一份私有拷贝,并且多个线程无法看到对方的值,即线程对于此变量的使用完全是在自己拷贝对象上。

private ThreadLocal myThreadLocal = new ThreadLocal();

ThreadLocal可以储存任意对象

myThreadLocal.set("A thread local value");//存储此对象的值
String threadLocalValue = (String) myThreadLocal.get();//读取
ThreadLocal myThreadLocal1 = new ThreadLocal<String>();//泛型使用

7.死锁

1.普通循环等待死锁

如果在同一时间,线程A持有锁M并且想获得锁N,线程B持有锁N并且想获得锁M,那么这两个线程将永远等待下去,这种情况就是最简单的死锁形式。

public class DeadLock{
private final Object left = new Object();
private final Object right = new Object();

public void leftRight() throws Exception
{
    synchronized (left)
    {
        Thread.sleep(2000);
        synchronized (right)
        {
            System.out.println("leftRight end!");
        }
    }
}

public void rightLeft() throws Exception
{
    synchronized (right)
    {
        Thread.sleep(2000);
        synchronized (left)
        {
            System.out.println("rightLeft end!");
        }
    }
}
}

死锁的四个必要条件:

避免死锁的方式:
1、只在必要的最短时间内持有锁,考虑使用同步语句块代替整个同步方法;
2、设计时考虑清楚锁的顺序,尽量减少潜在的加锁交互数量
3、既然死锁的产生是两个线程无限等待对方持有的锁,我们可以使用Lock类中的tryLock方法去尝试获取锁,这个方法可以指定一个超时时限,获取锁超时后会返回一个失败信息,放弃取锁。

2.重入锁死

如果一个线程在两次调用lock()间没有调用unlock()方法,那么第二次调用lock()就会被阻塞,这就出现了重入锁死。避免重入锁死有两个选择:

8.多线程集合的安全使用

在 Collections 类中有多个静态方法,它们可以获取通过同步方法封装非同步集合而得到的集合:
public static Collection synchronizedCollention(Collection c)
public static List synchronizedList(list l)
public static Map synchronizedMap(Map m)
public static Set synchronizedSet(Set s)
public static SortedMap synchronizedSortedMap(SortedMap sm)
public static SortedSet synchronizedSortedSet(SortedSet ss)

在多线程环境中,当遍历当前集合中的元素时,希望阻止其他线程添加或删除元素。安全遍历的实现方法如下:

import java.util.*;  

public class SafeCollectionIteration extends Object {  
public static void main(String[] args) {  
    //为了安全起见,仅使用同步列表的一个引用,这样可以确保控制了所有访问  
    //集合必须同步化,这里是一个List  
    List wordList = Collections.synchronizedList(newArrayList());  

    //wordList中的add方法是同步方法,会自动获取wordList实例的对象锁  
    wordList.add("Iterators");  
    wordList.add("require");  
    wordList.add("special");  
    wordList.add("handling");  

    //获取wordList实例的对象锁,  
    //迭代时,此时必须阻塞其他线程调用add或remove等方法修改元素
    synchronized ( wordList ) {  
        Iterator iter = wordList.iterator();  
        while ( iter.hasNext() ) {  
            String s = (String) iter.next();  
            System.out.println("found string: " + s + ", length=" + s.length());  
        }  
    }  
}  
} 

大部分的线程安全类都是相对线程安全的,也就是我们通常意义上所说的线程安全,像Vector这种,add、remove方法都是原子操作,不会被打断,但也仅限于此,如果有个线程在遍历某个Vector、有个线程同时在add这个Vector,99%的情况下都会出现·ConcurrentModificationException·,也就是fail-fast机制

6.多线程协作

1.wait、notify、notifyAll的使用

wait():将当前线程置入休眠状态,直到接到唤醒通知或被中断为止。调用后当前线程立即释放锁。

notify():用来通知那些正在等待该对象的对象锁的其他线程。如果有多个线程等待,则线程规划器任意挑选出其中一个wait()状态的线程来发出通知唤醒它。其他线程继续阻塞,但是调用后当前线程不会立马释放该对象锁,直到程序出锁

notifyAll():notifyAll会使在该对象锁上wait的所有线程统统退出wait的状态(即全部被唤醒),待程序出锁后,所有被唤醒的线程共同竞争该锁,没有竞争到锁的线程会一直竞争(不是阻塞)

注意:在调用wait、notify、notifyAll方法之前,必须先获得对象锁,并且只能在synchronized代码块或者方法中调用。否则抛出IllegalMonitorStateException异常

总结:

2.notify 通知的遗漏

当线程 A 还没开始 wait 的时候,线程 B 已经 notify 了,这样,线程 B 的通知是没有任何响应的,当 线程B 退出 synchronized 代码块后,线程A 再开始 wait,便会一直阻塞等待。也就是说这个通知信号提前来了,没有wait线程收到,因此丢失了信号,为了避免丢失信号,可以设置一个成员变量来标志信号。

public class MyWaitNotify{
MonitorObject myMonitorObject = new MonitorObject();

//一旦调用notify,则设为true表示唤醒信号发出来了,则设为false表示唤醒信号已经被其他某个线程消耗了,
boolean wasSignalled = false;
public void doWait(){
    synchronized(myMonitorObject){
    //自旋锁,循环检查,只有当为true时才表示有唤醒信号来了
    while(!wasSignalled){
             try{
               myMonitorObject.wait();
           } catch(InterruptedException e){...}
          }
       //clear signal and continue running.
         wasSignalled = false;
  }
 }
 public void doNotify(){
     synchronized(myMonitorObject){
        wasSignalled = true;
        myMonitorObject.notify();
   }
 }
}

如果有多个线程被notifyAll()唤醒,所有被唤醒的线程都会在while循环里检查wasSignalled变量值,但是只有一个线程可以获得对象锁并且退出wait()方法并清除wasSignalled标志(设为false)。这时这个标志已经被第一个唤醒的线程消耗了,所以其余的线程会检查到标志为false,还是会回到等待状态。

3.字符串常量或全局对象作为锁的隐患

字符串常量全局对象在不同的实例当中是同一个对象,即其实用的是同一把锁。因此本来在不同实例对象的线程会互相干扰,例如在实例A中的线程调用notifyAll()可能会唤醒实例B当中的wait线程。因此应该避免使用这两种对象作为监视器对象,而应使用每个实例中唯一的对象

String myMonitorObject = "";//相同String 常量赋值在内存当中只会有一份对象

4.生产者-消费者模型synchronized实现

生产者和消费者在同一时间段内共用同一存储空间,生产者向空间里生产数据,而消费者取走数据。问题在于如何通过线程之间的协作使得生产和消费轮流进行。

package job_3;

import java.util.Random;

public class Datebuf {
private Integer i = 0;
private int result;
private Random random;

public Datebuf() {
    random = new Random();
}

public void sendData() {
    while (!Thread.interrupted()) {
        synchronized (this) {
            try {
                while (i != 0) {
                    this.wait();
                }
                i = random.nextInt(100);
                System.out.println("线程 " + Thread.currentThread().getName()
                        + "生产" + i);
                this.notify();
            } catch (InterruptedException e) {
                return;
            }
        }
    }
}

public void addData() {
    while (!Thread.interrupted()) {
        synchronized (this) {
            try {
                while (i == 0) {
                    this.wait();
                }
                result += i;
                System.out.println("线程 " + Thread.currentThread().getName()
                        + "消费" + i + "--result=" + result);
                i = 0;
                this.notify();
            } catch (InterruptedException e) {
                return;
            }
        }
    }
}

}

5.其他协调方法

1.join()
一个线程可以调用其他线程的join()方法,其效果是等待其他线程结束才继续执行。如果某个线程调用t.join(),此线程将被挂起,直到目标线程t结束才恢复(即t.isAlive()为假)。

2.yield()
建议线程调度器让其他具有相同优先级的线程优先运行,但只是建议,并不一定就是别的线程先运行了

7.线程池

1.ExecutorService介绍

ExecutorService的生命周期包括三种状态:运行,关闭,终止。创建后便进入了运行状态,当调用了 shutdown()方法时,便进入关闭状态。

使用线程池的好处:

2.自定义线程池

public ThreadPoolExecutor (
            int corePoolSize, 
            int maximumPoolSize, 
            long keepAliveTime, 
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue)

3.创建线程池

以下四个线程池底层都是调用了ThreadPoolExecutor的构造方法,所以它们主要只是参构造数设置上的差异,理解了它们的默认构造参数值就能明白它们的区别

newCachedThreadPool()

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                            60L, TimeUnit.SECONDS,
                            new SynchronousQueue<Runnable>());
}

newFixedThreadPool(int)

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

newScheduledThreadPool(int)

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

延迟调用周期执行示例,表示延迟1秒后每3秒执行一次:

scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
           @Override
           public void run() {
System.out.println("delay 1 seconds, and excute every 3     seconds");
}
}, 1, 3, TimeUnit.SECONDS);

SingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(
    new ThreadPoolExecutor(1, 1,0L, 
    TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

当试图通过 excute 方法将一个 Runnable 任务添加到线程池中时,按照如下顺序来处理:

ex=>start: 提交任务
co=>condition: 核心线程池满了?
new=>operation: 创建新线程
queue=>condition: 缓冲队列无法加入?
add=>operation: 加入缓冲队列
bao=>operation: 饱和政策处理
max=>condition: 最大线程池满了?
e=>end

ex->co
co(yes)->queue
co(no)->new
queue(yes)->max
queue(no)->add
max(yes)->bao
max(no)->new

4.几种排队的策略

5.关闭线程池

6.Executor执行Runnable和Callable任务

Runnable
无返回值,无法抛出经过检查的异常,通过execute方法添加

ExecutorService executorService = Executors.newSingleThreadExecutor();//创建线程池
executorService.execute(new TestRunnable());//添加任务

Callable
返回Future对象,获取返回结果时可能会抛出异常,通过submit方法添加

package threadLearn;

import java.util.ArrayList;   
import java.util.List;   
import java.util.concurrent.*;   

public class CallableDemo{   
public static void main(String[] args){   
    ExecutorService executorService=Executors.newCachedThreadPool();   
    List<Future<String>> resultList = new ArrayList<Future<String>>();   
    //创建10个任务并执行   
        for (int i = 0; i < 10; i++){   
        //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中   
        Future<String> future = executorService.submit(new TaskWithResult(i));   
        //将任务执行结果存储到List中   
        resultList.add(future);   
    }   
    //遍历任务的结果   
    for (Future<String> fs : resultList){   
            try{   
                while(!fs.isDone()){//Future返回如果没有完成,则一直循环等待,直到Future返回完成  
                    System.out.println("还没完成");
                }
                System.out.println(fs.get());//打印各个线程(任务)执行的结果   
            }catch(InterruptedException e){   
                e.printStackTrace();   
            }catch(ExecutionException e){   
                e.printStackTrace();   
            }finally{   
                //启动一次顺序关闭,执行以前提交的任务,但不接受新任务  
                executorService.shutdown();   
            }   
    }   
}   
}   

class TaskWithResult implements Callable<String>{   
private int id;   

public TaskWithResult(int id){   
    this.id = id;   
}   

/**  
 * 任务的具体过程,一旦任务传给ExecutorService的submit方法, 
 * 则该方法自动在一个线程上执行 
 */   
public String call() throws Exception {  
    System.out.println("call()方法被自动调用" + Thread.currentThread().getName());   
    Thread.sleep(1000);
    //该返回结果将被Future的get方法得到  
    return "call()方法被自动调用,任务返回的结果是:" + id + "-----" + Thread.currentThread().getName();   
    }   
} 

如果真正的结果的返回尚未完成,则get()方法会阻塞等待,可以通过调用 isDone()方法判断 Future 是否完成了返回。

9.线程异常处理

由于线程的本质特性(可以理解不同的线程是平行空间),从某个线程中逃逸的异常是无法被别的线程捕获的。一旦异常逃出任务的run()方法,就会向外传向控制台。Thread.UncaughtExceptionHandler是JavaSE5中的新接口,它允许在每个Thread对象上都附着一个异常处理器。Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用。

Thread t = new Thread(r);
t.setUncaughtExceptionHandler(new  MyUncaughtExceptionHandler());

8.Lock 锁与Condition

1.Lock锁的介绍

Lock接口有3个实现它的类:ReentrantLock、ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入锁、读锁和写锁。

(1)ReentrantLock与synchronized的比较

(2)ReetrantLock的忽略中断锁和响应中断锁

忽略中断锁与 synchronized实现的互斥锁一样,不能响应中断,而响应中断锁可以响应中断。

ReentrantLock lock = new ReentrantLock();  
lock.lockInterruptibly();//获取响应中断锁  

(3)读写锁

用读锁来锁定读操作,用写锁来锁定写操作,这样写操作和写操作之间会互斥,读操作和写操作之间会互斥,但读操作和读操作就不会互斥。

ReadWriteLock rwl = new ReentrantReadWriteLock();  
rwl.writeLock().lock()  //获取写锁  
rwl.readLock().lock()  //获取读锁 

2.生产者-消费者模型Lock与Condition实现

package job_3;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class LockDatebuf implements Data {
private Integer i = 0;
private int result;
private Random random;
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public LockDatebuf() {
    random = new Random();
}

public void sendData() throws InterruptedException {
    while (!Thread.interrupted()) {
        lock.lockInterruptibly();
            try {
                while (i != 0) {
                    condition.await();
                }
                i = random.nextInt(100);
                System.out.println("线程 " + Thread.currentThread().getName()
                        + "生产" + i);
                condition.signal();
            } catch (InterruptedException e) {
                
            }finally{
                lock.unlock();
            }
    }
}

public void addData() throws InterruptedException {
    while (!Thread.interrupted()) {
        lock.lockInterruptibly();
            try {
                while (i == 0) {
                    condition.await();
                }
                result += i;
                System.out.println("线程 " + Thread.currentThread().getName()
                        + "消费" + i + "--result=" + result);
                i = 0;
                condition.signal();
            } catch (InterruptedException e) {
                
            }finally{
                lock.unlock();
            }
    }
}
}

9.并发新特性

1.CountDownLatch

可以让一组任务必须在另一组任务全部结束后才开始执行,向CountDownLatch对象设置一个初始计数值,任何在这个对象上调用await()方法的线程都将阻塞,直至计数值子减为0。其他任务在结束其工作时,可以调用countDown()来减小这个计数值。CountDownLatch被设计为只触发一次。

package threadLearn;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
    // 只触发一次,计数值不能被重置
    int size = 5;
    CountDownLatch latch = new CountDownLatch(size);
    ExecutorService exec = Executors.newCachedThreadPool();
    exec.execute(new Waiting(latch,"wait线程"));
    for (int i = 0; i < size; i++) {
        exec.execute(new OtherTask(latch));
    }
    TimeUnit.SECONDS.sleep(1);
    exec.shutdown();
}
}

class Waiting implements Runnable {
    private CountDownLatch latch;
    private String name;
    public Waiting(CountDownLatch latch,String name) {
        this.latch = latch;
        this.name = name;
    }
    public void run() {
        try {
            latch.await();
            System.out.println(name+"最后执行的任务...");
        } catch (Exception e) {
            return;
        }
    }
}

class OtherTask implements Runnable {
    private CountDownLatch latch;
    private Random rand = new Random();
    public OtherTask(CountDownLatch latch) {
        this.latch = latch;
    }

    public void run() {
        try {
            doWork();
            latch.countDown();
        } catch (Exception e) {
            return;
        }
    }

    private void doWork() throws InterruptedException {
        TimeUnit.MICROSECONDS.sleep(1000);
        System.out.println(Thread.currentThread().getName() + " 执行完毕");
    }
}

2.障碍器 CyclicBarrier

它适用于这样一种情况:你希望创建一组任务,它们并发地执行工作,另外的一个任务在这一组任务并发执行结束前一直阻塞等待,直到该组任务全部执行结束,这个任务才得以执行。这非常像CountDownLatch,只是 CountDownLatch是只触发一次的事件,而CyclicBarrier可以多次重用

package threadLearn;
import java.util.concurrent.BrokenBarrierException;   
import java.util.concurrent.CyclicBarrier;   
public class CyclicBarrierTest {   
    public static void main(String[] args) {   
            //创建CyclicBarrier对象,  
            //并设置执行完一组5个线程的并发任务后,再执行MainTask任务  
            CyclicBarrier cb = new CyclicBarrier(5, new MainTask());   
            new SubTask("A", cb).start();   
            new SubTask("B", cb).start();   
            new SubTask("C", cb).start();   
            new SubTask("D", cb).start();   
            new SubTask("E", cb).start();  
    }   
}   

/**  
* 最后执行的任务 
*/   
class MainTask implements Runnable {   
    public void run() {   
            System.out.println("......终于要执行最后的任务了...    ...");   
    }   
}   

/**  
* 一组并发任务  
*/   
class SubTask extends Thread {   
    private String name;   
    private CyclicBarrier cb;   

    SubTask(String name, CyclicBarrier cb) {   
            this.name = name;   
            this.cb = cb;   
    }   

    public void run() {   
            System.out.println("[并发任务" + name + "]  开始执行");   
            for (int i = 0; i < 999999; i++) ;    //模拟耗时的任务   
            System.out.println("[并发任务" + name + "]  开始执行完毕,通知障碍器");   
            try {   
                    //每执行完一项任务就通知障碍器   
                    cb.await();   
            } catch (InterruptedException e) {   
                    e.printStackTrace();   
            } catch (BrokenBarrierException e) {   
                    e.printStackTrace();   
            }   
    }   
}  

3.信号量Semaphore

信号量 Semaphore 实际上是一个功能完毕的计数信号量,从概念上讲,它维护了一个许可集合,对控制一定资源的消费与回收有着很重要的意义。Semaphore 可以控制某个资源被同时访问的任务数,它通过acquire()获取一个许可,release()释放一个许可。如果被同时访问的任务数已满,则其他 acquire 的任务进入等待状态,直到有一个任务被release掉,它才能得到许可。Semaphore 仅仅是对资源的并发访问的任务数进行监控,而不会保证线程安全,因此,在访问的时候,要自己控制线程的安全访问。

10.性能调优

(1)比较各类互斥技术

(2)免锁容器

CopyOnWiteArrayList的写入将导致创建整个底层数组的副本,而原数组将保留在原地,使得复制的数组在被修改时,读取操作可以安全的执行。当修改完成时,一个原子性的操作把新的数组换入,使得新的读取操作可以看到这个新的修改。

(3)ReadWriteLock

对向数据结构相对不频繁的写入,但是有多个任务要经常读取这个数据结构的这类情况进行了优化。ReadWriteLock使得你可以同时有多个读者,只要他们都不试图写入即可。如果写锁已经被其他任务持有,那么任何读者都不能访问,直至这个写锁被释放为止。即适用于读者多于写者的情况。

对于ReadWriteLock的应用主要是:缓存和提高对数据结构的并发性。

下面的代码展示了如何利用重入来执行锁降级:

class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
    rwl.readLock().lock();
    if (!cacheValid) {
    //在获取写锁之前必须释放读锁
        rwl.readLock().unlock();
        rwl.writeLock().lock();
        // 重新检查状态,因为可能其他线程已经获取到读锁了
        if (!cacheValid) {
            data = ...
            cacheValid = true;
        }
        rwl.readLock().lock();
        rwl.writeLock().unlock(); 
    }
    use(data);
    rwl.readLock().unlock();
}

}

上一篇 下一篇

猜你喜欢

热点阅读