并发编程

2019-01-29  本文已影响4人  34sir

synchronized和Lock的区别

https://blog.csdn.net/u012403290/article/details/64910926?locationNum=11&fps=1

组合关系

线程-->进程-->程序
线程是程序执行流的最小单位 进程是程序进行资源调度的独立单位

Thread的几个重要方法

wait 和 notify?
这两位是Object的方法 两者配合使用 分别标识线程的挂起和恢复
wait会释放锁 sleep不会释放锁

线程状态

线程状态.png

锁类型

synchronized与Lock的区别

类别 synchronized Lock

类别 synchronized Lock
存在层次 Java的关键字,在jvm层面上 是一个类
锁的释放 1、以获取锁的线程执行完同步代码,释放锁 2、线程执行发生异常,jvm会让线程释放锁 在finally中必须释放锁,不然容易造成线程死锁
锁的获取 假设A线程获得锁,B线程等待。如果A线程阻塞,B线程会一直等待 分情况而定,Lock有多个锁获取的方式,具体下面会说道,大致就是可以尝试获得锁,线程可以不用一直等待
锁状态 无法判断 可以判断
锁类型 可重入 不可中断 非公平 可重入 可判断 可公平(两者皆可)
性能 少量同步 大量同步

Lock详解

public interface Lock {

    /**
     * Acquires the lock.
     */
    // 获取锁 如果锁被暂用 一直等待
    void lock();

    /**
     * Acquires the lock unless the current thread is
     * {@linkplain Thread#interrupt interrupted}.
     */
    // 用该锁的获取方式 如果线程在获取锁的阶段进入了等待 那么可以中断此线程 先去做别的事情
    void lockInterruptibly() throws InterruptedException;

    /**
     * Acquires the lock only if it is free at the time of invocation.
     */
    // 锁被占用 返回false 否则返回true
    boolean tryLock();

    /**
     * Acquires the lock if it is free within the given waiting time and the
     * current thread has not been {@linkplain Thread#interrupt interrupted}.
     */
    // 比起tryLock()就是给了一个时间期限,保证等待参数时间
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    /**
     * Releases the lock.
     */
    // 释放锁
    void unlock();

}

使用

// ReentrantLock是Lock的一种实现
private Lock lock = new ReentrantLock();

    //需要参与同步的方法
    private void method(Thread thread){
        lock.lock();
        try {
            System.out.println("线程名"+thread.getName() + "获得了锁");
        }catch(Exception e){
            e.printStackTrace();
        } finally {
            System.out.println("线程名"+thread.getName() + "释放了锁");
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        LockTest lockTest = new LockTest();

        //线程1
        Thread t1 = new Thread(new Runnable() {

            @Override
            public void run() {
                lockTest.method(Thread.currentThread());
            }
        }, "t1");

        // 线程2
        Thread t2 = new Thread(new Runnable() {

            @Override
            public void run() {
                lockTest.method(Thread.currentThread());
            }
        }, "t2");

        t1.start();
        t2.start();
    }
}

执行情况:
// 线程名t1获得了锁
// 线程名t1释放了锁
// 线程名t2获得了锁
// 线程名t2释放了锁

private Lock lock = new ReentrantLock();

    //需要参与同步的方法
    private void method(Thread thread){
/*      lock.lock();
        try {
            System.out.println("线程名"+thread.getName() + "获得了锁");
        }catch(Exception e){
            e.printStackTrace();
        } finally {
            System.out.println("线程名"+thread.getName() + "释放了锁");
            lock.unlock();
        }*/


        if(lock.tryLock()){
            try {
                System.out.println("线程名"+thread.getName() + "获得了锁");
            }catch(Exception e){
                e.printStackTrace();
            } finally {
                System.out.println("线程名"+thread.getName() + "释放了锁");
                lock.unlock();
            }
        }else{
            System.out.println("我是"+Thread.currentThread().getName()+"有人占着锁,我就不要啦");
        }
    }

    public static void main(String[] args) {
        LockTest lockTest = new LockTest();

        //线程1
        Thread t1 = new Thread(new Runnable() {

            @Override
            public void run() {
                lockTest.method(Thread.currentThread());
            }
        }, "t1");

        Thread t2 = new Thread(new Runnable() {

            @Override
            public void run() {
                lockTest.method(Thread.currentThread());
            }
        }, "t2");

        t1.start();
        t2.start();
    }

执行结果:
// 线程名t2获得了锁
// 我是t1有人占着锁,我就不要啦
// 线程名t2释放了锁

平衡锁(公平锁)

 /**
     * Sync object for non-fair locks
     */
    // 平衡锁
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * Sync object for fair locks
     */
    // 非平衡锁
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
 public ReentrantLock() {
        sync = new NonfairSync();//默认非公平锁
    }

两种锁的底层实现

尽量使用synchronized而非Lock

开启线程的三种方式

① 继承Thread

public class FirstThreadTest extends Thread{
    int i = 0;
    //重写run方法,run方法的方法体就是现场执行体
    public void run()
    {
        for(;i<100;i++){
        System.out.println(getName()+"  "+i);
        
        }
    }
    public static void main(String[] args)
    {
        for(int i = 0;i< 100;i++)
        {
            System.out.println(Thread.currentThread().getName()+"  : "+i);
            if(i==20)
            {
                new FirstThreadTest().start();
                new FirstThreadTest().start();
            }
        }
    }
 
}

② 实现Runnable

public class RunnableThreadTest implements Runnable
{
 
    private int i;
    public void run()
    {
        for(i = 0;i <100;i++)
        {
            System.out.println(Thread.currentThread().getName()+" "+i);
        }
    }
    public static void main(String[] args)
    {
        for(int i = 0;i < 100;i++)
        {
            System.out.println(Thread.currentThread().getName()+" "+i);
            if(i==20)
            {
                RunnableThreadTest rtt = new RunnableThreadTest();
                new Thread(rtt,"新线程1").start();
                new Thread(rtt,"新线程2").start();
            }
        }
 
    }
 
}

③ Callable和Future 创建线程

public class CallableThreadTest implements Callable<Integer>
{
 
    public static void main(String[] args)
    {
        CallableThreadTest ctt = new CallableThreadTest();
                // 包装Callable对象 
        FutureTask<Integer> ft = new FutureTask<>(ctt);
        for(int i = 0;i < 100;i++)
        {
            System.out.println(Thread.currentThread().getName()+" 的循环变量i的值"+i);
            if(i==20)
            {
                // FutureTask对象作为Thread对象的target
                new Thread(ft,"有返回值的线程").start();
            }
        }
        try
        {
            System.out.println("子线程的返回值:"+ft.get());
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        } catch (ExecutionException e)
        {
            e.printStackTrace();
        }
 
    }
 
        // 实现call方法 作为线程执行体
    @Override
    public Integer call() throws Exception
    {
        int i = 0;
        for(;i<100;i++)
        {
            System.out.println(Thread.currentThread().getName()+" "+i);
        }
        return i;
    }
 
}

三种方式的对比

有了进程为什么还要线程?

start和run的区别

控制方法允许访问的并发线程个数

Semaphore的两个重要方法:

// 保证5个线程在执行test方法
private Semaphore mSemaphore = new Semaphore(5);
    public void run(){
        for(int i=0; i< 100; i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    test();
                }
            }).start();
        }
    }
 
    private void test(){
        try {
            mSemaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 进来了");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 出去了");
        mSemaphore.release();
    }

wait和notify关键字的理解

小结

什么会导致线程阻塞

结束线程

public class MyThread implements Runnable{
    // 需要volatile修饰 
    private volatile boolean isCancelled;
    
    public void run(){
        while(!isCancelled){
            //do something
        }
    }
    
    public void cancel(){   isCancelled=true;    }
}
public class InterruptedExample {

    public static void main(String[] args) throws Exception {
        InterruptedExample interruptedExample = new InterruptedExample();
        interruptedExample.start();
    }

    public void start() {
        MyThread myThread = new MyThread();
        myThread.start();

        try {
            Thread.sleep(3000);
            myThread.cancel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private class MyThread extends Thread{

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    System.out.println("test");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println("interrupt");
                    //抛出InterruptedException后中断标志被清除,如果什么都不做等于吞噬中断.标准做法是再次调用interrupt恢复中断
                    //调用interrupt方法不会真正中断线程 只是发出请求在合适时候结束自己
                    Thread.currentThread().interrupt();
                }
            }
            System.out.println("stop");
        }

        public void cancel(){
            interrupt();
        }
    }
}

什么时候都不应该吞掉中断!每个线程都应该有合适的方法响应中断!

用Java库

Executor框架提供了Java线程池的能力,ExecutorService扩展了Executor,提供了管理线程生命周期的关键能力
ExecutorService.submit返回了Future对象来描述一个线程任务,它有一个cancel()方法

public static void main(String[] args) throws Exception {
        ExecutorService es = Executors.newSingleThreadExecutor();
        Future<?> task = es.submit(new MyThread());

        try {
            //限定时间获取结果
            task.get(5, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            //超时触发线程中止
            System.out.println("thread over time");
        } catch (ExecutionException e) {
            throw e;
        } finally {
            boolean mayInterruptIfRunning = true;
            //对任务所在线程发出中断请求  mayInterruptIfRunning标识任务是否能够接收到中断请求
           //为true 任务如果在某个线程中运行 那么这个线程能够被中断
           //为false 如果任务还未启动 就不要运行
            task.cancel(mayInterruptIfRunning); 
        }
    }

    private static class MyThread extends Thread {

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {   
                try {
                    System.out.println("count");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.out.println("interrupt");
                    Thread.currentThread().interrupt();
                }
            }
            System.out.println("thread stop");
        }

        public void cancel() {
            interrupt();
        }
    }

同步方法

为什么使用同步?
多个线程同时操作一个可共享的变量时 保证该变量的唯一性准确性

private int account = 100;
            //需要声明这个锁
            private Lock lock = new ReentrantLock();
            public int getAccount() {
                return account;
            }
            //这里不再需要synchronized 
            public void save(int money) {
                lock.lock();
                try{
                    account += money;
                }finally{
                    lock.unlock();
                }
            }

局部变量实现线程同步

ThreadLocal管理变量 每个使用该变量的线程都将获得该变量的副本 每个线程都可以修改自己的副本 对其他线程没有影响

ThreadLocal常用方法:

//使用ThreadLocal类管理共享变量account
            private static ThreadLocal<Integer> account = new ThreadLocal<Integer>(){
                @Override
                protected Integer initialValue(){
                    return 100;
                }
            };
            public void save(int money){
                account.set(account.get()+money);
            }
            public int getAccount(){
                return account.get();
            }

ThreadLocal相比较同步机制 前者是"空间换时间" 后者是"时间换空间"
ThreadLocal最常见的使用场景:数据库连接、Session管理

在Android中的体现:Handler通过ThreadLocal获取对应线程的Looper

Java中的数据一致性

原子性 可见性 有序性
Volatile × ×
Snychronized
Final ×

内存模型

每一个线程都有一个工作线程和主存隔离; 工作内存中存放主存中值的拷贝

主存-->工作内存

工作内存-->主存

以上的四种操作都是原子性的

Java对象的生命在周期

怎么判断Java对象无用

两种算法:

变量回收之后的内存处理

三种算法:

标记状态.png 清除状态.png

由上图可见清除之后内存状态可能不是连续的
如果无法给一个大对象分配一个足够大的连续内存空间 那么GC不得不重新做一次内存整理
因为不是连续的 所以标记和清除的过程都需要遍历识别内存区域 整个过程效率比较低

预留区域 不分配对象 GC的时候把正在使用的对象复制到预留区域 然后把非预留区域以外的 内存全部清除(典型以空间换时间的做法)

标记-整理 过程.png 分代混合.png

深入浅出synchronized

synchronized保证统一时刻只有一个线程访问临界区 同时保证共享变量的内存可见性

Java中每个对象都可以作为锁

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread A = new Thread(new Wait(), "wait thread");
        A.start();
        TimeUnit.SECONDS.sleep(2);
        Thread B = new Thread(new Notify(), "notify thread");
        B.start();
    }

    static class Wait implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + " flag is true");
                        lock.wait(); //调用对象的wait方法进入等待
                    } catch (InterruptedException e) {

                    }
                }
                System.out.println(Thread.currentThread() + " flag is false");
            }
        }
    }

    static class Notify implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                flag = false;
                lock.notifyAll(); //调用notifyAll方法  线程A收到通知后从wait继续执行 对flag的修改是对线程A可见的
                try {
                    TimeUnit.SECONDS.sleep(7);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

;运行过程需要注意一下几点:

如何实现线程间的互斥性和可见性?

 0 getstatic #2 <com/example/ckc/test/SynchronizedTest.object>
 3 dup
 4 astore_1
 5 monitorenter  //监视器进入 获取锁
 6 aload_1
 7 monitorexit   //监视器退出 释放锁
 8 goto 16 (+8)
11 astore_2
12 aload_1
13 monitorexit
14 aload_2
15 athrow
16 return
public static synchronized void m();
   descriptor: ()V
   flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
   Code:
     stack=0, locals=0, args_size=0
        0: return
     LineNumberTable:
       line 9: 0

可以看出:

无论哪种情况都是对指定对象相关联的monitor的获取 这个过程是互斥的 同一个时刻只能有一个线程能够成功 其他失败的线程会被阻塞 并加入到同步队列中 进入blocked状态

锁的内部机制

锁的四种状态:
'- 无锁

两个概念: ① 对象头 ② monitor

hotspot虚拟机(Java虚拟机)中 对象在内存的分布分为三个部分:① 对象头 ②实例数据 ③ 对齐填充

Mark Word 被分为两个部分:①lock word ②标志位

对象.png

Klass ptr 指向Class字节码在虚拟机内部的对象表示的地址
Fields 表示连续的对象实例字段

Mark Word被设计为非固定的数据结构 目的是在极小的控件存储更多的信息

32位的hotspot虚拟机中:如果对象处于未被锁定的情况下。mark word 的32bit空间中有25bit存储对象的哈希码、4bit存储对象的分代年龄、2bit存储锁的标记位、1bit固定为0
其他的状态下(轻量级锁、重量级锁、GC标记、可偏向)下对象的存储结构为


Paste_image.png

① Owner: 初始化时为NULL 标识当前没有线程拥有该monitor 线程拥有该锁后保存线程唯一标识 锁被释放时又设为NULL
② EntryQ: 关联一个系统互斥锁 阻塞所有试图锁住monitor失败的线程
③ RcThis: blocked或waiting在该monitor上的所有线程的个数
④ Nest: 实现重入锁的计数
⑤ HashCode: 保存从对象头拷过来的的HashCode值(可能还含有GC age)
⑥ Candidate: 用来避免不必要的阻塞或者等待线程唤醒 因为每一次只有一个线程能够拥有锁 如果每次前一个释放锁的线程唤醒所有阻塞或者等待的线程 回引起不必要的上下文切换(从阻塞到就绪到竞争失败再到阻塞)
只有两种可能值:① 0标识没有需要唤醒的线程 ② 1标识需要唤醒一个继任线程来竞争锁

monitor的作用是什么?
Java虚拟机中 线程一旦进入synchronized 修饰的同步块 指定的锁对象将对象头中的LockWord指向monitor的起始地址与之关联 Owner存放该锁的线程的唯一标识 确保一次只能有一个线程执行这部分代码

偏向锁

public class SynchronizedTest {
    private static Object lock = new Object();
    public static void main(String[] args) {
        //访问method1时会在对象头(SynchronizedTest.class的对象头)和栈帧的锁记录中存储偏向锁的线程ID
        method1();
        //访问method2时只需要判断对象头的线程ID是否为当前线程 不需要CAS操作进行加解锁
        method2();
    }
    synchronized static void method1() {}
    synchronized static void method2() {}
}

什么是CAS锁操作?
对比交换 是一条CPU的原子指令 作用是让CPU比较两个值是否相等 然后原子性的更新某个位子的值
相对于重量级锁来说 开销较小

轻量级锁

利用了CAS操作

线程可以通过两种方式锁住一个对象

获取锁的过程:
① 对象处于无锁状态时 (LockWord的值为hashCode等,状态位为001) 线程首先从monitor列表中获取一个空闲的monitor 初始化Nest和Owner值为1和线程标识 通过CAS替换monitor起始位置到LockWord进行膨胀 如果存在其他线程竞争锁而导致CAS失败 则回到monitorender重新开始获取锁的过程
② 对象已经膨胀 monitor中的Owner指向当前线程 这是重入锁的情况 将Nest+1 不需要CAS操作 效率高

释放锁的过程:

重量级锁

其他线程试图获取锁都会被阻塞 持有锁的线程释放掉该锁后会唤醒这些线程

内存可见性

锁的可重入

public class UnReentrant{
    Lock lock = new Lock();
    public void outer(){
        // 此时outer已经获取到锁 不能在inner中重复利用已经获取到的锁资源 这种锁称为不可重入 也叫自选锁      
        lock.lock();
        inner();
        lock.unlock();
    }
    public void inner(){
        lock.lock();
        //do something
        lock.unlock();
    }
}

可重入意味着什么?
线程可以进入任何一个它已经拥有的锁所同步着的代码块

可重入锁

ReentrantLock对比synchronized

死锁的四个必要条件

对象锁和类锁是否会相互影响?

不会

线程池

为什么使用线程池?

ThreadPoolExecutor

核心线程
创建线程时 如果当前线程数量小于corePoolSize 那么创建的时核心线程
核心线程会一直存在与线程池中 即使时闲置状态
如果设置allowCoreThreadTimeOut为true 那么闲置的核心线程会在适当时机销毁

线程总数最大值

非核心线程闲置超时时长
如果设置allowCoreThreadTimeOut = true,则会作用于核心线程

该线程池中的任务队列 维护着等待执行的Runnable对象
所有的核心线程都在干活时 新添加的任务会被添加到这个队列中等待处理 如果队列满了 新建非核心线程处理任务
常用的类型:
1⃣️ SynchronousQueue
接受任务直接交给线程 如果线程都在工作 那么就新建线程 保证不会因为超过线程的最大数量出现无法新建线程的问题
2⃣️ LinkedBlockingQueue
如果当前线程数小于核心线程 那么新建核心线程 如果超过则放入队列中 此队列没有最大值限制
3⃣️ ArrayBlockingQueue
可以限定队列的长度 如果没有达到corePoolSize的值 就新建核心线程 如果达到了就入队等候 如果队列已满就新建线程 如果队列已满并且超过maximumPoolSize则异常
4⃣️ DelayQueue
任务先入队 达到指定的延迟时间 才会执行

添加任务

ThreadPoolExecutor.execute(Runnable command)

ThreadPoolExecutor的策略

一个任务被添加到线程池时:

常见的四种线程池

1⃣️ CachedThreadPool
可缓存线程池

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

2⃣️ FixedThreadPool

//nThreads => 最大线程数即maximumPoolSize
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(int nThreads);

3⃣️ ScheduledThreadPool

//nThreads => 最大线程数即maximumPoolSize
ExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(int corePoolSize);

4⃣️ SingleThreadExecutor

ExecutorService singleThreadPool = Executors.newSingleThreadPool();

对于并发编程的理解

并发编程的原则技巧

关于ConcurrentHashMap

既然有Collections.synchronizedMap 为什么需要ConcurrentHashMap?
因为前者会对整个容器对象上锁 意味着需要一直等到前一个线程离开同步代码块时才有机会执行
但是 修改HashMap时没必要将整个HashMap对象锁住 只需要锁住对应的桶

ConcurrentHashMap提供了对应的原子操作的方法:

关于CopyOnWriteArrayList

ArrayList并发下的替代品 通过增加写时复制语义避免并发访问引起的问题 任何底层操作都会在底层创建一个列表的副本 对于不要求严格读写同步的场景很有用 性能较高 (牺牲空间换时间的做法)

上一篇下一篇

猜你喜欢

热点阅读