RxJava学习,封装一个简单的map 和flatMap
2017-05-11 本文已影响127人
pdog18
RxJava 笔记
注意:这只是学习的过程,为了学习rxjava是怎么由callback嵌套转换而来,其中的map和flatMap 都只是精简过后的一个概念,不是真正的样子
感谢
W_BinaryTree
2楼 · 2017.05.12 01:07
flatMap不仅仅是把item封装成observable。 还有个merge的过程。
同步时
Response newsList = netApi.getNewsList(0,10);
String title = findLatestTitle(newsList);
Uri uri = netApi.save(title);
异步时嵌套
1. 三个不同的接口
public class MainActivity extends AppCompatActivity {
String url = "http://op.juhe.cn/yi18/news/newsclass";
interface ApiCallback {
void onSuccess(String data);
void onError(Throwable e);
}
interface TransferCallback {
void onSuccess(Data data);
void onError(Throwable e);
}
interface ResultCallback {
void onSuccess(Uri uri);
void onError(Throwable e);
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Log.d("ss", "sss");
findViewById(R.id.btn).setOnClickListener(v -> {
request(new ApiCallback() {
@Override
public void onSuccess(String data) {
Data dat = tansfer(data);
save(data, new ResultCallback() {
@Override
public void onSuccess(Uri uri) {
}
@Override
public void onError(Throwable e) {
}
});
});
}
@Override
public void onError(Throwable e) {
}
});
});
}
/**
* 1. 请求网络获取数据
*
* @param apiCallback
*/
public void request(ApiCallback apiCallback) {
OkGo.get(url)
.params("key", "2e6f34e3fe8fa96e5384021477f8e224")
.execute(new StringCallback() {
@Override
public void onSuccess(String s, Call call, Response response) {
Log.d("MainActivity", "onSuccess: " + s);
apiCallback.onSuccess(s);
}
@Override
public void onError(Call call, Response response, Exception e) {
apiCallback.onError(e);
}
});
}
/**
* 2. 将获取的数据转换成对象
*/
public Data tansfer(String s) {
Data data = GsonUtils.GsonToBean(s, Data.class);
return data;
}
/**
* 3.将对象中的某个数据保存
*/
public void save(Data data, ResultCallback resultCallback) {
try {
ResultBean result = data.getResult();
ListBean list = result.getList();
List<TngouBean> tngou = list.getTngou();
String s = GsonUtils.GsonString(list);
SharedPreferences config = getSharedPreferences("config", MODE_PRIVATE);
config.edit().putString("tag", s).apply();
resultCallback.onSuccess(Uri.parse(s));
} catch (Throwable e) {
resultCallback.onError(e);
}
}
}
2. 将三个接口改为同一个接口,用泛型来确定onSuccess()
的内容
public class MainActivity extends AppCompatActivity {
String url = "http://op.juhe.cn/yi18/news/newsclass";
interface Callback<T> {
void onSuccess(T data);
void onError(Throwable e);
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Log.d("ss", "sss");
findViewById(R.id.btn).setOnClickListener(v -> {
request(new Callback<String>() {
@Override
public void onSuccess(String data) {
tansfer(data, new Callback<Data>() {
@Override
public void onSuccess(Data data) {
save(data, new Callback<Uri>() {
@Override
public void onSuccess(Uri uri) {
}
@Override
public void onError(Throwable e) {
}
});
}
@Override
public void onError(Throwable e) {
}
});
}
@Override
public void onError(Throwable e) {
}
});
});
}
/**
* 1. 请求网络获取数据
*
* @param apiCallback
*/
public void request(Callback apiCallback) {
OkGo.get(url)
.params("key", "2e6f34e3fe8fa96e5384021477f8e224")
.execute(new StringCallback() {
@Override
public void onSuccess(String s, Call call, Response response) {
Log.d("MainActivity", "onSuccess: " + s);
apiCallback.onSuccess(s);
}
@Override
public void onError(Call call, Response response, Exception e) {
apiCallback.onError(e);
}
});
}
/**
* 2. 将获取的数据转换成对象
*
* @param transferCallback
*/
public void tansfer(String s, Callback transferCallback) {
try {
Data data = GsonUtils.GsonToBean(s, Data.class);
transferCallback.onSuccess(data);
} catch (Throwable throwable) {
transferCallback.onError(throwable);
}
}
/**
* 3.将对象中的某个数据保存
*/
public void save(Data data, Callback resultCallback) {
try {
ResultBean result = data.getResult();
ListBean list = result.getList();
List<TngouBean> tngou = list.getTngou();
String s = GsonUtils.GsonString(list);
SharedPreferences config = getSharedPreferences("config", MODE_PRIVATE);
config.edit().putString("tag", s).apply();
resultCallback.onSuccess(Uri.parse(s));
} catch (Throwable e) {
resultCallback.onError(e);
}
}
}
3. 第三步,让异步方法有返回值,从而避免嵌套
异步操作是没有返回值的,但是我们可以返回一个可以把传递进来的参数变成结果的步骤(动作/Action)
获得一个抽象类AsyncJob
这个AsyncJob
接收一个Callback
,有一个start()
的方法
public interface AsyncJob<T> {
void start(Callback<T> callback);
}
这个start()
方法就是要执行的动作,
回到刚才话题
- 创建异步任务的时候是没有返回值的
- 所以我们要返回一个含有将传递进来的参数转变成结果的动作,
AsyncJob
是返回结果,start()
就是动作 - 启动这个任务的时候需要的这个
Callback
中则会将成功/失败的结果传递出去
将行为封装后,这样每次执行后就可以获得一个返回值(这个返回值实际上就是一个方法包装)
这样行为的封装结果有了,那么就可以像同步方法一样执行一步一步的操作了;
map/faltMap操作符做了点什么
-
使用前/抽离前
AsyncJob<Model> modelAsyncJob2 = new AsyncJob<Model>() { @Override public void start(RxCallback<Model> rxCallback) { //responseAsyncJob 是上个AsyncJob对象 responseAsyncJob.start(new RxCallback<Response>() { @Override public void onSuccess(Response response) { //除了下面这一行其他都是模板代码,所以应该抽离出来 Model convert = convert(response); rxCallback.onSuccess(convert); } @Override public void onError(Throwable e) { rxCallback.onError(e); } }); } };
-
使用后/抽离后
//responseAsyncJob 是上个AsyncJob对象 AsyncJob<Model> modelAsyncJob1 = responseAsyncJob.map(new FunC1<Response, Model>() { @Override public Model call(Response response) { Model model = convert(response); return model; } });
-
map操作符做了点什么
public <R> AsyncJob<R> map(FunC1<T, R> funC1) { //1. source 就是上一个资源 (上次对象) AsyncJob<T> source = this; //2. 这里的new 对象就是返回的对象(简称本次对象) return new AsyncJob<R>() { //3. 这里表示本次执行(start())的时候,(调用start方法的时候,会去先执行上个对象的start()方法) @Override //4. 本个对象的start()方法 public void start(RxCallback<R> callback) { //5. 上个对象start()方法 source.start(new RxCallback<T>() { @Override public void onSuccess(T t) { //5.5 如果上个对象的start方法成功了 //6.0这里有一个不确定的函数,这个函数声明了将一个T,类型的参数转换为一个R类型 R result = funC1.call(t); //7 -a将成功结果传递出去 callback.onSuccess(result); } @Override public void onError(Throwable e) { //7- b 将结果传递出去 callback.onError(e); } }); } }; }
-
flatMap 做了点什么
public <R> AsyncJob<R> flatMap(FunC1<T, AsyncJob<R>> funC1) { //1. 保存上个对象 AsyncJob<T> source = this; //2. 返回本次对象 return new AsyncJob<R>() { //3. 表示本次对象执行 @Override public void start(RxCallback<R> callback) { //4. 表示执行时首先会执行上个对象的start()方法 source.start(new RxCallback<T>() { @Override public void onSuccess(T t) { //5. 在上个对象成功中使用传入的抽象动作,将一个参数转换为另外一个AsyncJob对象 AsyncJob<R> call = funC1.call(t); //6. 调用这个AsyncJob对象的start()方法 call.start(new RxCallback<R>() { @Override public void onSuccess(R r) { //7. 在成功时将结果传递出去 callback.onSuccess(r); } @Override public void onError(Throwable e) { callback.onError(e); } }); } @Override public void onError(Throwable e) { callback.onError(e); } }); } }; }
-
map
和flatMap
是哪里不一样?
map
中接收的接口中的泛型是<T,R>
说明map
操作符中Func1
接口的call
方法的返回值是一个R
类型的参数
@Override
public Model call(Response response) {
Model model = convert(response);
return model;
}
flatmap
中接收的接口中的泛型是<T,Observable<R>>
说明flatmap
操作符中Func1
接口的call
方法的返回值是一个Observable<R>
类型的参数
@Override
public AsyncJob<Uri> call(Model model) {
AsyncJob<Uri> save = save(model);
return save;
}
封装完成前后的对比
没有进行封装,需要传递CallBack的原始方法(含有嵌套操作)
public void getResult(String url, RxCallback<Uri> rxCallback) {
netApi.get(url, new RxCallback<Response>() {
@Override
public void onSuccess(Response response) {
Model model = convert(response);
save(model, new RxCallback<Uri>() {
@Override
public void onSuccess(Uri uri) {
rxCallback.onSuccess(uri);
}
@Override
public void onError(Throwable e) {
rxCallback.onError(e);
}
});
}
@Override
public void onError(Throwable e) {
rxCallback.onError(e);
}
});
}
使用封装了start()函数后
public AsyncJob<Uri> getResult(String url) {
AsyncJob<Response> responseAsyncJob = netApi.get(url);
AsyncJob<Model> modelAsyncJob = new AsyncJob<Model>() {
@Override
public void start(RxCallback<Model> rxCallback) {
responseAsyncJob.start(new RxCallback<Response>() {
@Override
public void onSuccess(Response response) {
Model convert = convert(response);
rxCallback.onSuccess(convert);
}
@Override
public void onError(Throwable e) {
rxCallback.onError(e);
}
});
}
};
AsyncJob<Uri> uriAsyncJob = new AsyncJob<Uri>() {
@Override
public void start(RxCallback<Uri> rxCallback) {
modelAsyncJob.start(new RxCallback<Model>() {
@Override
public void onSuccess(Model model) {
AsyncJob<Uri> save = save(model);
save.start(new RxCallback<Uri>() {
@Override
public void onSuccess(Uri uri) {
rxCallback.onSuccess(uri);
}
@Override
public void onError(Throwable e) {
rxCallback.onError(e);
}
});
}
@Override
public void onError(Throwable e) {
rxCallback.onError(e);
}
});
}
};
return uriAsyncJob;
}
封装了map操作符 和flatMap 操作符
public AsyncJob<Uri> getResult(String url) {
AsyncJob<Response> responseAsyncJob = netApi.get(url);
AsyncJob<Model> modelAsyncJob1 = responseAsyncJob.map(new FunC1<Response, Model>() {
@Override
public Model call(Response response) {
Model model = convert(response);
return model;
}
});
AsyncJob<Uri> uriAsyncJob = modelAsyncJob1.flatMap(new FunC1<Model, AsyncJob<Uri>>() {
@Override
public AsyncJob<Uri> call(Model model) {
AsyncJob<Uri> save = save(model);
return save;
}
});
}
封装完成 ,lambda化后
public AsyncJob<Uri> getResult(String url) {
return netApi.get(url)
.map(this::convert)
.flatMap(this::save);
}