Java多线程知识点整合 - 2

2019-02-08  本文已影响0人  Earl_iu

reentrantlock

tryLock(),马上返回,拿到lock就返回true,不然返回false。 比较潇洒的做法。

带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false。比较聪明的做法。

lock(), 拿不到lock就不罢休,不然线程就一直block。 比较无赖的做法。

lockInterruptibly,调用后一直阻塞直到获得锁,但是接受中断信号

lock的使用

读写锁 ReadWriteLock接口

public interface ReadWriteLock{

    Lock readLock();

    Lock writeLock();

}    允许多个读操作,或者一个写操作,这两个操作不能同时进行

读写锁面临的几个问题:

**释放优先**:写入锁释放锁时,读和写的进程同时存在,应该选择读还是写
 - 如果这个锁由读线程持有,而另一个线程请求写入锁,那么其他读线程都不能获取读锁,直到写线程使用完,释放锁

**读线程插队**:如果锁由读线程持有,别的读线程是否可以插队,如果允许读插到写的前面,那么可能出现饿死的情况

**重入锁**:读和写锁是否可以重入

        - 可以重入

**降级锁**:一个线程有写入锁,是否能不释放该锁的情况下获取读取锁

        - 写线程降级为读线程可以,即先获取写锁,再获取读锁,这时候释放写锁,即降级,读不可以升级为写(会导致死锁)

**升级**:读取锁能否优于其他正在等待的读和写线程,升级为一个写入锁

读写锁使用32位的int类型来表示锁被占用的线程数(Reentrantlock的state),如果一个整型变量行维护多种状态,就需要“按位切割”这个变量,高16位读状态的线程占有数,低16位的写锁被同一个线程的申请次数

读写锁的实现原理

上图是一个划分图,表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。

读写锁是通过位运算迅速确定读和写各自的状态。假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。

根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。

为什么只有降级锁,没有升级锁?

只要线程获取写锁,那么这一刻只有这一个线程可以在临界区操作,它自己写完的东西,自己的是可以看见的,所以写锁降级为读锁是非常自然的一种行为,并且几乎没有任何性能影响,但是反过来就不一定行的通了,因为读锁是共享的,也就是说同一时刻有大量的读线程都在临界区读取资源,如果可以允许读锁升级为写锁,这里面就涉及一个很大的竞争问题,所有的读锁都会去竞争写锁,这样以来必然引起巨大的抢占,这是非常复杂的,因为如果竞争写锁失败,那么这些线程该如何处理?是继续还原成读锁状态,还是升级为竞争写锁状态?这一点是不好处理的,所以Java的api为了让语义更加清晰,所以只支持写锁降级为读锁,不支持读锁升级为写锁。

另外:

如果我们有线程A和线程B,线程A和B都获取了读锁,线程A这时候升级为写锁,线程B也升级为写锁,这时由于写是互斥的,那么线程A就需要等待线程B释放自己的读锁,线程B也在等待线程A释放读锁,因为不同线程的读和写不能同时存在,这样就出现了死锁。

public class Main {

    Lock lock =new ReentrantLock();

    void m1(){

        try{

            lock.lock();

            for(int i =0;i<10;i++){

                System.out.println(i);

                try{

                    TimeUnit.SECONDS.sleep(1);

                }catch(InterruptedException e){

                    e.printStackTrace();

                }

            }

        }catch(Exception e){

            e.printStackTrace();

        }finally {

            lock.unlock();

        }

    }

    /**

     *** 使用trylock进行尝试锁定,不管锁定与否,方法都将继续执行**

**     * 可以根据tryLock的返回值来判定是否锁定,通过boolean值来决定接下来该干什么**

**     * 可以指定trylock的时间,如果trylock(time)抛出异常,unlock的处理需要放在finally中**

     */

    void m2(){

        boolean locked = false;

        try{

            locked = lock.tryLock(5, TimeUnit.SECONDS); // 如果5秒以后,依旧没有拿到锁,我们就执行下面的代码

            System.out.println("m2..."+locked);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }finally{

            if(locked){

                lock.unlock();

            }

        }

    }

    public static void main(String[] args) {

        Main l = new Main();

        new Thread(l::m1).start();

        try{

            TimeUnit.SECONDS.sleep(1);

        }catch(InterruptedException e){

            e.printStackTrace();

        }

        new Thread(l::m2).start();

    }

}

Thread t = new Thread(()->{

    try{

        lock.lockInterruptibly();

        System.out.println("t2.start");

    }catch(InterruptedException e){

        System.out.println("interrupted");    

    }

});

t.start();

t.interrupt();

闭锁

countDownLatch实现原理图
CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。

每当一个线程完成了自己的任务后,计数器的值就相应得减1。

当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。

构造函数:

用等待的线程数量来进行初始化

public void CountDownLatch(int count){...}

public void await() throws InterruptedException { };   //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行

public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行

public void countDown() { };  //将count值减1

栅栏 Barrier

栅栏类似于闭锁,它能阻塞某些线程直到某个事件的发生,但是latch是等待某个事件发生,而barrier是等待线程

闭锁带着事件的触发次数,当countdown次数为0的时候,所有线程都被释放才能进行后续的工作,但是触发次数不会被重置,如果需要一个可重置次数的闭锁,可以用栅栏

当线程到达Barrier时,线程将会调用await方法,count-1,如果count==0,就释放所有线程,这个方法阻塞直到所有线程都到达栅栏的位置,如果所有线程到达栅栏的位置,那么栅栏打开,所有线程都被释放,栅栏将被重置以便一下次的使用

barrier基于reentrantlock实现

这种行为阻塞的典型用法之一就是将某个问题分成多个部分,每个部分用不同的线程负责,并记得减少闭锁设置的次数。

当所有线程的工作结束后将通过await方法造成的阻塞,如果我们需要反复进行这样的工作就需要使用栅栏。

如果线程池不够大,那么当多个任务通过栅栏机制来彼此协调的时候,将导致线程饥饿死锁

java的实现是CyclicBarrier

默认的构造方法是

CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier已经到达屏障位置,线程被阻塞。

另外一个构造方法

CyclicBarrier(int parties, Runnable barrierAction),其中barrierAction任务会在所有线程到达屏障后执行。

https://www.jianshu.com/p/424374d71b67

信号量 Counting Semaphore

public class SemaphoreTest {

        private static final int THREAD_COUNT = 30;

        private static ExecutorService threadPool = Executors

                        .newFixedThreadPool(THREAD_COUNT);

        private static Semaphore s = new Semaphore(10);

        public static void main(String[] args) {

                for (int i = 0; i < THREAD_COUNT; i++) {

                        threadPool.execute(new Runnable() {

                                @Override

                                public void run() {

                                        try {

                                                s.acquire();

                                                System.out.println("save data");

                                                s.release();

                                        } catch (InterruptedException e) {

                                        }

                                }

                        });

                }

                threadPool.shutdown();

        }

}

在代码中,虽然有30个线程在执行但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。

当使用无界队列的时候,可以使用Semaphore来控制任务的提交速率:

public class BoundExecutor {

    private final **Executor exe;**

    private final **Semaphore semaphore;**

    public BoundExecutor(Executor exe, int bound){

        this.semaphore = new Semaphore(bound);

        this.exe = exe;

    }

    public void submitTask(Runnable command) throws InterruptedException{        // 这样就需要先获得许可才能执行任务

        semaphore.acquire();

        exe.execute(() -> {

            try{

                command.run();

            }finally{

                semaphore.release();

            }

        });

    }

}

常用方法:

acquire    获得许可

add    添加数值,之前必须先获得许可

remove    删除数值,之前必须先获得许可

relase    归还许可证

int availablePermits() :返回此信号量中当前可用的许可证数。

int getQueueLength():返回正在等待获取许可证的线程数。

boolean hasQueuedThreads() :是否有线程正在等待获取许可证。

void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。

Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。

线程封闭

栈封闭实例
public void fun1(){

    ThreadLocal<String> tl = new ThreadLocal();

    t1.set("xixi");

    String s = t1.get();

    t1.remove();    // 只有三个方法

} 

内部实现:

class threadlocal<T>{

    private Map<Thread, T> map = new HashMap<Thread,T>();

    public void set(){

        map.put(Thread.currentThread(),data);            // 将数据和当前线程绑定

    }     

    public void get(){

        map.get(Thread.currentThread());

    } 

}

同步容器

//对于一个非同步的容器,例如ArrayList,我们可以使用Collections.synchronizedList这样的方法,给容器加锁

// 当迭代返回集合的视图的时候,需要手动加锁

迭代器加锁也是一种客户端加锁,需要synchronized中一定要使用m作为锁,这样别的线程不可对m进行操作,如果不加锁会抛出ConcurrentModificationException

Map m = Collections.synchronizedMap(new HashMap());

      ...

  Set s = m.keySet();  // Needn't be in synchronized block

      ...

  synchronized (m) {  // Synchronizing on m, not s!

      Iterator i = s.iterator(); // Must be in synchronized block

      while (i.hasNext())

          foo(i.next());

  }
public class ListHelp<E> {

    public List<E> list = Collections.synchronizedList(new ArrayList<E>());

    public boolean putIfAbsent(E x) { 

    synchronized(list){        // 注意这里的锁只能是list,因为Collections.synchronizedList会把自身作为锁,所以我们需要使用和list一样的锁,即它自己,否则如果是两把锁的话,尽管在synchronized里面,但是不能保证原子性

        boolean absent = !list.contains(x);

        if (absent) {

            list.add(x);

        }

        return absent;

    }

    }

}

**错误示例:**

public synchronized boolean putIfAbsent(E x){        **// 错误原因就是同步方法上的锁和list上的锁不是一个锁,当获取同步方法上的锁以后,并不能保证list操作的原子性**

    boolean absent = !list.contains(x);

        if (absent) {

            list.add(x);

        }

        return absent;

}
// Vector也是线程同步的,这段代码有什么问题?

public class sell_ticket {

    static Vector ls = new Vector();

    static{

        for(int i =0;i<100;i++){

            ls.add(i);

        }

    }

    public static void main(String[] args) {

        for(int i =0;i<10;i++){

            new Thread(()->{

                while(ls.size()>0){        // 只保证remove的时候是同步的,get和set分离了,判断完ls.size()以后可能切换到别的线程

                    ls.remove();

                }

            });

        }

    }

}

并发容器

如果想要使用ConcurrentHashMap完成一些常见的原子操作,可以实现下接口。

ConcurrentHashMap自定义接口
public class sell_ticket2 {

    static Queue<String> ls = new **ConcurrentLinkedQueue<>()**;        // concurrent使用的是分段锁,将容器分为16段,插入的时候只锁定其中的                                                                          一段,如果其他线程向其他的分段插入数据的话就不影响

    static{

        for(int i =0;i<100;i++){

            ls.add("票号"+i);

        }

    }

    public static void main(String[] args) {

        for(int i =0;i<10;i++){

            new Thread(()->{

                while(true){

                    String s = ls.poll();

                    if(s==null) break;

                    else System.out.println("sell"+s);

                }

            });

        }

    }

}
跳表结构 image.png

内存占用问题

    public boolean **add**(E e) {

    final **ReentrantLock** lock = this.lock;

    lock.lock();

    try {

        Object[] elements = getArray();

        int len = elements.length;

        Object[] newElements = **Arrays.copyOf(elements, len + 1);**

        newElements[len] = e;

        setArray(newElements);

        return true;

    } finally {

        lock.unlock();

    }

   }
api 阻塞队列的多种实现
Executor接口:

public class exe implements Executor{

    @Override

    public void execute(Runnable command) {

        new Thread(command).start();

    }

}

Executors:

是一个工具类,提供了不同的工厂方法来创建不同的线程池

ExecutorService接口:

拓展了Executor接口,参数可以使Calable,并且拥有自己的周期方法:

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)

ScheduledExecutorService接口:

service.ScheduledAtFixedRate(Runnable command, long initialDelay(起始任务的延迟时间), long period(每隔多久执行一次任务), Timeunit unit(时间单位))

{

        固定频率完成任务

}
image.png image.png image.png

ScheduledThreadPoolExecutor

image.png image.png
ScheduledExecutorService s = Executors.newScheduledThreadPool(10);     

public static **ScheduledExecutorService newScheduledThreadPool**(int corePoolSize) {

        return new **ScheduledThreadPoolExecutor**(corePoolSize);

}

public class **ScheduledThreadPoolExecutor**

        extends **ThreadPoolExecutor**

        implements **ScheduledExecutorService**

{}

workStealingPool(工作窃取,基于forkjoinpool实现)

forkjoinpool fork切分任务,join合并任务

threadpoolexecutor 除了workstealpool和forkjoinpool,其他线程池都是直接由threadpoolexecutor实现的!threadpoolexecutor中的线程调度依赖于addWorker()方法,这个方法来创建和切换线程,ThreadPoolExecutor继承自ExecutorService和Executor,将提交的任务在线程池中的可用线程中执行

无限制创建线程的不足:

线程生命周期的开销非常高

资源消耗

稳定性

线程池的好处:

线程池中的线程从工作队列中获取一个任务,执行任务,然后返回线程等待下一个任务(工作队列保存了所有等待执行的任务)

线程池的好处在于,使用的是先有的线程而不是创建新的线程,可以在处理多个请求时分摊线程的创建与销毁的开销。当工作线程存在的时候,不用等待工作线程的创建而延迟任务的执行,从而提升了响应性。

上一篇下一篇

猜你喜欢

热点阅读