java阻塞队列和多线程实现生产者-消费者模式(一对一、一对多、

2020-04-20  本文已影响0人  虾餃

笔者所有文章第一时间发布于:
hhbbz的个人博客

生产者-消费者模式是什么

为什么要使用生产者-消费者模式

阻塞队列BlockingQueue的介绍

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

BlockingQueue的主要几种实现

通过简单的几个线程类实现

创建生产者类

/**
 * @author hhbbz on 2020-02-10.
 * @Explain:
 */
public class Producer implements Runnable{
    private BlockingQueue<Integer> queue;
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        queue.offer(new Random().nextInt(100));
    }
}

创建消费者类

/**
 * @author hhbbz on 2020-02-10.
 * @Explain:
 */
public class Consumer implements Runnable{
    private BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }
​
    @Override
    public void run() {
        Integer value = queue.poll();
    }
}

测试入口类

/**
 * @author hhbbz on 2020-02-10.
 * @Explain:
 */
public class Main {
    public static void main(String[] args){
        //多个队列
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        BlockingQueue<Integer> queue2 = new LinkedBlockingQueue<>();
​
        //多个生产者
        Thread producer1 = new Thread(new Producer(queue));
        Thread producer2 = new Thread(new Producer(queue));
        Thread producer3 = new Thread(new Producer(queue2));
        Thread producer4 = new Thread(new Producer(queue2));
        producer1.start();
        producer2.start();
        producer3.start();
        producer4.start();
​
        //多个消费者
        Thread consumer1 = new Thread(new Consumer(queue));
        Thread consumer2 = new Thread(new Consumer(queue));
        Thread consumer3 = new Thread(new Consumer(queue2));
        Thread consumer4 = new Thread(new Consumer(queue));
        Thread consumer5 = new Thread(new Consumer(queue2));
        Thread consumer6 = new Thread(new Consumer(queue));
        consumer1.start();
        consumer2.start();
        consumer3.start();
        consumer4.start();
        consumer5.start();
        consumer6.start();
    }
}

自己想怎么处理生产者、消费者和队列之间的关系,都能很直观的进行调整。

接下来列一下项目中常用到的实现方式。

通过线程池封装起来的实现代码(!最重要最重要最重要!)

创建队列服务配置启动类,包含生产消息,可按需拆解

/**
 * @author hhbbz on 2020-02-10.
 * @Explain: 队列服务配置启动类,包含生产消息,可按需拆解
 */
@Component
@Slf4j
public class RecordQueueService {
    /**执行状态 */
    protected boolean isRunning;
    /**队列消费线程池 */
    private ThreadPoolExecutor executorService;
    //队列数量
    Integer queueNumber = 5;
    //队列长度
    Integer queueCapacity = 500;
    //每个队列对应多少个消费线程
    Integer singleQueueThreadNumber = 2;
    /**队列组列表 */
    List<BlockingQueue<Integer>> queueList = new ArrayList<>();
    //总线程数量,所有生产线程和消费线程
    Integer threadSize = queueNumber*singleQueueThreadNumber;

    public void start(String srvPoolName) {
        log.info("队列服务启动.......");
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("consume-"+srvPoolName+"-%d").build();

        //生产端线程和队列一对一
        for (int i = 0; i < queueNumber; i++) {
            queueList.add(new ArrayBlockingQueue<>(queueCapacity));
        }
        executorService = new ThreadPoolExecutor(
                threadSize, //线程池核心线程,至少要可以放入所有的生产线程和消费线程
                threadSize, //线程池容量大小
                300,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(threadSize+1),
                threadFactory
        );



        for (int i = 0; i < threadSize; i++) {
            //消费端
            //因为生产线程和队列一对一,通过getQueue取余的方式取到队列,即可实现多个消费线程消费同个队列
            executorService.submit(new SimpleRecordQueueHandler(this.getQueue(i),i));
        }
    }

    /**
     * 生产消息
     * @param str
     * @return
     */
    public Integer publish(String str) {
        if(!isRunning){
            //
        }
        BlockingQueue<Integer> queue = this.getQueue(str);
        try {
            if(queue!=null){
                queue.put(Integer.parseInt(str));
            }
        } catch (Exception e) {

        }
        if(queue!=null){
            return queue.size();
        }else{
            return 0;
        }

    }
    /**
     * 基于key值的hash值放在不同的队列里面
     * @param keyValue
     * @return
     */
    public BlockingQueue<Integer> getQueue(String keyValue){
        int p = keyValue.hashCode() % queueNumber;
        p = Math.abs(p);
        return getQueue(p);
    }

    //每个消费者对应的队列
    public BlockingQueue<Integer> getQueue(int position){
        position = position % queueList.size();
        if(position >= queueNumber || position <0){
            return queueList.get(0);
        }
        return queueList.get(position);
    }
}

创建消费类

/**
 * @author hhbbz on 2020-02-10.
 * @Explain: 消费消息类
 */
@Slf4j
public class SimpleRecordQueueHandler implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(SimpleRecordQueueHandler.class);

    //队列内容
    private Queue<Integer> data;

    //队列编号
    private int handlerNumber;

    public SimpleRecordQueueHandler(Queue<Integer> data, int handlerNumber) {
        this.data = data;
        this.handlerNumber = handlerNumber;
    }

    /**
     * 详细的业务逻辑处理
     */
    @Override
    public void run() {
        logger.info("当前消费队列编号:{}",handlerNumber);
        Integer value = data.poll();
        //TODO 消费逻辑
    }
}

总结

最后一种实现方式是较为常用的,建议加深印象多理解理解,生产者-消费者模式在实践中非常广泛和实用,灵活配置一对一,一对多更是可以画龙点睛。

上一篇 下一篇

猜你喜欢

热点阅读