协程Flow之FlowCallAdapterFactory

2023-01-14  本文已影响0人  youxiaochen

Flow是kotlin协程的一个类似RxJava的流式API,它的出现可以替代RxJava, 所以Retrofit的CallAdapterFactory也可以替换了

代码非常少就一个类,直接上源码吧
class FlowCallAdapterFactory private constructor(
    private val dispatcher: CoroutineDispatcher?,
    private val isAsync: Boolean
) : CallAdapter.Factory() {

    companion object {
        @JvmStatic
        fun createAsync() = FlowCallAdapterFactory(null, true)

        @JvmStatic
        fun createSynchronous(dispatcher: CoroutineDispatcher? = null) = FlowCallAdapterFactory(dispatcher, false)
    }

    override fun get(returnType: Type, annotations: Array<out Annotation>, retrofit: Retrofit): CallAdapter<*, *>? {
        val rawType = getRawType(returnType)
        if (rawType != Flow::class.java) return null
        if (returnType !is ParameterizedType) {
            throw IllegalStateException("Flow return type must be parameterized as Flow<Foo> or Flow<out Foo>")
        }
        val observableType = getParameterUpperBound(0, returnType)
        //Log.d("FlowCallAdapterFactory", "rawType = $rawType, returnType = $returnType, observableType = $observableType")
        return if (isAsync) AsyncFlowCallAdapter<Any>(observableType) else FlowCallAdapter<Any>(observableType, dispatcher)
    }

    class FlowCallAdapter<T>(
        private val responseType: Type,
        private val dispatcher: CoroutineDispatcher?
    ) : CallAdapter<T, Flow<T>> {

        override fun responseType(): Type = responseType

        override fun adapt(call: Call<T>): Flow<T> {
            val adaptFlow = flow {
                suspendCancellableCoroutine<T> { continuation ->
                    continuation.invokeOnCancellation {
                        call.cancel()
                    }
                    try {
                        val response = call.execute()
                        var body: T? = null
                        if (response.isSuccessful && response.body().let { body = it; it != null }) {
                            continuation.resume(body!!)
                        } else {
                            continuation.resumeWithException(HttpException(response))
                        }
                    } catch (e: Exception) {
                        continuation.resumeWithException(e)
                    }
                }.also { emit(it) }
            }
            return dispatcher?.let { adaptFlow.flowOn(it) } ?: adaptFlow
        }
    }

    class AsyncFlowCallAdapter<T>(private val responseType: Type) : CallAdapter<T, Flow<T>> {

        override fun responseType(): Type = responseType

        override fun adapt(call: Call<T>): Flow<T> = flow {
            suspendCancellableCoroutine<T> { continuation ->
                continuation.invokeOnCancellation {
                    call.cancel()
                }
                call.enqueue(object : Callback<T> {
                    override fun onResponse(call: Call<T>, response: Response<T>) {
                        var body: T? = null
                        if (response.isSuccessful && response.body().let { body = it; it != null }) {
                            continuation.resume(body!!)
                        } else {
                            continuation.resumeWithException(HttpException(response))
                        }
                    }

                    override fun onFailure(call: Call<T>, t: Throwable) {
                        continuation.resumeWithException(t)
                    }
                })
            }.also { emit(it) }
        }
    }
}
使用示例
open class BaseViewModel(private val repository: DataRepository = DataRepository) : ViewModel(), DefaultLifecycleObserver {

    private val services = ArrayMap<Class<*>, Any>()

    protected fun <T> getService(serviceClass: Class<T>): T {
        val service = services[serviceClass]
        if (service != null) return service as T
        return repository.getService(serviceClass).also { services[serviceClass] = it }
    }

    protected fun <T : Any> getService(serviceClass: KClass<T>): T = getService(serviceClass.java)

    override fun onCleared() {
        services.clear()
    }
}
/**
 *  author: you : 2021/12/7
 */
class MainViewModel : BaseViewModel() {

    private var httpJob0: Job? = null

    private var httpJob1: Job? = null

    fun testHttpRequest0() {
        httpJob0?.cancel()
        httpJob0 = viewModelScope.launch {
            getService(TestApi::class).getUserBean(TestApi.URL)
                .onStart { Log.d("youxiaochen", "testHttpRequest0 loading start...") }
                .catch { Log.d("youxiaochen", "testHttpRequest0 loading error ...$it") }
                .onCompletion { Log.d("youxiaochen", "testHttpRequest0 loading complete...$it") }
                .collect { Log.d("youxiaochen", "testHttpRequest0 result = $it") }
        }
    }

    fun testHttpRequest1() {
        httpJob1?.cancel()
        httpJob1 = viewModelScope.launch {
            getService(TestApi::class).getUserBean(TestApi.URL2)
                .onStart { Log.d("youxiaochen", "testHttpRequest1 loading start...") }
                .catch { Log.d("youxiaochen", "testHttpRequest1 loading error ...$it") } //erro
                .onCompletion { Log.d("youxiaochen", "testHttpRequest1 loading complete...$it") } //cancel时 it不为空, error时it为空并触发catch...
                .collect { Log.d("youxiaochen", "testHttpRequest1 result = $it") }
        }
    }
}
源码地址

更多文章请关注:http://www.jianshu.com/u/b1cff340957c

上一篇下一篇

猜你喜欢

热点阅读