Kotlin

基于Kotlin SharedFlow的消息总线

2021-05-01  本文已影响0人  两三行代码

一个Hot Flow可以以广播的形式为所有的订阅者共享已发射的值,其特性如下

简单示例如下

internal object EventBus {
    /**
     * private mutable shared flow
     */
    private val mutableSharedFlow = MutableSharedFlow<Event>()

    /**
     * publicly exposed as read-only shared flow
     */
    private val asSharedFlow = mutableSharedFlow.asSharedFlow()

    val eventBus: SharedFlow<Event>
        get() = asSharedFlow

    init {
        GlobalScope.launch {
                //日志打印当前订阅的订阅者数量
            mutableSharedFlow.subscriptionCount.collect {
                Log.d("flow", "subscriptionCount $it")
            }
        }
    }

    /**
     * 发布事件
     * Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a [Job].
     * The coroutine is cancelled when the resulting job is [cancelled][Job.cancel].
     */
    fun <T : Event> LifecycleOwner.produceEvent(event: T): Job {
        // suspends until all subscribers receive it
        return lifecycleScope.launch {
            mutableSharedFlow.emit(event)
        }
    }

    /**
     * 在GlobalScope中发布
     */
    fun <T : Event> produceEventGlobal(event: T) {
        // suspends until all subscribers receive it
        GlobalScope.launch {
            mutableSharedFlow.emit(event)
        }
    }

    /**
     * Launches and runs the given block when the [Lifecycle] controlling this
     * [LifecycleCoroutineScope] is at least in [Lifecycle.State.CREATED] state.
     *
     * The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
     */
    fun <T : Event> LifecycleOwner.produceEventWhenCreated(event: T): Job {
        // suspends until all subscribers receive it
        return lifecycleScope.launchWhenCreated {
            mutableSharedFlow.emit(event)
        }
    }

    /**
     * Launches and runs the given block when the [Lifecycle] controlling this
     * [LifecycleCoroutineScope] is at least in [Lifecycle.State.STARTED] state.
     *
     * The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
     */
    fun <T : Event> LifecycleOwner.produceEventWhenStared(event: T): Job {
        // suspends until all subscribers receive it
        return lifecycleScope.launchWhenStarted {
            mutableSharedFlow.emit(event)
        }
    }

    /**
     * Launches and runs the given block when the [Lifecycle] controlling this
     * [LifecycleCoroutineScope] is at least in [Lifecycle.State.RESUMED] state.
     *
     * The returned [Job] will be cancelled when the [Lifecycle] is destroyed.
     */
    fun <T : Event> LifecycleOwner.produceEventWhenResumed(event: T): Job {
        // suspends until all subscribers receive it
        return lifecycleScope.launchWhenResumed {
            mutableSharedFlow.emit(event)
        }
    }


    /**
     * subscribe event
     * The returned [Job] can be cancelled
     */
    inline fun LifecycleOwner.subscribeEvent(
        crossinline predicate: suspend (e: Event) -> Boolean,
        crossinline action: suspend (e: Event) -> Unit,
    ): Job {
        return eventBus
            .filter { predicate.invoke(it) }
            .onEach {
                action.invoke(it)
            }.cancellable()
            .launchIn(lifecycleScope)
    }
}

open class Event(open val key: String)
上一篇 下一篇

猜你喜欢

热点阅读