RxJava之连接操作符介绍

2019-06-26  本文已影响0人  103style

转载请以链接形式标明出处:
本文出自:103style的博客

连接相关的操作符 以及 官方介绍

RxJava连接操作符 官方介绍 :Connectable Observable Operators


示例:

非连接操作
ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

输出:

Subscriber #1:391999
Sequence #1 complete
Subscriber #2:556663
Sequence #2 complete

publish and connect

官方示例:

ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

firstMillion.connect();

输出:

Subscriber #1:984513
Subscriber #2:984513
Sequence #1 complete
Sequence #2 complete

publish and refCount

官方示例:

ConnectableObservable firstMillion = Observable.range(1, 1000000)
        .sample(7, TimeUnit.MILLISECONDS)
        .publish();

firstMillion.refCount().subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #1:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #1 complete");
    }
});

firstMillion.refCount().subscribe(new Consumer() {
    @Override
    public void accept(Object it) throws Exception {
        System.out.println("Subscriber #2:" + it);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable it) throws Exception {
        System.out.println("Error: " + it.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        System.out.println("Sequence #2 complete");
    }
});

输出:

Subscriber #1:438899
Sequence #1 complete
Subscriber #2:684698
Sequence #2 complete

以上

上一篇下一篇

猜你喜欢

热点阅读