从Room源码看抽象与封装——数据流
目录
源码解析目录
从Room源码看抽象与封装——SQLite的抽象
从Room源码看抽象与封装——数据库的创建
从Room源码看抽象与封装——数据库的升降级
从Room源码看抽象与封装——Dao
从Room源码看抽象与封装——数据流
前言
之前关于Room源码分析的四篇文章基本上涵盖了Room使用的全流程,从抽象基础到数据库创建再到增删改查的实现,可谓很全面了。你以为这就完了吗?Too young!如果Room只是这些内容的话,那只能说它是个还不错的ORM框架,对SQLite进行了良好的抽象,并且效率也很高。Room最大的不同在于它是Jetpack包的一部分,它与Jetpack包中其它部分有着非常好的配合,尤其是LiveData。以之前定义的Dao为例:
@Dao
interface UserDao {
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserById(userId: Int): User?
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserLiveDataById(userId: Int): LiveData<User>
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserObservableById(userId: Int): Observable<User>
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserFlowableById(userId: Int): Flowable<User>
}
如上所示,Dao中的查询不仅可以返回Entity对象User,还可以以数据流的形式返回LiveData或者是RxJava中的Observable,Flowable。当我们“增删改”User表时,如果在此之前以如上形式获取到了User数据流,那么该User数据流就会“更新”,即相应查询会重新执行一遍,以把最新的User数据传递出去,至于User是否真的有变化,这需要我们自行判断。
这篇文章会介绍Room是如何实现数据流的。
1. 概述
如果说之前对Room方方面面的解析只是“繁”的话,那么Room对于数据流的实现真的是有点“难”了。整个流程比较复杂,为了防止你看的云里雾里,我先大致描述一下整个流程,然后再对各个部分进行详细讲解。
Room实现数据流大致包含如下几个步骤:
- 创建一个数据库临时表(表名为
room_table_modification_log
,保存在内存中,非本地存储),表中包含两列table_id
和invalidated
,这个表记录了当前数据库哪个Table被修改了(“增删改”都属于修改)。 - 如果Dao中的查询涉及到了数据流返回,那么生成代码就会帮我们向数据库中添加一个关于当前被查询表(对于上面的例子来说就是User这个表,可能会涉及到多个表)的TRIGGER(触发器),这个触发器的主要任务是当有操作修改了被查询表中的数据时,就把
room_table_modification_log
表中对应的table_id
的invalidated
置为1,表示“失效”了。 - Room中所有“增删改”都是放在Transaction中的,在
endTransaction
时,Room都会查看room_table_modification_log
表中是否有invalidated
为1的,如果有,再次执行相应查询(invalidated
会被重置为0),并把查询的数据通过相应数据流传递出去。 - 在数据流不再使用时(例如Observable被dispose),相应触发器也会被丢弃,这样就不会再对这个表进行“追踪”了。
2. 创建临时表
Room中关于数据流的实现几乎全部包含在InvalidationTracker
类中,这个类是在RoomDatabase
中被创建的,只是在之前的分析中被刻意忽略了。
public abstract class RoomDatabase {
private final InvalidationTracker mInvalidationTracker;
public RoomDatabase() {
mInvalidationTracker = createInvalidationTracker();
}
/**
* 由生成代码实现
*/
@NonNull
protected abstract InvalidationTracker createInvalidationTracker();
/**
* 由生成代码调用,初始化 InvalidationTracker
*/
protected void internalInitInvalidationTracker(@NonNull SupportSQLiteDatabase db) {
mInvalidationTracker.internalInit(db);
}
@NonNull
public InvalidationTracker getInvalidationTracker() {
return mInvalidationTracker;
}
}
public final class AppDatabase_Impl extends AppDatabase {
@Override
protected SupportSQLiteOpenHelper createOpenHelper(DatabaseConfiguration configuration) {
final SupportSQLiteOpenHelper.Callback _openCallback = new RoomOpenHelper(configuration, new RoomOpenHelper.Delegate(1) {
@Override
public void onOpen(SupportSQLiteDatabase _db) {
mDatabase = _db;
_db.execSQL("PRAGMA foreign_keys = ON");
//数据库打开时初始化 InvalidationTracker
internalInitInvalidationTracker(_db);
//...
}
//...
});
//...
}
@Override
protected InvalidationTracker createInvalidationTracker() {
//...
return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "User"); //User表示要观察的表名,可能包含很多个
}
}
注解处理器生成了AppDatabase_Impl
,其中不仅实现了createOpenHelper
,还实现了createInvalidationTracker
,并且在数据库打开时(onOpen
回调被调用),InvalidationTracker
被初始化,也是在初始化时,临时表被创建,踏出万里长征第一步。
public class InvalidationTracker {
private static final String UPDATE_TABLE_NAME = "room_table_modification_log";
private static final String TABLE_ID_COLUMN_NAME = "table_id";
private static final String INVALIDATED_COLUMN_NAME = "invalidated";
//建表
private static final String CREATE_TRACKING_TABLE_SQL = "CREATE TEMP TABLE " + UPDATE_TABLE_NAME
+ "(" + TABLE_ID_COLUMN_NAME + " INTEGER PRIMARY KEY, "
+ INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0)";
//重置
static final String RESET_UPDATED_TABLES_SQL = "UPDATE " + UPDATE_TABLE_NAME
+ " SET " + INVALIDATED_COLUMN_NAME + " = 0 WHERE " + INVALIDATED_COLUMN_NAME + " = 1 ";
//找出哪个表改变了
static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
+ " WHERE " + INVALIDATED_COLUMN_NAME + " = 1;";
//根据表名找表名对应的ID
final ArrayMap<String, Integer> mTableIdLookup;
//哪些表需要被观察
final String[] mTableNames;
//tableNames是可变参数,表示可能有多个要被“追踪”的表
public InvalidationTracker(RoomDatabase database, Map<String, String> shadowTablesMap,
Map<String, Set<String>> viewTables, String... tableNames) {
//shadowTablesMap, viewTables可以忽略,一般都为空
final int size = tableNames.length;
mTableNames = new String[size];
for (int id = 0; id < size; id++) {
final String tableName = tableNames[id].toLowerCase(Locale.US);
//表对应的ID其实就是它的位置,因为是临时表存储在内存中,所以这个ID只需要能分别谁是谁就可以了
mTableIdLookup.put(tableName, id);
mTableNames[id] = tableName;
}
//...
}
/**
* 初始化临时表
*/
void internalInit(SupportSQLiteDatabase database) {
synchronized (this) {
database.execSQL("PRAGMA temp_store = MEMORY;"); //临时表保存在内存中
database.execSQL("PRAGMA recursive_triggers='ON';");
//创建临时表
database.execSQL(CREATE_TRACKING_TABLE_SQL);
syncTriggers(database);
//重置临时表是需要经常执行的,把它编译成SQLiteStatement方便执行
mCleanupStatement = database.compileStatement(RESET_UPDATED_TABLES_SQL);
}
}
}
临时表的表名是room_table_modification_log
,创建它的目的是追踪我们需要观察的那些表中(例如User表)数据是否发生了变化。把建表的SQL翻译以下就是
CREATE TEMP TABLE room_table_modification_log
(table_id INTEGER PRIMARY KEY, invalidated INTEGER NOT NULL DEFAULT 0)
就像下面这样
table_id | invalidated |
---|---|
0 | 0 |
创建表之后不会有数据,数据(0,0)只是方便你理解。
因为这个表是临时表,并且在创建它之前设置了PRAGMA temp_store = MEMORY
,因此room_table_modification_log
表被放在了内存中,并不会被保存在本地文件中(当然也没有必要)。因为要放在内存中,所以说room_table_modification_log
选择使用把要追踪的表名转成ID进行存储,这样节省内存,而这个ID并没有什么特殊含义,只需要跟要追踪的表名一一对应就可以,所以使用0,1,2这样的位置作为ID就可以。
需要明确一点,room_table_modification_log
创建后,其中是没有数据的。创建InvalidationTracker
时传入的tableNames
参数只是表明,返回数据流的查询涉及到了这些表,但是在数据库使用过程中,可能根本就没有执行相关查询,所以也就不需要追踪。也就是说room_table_modification_log
的追踪是“懒”的,直到返回数据流的查询方法被调用时,相应数据才会被插入到room_table_modification_log
表中,开启追踪。至于数据是如何被插入的,invalidated
又是如何在01之间翻转的,下面会介绍。
3. 添加/移除触发器
触发器是数据库中的一个概念,你可以把它简单理解为在特定条件下“触发”的一系列行为,这种特定条件在我们这里就是UPDATE, DELETE, INSERT这三种操作,而“一系列行为”就是把room_table_modification_log
表中的invalidated
置为1。看来触发器的确非常适合于追踪表中数据被修改这种情形。
那么触发器是由谁添加的呢?源头就是返回数据流的查询方法,当这些方法被调用时,触发器就会被添加。然后,从Dao中查询方法被调用,到触发器被添加,这中间又经历了很多转换,我们暂且把这流程省略直接看触发器相关的内容。
public class InvalidationTracker {
/**
* An observer that can listen for changes in the database.
*/
public abstract static class Observer {
final String[] mTables;
public Observer(@NonNull String[] tables) {
mTables = Arrays.copyOf(tables, tables.length);
}
public abstract void onInvalidated(@NonNull Set<String> tables);
}
}
别管我们查询是以怎样的数据流返回,最终都是向数据库添加了一个观察者,这个观察者的onInvalidated
会在我们观察的Table“失效”时被回调,因此,我们可以再次进行查询,进而把最新的数据传递出去,形成数据流。而onInvalidated
之所以被回调,依赖的就是room_table_modification_log
表。
当向数据库添加一个观察者时(其实是向InvalidationTracker
添加),就会看要观察者提供的mTables
中是否还存在没有被“追踪”的Table,如果有则会向room_table_modification_log
表中插入一条数据,以开启对该Table的“追踪”,另外还会添加一个触发器,在UPDATE, DELETE, INSERT时设置对应table_id
的invalidated
为1。
public class InvalidationTracker {
//主要职责是记录哪些 Table 已经被“追踪”了
private ObservedTableTracker mObservedTableTracker;
final SafeIterableMap<Observer, ObserverWrapper> mObserverMap = new SafeIterableMap<>();
//添加观察者
@WorkerThread
public void addObserver(@NonNull Observer observer) {
//一般情况下,与 mTables一致
final String[] tableNames = resolveViews(observer.mTables);
int[] tableIds = new int[tableNames.length];
final int size = tableNames.length;
for (int i = 0; i < size; i++) {
//查找 Table对应的ID
Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
if (tableId == null) {
throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
}
tableIds[i] = tableId;
}
//会进行一下包装
ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames);
ObserverWrapper currentObserver;
synchronized (mObserverMap) {
//保存下来,如果之前添加过,啥也不干
currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
}
//如果之前没有添加过该观察者,并且 tableId之前没有添加过,就会添加触发器
if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
//需要“同步”
syncTriggers();
}
}
//移除观察者,在Observable dispose,LiveData被回收时调用
@WorkerThread
public void removeObserver(@NonNull final Observer observer) {
ObserverWrapper wrapper;
synchronized (mObserverMap) {
wrapper = mObserverMap.remove(observer);
}
if (wrapper != null && mObservedTableTracker.onRemoved(wrapper.mTableIds)) {
//需要“同步”
syncTriggers();
}
}
}
向InvalidationTracker
添加观察者的主要流程是,根据Observer
要观察的mTables
,转换成对应的tableIds
,之后把Observer
包装成ObserverWrapper
,ObserverWrapper
的主要作用是记录下被观察表的信息,诸如tableIds, tableNames。最后,把观察者保存下来,并且在必要时syncTriggers
,顾名思义就是同步触发器的意思(可能添加也可能移除触发器)。
之前说过,只有在被观察的表还未被“追踪”时,才会添加触发器,如果某个表在之前就已经被“追踪”了,那自然不需要重复添加触发器。而哪些表已经被“追踪”,是否需要停止追踪,这些信息保存在ObservedTableTracker
中,这里就不展开其代码了。
移除观察者就更简单了,一目了然,就不废话了。
我接着来看主线流程syncTriggers
:
public class InvalidationTracker {
void syncTriggers() {
if (!mDatabase.isOpen()) {
return;
}
syncTriggers(mDatabase.getOpenHelper().getWritableDatabase());
}
void syncTriggers(SupportSQLiteDatabase database) {
if (database.inTransaction()) {
return;
}
try {
// This method runs in a while loop because while changes are synced to db, another
// runnable may be skipped. If we cause it to skip, we need to do its work.
while (true) {
Lock closeLock = mDatabase.getCloseLock();
closeLock.lock();
try {
//上文提到的 ObservedTableTracker
//返回为null,证明没有要同步的,否则就是“有活要干”
final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
if (tablesToSync == null) {
return;
}
final int limit = tablesToSync.length;
database.beginTransaction();
try {
for (int tableId = 0; tableId < limit; tableId++) {
switch (tablesToSync[tableId]) {
//活来了,可能是开始追踪表
case ObservedTableTracker.ADD:
startTrackingTable(database, tableId);
break;
//也可能是停止追踪表
case ObservedTableTracker.REMOVE:
stopTrackingTable(database, tableId);
break;
}
}
database.setTransactionSuccessful();
} finally {
database.endTransaction();
}
mObservedTableTracker.onSyncCompleted();
} finally {
closeLock.unlock();
}
}
} catch (IllegalStateException | SQLiteException exception) {
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
}
}
}
其实Room实现数据流的整体流程并不复杂,关键就是其中需要处理非常多的线程安全的问题,导致源码很复杂。syncTriggers
就是这样的,其主干就是startTrackingTable
或者stopTrackingTable
或者啥也不干,但是考虑到线程安全的问题,就需要加锁,循环查询等等。总之,开启追踪是在startTrackingTable
,停止追踪是在stopTrackingTable
。
public class InvalidationTracker {
private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
//开启追踪,向room_table_modification_log插入数据,并添加触发器
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
writableDb.execSQL(
"INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
stringBuilder.append(" AFTER ")
.append(trigger)
.append(" ON `")
.append(tableName)
.append("` BEGIN UPDATE ")
.append(UPDATE_TABLE_NAME)
.append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
.append(" WHERE ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
.append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
.append("; END");
writableDb.execSQL(stringBuilder.toString());
}
}
//停止追踪,只是丢弃触发器,并不会删除room_table_modification_log中的数据
private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
final String tableName = mTableNames[tableId];
StringBuilder stringBuilder = new StringBuilder();
for (String trigger : TRIGGERS) {
stringBuilder.setLength(0);
stringBuilder.append("DROP TRIGGER IF EXISTS ");
appendTriggerName(stringBuilder, tableName, trigger);
writableDb.execSQL(stringBuilder.toString());
}
}
//触发器的名称
private static void appendTriggerName(StringBuilder builder, String tableName,
String triggerType) {
builder.append("`")
.append("room_table_modification_trigger_")
.append(tableName)
.append("_")
.append(triggerType)
.append("`");
}
}
以User表为例(其table_id
为0),那么开启“追踪”:
INSERT OR IGNORE INTO room_table_modification_log VALUES(0, 0)
CREATE TEMP TRIGGER IF NOT EXISTS `room_table_modification_trigger_user_UPDATE`
AFTER UPDATE ON `user`
BEGIN
UPDATE room_table_modification_log
SET invalidated = 1 WHERE table_id = 0 AND invalidated = 0;
END
会为UPDATE, DELETE, INSERT分别创建TRIGGER,TIRGGER中的内容都是一样的。
停止“追踪”:
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_UPDATE`
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_DELETE`
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_INSERT`
并不会把room_table_modification_log
中相应的数据也删了,说不定下次还能用到,只是不再更新invalidated
。
4. 数据流
有了上面这些基础,在看数据流的实现就比较简单了。下面会以RxJava Observable数据流为例,看看数据流究竟是如何实现的。
@Dao
interface UserDao {
@Query("SELECT * FROM user WHERE uid = :userId")
fun getUserObservableById(userId: Int): Observable<User>
}
会生成如下代码:
public final class UserDao_Impl implements UserDao {
@Override
public Observable<User> getUserObservableById(final int userId) {
final String _sql = "SELECT * FROM user WHERE uid = ?";
final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1);
int _argIndex = 1;
_statement.bindLong(_argIndex, userId);
//这里是关键
return RxRoom.createObservable(__db, false, new String[]{"user"}, new Callable<User>() {
@Override
public User call() throws Exception {
//以下是正常的数据库查询流程
final Cursor _cursor = DBUtil.query(__db, _statement, false);
try {
final int _cursorIndexOfUid = CursorUtil.getColumnIndexOrThrow(_cursor, "uid");
final int _cursorIndexOfFirstName = CursorUtil.getColumnIndexOrThrow(_cursor, "first_name");
final int _cursorIndexOfLastName = CursorUtil.getColumnIndexOrThrow(_cursor, "last_name");
final User _result;
if(_cursor.moveToFirst()) {
final int _tmpUid;
_tmpUid = _cursor.getInt(_cursorIndexOfUid);
final String _tmpFirstName;
_tmpFirstName = _cursor.getString(_cursorIndexOfFirstName);
final String _tmpLastName;
_tmpLastName = _cursor.getString(_cursorIndexOfLastName);
_result = new User(_tmpUid,_tmpFirstName,_tmpLastName);
} else {
_result = null;
}
return _result;
} finally {
_cursor.close();
}
}
@Override
protected void finalize() {
_statement.release();
}
});
}
}
跟普通查询最大的不同就是以RxRoom.createObservable
创建了Observable
。来看一下RxRoom
:
public class RxRoom {
//其中参数callable就是要进行的查询
public static <T> Observable<T> createObservable(final RoomDatabase database,
final boolean inTransaction, final String[] tableNames, final Callable<T> callable) {
//不用太在意这个scheduler,就是一个线程池,不同情况下用不同的线程池
Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
//fromCallable会天然地阻止 null,如果callable返回null会被过滤掉
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createObservable(database, tableNames)
.subscribeOn(scheduler)
.unsubscribeOn(scheduler)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
}
public static Observable<Object> createObservable(final RoomDatabase database,
final String... tableNames) {
return Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
//新建一个InvalidationTracker.Observer
final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
tableNames) {
//在观察的表“失效”时回调
@Override
public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
//只是发射一个“信号”,表明需要重新查询
emitter.onNext(NOTHING);
}
};
//向InvalidationTracker添加观察者
database.getInvalidationTracker().addObserver(observer);
emitter.setDisposable(Disposables.fromAction(new Action() {
@Override
public void run() throws Exception {
//dispose时移除观察者
database.getInvalidationTracker().removeObserver(observer);
}
}));
// emit once to avoid missing any data and also easy chaining
emitter.onNext(NOTHING);
}
});
}
}
很简单,就是向InvalidationTracker
添加观察者,在Observable
被dispose
的时候移除观察者。因为RxJava 2不喜欢null,所以RxRoom
的做法是仅仅发送数据“失效”的信号,然后使用flatMapMaybe
操作符结合Maybe.fromCallable
天然地过滤掉了null,很巧妙(巧妙也不是一天达成的,在之前的版本中没有使用这个操作符,还是使用我们总是用的那种“笨拙”的方法过滤null)。
还差最后一个环节就能形成闭环了,那就是啥时候去查询room_table_modification_log
表,看看哪个被“追踪”的表“失效”了。最合适的时机就是“增删改”结束的时候:
public abstract class RoomDatabase {
public void beginTransaction() {
assertNotMainThread();
SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();
//Transaction开始时“同步”Trigger
mInvalidationTracker.syncTriggers(database);
database.beginTransaction();
}
//Transaction结束时refreshVersionsAsync
public void endTransaction() {
mOpenHelper.getWritableDatabase().endTransaction();
if (!inTransaction()) {
// enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
// endTransaction call to do it.
mInvalidationTracker.refreshVersionsAsync();
}
}
}
public class InvalidationTracker {
public void refreshVersionsAsync() {
if (mPendingRefresh.compareAndSet(false, true)) {
mDatabase.getQueryExecutor().execute(mRefreshRunnable);
}
}
Runnable mRefreshRunnable = new Runnable() {
@Override
public void run() {
final Lock closeLock = mDatabase.getCloseLock();
Set<Integer> invalidatedTableIds = null;
try {
closeLock.lock();
if (!ensureInitialization()) {
return;
}
if (!mPendingRefresh.compareAndSet(true, false)) {
// no pending refresh
return;
}
if (mDatabase.inTransaction()) {
// current thread is in a transaction. when it ends, it will invoke
// refreshRunnable again. mPendingRefresh is left as false on purpose
// so that the last transaction can flip it on again.
return;
}
//查询room_table_modification_log,获取“失效”的表
invalidatedTableIds = checkUpdatedTable();
} catch (IllegalStateException | SQLiteException exception) {
// may happen if db is closed. just log.
Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
exception);
} finally {
closeLock.unlock();
}
//如果有“失效”的表
if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
synchronized (mObserverMap) {
for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
//通知观察者,如果是观察者“关心”的表,就会回调 onInvalidated
entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
}
}
}
}
//查询出room_table_modification_log中invalidated为1的,然后把它重置为0;返回的是 table_id的集合
private Set<Integer> checkUpdatedTable() {
//主要就是执行下面SQL
//SELECT * FROM room_table_modification_log WHERE invalidated = 1
//UPDATE room_table_modification_log SET invalidated = 0 WHERE invalidated = 1
}
};
}
“增删改”都是放在Transaction中的,因此endTransaction
是个查询room_table_modification_log
的好时机。通过room_table_modification_log
获取“失效”的表,如果InvalidationTracker
的观察者观察的是“失效”表中的某一个或几个,就会回调观察者的onInvalidated
方法。onInvalidated
方法正如前面展示的那样,会发送一个信号,然后相应查询就会被再次执行,最新的数据就会被传递出去。至此,整个流程完整地闭合。
某个数据表“失效”仅仅是说这个表中的数据被修改了,但是,是不是修改的我们查询关心的部分,其实并不知道,有可能重新查询的结果跟之前是一样的。如果不想接收到重复的数据,对于RxJava而言可以使用
distinctUntilChanged
来进行过滤,对于LiveData而言可以使用Transformations.distinctUntilChanged
来过滤。
以上仅以RxJava Observable为例展示了数据流是如何实现的,对于Flowable而言是类似的。但是对于LiveData而言,跟Observable就不太一样了,中间多了很多处理(主要是防止在我们没有保存LiveData的情况下,LiveData被意外回收),整个流程下来比较繁琐,但是思路是一样的,这里就省略了。
5. 总结
Room实现数据流的流程图中有一些不合适的地方,removeObserver
并不是有Dao调用的,而是查询返回的RaJava Observable或者LiveData“不再用”了的时候被各自调用的。其中,“重新查询”的意思就是数据流被更新,数据流相关的RaJava Observable或者LiveData都没有在包含在图中。
能从Room分析的第一篇文章看到这里的,应该颁发一个最佳坚持奖,请留言或者点赞让我看见你们的双手。这是这一系列文章的最后一篇,文章题目叫“从Room源码看抽象与封装”,主要是因为写第一篇文章的时候,想从“抽象与封装”作为切入点去分析Room的源码,没有想到会写这么长。前面几篇文章还是比较符合题目的,后面的文章基本上就剩源码解析了,不是说这里面不包含抽象封装的内容,而是源码已经比较复杂了,再去谈抽象封装只会更加混乱。总之,希望你有所收获,欢迎留言与我交流。