Java高并发高性能编程(多线程,协程,Actor,RxJava、Akka、Reactor)

Java Lock应用示例-生产者与消费者

2018-01-10  本文已影响0人  单行线的旋律

Java Lock的使用以及与synchronized的区别很多文章已经讲解的很清楚了,这里不再详细讲解,可以参考java Lock讲解.
总的来说Lock的功能比synchronized更强大,功能更多,但一般的线程同步业务synchronized已经能够满足,对于一些特殊的要求,比如要知道线程获取锁的结果,线程获取锁时没有获取到,要求等待一段时间后仍没获取到就不去获取了,去做别的事情或者等待锁的过程中能够响应中断等等。Lock接口中的newCondition()方法用于获取锁对象上绑定的实例,用于线程的等待与唤醒,可以获取多个实例,而不是像synchronized只有一个锁对象(此时锁对象既是锁对象也是实例),好处是线程可以使用不同的实例,进行有针对的等待与唤醒,Lock接口唯一的实现类是ReentrantLock,下面我使用这个类来实现经典的生产者与消费者。

package cn.yanggy.demo04;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * User:cool coding
 * Date:2018/1/10
 * Time:12:44
 * 篮子
 * 生产者和消费者需要向蓝子申请锁
 */
public class Bucket {
    /**
     * 蓝子
     */
    private List<String> buckets;

    /**
     * 蓝子的容量
     */
    private final int maxCount=20;

    /**
     * 蓝子上的锁
     */
    private final Lock lock=new ReentrantLock();

    /**
     * 消费锁对象
     */
    private final Condition fetchLock;

    /**
     * 放入锁对象
     */
    private final Condition addLock;

    public Bucket(){
        buckets=new ArrayList<>();
        fetchLock =lock.newCondition();
        addLock =lock.newCondition();
    }

    /**
     * 获得锁
     * @return
     */
    public Lock getLock() {
        return lock;
    }

    /**
     * 获得消费锁对象
     * @return
     */
    public Condition getFetchLock() {
        return fetchLock;
    }

    /**
     * 获得放入锁对象
     * @return
     */
    public Condition getAddLock() {
        return addLock;
    }

    /**
     * 蓝子当前大小
     * @return
     */
    public int size(){
        return buckets.size();
    }

    /**
     * 放入
     * @param obj
     */
    public boolean add(String obj){
        if(buckets.size()<20) {
            buckets.add(obj);
            return true;
        }
        else return false;
    }

    /**
     * 随机获取一个蓝子里的东西
     * @return
     */
    public String fetch(){
        int size=buckets.size();
        if(size==1){
            String obj=buckets.get(0);
            buckets.clear();
            return obj;
        }
        else if(size>1) {
            int index = new Random().nextInt(size);
            String obj=buckets.get(index);
            buckets.remove(index);
            return obj;
        }else return null;
    }
}

package cn.yanggy.demo04;

import java.util.Random;

/**
 * User:cool coding
 * Date:2018/1/10
 * Time:12:49
 * 生产苹果类
 */
public class Produce {

    /**
     * 生产苹果,耗时[1,5]秒(多线程共用)
     * @return
     */
    public static String produce(){
        try {
            Thread.sleep(new Random().nextInt(5)*1000+1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "苹果";
    }
}

package cn.yanggy.demo04;

import java.awt.*;
import java.util.Random;

/**
 * User:cool coding
 * Date:2018/1/10
 * Time:12:53
 * 消费者
 */
public class Consumer implements Runnable {

    /**
     * 篮子
     */
    private Bucket bucket;

    public Consumer(Bucket bucket){
        this.bucket=bucket;
    }


    @Override
    public void run() {
        while (true){
                //拿锁
                bucket.getLock().lock();
                String obj=bucket.fetch();
                if(obj==null) {
                    System.out.println(Thread.currentThread().getName()+":篮子空了");
                    //通知生产者生产
                    bucket.getAddLock().signalAll();
                    //消费者等待
                    try {
                        bucket.getFetchLock().await();
                    }catch (InterruptedException e){

                    }finally {
                        bucket.getLock().unlock();
                    }
                }
                else{
                    System.out.println(Thread.currentThread().getName()+":消费了"+obj);
                    bucket.getLock().unlock();

                    //消费完后,随机休息[1,5]秒钟
                    try {
                        Thread.sleep(new Random().nextInt(5)*1000+1000);
                    }catch (InterruptedException e){

                    }
                }
        }
    }
}

package cn.yanggy.demo04;

import java.util.concurrent.ThreadLocalRandom;

/**
 * User:cool coding
 * Date:2018/1/10
 * Time:13:02
 * 生产者
* 分两步:先生产,然后放入蓝子
 */
public class Producer implements Runnable {
    /**
     * 蓝子
     */
    private final Bucket bucket;

    /**
     * 当前手上拿着的东西
     */
    private ThreadLocal<String> currentHold=new ThreadLocal<>();
    /**
     * 记录总共生产出来东西的数量
     */
    private ThreadLocal<Integer> count=new ThreadLocal<>();


    public Producer(Bucket bucket){
        this.bucket=bucket;
    }

    @Override
    public void run() {
        //初始化本地线程执有的变量,不能写在Construct中,否则无效
        currentHold.set(null);
        count.set(0);

        while (true){
            String obj=currentHold.get();
            if(obj==null) {//如果生产者手上没有拿着东西,则生产,否则先要放入篮子中
                obj = Produce.produce();
                count.set(count.get()+1);
                System.out.println(Thread.currentThread().getName() + ":生产了第[" +count.get()+"]个"+obj);
            }
            bucket.getLock().lock();
            boolean result=bucket.add(Thread.currentThread().getName()+"生产的第["+count.get()+"]个"+obj);
            if(result) {
               currentHold.remove();
               System.out.println(Thread.currentThread().getName() + ":放入了第[" +count.get()+"]个"+obj);
                System.out.println("篮子大小为:"+bucket.size());
               bucket.getLock().unlock();
            }
            else {
                currentHold.set(obj);
                System.out.println(Thread.currentThread().getName() + ":篮子满了");
                bucket.getFetchLock().signalAll();//通知消费者去消费
                try {
                    bucket.getAddLock().await();//停止生产
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    bucket.getLock().unlock();
                }
            }
        }
    }
}

package cn.yanggy.demo04;

/**
 * User:cool coding
 * Date:2018/1/10
 * Time:13:16
 *总共产生10个消费者,5个生产者
 */
public class Main {
    public static void main(String[] args){
        Bucket bucket=new Bucket();//篮子

        Consumer consumer=new Consumer(bucket);//消费者
        Producer producer=new Producer(bucket);//生产者

        for(int i=0;i<5;i++){
            Thread thread=new Thread(producer,"P["+i+"]");
            Thread thread2=new Thread(consumer,"C["+i+"]");
            thread.start();
            thread2.start();
        }

        //延迟2秒钟,以便后五个消费者可以消费到东西
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for(int i=5;i<10;i++){
            Thread thread=new Thread(consumer,"C["+i+"]");
            thread.start();
        }
    }
}

运行结果:

C[0]:篮子空了
C[1]:篮子空了
C[2]:篮子空了
C[3]:篮子空了
C[4]:篮子空了
P[3]:生产了第[1]个苹果
P[3]:放入了第[1]个苹果
篮子大小为:1
C[5]:消费了P[3]生产的第[1]个苹果
C[6]:篮子空了
C[7]:篮子空了
C[8]:篮子空了
C[9]:篮子空了
C[5]:篮子空了
P[3]:生产了第[2]个苹果
P[2]:生产了第[1]个苹果
P[4]:生产了第[1]个苹果
P[3]:放入了第[2]个苹果
篮子大小为:1
P[2]:放入了第[1]个苹果
篮子大小为:2
P[4]:放入了第[1]个苹果
篮子大小为:3
P[0]:生产了第[1]个苹果
P[1]:生产了第[1]个苹果
P[0]:放入了第[1]个苹果
篮子大小为:4
P[1]:放入了第[1]个苹果
篮子大小为:5
P[4]:生产了第[2]个苹果
P[2]:生产了第[2]个苹果
P[4]:放入了第[2]个苹果
篮子大小为:6
P[2]:放入了第[2]个苹果
篮子大小为:7
P[3]:生产了第[3]个苹果
P[3]:放入了第[3]个苹果
篮子大小为:8
P[1]:生产了第[2]个苹果
P[1]:放入了第[2]个苹果
篮子大小为:9
P[0]:生产了第[2]个苹果
P[0]:放入了第[2]个苹果
篮子大小为:10
P[4]:生产了第[3]个苹果
P[4]:放入了第[3]个苹果
篮子大小为:11
P[3]:生产了第[4]个苹果
P[2]:生产了第[3]个苹果
P[3]:放入了第[4]个苹果
篮子大小为:12
P[2]:放入了第[3]个苹果
篮子大小为:13
P[1]:生产了第[3]个苹果
P[1]:放入了第[3]个苹果
篮子大小为:14
P[4]:生产了第[4]个苹果
P[4]:放入了第[4]个苹果
篮子大小为:15
P[0]:生产了第[3]个苹果
P[0]:放入了第[3]个苹果
篮子大小为:16
P[2]:生产了第[4]个苹果
P[2]:放入了第[4]个苹果
篮子大小为:17
P[3]:生产了第[5]个苹果
P[3]:放入了第[5]个苹果
篮子大小为:18
P[1]:生产了第[4]个苹果
P[2]:生产了第[5]个苹果
P[1]:放入了第[4]个苹果
篮子大小为:19
P[2]:放入了第[5]个苹果
篮子大小为:20
P[3]:生产了第[6]个苹果
P[4]:生产了第[5]个苹果
P[3]:篮子满了
P[4]:篮子满了
C[0]:消费了P[3]生产的第[3]个苹果
C[1]:消费了P[3]生产的第[2]个苹果
C[2]:消费了P[4]生产的第[4]个苹果
C[3]:消费了P[3]生产的第[4]个苹果
C[4]:消费了P[2]生产的第[1]个苹果
C[6]:消费了P[0]生产的第[2]个苹果
C[7]:消费了P[3]生产的第[5]个苹果
C[8]:消费了P[2]生产的第[5]个苹果
C[9]:消费了P[4]生产的第[3]个苹果
C[5]:消费了P[4]生产的第[2]个苹果
P[0]:生产了第[4]个苹果
P[0]:放入了第[4]个苹果
篮子大小为:11
P[2]:生产了第[6]个苹果
P[2]:放入了第[6]个苹果
篮子大小为:12
C[9]:消费了P[2]生产的第[2]个苹果
C[8]:消费了P[2]生产的第[4]个苹果
C[2]:消费了P[0]生产的第[4]个苹果
P[0]:生产了第[5]个苹果
P[0]:放入了第[5]个苹果
篮子大小为:10
C[4]:消费了P[2]生产的第[6]个苹果
C[9]:消费了P[0]生产的第[3]个苹果
C[0]:消费了P[4]生产的第[1]个苹果
P[0]:生产了第[6]个苹果
P[0]:放入了第[6]个苹果
篮子大小为:8
P[1]:生产了第[5]个苹果
P[1]:放入了第[5]个苹果
篮子大小为:9
C[6]:消费了P[1]生产的第[4]个苹果
C[0]:消费了P[0]生产的第[5]个苹果
C[9]:消费了P[0]生产的第[1]个苹果
P[2]:生产了第[7]个苹果
C[1]:消费了P[1]生产的第[2]个苹果
P[2]:放入了第[7]个苹果
篮子大小为:6
C[5]:消费了P[1]生产的第[5]个苹果
P[0]:生产了第[7]个苹果
P[0]:放入了第[7]个苹果
篮子大小为:6
......
......
上一篇下一篇

猜你喜欢

热点阅读