Java实现生产者/消费者模型实战应用

2020-08-31  本文已影响0人  大杯冰摩卡

场景: 我们需要创建一个job,这个job是异步执行的,且任务有多个状态,每个状态需要不同的处理。

实现: 在服务里创建一个生产消费模型,job在创建后,设置初始状态,并放在队列里由消费者消费,处理业务逻辑。消费成功后,更改状态再次放入队列中,等待下一次消费。

实现一: wait && notify

最朴素也是最简单的方案:wait && notify机制 。
队列中有数据就阻塞生产者线程,消费者消费后就唤醒生产者。反之,队列中没有数据就阻塞消费者线程,生产者添加数据后唤醒消费者线程。wait && notify机制虽然足够简单,但是不够灵活,并发效率也不佳,不能满足实际场景需求。

 // 存储生产者产生的数据
    static List<String> list = new ArrayList<>();

    public static void main(String[] args) {

        new Thread(() -> {
            while (true) {
                synchronized (list) {
                    // 判断 list 中是否有数据,如果有数据的话,就进入等待状态,等数据消费完
                    if (list.size() != 0) {
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    // list 中没有数据时,产生数据添加到 list 中
                    try {
                        Thread.sleep(5000);
                        list.add(UUID.randomUUID().toString());
                        list.notify();
                        System.out.println(Thread.currentThread().getName() + list);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "生产者线程 A ").start();


        new Thread(() -> {
            while (true) {
                synchronized (list) {
                    // 如果 list 中没有数据,则进入等待状态,等收到有数据通知后再继续运行
                    if (list.size() == 0) {
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    // 有数据时,读取数据
                    System.out.println(Thread.currentThread().getName() + list);
                    list.notify();
                    // 读取完毕,将当前这条 UUID 数据进行清除
                    list.clear();
                }
            }
        }, "消费者线程 B ").start();

    }

实现二: BlockingQueue

BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中

 public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
        for(int i =1;i<=10;i++){
            queue.put("第"+i+"条消息");
        }
        System.out.println("当前队列还有"+queue.size()+"消息");


        new Thread(()->{
            try {
                System.out.println("睡眠中");
                Thread.sleep(10000);
                for(int i =1;i<=10;i++){
                    queue.put("新的消息:第"+i+"条消息");
                }

            } catch (InterruptedException e) {

            }
        }).start();

        int nThreads = 1 ;
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        for(int i = 0 ;i<nThreads;i++){
            executorService.submit(()->{
                while (true){
                    System.out.println("消费者");
                    String poll = null;
                    try {
                        poll = queue.take();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者:"+poll==null?"":poll);
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }


    }

demo中通过put方法生产数据,take方法消费数据。这个两个方法都有阻塞线程的效果,我们来看下:

2.1 put()

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        
        // 以可中断的形式获取put锁
        putLock.lockInterruptibly();
        try {
            // 与offer(e, timeout, unit)相比,采用了无限等待的方式
            while (count.get() == capacity) {
                // 当执行了移除元素操作后,会通过signal操作来唤醒notFull队列中的一个线程
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

2.2 take()

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 出队,并自减
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
            // 只要队列还有元素,就唤醒一个take操作
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
        // 如果在队列满的情况下移除一个元素,会唤醒一个put操作
            signalNotFull();
        return x;
    }

这里能看到take其实就是put的一个翻版。这里也不难发现wait && notify机制实际上也是在模拟实现一个BlockingQueue。使用BlockingQueue不枉为最佳选择。

实战:

利用@PostConstruct在服务启动时加载生产者/消费者线程。exportRunnerListener循环执行asyncJobRunner()消费队列,使用take()方法阻塞线程,避免资源浪费。执行消费者之前,我们需要在db里面捞一下未完成的任务,避免因服务重启造成的任务丢失。

private final ExecutorService executorService;
private static final LinkedBlockingQueue<String> jobQueue = new LinkedBlockingQueue<>();
...
@PostConstruct
    public void NeoExportRunner() {
        int nThreads = 3;
        //每次重启找出未完成的job
        exportJobRepository.findByStatusIn(Arrays.asList(new String[]{PENDING, LOADED, UPLOADED, NOTIFIED}))
                .orElse(new ArrayList<>()).forEach(
                //加入队列
                o -> jobQueue.add(o.getId())
        );
        this.executorService = Executors.newFixedThreadPool(nThreads);

        for (int i = 0; i < nThreads; i++) {
            executorService.execute(() -> {
                exportRunnerListener();
            });
        }

    }
    
    public void exportRunnerListener() {
        while (true) {
            log.info("asyncJobRunner is working... {}", Thread.currentThread().getName());
            try {
                //消费者
                asyncJobRunner();
            } catch (Exception e) {
                log.error("asyncJobRunner is error...{}",e);
            }
        }
    }

asyncJobRunner()是一个任务调度器。拿到队列里的消息,根据状态来处理不同的业务逻辑。每个job执行完后,变更任务状态,重新写回队列,下次消费时进行下一个状态的处理,从而实现状态扭转。

    public static void asyncJobRunner() throws InterruptedException {
        
        //消费jobQueue中的数据
        Optional.ofNullable(jobQueue.take()).flatMap(id -> exportJobRepository.findById(id)).ifPresent(job -> {
            switch (job.getStatus()) {
                case PENDING:
                    loadData(job);
                    log.info("Job had been loaded.. job -> " + job.toString());
                    break;

                case LOADED:
                    upload(job);
                    log.info("Job had been uploaded.. job -> " + job.toString());
                    break;

                case UPLOADED:
                    notify(job);
                    log.info("Job had been notify.. job -> " + job.toString());
                    break;

                case FAILED:
                    retry(job);
                    log.info("Job had been failed.. will be retry. .job -> " + job.toString());
                    break;

                case NOTIFIED:
                    finish(job);
                    log.info("Job had been finished. . job -> " + job.toString());
                    break;

                case FINISHED:
                    log.warn("Finished job should not appear in job queue, check for logical error. job -> " + job.toString());
                    break;

                case CANCELED:
                    log.info("Job had been canceled. Nothing to do here. job -> " + job.toString());
                    break;

                default:
                    log.error("Unrecognized job status. job -> " + job.toString());

            }
        });
    }
  

这里说一下retry机制,当任务在某个状态发生异常,并未执行成功,我们来设置一个retry机制在任务FAILED的时候进行补偿。某个状态异常时,将当前状态保存在LastStatus中并设置当前状态为FAILED,同时记录retry的次数。这样以来下次我们拿到这个job的状态是FAILED,在调用retry方法时把失败时的状态在写回去丢到队列里,下一次就可以继续执行了。

    public static void retry(ExportJob job) {
    //判断重试次数是否<最大重试次数
        if (job.getRetry() < maximumRetry) {
            job.setStatus(job.getLastStatus());
            job.setRetry(job.getRetry() + 1);
            jobQueue.add(job.getId());
        } else {
            //save db -> error status
            log.error("Max retry exceeds. job -> " + job.toString());
        }
    }

addJob()cancelJob() 提供给我们的业务代码调用,用来创建任务和取消任务,这里的取消任务做不到实时性,具体代码需要根据实际业务场景进行调整。

public static String addJob(String id, String type, String channel) {
        ExportJob exportJob = ExportJob.builder()
                .channel(channel)
                .jobId(id)
                //初始化任务
                .status(PENDING)
                .createAt(sdf.format(System.currentTimeMillis()))
                .type(type)
                .id(UUID.randomUUID().toString().replaceAll("-", ""))
                .retry(0)
                .build();
        ExportJob saved = exportJobRepository.saveAndFlush(exportJob);
        jobQueue.add(saved.getId());
        return saved.getId();
    }
public static String cancelJob(String id) {
        canceled.add(id);
        exportJobRepository.findById(id).ifPresent(job -> {
            //取消任务
            job.setStatus(CANCELED);
            exportJobRepository.saveAndFlush(job);
        });
        return id;
    }
上一篇下一篇

猜你喜欢

热点阅读