Scheduler (一)

2021-01-05  本文已影响0人  捞月亮的阿汤哥

本篇文章主要是阅读官网的scheduler文档。原文链接http://reactivex.io/documentation/scheduler.html

弹珠图

弹珠图可以很好的描述rxjava的运作流程,建议先看下这篇文章Understanding Marble Diagrams for Reactive Streams

简单scheduler

输出结果: RxNewThreadScheduler-1 hello world!

public class SchedulersWork {
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws InterruptedException {
        Scheduler.Worker worker = Schedulers.newThread().createWorker();
        worker.schedule(SchedulersWork::sayHello);
        latch.await();
        worker.dispose();
    }

    public static void sayHello() {
        System.out.println(Thread.currentThread().getName() + " hello world!");
        latch.countDown();
    }
}

递归scheduler

输出结果: RxNewThreadScheduler-1 hello world! ...
循环输出直到dispose worker

public class RecursiveScheduler {
    public static void main(String[] args) throws InterruptedException {
        Scheduler.Worker worker = Schedulers.newThread().createWorker();
        worker.schedule(new Runnable() {
            @Override
            public void run() {
                sayHello();
                //递归直到dispose
                worker.schedule(this);
            }
        });

        Thread.sleep(1000);
        worker.dispose();
    }

    public static void sayHello() {
        System.out.println(Thread.currentThread().getName() + " hello world!");
    }
}

⚠️: 递归调用需要限制递归次数或者主动设置dispose状态,否则会出现死循环。

检查或者设置dispose状态

package com.zihao.schedulers;

import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.util.Scanner;

/**
 * 相比较普通的recursive 调度器,使用dispose状态检查
 * 如果不订阅了 就停止并释放资源
 *
 * @author tangzihao
 * @Date 2021/1/3 11:20 上午
 */
public class CheckDisposeRecursiveScheduler {
    public static void main(String[] args) throws InterruptedException {
        Scheduler.Worker worker = Schedulers.newThread().createWorker();
        worker.schedule(new Runnable() {
            @Override
            public void run() {
                while (!worker.isDisposed()) {
                    sayHello();
                }
                System.out.println("worker被终止了。。。");
            }
        });

        //主线程通过终端控制worker结束
        Scanner scan = new Scanner(System.in);
        System.out.println("终止worker工作: ");
        if (scan.hasNext()) {
            String str = scan.next();
            if (str.equals("stop")) {
                worker.dispose();
            }
        }
        scan.close();
    }

    public static void sayHello() {
        //do nothing 故意不输出方便终端输入
    }
}

输出结果

终止worker工作: 
stop  //在控制台输入 main线程
worker被终止了。。。 //worker线程

延迟或周期性调度器

delayed scheduler

schedule有三个参数,调用的方法,延迟的时间,时间单位

public class DelayedAndPeriodicScheduler {
    public static void main(String[] args) throws InterruptedException {
        Scheduler.Worker worker = Schedulers.newThread().createWorker();
        worker.schedule(DelayedAndPeriodicScheduler::sayHello,1, TimeUnit.SECONDS);
        Thread.sleep(2000);
    }

    public static void sayHello(){
        System.out.println("hello,world!");
    }
}

periodic scheduler

schedulePeriodically有四个参数,调用的方法,延迟的时间,周期性时间,时间单位

public class DelayedAndPeriodicScheduler {
    public static void main(String[] args) throws InterruptedException {
        periodicScheduler();
    }

    public static void periodicScheduler() throws InterruptedException {
        Scheduler.Worker worker = Schedulers.newThread().createWorker();
        worker.schedulePeriodically(DelayedAndPeriodicScheduler::sayHello, 500, 250, TimeUnit.MILLISECONDS);
        Thread.sleep(3000);
    }

    public static void sayHello() {
        System.out.println("hello,world!");
    }
}
上一篇下一篇

猜你喜欢

热点阅读