Kotlin flow学习笔记
学习:https://juejin.cn/post/7031726493906829319 感觉别人写的很好,转载了部分,版权是人家的
Flow默认是Cold的,生产者和消费者的通信是同步非阻塞的,也就是生产和消费会顺序交替进行
image.png lifecycleScope.launch {
flow {
Log.d("haha",Thread.currentThread().name)
emit("123")
}.flowOn(Dispatchers.IO) //切换线程
.onStart {
println("onStart")
}.catch {
println("catch:${it.message}")//有异常会进入此方法
}.onCompletion {
println("oniComplete:${it?.message}")//无论是否有异常都会执行
}.collect {//收集流
Log.d("haha",Thread.currentThread().name)
println("result = $it")
}
}
flowOn以上的代码运行在flowon的线程上,collect中的代码运行在调用者的线程上
末端操作符
collect
搜集上游发送的数据
lifecycleScope.launch {
flow {
emit("haha")
delay(5000)
emit("ddddd")
}.flowOn(Dispatchers.IO)
.collect{
println(it)
}
println("tttt")
}
打印haha,5秒后打印dddd和tttt,collect 没搜集完毕时,不会往下执行。
Backpressure 背压
MISSING:创建的 Flowable 没有指定背压策略,不会对emit 发射的数据做缓存或丢弃处理。
lifecycleScope.launch {
flow {
emit("haha")
delay(2000)
emit("ddddd")
}
.collect {
println("collect start$it")
delay(5000)
println("collect end$it")
}
}
打印:
2022-08-21 15:15:07 /System.out: collect starthaha
2022-08-21 15:15:12 /System.out: collect endhaha
2022-08-21 15:15:14 /System.out: collect startddddd
2022-08-21 15:15:19 /System.out: collect endddddd
1 先执行 emit("haha") 发现通道buffer满了,挂起
2 再执行 println("collect start$it")
3 delay5秒
4 collect执行完毕后 执行delay2秒,2秒后emit("ddddd")挂起
5 继续执行collect
collect后才会emit下一个,因为没有缓冲
buffer
lifecycleScope.launch {
flow {
println("emit 1111")
emit(1111)
println("emit 2222")
emit(2222)
println("emit 3333")
emit(3333)
}
.collect {
println("collect $it")
}
}
打印
2022-08-22 21:46:22 I/System.out: emit 1111
2022-08-22 21:46:22 I/System.out: collect 1111
2022-08-22 21:46:22 I/System.out: emit 2222
2022-08-22 21:46:22 I/System.out: collect 2222
2022-08-22 21:46:22 I/System.out: emit 3333
2022-08-22 21:46:22 I/System.out: collect 3333
由于没有buffer,发送一个,通道满了挂起,消费一个,就继续发送下一个
lifecycleScope.launch {
flow {
println("emit 1111")
emit(1111)
println("emit 2222")
emit(2222)
println("emit 3333")
emit(3333)
println("emit 444")
emit(444)
println("emit 555")
emit(555)
}.buffer(1)
.collect {
println("collect $it")
}
}
打印
2022-08-22 22:06:15 I/System.out: emit 1111
2022-08-22 22:06:15 I/System.out: emit 2222
2022-08-22 22:06:15 I/System.out: emit 3333
2022-08-22 22:06:15 I/System.out: collect 1111
2022-08-22 22:06:15 I/System.out: collect 2222
2022-08-22 22:06:15 I/System.out: collect 3333
2022-08-22 22:06:15 I/System.out: emit 444
2022-08-22 22:06:15 I/System.out: emit 555
2022-08-22 22:06:15 I/System.out: collect 444
2022-08-22 22:06:15 I/System.out: collect 555
buffer 容量设置1, 发送111,缓冲没满 继续发送222,发送222后,去发送333时,发现通道满了,挂起,搜集了111后,开始发送444
buffer(1,BufferOverflow.DROP_LATEST)
则只会搜集1111 2222,超出缓冲后,最新的都被丢了
buffer(1,BufferOverflow.DROP_OLDEST)
则只会搜集1111 5555,超出缓冲后,旧的的都被丢了
缓冲超出缓冲区策略:
A strategy for buffer overflow handling in channels and flows that controls what is going to be sacrificed on buffer overflow:
SUSPEND — the upstream that is sending or is emitting a value is suspended while the buffer is full.
DROP_OLDEST — drop the oldest value in the buffer on overflow, add the new value to the buffer, do not suspend.
DROP_LATEST — drop the latest value that is being added to the buffer right now on buffer overflow (so that buffer contents stay the same), do not suspend.
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
map
转换上游发送的数据类型后到下游
lifecycleScope.launch {
flow {
emit(1111)
}.map {
"map" + it
}
.collect {
println("collect $it")
}
}
transform
把上游的每一次发送,转换成任意次发送
lifecycleScope.launch {
flow {
emit("aaaa")
}
.transform {
emit(it+"1111")
emit(it+"2222")
emit(it+"3333")
}
.collect {
println("collect $it")
}
}
打印
collect aaaa1111
collect aaaa2222
collect aaaa3333
onEach
遍历上游,再发往下游,所以必须有collect才会执行,因为flow是冷流
lifecycleScope.launch {
flow {
emit("aaaa")
emit("bbbb")
} .onEach { println("onEach: $it") }
.collect {
println("collect $it")
}
}
打印
onEach: aaaa
collect aaaa
onEach: bbbb
collect bbbb
take
取前面几个发送的
lifecycleScope.launch {
(1..6).asFlow()
.take(2)
.collect{
println(it)
}
}
打印 1 2
zip
把2个流 一一对应合并,数量不一致 取小的数量,时间耗费时长,取长的时间消耗
lifecycleScope.launch {
val flowa = (1..6).asFlow()
val flowb = flowOf("aaa", "bbb")
flowa.zip(flowb) { a, b ->
b + a
}.collect {
println(it)
}
}
打印
aaa1
bbb2
flattenContact
把流依次展开并且发送
lifecycleScope.launch {
val flowa = (1..3).asFlow()
val flowb = flowOf("aaa", "bbb")
flowOf(flowa,flowb)
.flattenConcat()
.collect{
println(it)
}
}
打印
1
2
3
aaa
bbb
flattenMerge
把流展开,可以设置并发数量,默认16,如果设置1就与 flattenConcat 一致。
lifecycleScope.launch {
val flowa = (1..3).asFlow().onEach { delay(100) }
val flowb = flowOf("aaa", "bbb").onEach { delay(200) }
flowOf(flowa, flowb)
.flattenMerge(2)
.collect {
println(it)
}
}
打印
1
aaa
2
3
bbb
StateFlow 和 SharedFlow
用于上游发射数据,能同时被多个订阅者收集数据。
1 StateFlow
StateFlow 是一个状态容器式可观察数据流,可以向其收集器发出当前状态更新和新状态更新。
StateFlow 非常适合需要让可变状态保持可观察的类,任何对值的更新,都会被搜集到。
class MyViewModel : ViewModel() {
private val _state = MutableStateFlow<String>("null")
val state: StateFlow<String> get() = _state
fun getData() {
viewModelScope.launch {
_state.value = "hahaha"
}
}
}
class MainActivity : AppCompatActivity() {
val model: MyViewModel by viewModels()
@OptIn(FlowPreview::class)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
lifecycleScope.launch {
model.state.collect {
println(it)
}
println("end")
}
lifecycleScope.launch {
model.state.collect {
println(it)
}
println("end")
}
model.getData()
}
}
打印
null
null
hahaha
hahaha
1 StateFlow 是发射的数据可以被在不同的协程中,被多个接受者同时收集的。
2 StateFlow 是热流,只要数据发生变化,就会发射数据
3 StateFlow 的收集者调用 collect 会挂起当前协程,而且永远不会结束。
4 StateFlow 不跟声明周期绑定,需要手动取消订阅者协程 repeatOnLifecycle(Lifecycle.State.STARTED)
5 StateFlow跟LiveData一样 只会发送最新的数据给订阅者,例如
6 StateFlow是粘性的,横竖屏切换后,会把最后一次的数据重新发送给搜集者
7 StateFlow防抖,重复的数据不会更新
8 StateFlow上游发送数据比下游搜集数据快,会把旧的数据丢弃,只搜集最新的
比较适用于页面状态的更新
class MyViewModel : ViewModel() {
private val _state = MutableStateFlow<String>("null")
val state: StateFlow<String> get() = _state
fun getData() {
viewModelScope.launch {
_state.value = "111"
_state.value = "222"
_state.value = "333"
_state.value = "444"
}
}
}
class MainActivity : AppCompatActivity() {
val model: MyViewModel by viewModels()
@OptIn(FlowPreview::class)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
lifecycleScope.launch {
model.state.collect {
println(it)
}
}
model.getData()
}
}
只会打印null和44444
SharedFlow
1 SharedFlow没有默认值
2 SharedFlow会挂起直到所有的订阅者处理完成。
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
replay 是重放个数,默认为0,0的意思是,不会新的搜集者搜集,不会把最后一次数据提供给新的搜集者,如果设置为1,那就是有粘性了。
extraBufferCapacity 缓冲数量
onBufferOverflow 超过后的处理规则
这2个参数类似flow里面的buffer,不过这里的buffer缓存数量是replay+onBufferOverflow
3 SharedFlow适合作为事件总线 分发事件
class EventBus {
private val _events = MutableSharedFlow<Event>()
val events = _events.asSharedFlow()
suspend fun produceEvent(event: Event) {
_events.emit(event)
}
}
事件:希望每一条事件都会被消费,消费结束后挂起。适合使用SharedFlow
状态:显示页面,期望显示最新的,并且切换横竖屏后再次搜集,希望得到最后一次数据(粘性),也允许丢弃中间的数据,只显示最新的数据。适合使用StateFlow
ChannelFlow
冷流,异步非阻塞
flow{ ... } 可以通过 flowOn切换线程 切换成子线程后和 channelFlow花的时间差不多
基于Flow/Channel的MVI如何实现
抽象出基类BaseViewModel
UiState是可以表征UI的Model,用StateFlow承载(也可以使用LiveData)
UiEvent是表示交互事件的Intent,用SharedFlow承载
UiEffect是事件带来除了改变UI以外的副作用,用channelFlow承载
BaseViewModel.kt
abstract class BaseViewModel<State : UiState, Event : UiEvent, Effect : UiEffect> : ViewModel() {
/**
* 初始状态
* stateFlow区别于LiveData必须有初始值
*/
private val initialState: State by lazy { createInitialState() }
abstract fun createInitialState(): State
/**
* uiState聚合页面的全部UI 状态
*/
private val _uiState: MutableStateFlow<State> = MutableStateFlow(initialState)
val uiState = _uiState.asStateFlow()
/**
* event包含用户与ui的交互(如点击操作),也有来自后台的消息(如切换自习模式)
*/
private val _event: MutableSharedFlow<Event> = MutableSharedFlow()
val event = _event.asSharedFlow()
/**
* effect用作 事件带来的副作用,通常是 一次性事件 且 一对一的订阅关系
* 例如:弹Toast、导航Fragment等
*/
private val _effect: Channel<Effect> = Channel()
val effect = _effect.receiveAsFlow()
init {
subscribeEvents()
}
private fun subscribeEvents() {
viewModelScope.launch {
event.collect {
handleEvent(it)
}
}
}
protected abstract fun handleEvent(event: Event)
fun sendEvent(event: Event) {
viewModelScope.launch {
_event.emit(event)
}
}
protected fun setState(reduce: State.() -> State) {
val newState = currentState.reduce()
_uiState.value = newState
}
protected fun setEffect(builder: () -> Effect) {
val newEffect = builder()
viewModelScope.launch {
_effect.send(newEffect)
}
}
}
interface UiState
interface UiEvent
interface UiEffect
StateFlow基本等同于LiveData,区别在于StateFlow必须有初值,这也更符合页面必须有初始状态的逻辑。一般使用data class实现UiState,页面所有元素的状态用成员变量表示。
用户交互事件用SharedFlow,具有时效性且支持一对多订阅,使用它可以解决上文提到的痛点二问题。
消费事件带来的副作用影响用ChannelFlow承载,不会丢失且一对一订阅,只执行一次。使用它可以解决上文提到的痛点一问题。
协议类,定义具体业务需要的State、Event、Effect类
class NoteContract {
/**
* pageTitle: 页面标题
* loadStatus: 上拉加载的状态
* refreshStatus: 下拉刷新的状态
* noteList : 备忘录列表
*/
data class State(
val pageTitle: String,
val loadStatus: LoadStatus,
val refreshStatus: RefreshStatus,
val noteList: MutableList<NoteItem>
) : UiState
sealed class Event : UiEvent {
// 下拉刷新事件
object RefreshNoteListEvent : Event()
// 上拉加载事件
object LoadMoreNoteListEvent: Event()
// 添加按键点击事件
object AddingButtonClickEvent : Event()
// 列表item点击事件
data class ListItemClickEvent(val item: NoteItem) : Event()
// 添加项弹窗消失事件
object AddingNoteDialogDismiss : Event()
// 添加项弹窗添加确认点击事件
data class AddingNoteDialogConfirm(val title: String, val desc: String) : Event()
// 添加项弹窗取消确认点击事件
object AddingNoteDialogCanceled : Event()
}
sealed class Effect : UiEffect {
// 弹出数据加载错误Toast
data class ShowErrorToastEffect(val text: String) : Effect()
// 弹出添加项弹窗
object ShowAddNoteDialog : Effect()
}
sealed class LoadStatus {
object LoadMoreInit : LoadStatus()
object LoadMoreLoading : LoadStatus()
data class LoadMoreSuccess(val hasMore: Boolean) : LoadStatus()
data class LoadMoreError(val exception: Throwable) : LoadStatus()
data class LoadMoreFailed(val errCode: Int) : LoadStatus()
}
sealed class RefreshStatus {
object RefreshInit : RefreshStatus()
object RefreshLoading : RefreshStatus()
data class RefreshSuccess(val hasMore: Boolean) : RefreshStatus()
data class RefreshError(val exception: Throwable) : RefreshStatus()
data class RefreshFailed(val errCode: Int) : RefreshStatus()
}
}
在生命周期组件中收集状态变化流和一次性事件流,发送用户交互事件
class NotePadActivity : BaseActivity() {
...
override fun initObserver() {
super.initObserver()
lifecycleScope.launchWhenStarted {
viewModel.uiState.collect {
when (it.loadStatus) {
is NoteContract.LoadStatus.LoadMoreLoading -> {
adapter.loadMoreModule.loadMoreToLoading()
}
...
}
when (it.refreshStatus) {
is NoteContract.RefreshStatus.RefreshSuccess -> {
adapter.setDiffNewData(it.noteList)
refresh_layout.finishRefresh()
if (it.refreshStatus.hasMore) {
adapter.loadMoreModule.loadMoreComplete()
} else {
adapter.loadMoreModule.loadMoreEnd(false)
}
}
...
}
txv_title.text = it.pageTitle
txv_desc.text = "${it.noteList.size}条记录"
}
}
lifecycleScope.launchWhenStarted {
viewModel.effect.collect {
when (it) {
is NoteContract.Effect.ShowErrorToastEffect -> {
showToast(it.text)
}
is NoteContract.Effect.ShowAddNoteDialog -> {
showAddNoteDialog()
}
}
}
}
}
private fun initListener() {
btn_floating.setOnClickListener {
viewModel.sendEvent(NoteContract.Event.AddingButtonClickEvent)
}
}
}