Flow 的理解与运用
Kotlin Flow 可以用于替换 Rxjava,也可以用于替换 LiveData,功能十分强大,它是 Kotlin 协程库提供的一部分功能,因此,如果我们项目中已经引用了 Kotlin 协程,则不需要额外引入 Flow 相关的依赖。
在协程中,挂起函数最多仅能返回一个值,而数据流 Flow 可按顺序发出多个值,例如,我们可以通过数据流从数据库中实时接收更新。数据流使用挂起函数通过异步方式生成和使用值,也就是说,数据流可安全地发出网络请求以生成下一个值,而不会阻塞主线程。
数据流 Flow 包含三个重要角色:
数据提供方:生成数据,并添加到数据流中
中介(可选):可修改发送到数据流的值,或修正数据流本身
数据使用方:使用数据流中的值
创建数据流
flow 构建器函数会创建一个新数据流,然后我们可使用 emit 函数将新值发送到数据流中。
val latestNews: Flow<List<NewsData>> = flow {
while (true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews)
delay(5000)
}
}
修改数据流
中介可以利用中间运算符在不使用值的情况下修改数据流,例如:
- filter : 对待操作的值进行过滤
- map :对值进行加工后继续向后传递
- flatMapLatest:转换成一个新的流,需要返回一个转换后的新的流。(如果下个值来了,上一个值变换还没结束,上一个值的转换会被取消)
- onEach:接收到的每一个值
val news: Flow<List<NewsData>> = myRemoteDataSource.latestNews
// 先过滤列表数据大于3的数据(大于3才能通过)
.filter {
it.size >= 3
}
// 对结果进行加工后继续向后传递
.map { list ->
// 调用 list.filter 进一步筛选出 id==1 的新闻
list.filter { it.id == 1 }
}
// 转换成一个新的流,需要返回一个转换后的新的流。(如果下个值来了,上一个值变换还没结束,上一个值的转换会被取消)
.flatMapLatest {
flow {
emit(it)
}
}
.onEach {
// todo 获取到筛选后 新闻列表(结果) 数据
}
收集数据流
只有在 收集数据流时 才会触发 数据提供方 刷新最新数据。除非使用其他中间运算符指定流,否则数据流始终为冷流并延迟执行。
fun getNewsData() {
viewModelScope.launch(Dispatchers.Main) {
remoteRepository.news
.catch {
// todo 收集异常
}
.collect {
// 收到到的数据
}
}
}
数据流收集可能会由于以下原因而停止:
收集数据的协程被取消,此操作也会让 数据提供方 停止活动。
数据提供方完成发出数据项。在这种情况下,数据流将关闭,调用 collect 的协程则继续执行。
捕获异常
如需处理异常,可以使用 catch 运算符,如:
fun getNewsData() {
viewModelScope.launch {
remoteRepository.news
.catch {
// todo 在这里收集异常
}
.collect {
newsData.value = it
}
}
}
另外,catch 还可执行 emit 操作,向数据流发出新的数据项,例如,如果我们在 上游 发现了异常,我们可以继续调用 emit 函数发送新的数据(或者之前缓存的数据),如:
class MyRemoteRepository @Inject constructor(
private val myRemoteDataSource: MyRemoteDataSource,
) {
// 返回 id 等于 1 的新闻
val news: Flow<List<NewsData>> = myRemoteDataSource.latestNews
.map { news ->
// 筛选出 id==1 的新闻
news.filter { it.id == 1 }
}
.onEach {
// todo 获取到筛选后 新闻列表(结果) 数据
}
.catch {
// 如果在 上游 收集到异常,我们可以继续调用 emit 函数发送新的数据(或者之前缓存的数据)
// 例如:emit(lastCachedNews())
}
}
协程作用域切换
默认情况下,flow 上游数据提供方 会基于 下游数据收集方 的协程 CoroutineContext 执行,也就是说,默认情况,下游和上游会运行在同一个协程作用域中。
并且,它无法从不同协程作用域对值执行 emit 操作。
如果需要更改数据流的的协程作用域,可以使用中间运算符 flowOn 运算符。
flowOn 会更改上游数据流的 CoroutineContext,但不会影响到下游数据流的作用域。
如果有多个 flowOn 运算符,每个运算符都会更改当前位置的上游数据流。
// 上游数据流代码,上游数据流将会在 Dispatchers.IO 作用域上执行:
class MyRemoteRepository @Inject constructor(
private val myRemoteDataSource: MyRemoteDataSource,
) {
val news: Flow<List<NewsData>> = myRemoteDataSource.latestNews
.flowOn(Dispatchers.IO)
.catch { }
}
// 下游数据流代码,下游数据流将会在 Dispatchers.Main 作用域上执行
fun getNewsData() {
viewModelScope.launch(Dispatchers.Main) {
remoteRepository.news.collect { }
}
}
Flow Demo 演示
下面的代码将演示通过 Flow 不断获取最新的新闻列表。
步骤一:创建 News 类,定义新闻格式
data class NewsData(var id: Int, var content: String)
1
步骤二:创建 NewsApi 接口,用于请求最新的新闻列表
interface NewsApi {
/**
* 请求最新的新闻列表
*/
suspend fun fetchLatestNews(): List<NewsData>
companion object {
fun create(): NewsApi {
return NewsApiImpl()
}
}
}
步骤三:创建 DataSource 类,内部有一个 latestNews 变量,作为 Flow 的上游数据提供者,每隔 5 秒,通过 newsApi 请求新闻数据,并调用 emit 方法将新闻数据发出
class MyRemoteDataSource @Inject constructor(
private val newsApi: NewsApi,
) {
val latestNews: Flow<List<NewsData>> = flow {
while (true) {
val latestNews = newsApi.fetchLatestNews()
emit(latestNews)
delay(5000)
}
}
}
步骤四:创建 RemoteRepository 类,内部有一个 news 变量,作为 Flow 的数据中间处理者,筛选数据,切换上游作用域,收集上游异常等都可以在这里处理。
class MyRemoteRepository @Inject constructor(
private val myRemoteDataSource: MyRemoteDataSource,
) {
// 返回 id 等于 1 的新闻
val news: Flow<List<NewsData>> = myRemoteDataSource.latestNews
.map { news ->
// 筛选出 id==1 的新闻
news.filter { it.id == 1 }
}
.onEach {
// todo 获取到筛选后 新闻列表(结果) 数据
}
.flowOn(Dispatchers.IO) // 上游数据流将会在 Dispatchers.IO 作用域上执行
.catch {
// 如果在 上游 收集到异常,我们可以继续调用 emit 函数发送新的数据(或者之前缓存的数据)
// 例如:emit(lastCachedNews())
}
}
步骤五:创建 ViewModel 类,定义了一个成员方法 getNewsData() ,作为 Flow 的下游数据接收者,另外,还定义了一个 LiveData 变量,监听最新的新闻数据。
@HiltViewModel
class MainViewModel @Inject constructor(
private val remoteRepository: MyRemoteRepository,
) : ViewModel() {
val newsData = MutableLiveData<List<NewsData>>()
fun getNewsData() {
viewModelScope.launch(Dispatchers.Main) {
remoteRepository.news
.catch {
// todo 收集异常
}
.collect {
newsData.value = it
}
}
}
}
步骤六:编写 MainActivity 代码,接受最新的新闻数据并打印出来
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {
private val mMainViewModel: MainViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
initObserve()
}
private fun initObserve() {
mMainViewModel.newsData.observe(this) {
println("newsData=${Gson().toJson(it)}")
}
}
fun click(view: View) {
mMainViewModel.getNewsData()
}
}
步骤七:创建 NewsApi 的实现类
class NewsApiImpl : NewsApi {
override suspend fun fetchLatestNews(): List<NewsData> {
val list = ArrayList<NewsData>()
list.add(NewsData(1, "news 1"))
list.add(NewsData(2, "news 2"))
list.add(NewsData(3, "news 3"))
return list
}
}
步骤八:编写 依赖注入类(di)
@InstallIn(SingletonComponent::class)
@Module
class MainModule {
@Singleton
@Provides
fun provideAppDatabase(@ApplicationContext context: Context): NewsApi {
return NewsApi.create()
}
}
// 另外,别忘了在 Application 中加上 @HiltAndroidApp 注解
@HiltAndroidApp
class MainApplication : Application()
步骤九:由于 demo 使用到了 Hilt ,因此我们需要加上如下依赖:
// Project-build.gradle
buildscript {
ext {
hiltVersion = '2.41'
}
dependencies {
classpath "com.google.dagger:hilt-android-gradle-plugin:$hiltVersion"
}
}
// app-build.gradle
plugins {
id 'kotlin-kapt'
id 'dagger.hilt.android.plugin'
}
dependencies {
kapt "com.google.dagger:hilt-android-compiler:$rootProject.hiltVersion"
implementation "com.google.dagger:hilt-android:$rootProject.hiltVersion"
}
步骤十:为了能在 activity 或 fragment 中使用 by viewModels() api,我们还需额外引入以下 依赖:
// app-build.gradle
dependencies {
implementation "androidx.activity:activity-ktx:1.4.0"
implementation "androidx.fragment:fragment-ktx:1.4.1"
}
StateFlw
StateFlow 是一个状态容器式的可观察数据流,可以向其收集器发出当前状态更新和新状态更新。还可通过其 value 属性读取当前状态值。
StateFlow 非常适合需要让可变状态保持可观察的类。
Flow 是冷数据流,而 StateFlow 是热数据流,热流有如下特性:
- 调用 StateFlow.collect 收集数据不会触发任何 数据提供方(上游) 的代码
- 上游数据流 如果已经处于活跃(发送)状态,即使没有任何地方调用 StateFlow.collect ,上游流仍会持续活跃(没有 Gc Root 引用自然也会被回收)
- 它允许被多个观察者共用 (因此是共享的数据流)
当一个 新的数据接收方 开始从数据流中 collect 数据时,它将接收到信息流中的最近一个状态及任何后续状态。
注意:如果 StateFlow.value 接收的新数据和前一个旧数据一样时,下游并不会接收到数据的更新通知。
StateFlow 和 LiveData
StateFlow 和 LiveData 具有相似之处,两者都是可观察的数据容器类。
但也存在不同之处:
- StateFlow 需要将初始状态传递给构造函数,而 LiveData 不需要
- 当 View 进入 STOPPED 状态时,LiveData.observe() 会自动取消注册使用方,但是从 StateFlow 或任何其他数据流收集数据的操作并不会自动停止,如需实现与 LiveData 相同的行为,则需从 Lifecycle.repeatOnLifecycle 块中收集数据流
StateFlow 的简单用法如下:
// ViewModel: (_uiState.value更新的地方属于上游)
@HiltViewModel
class MainViewModel @Inject constructor(
private val remoteRepository: MyRemoteRepository,
) : ViewModel() {
// 定义一个私有的 MutableStateFlow 变量(可变)
private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
// UI 从此 StateFlow 收集以获取其状态更新
val uiState: StateFlow<LatestNewsUiState> = _uiState
fun getNewsData() {
viewModelScope.launch {
remoteRepository.news.collect {
// 接受到最新的新闻列表数据后,将数据赋值给 StateFlow 的 value
_uiState.value = LatestNewsUiState.Success(it)
}
}
}
}
// Activity: (mMainViewModel.uiState.collect的地方属于下游)
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {
private val mMainViewModel: MainViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
initObserve()
}
private fun initObserve() {
mMainViewModel.viewModelScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
mMainViewModel.uiState.collect {
// 注意:如果 StateFlow.value 接收的新数据和前一个旧数据一样时,下游并不会接收到数据的更新通知
when (it) {
is LatestNewsUiState.Success -> {
println("获取新闻成功,news=${Gson().toJson(it.news)}")
}
is LatestNewsUiState.Error -> {
println("获取新闻失败,error=${Gson().toJson(it.exception)}")
}
}
}
}
}
}
fun click(view: View) {
mMainViewModel.getNewsData()
}
}
如下代码中,负责更新 MutableStateFlow 的类是 数据提供方(上游) ,从 StateFlow.collect 的类是 数据使用方(下游)。
另外,repeatOnLifecycle 能使界面处于活跃状态下才会更新界面,要使用该 api,还需引入以下依赖:
// Project-build.gradle
buildscript {
ext {
lifecycleVersion = '2.4.1'
}
}
// app-build.gradle
dependencies {
implementation "androidx.lifecycle:lifecycle-runtime-ktx:$rootProject.lifecycleVersion"
}
Flow 转为 StateFlow
如需将任何数据流转换为 StateFlow ,可以使用 stateIn 中间运算符。
stateIn 有两个重载函数,一般我们用第二个:
// 函数1:
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>
// 函数2:
public fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>
其中函数1是一个挂起函数,且仅需要传一个 scope 参数既可,函数2是非挂起函数,需要传递三个参数,三个参数的含义如下:
1、scope :共享流开始时所在的协程作用域范围
2、started :控制共享的开始和结束的策略
3、initialValue: 流的初始值
而 started 有三种取值可选:
- SharingStarted.Eagerly :立即启动上游数据流,且在 scope 指定的作用域被结束时终止上游流
- SharingStarted.Lazily :在第一个订阅者出现后开始启动上游数据流,且在 scope 指定的作用域被结束时终止上游流
- SharingStarted.WhileSubscribed(stopTimeoutMillis) :在第一个订阅者出现后开始启动上游数据流,没有下游收集的情况下,指定时间后(默认是0)会取消上游数据流
stateIn 使用如下:
// ViewModel:
@HiltViewModel
class MainViewModel @Inject constructor(
private val remoteRepository: MyRemoteRepository,
) : ViewModel() {
// 将 news(Flow冷流) 转为 StateFlow 热流
val mStateFlow: StateFlow<List<NewsData>> = remoteRepository.news
.stateIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
initialValue = emptyList()
)
}
// Activity:
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {
private val mMainViewModel: MainViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
}
// 点击开始收集上游数据
fun click(view: View) {
mMainViewModel.viewModelScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
mMainViewModel.mStateFlow.collect {
println("收集到数据,it=${Gson().toJson(it)}")
}
}
}
}
}
例子中的 ViewModel,定义了一个变量,将 Flow 冷流通过 stateIn 操作符转为 StateFlow ,该 StateFlow 在第一个订阅者出现后开始启动上游数据流,没有下游收集的情况下,会在 5秒 后取消上游数据流,另外,initialValue 初始值设置为 一个空列表 。在 MainActivity 的代码中,点击按钮将会触发 上游流开始发送数据,同时下游流也开始接收数据。
WhileSubscribed 传入了 5000 ,是为了实现等待5 秒后仍然没有订阅者存在就终止协程的功能,这个方法有以下功能:
应用转至后台运行后,5 秒钟后所有来自其他层的数据更新会停止,这样可以节省电量
在屏幕旋转时,因为重新订阅的时间在5s内,因此上游流不会中止
SharedFlow
SharedFlow 配置更为灵活,支持配置replay,缓冲区大小等,StateFlow是SharedFlow的特化版本,replay固定为1,缓冲区大小默认为0
我们可使用 shareIn 函数会返回一个热数据流 SharedFlow, SharedFlow 会 向从其所有的 数据接收方(下游) 发出数据。
我们先看下 ShareFlow 的构造函数:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
其主要有3个参数:
- replay:有新的订阅者Collect时,发送几个已经发送过的数据给它,默认为0
- extraBufferCapacity:除了 replay,SharedFlow 还缓存多少数据,默认为0
- onBufferOverflow:表示缓存策略,即缓冲区满了之后 ShareFlow 如何处理,默认为挂起
StateFolw 和 SharedFlow 的区别
StateFolw 和 SharedFlow 都属于热流。
StateFlow 本质上是一个 replay 为 1,且没有缓冲区的 SharedFlow,因此第一次订阅时会先获得默认值。
StateFlow 仅在值已更新,并且值发生了变化时才会返回,也就是说如果更新后的值没有变化,Collect 方法不会回调,但是 ShareFlow 是会回调的。
下面举个简单的使用 SharedFlow 的例子:
// ViewModel:
@HiltViewModel
class MainViewModel @Inject constructor(
private val remoteRepository: MyRemoteRepository,
) : ViewModel() {
// 定义一个私有的 MutableSharedFlow 变量(可变),当有新的订阅者时,会先发送1个之前发送过的数据给订阅者
private val mMutableSharedFlow = MutableSharedFlow<List<NewsData>>(replay = 1)
// 不可变的 shareFlow
val shareFlow: SharedFlow<List<NewsData>> = mMutableSharedFlow
fun getNewsData() {
viewModelScope.launch {
remoteRepository.news.collect {
mMutableSharedFlow.emit(it)
}
}
}
}
// Activity:
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {
private val mMainViewModel: MainViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
}
// ShareFlow 开始发送数据
fun click(view: View) {
mMainViewModel.getNewsData()
}
// 收集 ShareFlow 发送的数据
fun click2(view: View) {
mMainViewModel.viewModelScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
mMainViewModel.shareFlow.collect {
println("获取到的新闻,news=${Gson().toJson(it)}")
}
}
}
}
}
该例子中,ViewModel 中定义了一个 ShareFlow,并将其 replay 参数设置为 1 ,即当有新的订阅者时,会先发送 1 个之前发送过的数据给订阅者。在Activity 中,有两个按钮,按钮 1 触发 ShareFlow 上游开始发送数据, 按钮 2 触发 下游收集数据,按钮 2 按下后,下游会先收集到 1 个之前发送过的数据。
Flow 转为 SharedFlow
如需将任何数据流转换为 SharedFlow ,可以使用 ShareIn 中间运算符:
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
ShareIn 函数有三个参数:
- scope:共享流开始时所在的协程作用域范围
- started:控制共享的开始和结束的策略
- replay:有新的订阅者Collect时,发送几个已经发送过的数据给它,默认为0
举个简单的例子(跟 StateIn 的例子很像):
// ViewModel:
@HiltViewModel
class MainViewModel @Inject constructor(
private val remoteRepository: MyRemoteRepository,
) : ViewModel() {
// 将 news(Flow冷流) 转为 SharedFlow 热流
val mSharedFlow: SharedFlow<List<NewsData>> = remoteRepository.news
.shareIn(
scope = viewModelScope,
started = WhileSubscribed(5000),
replay = 1
)
}
// Activity:
@AndroidEntryPoint
class MainActivity : AppCompatActivity() {
private val mMainViewModel: MainViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
}
fun click(view: View) {
// 点击开始收集上游数据
mMainViewModel.viewModelScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
mMainViewModel.mSharedFlow.collect {
println("收集到的新闻数据,news=${Gson().toJson(it)}")
}
}
}
}
}
例子中的 ViewModel,定义了一个变量,将 Flow 冷流通过 shareIn 操作符转为 SharedFlow ,该 SharedFlow 在第一个订阅者出现后开始启动上游数据流,没有下游收集的情况下,会在 5秒 后取消上游数据流,另外,replay设置为 1 ,当有下游收集者时,会将之前发过的最近一个值发给下游收集者。在 MainActivity 的代码中,点击按钮将会触发 上游流开始发送数据,同时下游流也开始接收数据。