RxJava操作符(创建)

2019-02-13  本文已影响0人  Charein

包括create,defer,just,fromfromArrayfromIterablefromPublisher

create

通过以编程方式调用observer方法从头创建一个Observable,参考http://reactivex.io/documentation/operators/create.html

image.png
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

例如:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                for (int i = 0; i < 3; i++) {
                    e.onNext("#" + i);
                }
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("accept: " + s);
            }
        });

       // 输出
       // System.out: accept: #0
       // System.out: accept: #
       // System.out: accept: #2

defer

在观察者订阅之前不要创建Observable,并为每个观察者创建一个新的Observable。
Defer运算符等待观察者订阅它,然后它生成一个Observable,通常带有Observable工厂函数。 它为每个订阅者重新执行此操作,因此尽管每个订阅者可能认为它订阅了相同的Observable,但实际上每个订阅者都获得其自己的单独序列。
在某些情况下,等到最后一分钟(即直到订阅时间)生成Observable可以确保此Observable包含最新的数据。
参考:http://reactivex.io/documentation/operators/defer.html

image.png
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier);

例如

    private String msg = "#0";

    @SuppressLint("CheckResult")
    public void testDefer() throws InterruptedException {
        // 不停的更新msg内容
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i=0; i<10; i++) {
                    msg = "#" + i;
                    System.out.println("msg: " + msg);
                    try {
                        TimeUnit.MILLISECONDS.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
            @Override
            public ObservableSource<String> call() throws Exception {
                return Observable.just(msg);
            }
        });
//        Observable<String> observable = Observable.just(msg);

        TimeUnit.MILLISECONDS.sleep(1000);
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println("accept: " + s);
            }
        });

        // 输出
        // System.out: msg: #0
        // System.out: msg: #1
        // System.out: msg: #2
        // System.out: msg: #3
        // System.out: accept: #3
        // System.out: msg: #4
        // System.out: msg: #5
        // System.out: msg: #6
        // System.out: msg: #7
        // System.out: msg: #8
        // System.out: msg: #9
    }

如果把defer换成just,输出结果将为

        // System.out: msg: #0
        // System.out: msg: #1
        // System.out: msg: #2
        // System.out: msg: #3
        // System.out: accept: #0
        // System.out: msg: #4
        // System.out: msg: #5
        // System.out: msg: #6
        // System.out: msg: #7
        // System.out: msg: #8
        // System.out: msg: #9

just

public static <T> Observable<T> just(T item)
public static <T> Observable<T> just(T item1, T item2)
// 最多可以带10个参数

例如

Observable.just("#0")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        // System.out: accept: #0
                        System.out.println("accept: " + s);
                    }
                });

fromArray

public static <T> Observable<T> fromArray(T... items)

例如

Observable.fromArray("#0", "#1", "#2")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        // System.out: accept: #0
                        // System.out: accept: #1
                        // System.out: accept: #2
                        System.out.println("accept: " + s);
                    }
                });

fromIterable

public static <T> Observable<T> fromIterable(Iterable<? extends T> source)

例如

Observable.fromIterable(new Iterable<String>() {
            @NonNull
            @Override
            public Iterator<String> iterator() {
                List<String> list = new ArrayList<>();
                list.add("#0");
                list.add("#1");
                list.add("#2");
                return list.iterator();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                // System.out: accept: #0
                // System.out: accept: #1
                // System.out: accept: #2
                System.out.println("accept: " + s);
            }
        });

fromPublisher

public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)

例如

Observable.fromPublisher(new Publisher<String>() {
            @Override
            public void subscribe(Subscriber<? super String> s) {
                for (int i = 0; i < 4; i++) {
                    s.onNext("#" + i);
                }
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                // System.out: accept: #0
                // System.out: accept: #1
                // System.out: accept: #2
                System.out.println("accept: " + s);
            }
        });
上一篇下一篇

猜你喜欢

热点阅读