RxJava使用笔记

2018-04-15  本文已影响0人  Leo_o

一、RxJava操作符

1、创建操作符

2、变换操作符

3、过滤操作符

4、组合操作符

Flowable<Integer> flowable1 = Flowable.just(1, 2, 3);
                Flowable<String> flowable2 = Flowable.just("a", "b", "c");
                Flowable.combineLatest(flowable1, flowable2, new BiFunction<Integer, String, String>() {
                    @Override
                    public String apply(Integer integer, String s) throws Exception {
                        return integer + s;
                    }
                }).subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("Tag", "combineLatest:" + s);
                    }
                });
输出:3a 3b 3c

5、辅助操作符

Flowable.create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> sub) throws Exception {
                        for (int i = 0; i < 4; i++) {
                            try {
                                Thread.sleep(i * 100);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            sub.onNext(i);
                        }
                        sub.onComplete();
                    }
                }, BackpressureStrategy.BUFFER).timeout(200, TimeUnit.MILLISECONDS, Flowable.just(10, 11))
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.d("tag", "timeout:" + integer);
                            }
                        });
输出:0 1 2 10 11

6、错误处理操作符

Flowable.create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 5; i++) {
                            if (i > 2) {
                                e.onError(new Throwable("Throwable"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER).onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(Throwable throwable) throws Exception {
                        return 6;
                    }
                }).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d("tag", "onNext:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d("tag", "onError:" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("tag", "onComplete");
                    }
                });
输出:onNext:0 1 2 6 onComplete
Flowable.create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 5; i++) {
                            if (i == 1) {
                                e.onError(new Throwable("Throwable"));
                            } else {
                                e.onNext(i);
                            }
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER).retry(2).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Integer.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d("tag", "onNext:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d("tag", "onError:" + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d("tag", "onComplete");
                    }
                });
上面代码重新订阅次数为2,i=0调用注释1,重试2次同样调用1,这样一共调用3次onNext方法最后才会调用onError方法
输出:0 0 0 onError:Throwable 

7、条件操作符和布尔操作符

1、布尔操作符
2、条件操作符
Flowable.ambArray(Flowable.just(1, 2, 3).delay(2, TimeUnit.SECONDS), Flowable.just(4, 5, 6))
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.d("tag", "amb:" + integer);
                            }
                        });
输出:4 5 6

8、转换操作符

Flowable.just(1, 2, 3).toList().subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        for (int i : integers) {
                            Log.d("tag", "toList:" + i);
                        }
                    }
                });
Flowable.just(3, 1, 2).toSortedList().subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integers) throws Exception {
                        for (int i : integers) {
                            Log.d("tag", "toSortedList:" + i);
                        }
                    }
                });
输出:1 2 3

二、RxJava使用场景,结合Okhttp、Retrofit

1、配置build.gradle

dependencies {
   ...
    implementation 'io.reactivex.rxjava2:rxjava:2.1.5'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementation 'com.squareup.retrofit2:retrofit:2.3.0'
    implementation 'com.squareup.retrofit2:converter-gson:2.3.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
}

2、网络请求接口

public interface LastFmApiService {

    String BASE_PARAMETERS_ARTIST = "?method=artist.getinfo&api_key=fdb3a51437d4281d4d64964d333531d4&format=json";

    @GET(BASE_PARAMETERS_ARTIST)
    Flowable<ArtistInfo> getArtistInfo(@Query("artist") String artist);
}

3、网络请求

public class RetrofitUtils {

    private static final String BASE_KU_GOU_URL = "http://lyrics.kugou.com/";
    private static final String BASE_LASTFM_URL = "http://ws.audioscrobbler.com/2.0/";
    private Retrofit retrofit;

    private RetrofitUtils(boolean is) {
        OkHttpClient builder = new OkHttpClient.Builder()
                .addInterceptor(new LoggingInterceptor())
                .connectTimeout(15, TimeUnit.SECONDS)
                .readTimeout(15, TimeUnit.SECONDS)
                // 失败重试
                .retryOnConnectionFailure(true)
                //.sslSocketFactory(SSLSocketClient.setCertificates())
                //.hostnameVerifier(SSLSocketClient.getHostnameVerifier())
                .build();
        /*
         * StringConverterFactory和GsonConverterFactory不能同时使用
         * 谁在前返回谁的类型(坑)
         * MapConverterFactory和StringConverterFactory可同时使用
         *
         */

        retrofit = new Retrofit.Builder()
                .client(builder)
                .baseUrl(is ? BASE_KU_GOU_URL : BASE_LASTFM_URL)
                //.addConverterFactory(MapConverterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .addConverterFactory(StringConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
    }
}
上一篇下一篇

猜你喜欢

热点阅读