JetPack DataStore 源码解析
DataStore 是一种数据存储解决方案,使用协议缓冲区存储键值对或类型化对象。DataStore 使用 Kotlin 协程和 Flow 以异步、一致的事务方式存储数据。
如果您当前在使用SharedPreferences 存储数据,请考虑迁移到 DataStore。
Preferences DataStore 和 Proto DataStore
- Preferences DataStore 使用键存储和访问数据。此实现不需要预定义的架构,也不确保类型安全。
- Proto DataStore 将数据作为自定义数据类型的实例进行存储。此实现要求您使用协议缓冲区来定义架构,但可以确保类型安全。
基本使用
引入
implementation("androidx.datastore:datastore:1.0.0")
implementation("androidx.datastore:datastore-preferences:1.0.0")
Preferences DataStore 的使用
(1)创建datasource实例
val dataStore: DataStore<Preferences> by preferencesDataStore(name = "settings")
datasouce会将数据保存在内部存储的以下目录
[图片上传失败...(image-caad2-1638167023752)]
其中,settings为上边name中设置的值,后缀名是preferences_pb文件
注意,在实际的开发中,建议将上述的dataStore设置为单例模式。
(2)写数据
datastore 使用edit方法以异步的方式保存数据。
val EXAMPLE_COUNTER = intPreferencesKey("example_counter")
dataStore.edit { settings ->
val currentCounterValue = settings[EXAMPLE_COUNTER] ?: 0
settings[EXAMPLE_COUNTER] = currentCounterValue + 1
}
其中EXAMPLE_COUNTER 为key值,并且必须是一个Preferences.Key,datastore目前支持以下几种类型的key:
-
intPreferencesKey
-
doublePreferencesKey
-
stringPreferencesKey
-
booleanPreferencesKey
-
floatPreferencesKey
-
longPreferencesKey
(3)读数据
datastore以flow的方式观察数据的变化。
val exampleCounterFlow: Flow<Int> = dataStore.data
.map { preferences ->
preferences[EXAMPLE_COUNTER] ?: 0
}
GlobalScope.launch {
exampleCounterFlow.collectLatest {
Log.i(TAG, "read value:$it")
}
}
Proto DataStore 的使用
Proto DataStore 用于保存实例对象,使用之前需要先了解Proto协议及在Android下的基本使用方式,这里不再赘述。
Proto DataStore 的使用和 Preferences DataStore基本类似,在写入数据时使用updateData提交数据
dataStore.updateData { settings ->
settings.toBuilder()
.setExampleCounter(settings.exampleCounter + 1)
.setId(currentSettings.id + 1)
.build()
}
读取方式和Preferences DataStore一样这里也不再赘述。
使用方式就先介绍到这里,下面来分析下DataStore的源码
源码分析
我们以Preferences DataStore 的读写为例,Proto DataStore的过程类似Preferences DataStore。
先从datastore的实例化入手
val dataStore: DataStore<Preferences> by preferencesDataStore(name = "settings")
dataStore初始化以委托的方式调用 preferencesDataStore 函数 返回一个 DataStore<Preferences>实例
我们来看preferencesDataStore的实现
public fun preferencesDataStore(
name: String,
corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
produceMigrations: (Context) -> List<DataMigration<Preferences>> = { listOf() },
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
): ReadOnlyProperty<Context, DataStore<Preferences>> {
return PreferenceDataStoreSingletonDelegate(name, corruptionHandler, produceMigrations, scope)
}
preferencesDataStore 函数中,
-
name 上边解释过,就是保存的文件的名字
-
corruptionHandler 中包含了读写失败时的错误。
-
produceMigrations 用于数据迁移,当我们需要从sp中将数据迁移到datastore时需要此参数。
-
scope 指定了线程调度器,默认是在IO线程中。
preferencesDataStore 函数返回了一个PreferenceDataStoreSingletonDelegate的实例,我们来看PreferenceDataStoreSingletonDelegate的具体实现
internal class PreferenceDataStoreSingletonDelegate internal constructor(
private val name: String,
private val corruptionHandler: ReplaceFileCorruptionHandler<Preferences>?,
private val produceMigrations: (Context) -> List<DataMigration<Preferences>>,
private val scope: CoroutineScope
) : ReadOnlyProperty<Context, DataStore<Preferences>> {
private val lock = Any()
@GuardedBy("lock")
@Volatile
private var INSTANCE: DataStore<Preferences>? = null
/**
* Gets the instance of the DataStore.
*
* @param thisRef must be an instance of [Context]
* @param property not used
*/
override fun getValue(thisRef: Context, property: KProperty<*>): DataStore<Preferences> {
return INSTANCE ?: synchronized(lock) {
if (INSTANCE == null) {
val applicationContext = thisRef.applicationContext
INSTANCE = PreferenceDataStoreFactory.create(
corruptionHandler = corruptionHandler,
migrations = produceMigrations(applicationContext),
scope = scope
) {
applicationContext.preferencesDataStoreFile(name)
}
}
INSTANCE!!
}
}
}
代码很容易理解,PreferenceDataStoreSingletonDelegate 实现了 ReadOnlyProperty 接口,在ReadOnlyProperty接口中 有一个重载的getValue方法
public fun interface ReadOnlyProperty<in T, out V> {
public operator fun getValue(thisRef: T, property: KProperty<*>): V
}
PreferenceDataStoreSingletonDelegate 中getValue方法返回了一个DataStore<Preferences>的单例对象。
INSTANCE通过 PreferenceDataStoreFactory.create 函数创建,我们先来看这句
applicationContext.preferencesDataStoreFile(name)
preferencesDataStoreFile 是context的一个扩展,需要我们前边传入的name参数,猜测这里是设置保存文件路径。
public fun Context.preferencesDataStoreFile(name: String): File =
this.dataStoreFile("$name.preferences_pb")
public fun Context.dataStoreFile(fileName: String): File =
File(applicationContext.filesDir, "datastore/$fileName")
preferencesDataStoreFile 进一步调用了dataStoreFile函数,dataStoreFile函数中设置了保存文件的具体路径,
到这里我们知道了,DataSource是将数据保存在了
data/data/包名/files/datastore/xxx.preferences_pb
文件中。我们回到PreferenceDataStoreSingletonDelegate类中,继续看 INSTANCE的创建过程。
PreferenceDataStoreFactory.kt
public fun create(
corruptionHandler: ReplaceFileCorruptionHandler<Preferences>? = null,
migrations: List<DataMigration<Preferences>> = listOf(),
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
produceFile: () -> File
): DataStore<Preferences> {
val delegate = DataStoreFactory.create(
serializer = PreferencesSerializer,
corruptionHandler = corruptionHandler,
migrations = migrations,
scope = scope
) {
val file = produceFile()
check(file.extension == PreferencesSerializer.fileExtension) {
"File extension for file: $file does not match required extension for" +
" Preferences file: ${PreferencesSerializer.fileExtension}"
}
file
}
return PreferenceDataStore(delegate)
}
}
PreferenceDataStoreFactory的create方法没有太多逻辑,先是继续调用DataStoreFactory.create方法,返回一个DataStore<T>的代理,同时,检查创建的文件名称合法性。最后返回一个PreferenceDataStore实例。
我们先看DataStoreFactory.create方法
public fun <T> create(
serializer: Serializer<T>,
corruptionHandler: ReplaceFileCorruptionHandler<T>? = null,
migrations: List<DataMigration<T>> = listOf(),
scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob()),
produceFile: () -> File
): DataStore<T> =
SingleProcessDataStore(
produceFile = produceFile,
serializer = serializer,
corruptionHandler = corruptionHandler ?: NoOpCorruptionHandler(),
initTasksList = listOf(DataMigrationInitializer.getInitializer(migrations)),
scope = scope
)
}
我们先看DataStoreFactory 从名称上看像是一个工厂类,其实他只是个单例对象,create方法直接返回了一个
SingleProcessDataStore的实例。
SingleProcessDataStore是最后真正的datastore实例化的类DataStore的读写关键逻辑也是在这里实现。
到这里,datastore的实例化基本分析完毕,下面来看数据的读写过程。
写
datastore通过edit方式实现数据的更新
dataStore.edit { settings ->
val currentCounterValue = settings[EXAMPLE_COUNTER] ?: 0
settings[EXAMPLE_COUNTER] = currentCounterValue + 1
}
在edit函数的实现中,直接调用了datastore的updateData函数,
public suspend fun DataStore<Preferences>.edit(
transform: suspend (MutablePreferences) -> Unit
): Preferences {
return this.updateData {
// It's safe to return MutablePreferences since we freeze it in
// PreferencesDataStore.updateData()
it.toMutablePreferences().apply { transform(this) }
}
}
在上面datastore的实例化时我们知道,datastore对象实际上是一个SingleProcessDataStore的实例化对象,那我们直接看SingleProcessDataStore 中的updateData方法:
override suspend fun updateData(transform: suspend (t: T) -> T): T {
val ack = CompletableDeferred<T>()
val currentDownStreamFlowState = downstreamFlow.value
val updateMsg =
Message.Update(transform, ack, currentDownStreamFlowState, coroutineContext)
actor.offer(updateMsg)
return ack.await()
}
updateData 函数利用了协程中的处理并发时的 Actors解决方案,如果你对Actors不了解可以看下这里https://www.kotlincn.net/docs/reference/coroutines/shared-mutable-state-and-concurrency.html ,总之在这里只要记住这是为了处理写同步的操作即可。
actor.offer(updateMsg) 这句将任务提交到actor中,updateMsg 中携带了数据及状态信息,我们来看actor中实现
private val actor = SimpleActor<Message<T>>(
scope = scope,
onComplete = {
it?.let {
downstreamFlow.value = Final(it)
}
// We expect it to always be non-null but we will leave the alternative as a no-op
// just in case.
synchronized(activeFilesLock) {
activeFiles.remove(file.absolutePath)
}
},
onUndeliveredElement = { msg, ex ->
if (msg is Message.Update) {
// TODO(rohitsat): should we instead use scope.ensureActive() to get the original
// cancellation cause? Should we instead have something like
// UndeliveredElementException?
msg.ack.completeExceptionally(
ex ?: CancellationException(
"DataStore scope was cancelled before updateData could complete"
)
)
}
}
) { msg ->
when (msg) {
is Message.Read -> {
handleRead(msg)
}
is Message.Update -> {
handleUpdate(msg)
}
}
}
actor 根据消息类型,进行读写操作,由于上一步我们传递的是Message.Update类型,因此会调用handleUpdate(msg)函数,我们继续进入handleUpdate(msg)函数中
private suspend fun handleUpdate(update: Message.Update<T>) {
// All branches of this *must* complete ack either successfully or exceptionally.
// We must *not* throw an exception, just propagate it to the ack.
update.ack.completeWith(
runCatching {
when (val currentState = downstreamFlow.value) {
is Data -> {
// We are already initialized, we just need to perform the update
transformAndWrite(update.transform, update.callerContext)
}
is ReadException, is UnInitialized -> {
if (currentState === update.lastState) {
// we need to try to read again
readAndInitOrPropagateAndThrowFailure()
// We've successfully read, now we need to perform the update
transformAndWrite(update.transform, update.callerContext)
} else {
// Someone else beat us to read but also failed. We just need to
// signal the writer that is waiting on ack.
// This cast is safe because we can't be in the UnInitialized
// state if the state has changed.
throw (currentState as ReadException).readException
}
}
is Final -> throw currentState.finalException // won't happen
}
}
)
}
我们先看主线逻辑,在handleUpdate中 根据状态类型进一步跳转,如果我们上来就执行写操作,downstreamFlow.value的初始状态是UnInitialized ,那么会执行这里的逻辑
if (currentState === update.lastState) {
// we need to try to read again
readAndInitOrPropagateAndThrowFailure()
// We've successfully read, now we need to perform the update
transformAndWrite(update.transform, update.callerContext)
}
我们看在进一步看transformAndWrite 方法
private suspend fun transformAndWrite(
transform: suspend (t: T) -> T,
callerContext: CoroutineContext
): T {
// value is not null or an exception because we must have the value set by now so this cast
// is safe.
val curDataAndHash = downstreamFlow.value as Data<T>
curDataAndHash.checkHashCode()
val curData = curDataAndHash.value
val newData = withContext(callerContext) { transform(curData) }
// Check that curData has not changed...
curDataAndHash.checkHashCode()
return if (curData == newData) {
curData
} else {
writeData(newData)
downstreamFlow.value = Data(newData, newData.hashCode())
newData
}
}
关键逻辑在这里
return if (curData == newData) {
curData
} else {
writeData(newData)
downstreamFlow.value = Data(newData, newData.hashCode())
newData
}
如果要更新的值和当前值相等,就不再继续执行,否则,执行writeData 然后更新downstreamFlow.value的状态为Data,继续看writeData 函数
internal suspend fun writeData(newData: T) {
file.createParentDirectories()
val scratchFile = File(file.absolutePath + SCRATCH_SUFFIX)
try {
FileOutputStream(scratchFile).use { stream ->
serializer.writeTo(newData, UncloseableOutputStream(stream))
stream.fd.sync()
// TODO(b/151635324): fsync the directory, otherwise a badly timed crash could
// result in reverting to a previous state.
}
if (!scratchFile.renameTo(file)) {
throw IOException(
"Unable to rename $scratchFile." +
"This likely means that there are multiple instances of DataStore " +
"for this file. Ensure that you are only creating a single instance of " +
"datastore for this file."
)
}
} catch (ex: IOException) {
if (scratchFile.exists()) {
scratchFile.delete() // Swallow failure to delete
}
throw ex
}
}
这里真正执行了数据的保存操作,而且是先将数据写到了一个.tmp的临时文件中,然后调用
scratchFile.renameTo(file)
将scratchFile文件重命名为file文件。到这里数据就保存成功了。
这里有两个地方需要注意下
stream.fd.sync()
(1)这句代码是通过文件描述符刷新数据,执行这句之后,内存中的数据会立即同步到文件中,这是linux的机制,知道即可。
scratchFile.renameTo(file)
(2)renameTo方法是将文件重命名,测试时发现在Android平台下不论目标文件是否存在,均会执行成功,除非scratchFile 不存在,这可能和Java中的不一致,具体还需要看下renameTo的源码。
读
我们再来看下数据的读取过程。
在上面的示例中我们知道 读的过程就是观察dataStore.data流的过程,我们继续查看SingleProcessDataStore相关代码
override val data: Flow<T> = flow {
val currentDownStreamFlowState = downstreamFlow.value
if (currentDownStreamFlowState !is Data) {
// We need to send a read request because we don't have data yet.
actor.offer(Message.Read(currentDownStreamFlowState))
}
emitAll(
downstreamFlow.dropWhile {
if (currentDownStreamFlowState is Data<T> ||
currentDownStreamFlowState is Final<T>
) {
false
} else {
it === currentDownStreamFlowState
}
}.map {
when (it) {
is ReadException<T> -> throw it.readException
is Final<T> -> throw it.finalException
is Data<T> -> it.value
is UnInitialized -> error(
"This is a bug in DataStore. Please file a bug at: " +
"https://issuetracker.google.com/issues/new?" +
"component=907884&template=1466542"
)
}
}
)
}
如果读之前没有写操作或者第一次会先执行
if (currentDownStreamFlowState !is Data) {
// We need to send a read request because we don't have data yet.
actor.offer(Message.Read(currentDownStreamFlowState))
}
downstreamFlow.value的初始值为*UnInitialized
但是如果之前有过写操作,就可以直接从缓存中读取最新值,因为在写完时downstreamFlow.value中保存了最新值
private suspend fun transformAndWrite(
transform: suspend (t: T) -> T,
callerContext: CoroutineContext
): T {
...
return if (curData == newData) {
curData
} else {
writeData(newData)
downstreamFlow.value = Data(newData, newData.hashCode())
...
}
}
我们来看下actor中执行读的相关操作,直接查看handlRead函数
private suspend fun handleRead(read: Message.Read<T>) {
when (val currentState = downstreamFlow.value) {
...
UnInitialized -> {
readAndInitOrPropagateFailure()
}
...
}
}
在进入readAndInitOrPropagateFailure函数中
private suspend fun readAndInitOrPropagateFailure() {
try {
readAndInit()
} catch (throwable: Throwable) {
downstreamFlow.value = ReadException(throwable)
}
}
在readAndInit继续调用 readDataOrHandleCorruption执行读操作
var initData = readDataOrHandleCorruption()
private suspend fun readDataOrHandleCorruption(): T {
try {
return readData()
} catch (ex: CorruptionException) {
...
}
}
最后在readData中执行了真正的读操作。
private suspend fun readData(): T {
try {
FileInputStream(file).use { stream ->
return serializer.readFrom(stream)
}
} catch (ex: FileNotFoundException) {
if (file.exists()) {
throw ex
}
return serializer.defaultValue
}
}
在获取到数据后downstreamFlow.value变为Data
private suspend fun readAndInit() {
...
downstreamFlow.value = Data(initData, initData.hashCode())
}
最后通过emitAll发射出去
emitAll(
downstreamFlow.dropWhile {
...
}.map {
when (it) {
...
is Data<T> -> it.value
...
}
}
)
以上就是 DataStore读写的主要流程,涉及到的其他细节由于篇幅原因这里就不展开了,感兴趣的小伙伴们可以自己阅读。