协程Flow之FlowCallAdapterFactory
2023-01-14 本文已影响0人
youxiaochen
Flow是kotlin协程的一个类似RxJava的流式API,它的出现可以替代RxJava, 所以Retrofit的CallAdapterFactory也可以替换了
代码非常少就一个类,直接上源码吧
class FlowCallAdapterFactory private constructor(
private val dispatcher: CoroutineDispatcher?,
private val isAsync: Boolean
) : CallAdapter.Factory() {
companion object {
@JvmStatic
fun createAsync() = FlowCallAdapterFactory(null, true)
@JvmStatic
fun createSynchronous(dispatcher: CoroutineDispatcher? = null) = FlowCallAdapterFactory(dispatcher, false)
}
override fun get(returnType: Type, annotations: Array<out Annotation>, retrofit: Retrofit): CallAdapter<*, *>? {
val rawType = getRawType(returnType)
if (rawType != Flow::class.java) return null
if (returnType !is ParameterizedType) {
throw IllegalStateException("Flow return type must be parameterized as Flow<Foo> or Flow<out Foo>")
}
val observableType = getParameterUpperBound(0, returnType)
//Log.d("FlowCallAdapterFactory", "rawType = $rawType, returnType = $returnType, observableType = $observableType")
return if (isAsync) AsyncFlowCallAdapter<Any>(observableType) else FlowCallAdapter<Any>(observableType, dispatcher)
}
class FlowCallAdapter<T>(
private val responseType: Type,
private val dispatcher: CoroutineDispatcher?
) : CallAdapter<T, Flow<T>> {
override fun responseType(): Type = responseType
override fun adapt(call: Call<T>): Flow<T> {
val adaptFlow = flow {
suspendCancellableCoroutine<T> { continuation ->
continuation.invokeOnCancellation {
call.cancel()
}
try {
val response = call.execute()
var body: T? = null
if (response.isSuccessful && response.body().let { body = it; it != null }) {
continuation.resume(body!!)
} else {
continuation.resumeWithException(HttpException(response))
}
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}.also { emit(it) }
}
return dispatcher?.let { adaptFlow.flowOn(it) } ?: adaptFlow
}
}
class AsyncFlowCallAdapter<T>(private val responseType: Type) : CallAdapter<T, Flow<T>> {
override fun responseType(): Type = responseType
override fun adapt(call: Call<T>): Flow<T> = flow {
suspendCancellableCoroutine<T> { continuation ->
continuation.invokeOnCancellation {
call.cancel()
}
call.enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
var body: T? = null
if (response.isSuccessful && response.body().let { body = it; it != null }) {
continuation.resume(body!!)
} else {
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}.also { emit(it) }
}
}
}
使用示例
open class BaseViewModel(private val repository: DataRepository = DataRepository) : ViewModel(), DefaultLifecycleObserver {
private val services = ArrayMap<Class<*>, Any>()
protected fun <T> getService(serviceClass: Class<T>): T {
val service = services[serviceClass]
if (service != null) return service as T
return repository.getService(serviceClass).also { services[serviceClass] = it }
}
protected fun <T : Any> getService(serviceClass: KClass<T>): T = getService(serviceClass.java)
override fun onCleared() {
services.clear()
}
}
/**
* author: you : 2021/12/7
*/
class MainViewModel : BaseViewModel() {
private var httpJob0: Job? = null
private var httpJob1: Job? = null
fun testHttpRequest0() {
httpJob0?.cancel()
httpJob0 = viewModelScope.launch {
getService(TestApi::class).getUserBean(TestApi.URL)
.onStart { Log.d("youxiaochen", "testHttpRequest0 loading start...") }
.catch { Log.d("youxiaochen", "testHttpRequest0 loading error ...$it") }
.onCompletion { Log.d("youxiaochen", "testHttpRequest0 loading complete...$it") }
.collect { Log.d("youxiaochen", "testHttpRequest0 result = $it") }
}
}
fun testHttpRequest1() {
httpJob1?.cancel()
httpJob1 = viewModelScope.launch {
getService(TestApi::class).getUserBean(TestApi.URL2)
.onStart { Log.d("youxiaochen", "testHttpRequest1 loading start...") }
.catch { Log.d("youxiaochen", "testHttpRequest1 loading error ...$it") } //erro
.onCompletion { Log.d("youxiaochen", "testHttpRequest1 loading complete...$it") } //cancel时 it不为空, error时it为空并触发catch...
.collect { Log.d("youxiaochen", "testHttpRequest1 result = $it") }
}
}
}