JetPack DataStore 源码解析

2021-11-29  本文已影响0人  Knight_Davion

DataStore 是一种数据存储解决方案,使用协议缓冲区存储键值对或类型化对象。DataStore 使用 Kotlin 协程和 Flow 以异步、一致的事务方式存储数据。

如果您当前在使用SharedPreferences 存储数据,请考虑迁移到 DataStore。

Preferences DataStore 和 Proto DataStore

基本使用

引入

implementation("androidx.datastore:datastore:1.0.0")
implementation("androidx.datastore:datastore-preferences:1.0.0")

Preferences DataStore 的使用

(1)创建datasource实例

val dataStore: DataStore<Preferences> by preferencesDataStore(name = "settings")

datasouce会将数据保存在内部存储的以下目录

[图片上传失败...(image-caad2-1638167023752)]

其中,settings为上边name中设置的值,后缀名是preferences_pb文件

注意,在实际的开发中,建议将上述的dataStore设置为单例模式。

(2)写数据

datastore 使用edit方法以异步的方式保存数据。

val EXAMPLE_COUNTER = intPreferencesKey("example_counter")

dataStore.edit { settings ->
    val currentCounterValue = settings[EXAMPLE_COUNTER] ?: 0
    settings[EXAMPLE_COUNTER] = currentCounterValue + 1
}

其中EXAMPLE_COUNTER 为key值,并且必须是一个Preferences.Key,datastore目前支持以下几种类型的key:

(3)读数据

datastore以flow的方式观察数据的变化。

val exampleCounterFlow: Flow<Int> = dataStore.data
    .map { preferences ->
        preferences[EXAMPLE_COUNTER] ?: 0
    }

GlobalScope.launch {
    exampleCounterFlow.collectLatest {
        Log.i(TAG, "read value:$it")
    }
}

Proto DataStore 的使用

Proto DataStore 用于保存实例对象,使用之前需要先了解Proto协议及在Android下的基本使用方式,这里不再赘述。
Proto DataStore 的使用和 Preferences DataStore基本类似,在写入数据时使用updateData提交数据

dataStore.updateData { settings ->
    settings.toBuilder()
        .setExampleCounter(settings.exampleCounter + 1)
        .setId(currentSettings.id + 1)
        .build()
}

读取方式和Preferences DataStore一样这里也不再赘述。

使用方式就先介绍到这里,下面来分析下DataStore的源码

源码分析

我们以Preferences DataStore 的读写为例,Proto DataStore的过程类似Preferences DataStore。

先从datastore的实例化入手

val dataStore: DataStore<Preferences> by preferencesDataStore(name = "settings")

dataStore初始化以委托的方式调用 preferencesDataStore 函数 返回一个 DataStore<Preferences>实例

我们来看preferencesDataStore的实现

public fun preferencesDataStore(
    name: String,
    corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
    produceMigrations: (Context) -> List<DataMigration<Preferences>> = { listOf() },
    scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): ReadOnlyProperty<Context, DataStore<Preferences>> {
    return PreferenceDataStoreSingletonDelegate(name, corruptionHandler, produceMigrations, scope)
}

preferencesDataStore 函数中,

preferencesDataStore 函数返回了一个PreferenceDataStoreSingletonDelegate的实例,我们来看PreferenceDataStoreSingletonDelegate的具体实现

internal class PreferenceDataStoreSingletonDelegate internal constructor(
    private val name: String,
    private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
    private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
    private val scope: CoroutineScope
) : ReadOnlyProperty<Context, DataStore<Preferences>> {

    private val lock = Any()

    @GuardedBy("lock")
    @Volatile
    private var INSTANCE: DataStore<Preferences>? = null

    /**
     * Gets the instance of the DataStore.
     *
     * @param thisRef must be an instance of [Context]
     * @param property not used
     */
    override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
        return INSTANCE ?: synchronized(lock) {
            if (INSTANCE == null) {
                val applicationContext = thisRef.applicationContext

                INSTANCE = PreferenceDataStoreFactory.create(
                    corruptionHandler = corruptionHandler,
                    migrations = produceMigrations(applicationContext),
                    scope = scope
                ) {
                    applicationContext.preferencesDataStoreFile(name)
                }
            }
            INSTANCE!!
        }
    }
}

代码很容易理解,PreferenceDataStoreSingletonDelegate 实现了 ReadOnlyProperty 接口,在ReadOnlyProperty接口中 有一个重载的getValue方法

public fun interface ReadOnlyProperty<in T, out V> {
    public operator fun getValue(thisRef: T, property: KProperty<*>): V
}

PreferenceDataStoreSingletonDelegate 中getValue方法返回了一个DataStore<Preferences>的单例对象。

INSTANCE通过 PreferenceDataStoreFactory.create 函数创建,我们先来看这句

applicationContext.preferencesDataStoreFile(name)

preferencesDataStoreFile 是context的一个扩展,需要我们前边传入的name参数,猜测这里是设置保存文件路径。

public fun Context.preferencesDataStoreFile(name: String): File =
    this.dataStoreFile("$name.preferences_pb")
    
public fun Context.dataStoreFile(fileName: String): File =
    File(applicationContext.filesDir, "datastore/$fileName")

preferencesDataStoreFile 进一步调用了dataStoreFile函数,dataStoreFile函数中设置了保存文件的具体路径,

到这里我们知道了,DataSource是将数据保存在了

data/data/包名/files/datastore/xxx.preferences_pb

文件中。我们回到PreferenceDataStoreSingletonDelegate类中,继续看 INSTANCE的创建过程。

PreferenceDataStoreFactory.kt

    public fun create(
        corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
        migrations: List<DataMigration<Preferences>> = listOf(),
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
        produceFile: () -> File
    ): DataStore<Preferences> {
        val delegate = DataStoreFactory.create(
            serializer = PreferencesSerializer,
            corruptionHandler = corruptionHandler,
            migrations = migrations,
            scope = scope
        ) {
            val file = produceFile()
            check(file.extension == PreferencesSerializer.fileExtension) {
                "File extension for file: $file does not match required extension for" +
                    " Preferences file: ${PreferencesSerializer.fileExtension}"
            }
            file
        }
        return PreferenceDataStore(delegate)
    }
}

PreferenceDataStoreFactory的create方法没有太多逻辑,先是继续调用DataStoreFactory.create方法,返回一个DataStore<T>的代理,同时,检查创建的文件名称合法性。最后返回一个PreferenceDataStore实例。

我们先看DataStoreFactory.create方法

    public fun <T> create(
        serializer: Serializer<T>,
        corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
        migrations: List<DataMigration<T>> = listOf(),
        scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
        produceFile: () -> File
    ): DataStore<T> =
        SingleProcessDataStore(
            produceFile = produceFile,
            serializer = serializer,
            corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
            initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
            scope = scope
        )
}

我们先看DataStoreFactory 从名称上看像是一个工厂类,其实他只是个单例对象,create方法直接返回了一个
SingleProcessDataStore的实例。

SingleProcessDataStore是最后真正的datastore实例化的类DataStore的读写关键逻辑也是在这里实现。

到这里,datastore的实例化基本分析完毕,下面来看数据的读写过程。

datastore通过edit方式实现数据的更新

dataStore.edit { settings ->
    val currentCounterValue = settings[EXAMPLE_COUNTER] ?: 0
    settings[EXAMPLE_COUNTER] = currentCounterValue + 1
}

在edit函数的实现中,直接调用了datastore的updateData函数,


public suspend fun DataStore<Preferences>.edit(
    transform: suspend (MutablePreferences) -> Unit
): Preferences {
    return this.updateData {
        // It's safe to return MutablePreferences since we freeze it in
        // PreferencesDataStore.updateData()
        it.toMutablePreferences().apply { transform(this) }
    }
}

在上面datastore的实例化时我们知道,datastore对象实际上是一个SingleProcessDataStore的实例化对象,那我们直接看SingleProcessDataStore 中的updateData方法:

override suspend fun updateData(transform: suspend (t: T) -> T): T {

    val ack = CompletableDeferred<T>()
    val currentDownStreamFlowState = downstreamFlow.value

    val updateMsg =
        Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)

    actor.offer(updateMsg)

    return ack.await()
}

updateData 函数利用了协程中的处理并发时的 Actors解决方案,如果你对Actors不了解可以看下这里https://www.kotlincn.net/docs/reference/coroutines/shared-mutable-state-and-concurrency.html ,总之在这里只要记住这是为了处理写同步的操作即可。

actor.offer(updateMsg) 这句将任务提交到actor中,updateMsg 中携带了数据及状态信息,我们来看actor中实现

private val actor = SimpleActor<Message<T>>(
    scope = scope,
    onComplete = {
        it?.let {
            downstreamFlow.value = Final(it)
        }
        // We expect it to always be non-null but we will leave the alternative as a no-op
        // just in case.

        synchronized(activeFilesLock) {
            activeFiles.remove(file.absolutePath)
        }
    },
    onUndeliveredElement = { msg, ex ->
        if (msg is Message.Update) {
            // TODO(rohitsat): should we instead use scope.ensureActive() to get the original
            //  cancellation cause? Should we instead have something like
            //  UndeliveredElementException?
            msg.ack.completeExceptionally(
                ex ?: CancellationException(
                    "DataStore scope was cancelled before updateData could complete"
                )
            )
        }
    }
) { msg ->
    when (msg) {
        is Message.Read -> {
            handleRead(msg)
        }
        is Message.Update -> {
            handleUpdate(msg)
        }
    }
}

actor 根据消息类型,进行读写操作,由于上一步我们传递的是Message.Update类型,因此会调用handleUpdate(msg)函数,我们继续进入handleUpdate(msg)函数中

private suspend fun handleUpdate(update: Message.Update<T>) {
    // All branches of this *must* complete ack either successfully or exceptionally.
    // We must *not* throw an exception, just propagate it to the ack.
    update.ack.completeWith(
        runCatching {

            when (val currentState = downstreamFlow.value) {
                is Data -> {
                    // We are already initialized, we just need to perform the update
                    transformAndWrite(update.transform, update.callerContext)
                }
                is ReadException, is UnInitialized -> {
                    if (currentState === update.lastState) {
                        // we need to try to read again
                        readAndInitOrPropagateAndThrowFailure()

                        // We've successfully read, now we need to perform the update
                        transformAndWrite(update.transform, update.callerContext)
                    } else {
                        // Someone else beat us to read but also failed. We just need to
                        // signal the writer that is waiting on ack.
                        // This cast is safe because we can't be in the UnInitialized
                        // state if the state has changed.
                        throw (currentState as ReadException).readException
                    }
                }

                is Final -> throw currentState.finalException // won't happen
            }
        }
    )
}

我们先看主线逻辑,在handleUpdate中 根据状态类型进一步跳转,如果我们上来就执行写操作,downstreamFlow.value的初始状态是UnInitialized ,那么会执行这里的逻辑

if (currentState === update.lastState) {
    // we need to try to read again
    readAndInitOrPropagateAndThrowFailure()

    // We've successfully read, now we need to perform the update
    transformAndWrite(update.transform, update.callerContext)
}

我们看在进一步看transformAndWrite 方法

private suspend fun transformAndWrite(
    transform: suspend (t: T) -> T,
    callerContext: CoroutineContext
): T {
    // value is not null or an exception because we must have the value set by now so this cast
    // is safe.
    val curDataAndHash = downstreamFlow.value as Data<T>
    curDataAndHash.checkHashCode()

    val curData = curDataAndHash.value
    val newData = withContext(callerContext) { transform(curData) }

    // Check that curData has not changed...
    curDataAndHash.checkHashCode()

    return if (curData == newData) {
        curData
    } else {
        writeData(newData)
        downstreamFlow.value = Data(newData, newData.hashCode())
        newData
    }
}

关键逻辑在这里

return if (curData == newData) {
    curData
} else {
    writeData(newData)
    downstreamFlow.value = Data(newData, newData.hashCode())
    newData
}

如果要更新的值和当前值相等,就不再继续执行,否则,执行writeData 然后更新downstreamFlow.value的状态为Data,继续看writeData 函数

internal suspend fun writeData(newData: T) {
    file.createParentDirectories()

    val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
    try {
        FileOutputStream(scratchFile).use { stream ->
            serializer.writeTo(newData, UncloseableOutputStream(stream))
            stream.fd.sync()
            // TODO(b/151635324): fsync the directory, otherwise a badly timed crash could
            //  result in reverting to a previous state.
        }

        if (!scratchFile.renameTo(file)) {
            throw IOException(
                "Unable to rename $scratchFile." +
                    "This likely means that there are multiple instances of DataStore " +
                    "for this file. Ensure that you are only creating a single instance of " +
                    "datastore for this file."
            )
        }
    } catch (ex: IOException) {
        if (scratchFile.exists()) {
            scratchFile.delete() // Swallow failure to delete
        }
        throw ex
    }
}

这里真正执行了数据的保存操作,而且是先将数据写到了一个.tmp的临时文件中,然后调用

scratchFile.renameTo(file)

将scratchFile文件重命名为file文件。到这里数据就保存成功了。

这里有两个地方需要注意下

 stream.fd.sync()

(1)这句代码是通过文件描述符刷新数据,执行这句之后,内存中的数据会立即同步到文件中,这是linux的机制,知道即可。

scratchFile.renameTo(file)

(2)renameTo方法是将文件重命名,测试时发现在Android平台下不论目标文件是否存在,均会执行成功,除非scratchFile 不存在,这可能和Java中的不一致,具体还需要看下renameTo的源码。

我们再来看下数据的读取过程。

在上面的示例中我们知道 读的过程就是观察dataStore.data流的过程,我们继续查看SingleProcessDataStore相关代码

override val data: Flow<T> = flow {
    val currentDownStreamFlowState = downstreamFlow.value
    if (currentDownStreamFlowState !is Data) {
        // We need to send a read request because we don't have data yet.
        actor.offer(Message.Read(currentDownStreamFlowState))
    }
    emitAll(
        downstreamFlow.dropWhile {
            if (currentDownStreamFlowState is Data<T> ||
                currentDownStreamFlowState is Final<T>
            ) {
                false
            } else {
                it === currentDownStreamFlowState
            }
        }.map {
            when (it) {
                is ReadException<T> -> throw it.readException
                is Final<T> -> throw it.finalException
                is Data<T> -> it.value
                is UnInitialized -> error(
                    "This is a bug in DataStore. Please file a bug at: " +
                        "https://issuetracker.google.com/issues/new?" +
                        "component=907884&template=1466542"
                )
            }
        }
    )
}

如果读之前没有写操作或者第一次会先执行

if (currentDownStreamFlowState !is Data) {
    // We need to send a read request because we don't have data yet.
    actor.offer(Message.Read(currentDownStreamFlowState))
}

downstreamFlow.value的初始值为*UnInitialized

但是如果之前有过写操作,就可以直接从缓存中读取最新值,因为在写完时downstreamFlow.value中保存了最新值

private suspend fun transformAndWrite(
    transform: suspend (t: T) -> T,
    callerContext: CoroutineContext
): T {
    ...
    return if (curData == newData) {
        curData
    } else {
        writeData(newData)
        downstreamFlow.value = Data(newData, newData.hashCode())
        ...
    }
}

我们来看下actor中执行读的相关操作,直接查看handlRead函数

private suspend fun handleRead(read: Message.Read<T>) {
    when (val currentState = downstreamFlow.value) {
        ...
        UnInitialized -> {
            readAndInitOrPropagateFailure()
        }
        ...
    }
}

在进入readAndInitOrPropagateFailure函数中

private suspend fun readAndInitOrPropagateFailure() {
    try {
        readAndInit()
    } catch (throwable: Throwable) {
        downstreamFlow.value = ReadException(throwable)
    }
}

在readAndInit继续调用 readDataOrHandleCorruption执行读操作

var initData = readDataOrHandleCorruption()
private suspend fun readDataOrHandleCorruption(): T {
    try {
        return readData()
    } catch (ex: CorruptionException) {
    ...
    }
}

最后在readData中执行了真正的读操作。

private suspend fun readData(): T {
    try {
        FileInputStream(file).use { stream ->
            return serializer.readFrom(stream)
        }
    } catch (ex: FileNotFoundException) {
        if (file.exists()) {
            throw ex
        }
        return serializer.defaultValue
    }
}

在获取到数据后downstreamFlow.value变为Data

private suspend fun readAndInit() {
    ...
    downstreamFlow.value = Data(initData, initData.hashCode())
}

最后通过emitAll发射出去

emitAll(
    downstreamFlow.dropWhile {
        ...
    }.map {
        when (it) {
            ...
            is Data<T> -> it.value
            ...
        }
    }
)

以上就是 DataStore读写的主要流程,涉及到的其他细节由于篇幅原因这里就不展开了,感兴趣的小伙伴们可以自己阅读。

上一篇下一篇

猜你喜欢

热点阅读