从Room源码看抽象与封装——数据流

2019-07-09  本文已影响0人  珞泽珈群

目录

源码解析目录
从Room源码看抽象与封装——SQLite的抽象
从Room源码看抽象与封装——数据库的创建
从Room源码看抽象与封装——数据库的升降级
从Room源码看抽象与封装——Dao
从Room源码看抽象与封装——数据流

前言

之前关于Room源码分析的四篇文章基本上涵盖了Room使用的全流程,从抽象基础到数据库创建再到增删改查的实现,可谓很全面了。你以为这就完了吗?Too young!如果Room只是这些内容的话,那只能说它是个还不错的ORM框架,对SQLite进行了良好的抽象,并且效率也很高。Room最大的不同在于它是Jetpack包的一部分,它与Jetpack包中其它部分有着非常好的配合,尤其是LiveData。以之前定义的Dao为例:

@Dao
interface UserDao {
    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserById(userId: Int): User?

    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserLiveDataById(userId: Int): LiveData<User>
    
    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserObservableById(userId: Int): Observable<User>
    
    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserFlowableById(userId: Int): Flowable<User>
}

如上所示,Dao中的查询不仅可以返回Entity对象User,还可以以数据流的形式返回LiveData或者是RxJava中的Observable,Flowable。当我们“增删改”User表时,如果在此之前以如上形式获取到了User数据流,那么该User数据流就会“更新”,即相应查询会重新执行一遍,以把最新的User数据传递出去,至于User是否真的有变化,这需要我们自行判断。
这篇文章会介绍Room是如何实现数据流的。

1. 概述

如果说之前对Room方方面面的解析只是“繁”的话,那么Room对于数据流的实现真的是有点“难”了。整个流程比较复杂,为了防止你看的云里雾里,我先大致描述一下整个流程,然后再对各个部分进行详细讲解。
Room实现数据流大致包含如下几个步骤:

  1. 创建一个数据库临时表(表名为room_table_modification_log,保存在内存中,非本地存储),表中包含两列table_idinvalidated,这个表记录了当前数据库哪个Table被修改了(“增删改”都属于修改)。
  2. 如果Dao中的查询涉及到了数据流返回,那么生成代码就会帮我们向数据库中添加一个关于当前被查询表(对于上面的例子来说就是User这个表,可能会涉及到多个表)的TRIGGER(触发器),这个触发器的主要任务是当有操作修改了被查询表中的数据时,就把room_table_modification_log表中对应的table_idinvalidated置为1,表示“失效”了。
  3. Room中所有“增删改”都是放在Transaction中的,在endTransaction时,Room都会查看room_table_modification_log表中是否有invalidated为1的,如果有,再次执行相应查询(invalidated会被重置为0),并把查询的数据通过相应数据流传递出去。
  4. 在数据流不再使用时(例如Observable被dispose),相应触发器也会被丢弃,这样就不会再对这个表进行“追踪”了。

2. 创建临时表

Room中关于数据流的实现几乎全部包含在InvalidationTracker类中,这个类是在RoomDatabase中被创建的,只是在之前的分析中被刻意忽略了。

public abstract class RoomDatabase {
    private final InvalidationTracker mInvalidationTracker;
    
    public RoomDatabase() {
        mInvalidationTracker = createInvalidationTracker();
    }
    
    /**
     * 由生成代码实现
     */
    @NonNull
    protected abstract InvalidationTracker createInvalidationTracker();
    
    /**
     * 由生成代码调用,初始化 InvalidationTracker
     */
    protected void internalInitInvalidationTracker(@NonNull SupportSQLiteDatabase db) {
        mInvalidationTracker.internalInit(db);
    }

    @NonNull
    public InvalidationTracker getInvalidationTracker() {
        return mInvalidationTracker;
    }
}

public final class AppDatabase_Impl extends AppDatabase {
    @Override
    protected SupportSQLiteOpenHelper createOpenHelper(DatabaseConfiguration configuration) {
        final SupportSQLiteOpenHelper.Callback _openCallback = new RoomOpenHelper(configuration, new RoomOpenHelper.Delegate(1) {
            @Override
            public void onOpen(SupportSQLiteDatabase _db) {
                mDatabase = _db;
                _db.execSQL("PRAGMA foreign_keys = ON");
                //数据库打开时初始化 InvalidationTracker
                internalInitInvalidationTracker(_db);
                //...
            }
            //...
        });
        //...
    }
      
    @Override
    protected InvalidationTracker createInvalidationTracker() {
        //...
        return new InvalidationTracker(this, _shadowTablesMap, _viewTables, "User"); //User表示要观察的表名,可能包含很多个
    }
}

注解处理器生成了AppDatabase_Impl,其中不仅实现了createOpenHelper,还实现了createInvalidationTracker,并且在数据库打开时(onOpen回调被调用),InvalidationTracker被初始化,也是在初始化时,临时表被创建,踏出万里长征第一步。

public class InvalidationTracker {

    private static final String UPDATE_TABLE_NAME = "room_table_modification_log";

    private static final String TABLE_ID_COLUMN_NAME = "table_id";

    private static final String INVALIDATED_COLUMN_NAME = "invalidated";

    //建表
    private static final String CREATE_TRACKING_TABLE_SQL = "CREATE TEMP TABLE " + UPDATE_TABLE_NAME
            + "(" + TABLE_ID_COLUMN_NAME + " INTEGER PRIMARY KEY, "
            + INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0)";

    //重置
    static final String RESET_UPDATED_TABLES_SQL = "UPDATE " + UPDATE_TABLE_NAME
            + " SET " + INVALIDATED_COLUMN_NAME + " = 0 WHERE " + INVALIDATED_COLUMN_NAME + " = 1 ";

    //找出哪个表改变了
    static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME
            + " WHERE " + INVALIDATED_COLUMN_NAME + " = 1;";
            
    //根据表名找表名对应的ID
    final ArrayMap<String, Integer> mTableIdLookup;
    //哪些表需要被观察
    final String[] mTableNames;
    
    //tableNames是可变参数,表示可能有多个要被“追踪”的表
    public InvalidationTracker(RoomDatabase database, Map<String, String> shadowTablesMap,
            Map<String, Set<String>> viewTables, String... tableNames) {
        //shadowTablesMap, viewTables可以忽略,一般都为空
        final int size = tableNames.length;
        mTableNames = new String[size];
        for (int id = 0; id < size; id++) {
            final String tableName = tableNames[id].toLowerCase(Locale.US);
            //表对应的ID其实就是它的位置,因为是临时表存储在内存中,所以这个ID只需要能分别谁是谁就可以了
            mTableIdLookup.put(tableName, id);
            mTableNames[id] = tableName;
        }
        //...
    }
    
    /**
     * 初始化临时表
     */
    void internalInit(SupportSQLiteDatabase database) {
        synchronized (this) {
            database.execSQL("PRAGMA temp_store = MEMORY;"); //临时表保存在内存中
            database.execSQL("PRAGMA recursive_triggers='ON';");
            //创建临时表
            database.execSQL(CREATE_TRACKING_TABLE_SQL);
            syncTriggers(database);
            //重置临时表是需要经常执行的,把它编译成SQLiteStatement方便执行
            mCleanupStatement = database.compileStatement(RESET_UPDATED_TABLES_SQL);
        }
    }
}

临时表的表名是room_table_modification_log,创建它的目的是追踪我们需要观察的那些表中(例如User表)数据是否发生了变化。把建表的SQL翻译以下就是

CREATE TEMP TABLE room_table_modification_log
    (table_id INTEGER PRIMARY KEY,  invalidated INTEGER NOT NULL DEFAULT 0)

就像下面这样

table_id invalidated
0 0

创建表之后不会有数据,数据(0,0)只是方便你理解。

因为这个表是临时表,并且在创建它之前设置了PRAGMA temp_store = MEMORY,因此room_table_modification_log表被放在了内存中,并不会被保存在本地文件中(当然也没有必要)。因为要放在内存中,所以说room_table_modification_log选择使用把要追踪的表名转成ID进行存储,这样节省内存,而这个ID并没有什么特殊含义,只需要跟要追踪的表名一一对应就可以,所以使用0,1,2这样的位置作为ID就可以。
需要明确一点,room_table_modification_log创建后,其中是没有数据的。创建InvalidationTracker时传入的tableNames参数只是表明,返回数据流的查询涉及到了这些表,但是在数据库使用过程中,可能根本就没有执行相关查询,所以也就不需要追踪。也就是说room_table_modification_log的追踪是“懒”的,直到返回数据流的查询方法被调用时,相应数据才会被插入到room_table_modification_log表中,开启追踪。至于数据是如何被插入的,invalidated又是如何在01之间翻转的,下面会介绍。

3. 添加/移除触发器

触发器是数据库中的一个概念,你可以把它简单理解为在特定条件下“触发”的一系列行为,这种特定条件在我们这里就是UPDATE, DELETE, INSERT这三种操作,而“一系列行为”就是把room_table_modification_log表中的invalidated置为1。看来触发器的确非常适合于追踪表中数据被修改这种情形。
那么触发器是由谁添加的呢?源头就是返回数据流的查询方法,当这些方法被调用时,触发器就会被添加。然后,从Dao中查询方法被调用,到触发器被添加,这中间又经历了很多转换,我们暂且把这流程省略直接看触发器相关的内容。

public class InvalidationTracker {
    /**
     * An observer that can listen for changes in the database.
     */
    public abstract static class Observer {
        final String[] mTables;

        public Observer(@NonNull String[] tables) {
            mTables = Arrays.copyOf(tables, tables.length);
        }

        public abstract void onInvalidated(@NonNull Set<String> tables);
    }
}

别管我们查询是以怎样的数据流返回,最终都是向数据库添加了一个观察者,这个观察者的onInvalidated会在我们观察的Table“失效”时被回调,因此,我们可以再次进行查询,进而把最新的数据传递出去,形成数据流。而onInvalidated之所以被回调,依赖的就是room_table_modification_log表。
当向数据库添加一个观察者时(其实是向InvalidationTracker添加),就会看要观察者提供的mTables中是否还存在没有被“追踪”的Table,如果有则会向room_table_modification_log表中插入一条数据,以开启对该Table的“追踪”,另外还会添加一个触发器,在UPDATE, DELETE, INSERT时设置对应table_idinvalidated为1。

public class InvalidationTracker {
    //主要职责是记录哪些 Table 已经被“追踪”了
    private ObservedTableTracker mObservedTableTracker;

    final SafeIterableMap<Observer, ObserverWrapper> mObserverMap = new SafeIterableMap<>();

    //添加观察者
    @WorkerThread
    public void addObserver(@NonNull Observer observer) {
        //一般情况下,与 mTables一致
        final String[] tableNames = resolveViews(observer.mTables);
        int[] tableIds = new int[tableNames.length];
        final int size = tableNames.length;

        for (int i = 0; i < size; i++) {
            //查找 Table对应的ID
            Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US));
            if (tableId == null) {
                throw new IllegalArgumentException("There is no table with name " + tableNames[i]);
            }
            tableIds[i] = tableId;
        }
        //会进行一下包装
        ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames);
        ObserverWrapper currentObserver;
        synchronized (mObserverMap) {
            //保存下来,如果之前添加过,啥也不干
            currentObserver = mObserverMap.putIfAbsent(observer, wrapper);
        }
        //如果之前没有添加过该观察者,并且 tableId之前没有添加过,就会添加触发器
        if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) {
            //需要“同步”
            syncTriggers();
        }
    }
    
    //移除观察者,在Observable dispose,LiveData被回收时调用
    @WorkerThread
    public void removeObserver(@NonNull final Observer observer) {
        ObserverWrapper wrapper;
        synchronized (mObserverMap) {
            wrapper = mObserverMap.remove(observer);
        }
        if (wrapper != null && mObservedTableTracker.onRemoved(wrapper.mTableIds)) {
            //需要“同步”
            syncTriggers();
        }
    }
}

InvalidationTracker添加观察者的主要流程是,根据Observer要观察的mTables,转换成对应的tableIds,之后把Observer包装成ObserverWrapperObserverWrapper的主要作用是记录下被观察表的信息,诸如tableIds, tableNames。最后,把观察者保存下来,并且在必要时syncTriggers,顾名思义就是同步触发器的意思(可能添加也可能移除触发器)。
之前说过,只有在被观察的表还未被“追踪”时,才会添加触发器,如果某个表在之前就已经被“追踪”了,那自然不需要重复添加触发器。而哪些表已经被“追踪”,是否需要停止追踪,这些信息保存在ObservedTableTracker中,这里就不展开其代码了。
移除观察者就更简单了,一目了然,就不废话了。
我接着来看主线流程syncTriggers

public class InvalidationTracker {
    void syncTriggers() {
        if (!mDatabase.isOpen()) {
            return;
        }
        syncTriggers(mDatabase.getOpenHelper().getWritableDatabase());
    }
    
    void syncTriggers(SupportSQLiteDatabase database) {
        if (database.inTransaction()) {
            return;
        }
        try {
            // This method runs in a while loop because while changes are synced to db, another
            // runnable may be skipped. If we cause it to skip, we need to do its work.
            while (true) {
                Lock closeLock = mDatabase.getCloseLock();
                closeLock.lock();
                try {
                    //上文提到的 ObservedTableTracker
                    //返回为null,证明没有要同步的,否则就是“有活要干”
                    final int[] tablesToSync = mObservedTableTracker.getTablesToSync();
                    if (tablesToSync == null) {
                        return;
                    }
                    final int limit = tablesToSync.length;
                    database.beginTransaction();
                    try {
                        for (int tableId = 0; tableId < limit; tableId++) {
                            switch (tablesToSync[tableId]) {
                                //活来了,可能是开始追踪表
                                case ObservedTableTracker.ADD:
                                    startTrackingTable(database, tableId);
                                    break;
                                //也可能是停止追踪表
                                case ObservedTableTracker.REMOVE:
                                    stopTrackingTable(database, tableId);
                                    break;
                            }
                        }
                        database.setTransactionSuccessful();
                    } finally {
                        database.endTransaction();
                    }
                    mObservedTableTracker.onSyncCompleted();
                } finally {
                    closeLock.unlock();
                }
            }
        } catch (IllegalStateException | SQLiteException exception) {
            Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
                    exception);
        }
    }
}

其实Room实现数据流的整体流程并不复杂,关键就是其中需要处理非常多的线程安全的问题,导致源码很复杂。syncTriggers就是这样的,其主干就是startTrackingTable或者stopTrackingTable或者啥也不干,但是考虑到线程安全的问题,就需要加锁,循环查询等等。总之,开启追踪是在startTrackingTable,停止追踪是在stopTrackingTable

public class InvalidationTracker {
    private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
    
    //开启追踪,向room_table_modification_log插入数据,并添加触发器
    private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
        writableDb.execSQL(
                "INSERT OR IGNORE INTO " + UPDATE_TABLE_NAME + " VALUES(" + tableId + ", 0)");
        final String tableName = mTableNames[tableId];
        StringBuilder stringBuilder = new StringBuilder();
        for (String trigger : TRIGGERS) {
            stringBuilder.setLength(0);
            stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS ");
            appendTriggerName(stringBuilder, tableName, trigger);
            stringBuilder.append(" AFTER ")
                    .append(trigger)
                    .append(" ON `")
                    .append(tableName)
                    .append("` BEGIN UPDATE ")
                    .append(UPDATE_TABLE_NAME)
                    .append(" SET ").append(INVALIDATED_COLUMN_NAME).append(" = 1")
                    .append(" WHERE ").append(TABLE_ID_COLUMN_NAME).append(" = ").append(tableId)
                    .append(" AND ").append(INVALIDATED_COLUMN_NAME).append(" = 0")
                    .append("; END");
            writableDb.execSQL(stringBuilder.toString());
        }
    }
    
    //停止追踪,只是丢弃触发器,并不会删除room_table_modification_log中的数据
    private void stopTrackingTable(SupportSQLiteDatabase writableDb, int tableId) {
        final String tableName = mTableNames[tableId];
        StringBuilder stringBuilder = new StringBuilder();
        for (String trigger : TRIGGERS) {
            stringBuilder.setLength(0);
            stringBuilder.append("DROP TRIGGER IF EXISTS ");
            appendTriggerName(stringBuilder, tableName, trigger);
            writableDb.execSQL(stringBuilder.toString());
        }
    }
    
    //触发器的名称
    private static void appendTriggerName(StringBuilder builder, String tableName,
            String triggerType) {
        builder.append("`")
                .append("room_table_modification_trigger_")
                .append(tableName)
                .append("_")
                .append(triggerType)
                .append("`");
    }
}

以User表为例(其table_id为0),那么开启“追踪”:

INSERT OR IGNORE INTO room_table_modification_log VALUES(0, 0)

CREATE TEMP TRIGGER IF NOT EXISTS `room_table_modification_trigger_user_UPDATE`
AFTER UPDATE ON `user`
BEGIN
UPDATE room_table_modification_log
SET invalidated = 1 WHERE table_id = 0 AND invalidated = 0;
END

会为UPDATE, DELETE, INSERT分别创建TRIGGER,TIRGGER中的内容都是一样的。

停止“追踪”:

DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_UPDATE`
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_DELETE`
DROP TRIGGER IF EXISTS `room_table_modification_trigger_user_INSERT`

并不会把room_table_modification_log中相应的数据也删了,说不定下次还能用到,只是不再更新invalidated

4. 数据流

有了上面这些基础,在看数据流的实现就比较简单了。下面会以RxJava Observable数据流为例,看看数据流究竟是如何实现的。

@Dao
interface UserDao {
    @Query("SELECT * FROM user WHERE uid = :userId")
    fun getUserObservableById(userId: Int): Observable<User>
}

会生成如下代码:

public final class UserDao_Impl implements UserDao {
  @Override
  public Observable<User> getUserObservableById(final int userId) {
    final String _sql = "SELECT * FROM user WHERE uid = ?";
    final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1);
    int _argIndex = 1;
    _statement.bindLong(_argIndex, userId);
    //这里是关键
    return RxRoom.createObservable(__db, false, new String[]{"user"}, new Callable<User>() {
      @Override
      public User call() throws Exception {
        //以下是正常的数据库查询流程
        final Cursor _cursor = DBUtil.query(__db, _statement, false);
        try {
          final int _cursorIndexOfUid = CursorUtil.getColumnIndexOrThrow(_cursor, "uid");
          final int _cursorIndexOfFirstName = CursorUtil.getColumnIndexOrThrow(_cursor, "first_name");
          final int _cursorIndexOfLastName = CursorUtil.getColumnIndexOrThrow(_cursor, "last_name");
          final User _result;
          if(_cursor.moveToFirst()) {
            final int _tmpUid;
            _tmpUid = _cursor.getInt(_cursorIndexOfUid);
            final String _tmpFirstName;
            _tmpFirstName = _cursor.getString(_cursorIndexOfFirstName);
            final String _tmpLastName;
            _tmpLastName = _cursor.getString(_cursorIndexOfLastName);
            _result = new User(_tmpUid,_tmpFirstName,_tmpLastName);
          } else {
            _result = null;
          }
          return _result;
        } finally {
          _cursor.close();
        }
      }

      @Override
      protected void finalize() {
        _statement.release();
      }
    });
  }
}

跟普通查询最大的不同就是以RxRoom.createObservable创建了Observable。来看一下RxRoom

public class RxRoom {
    //其中参数callable就是要进行的查询
    public static <T> Observable<T> createObservable(final RoomDatabase database,
            final boolean inTransaction, final String[] tableNames, final Callable<T> callable) {
        //不用太在意这个scheduler,就是一个线程池,不同情况下用不同的线程池
        Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
        //fromCallable会天然地阻止 null,如果callable返回null会被过滤掉
        final Maybe<T> maybe = Maybe.fromCallable(callable);
        return createObservable(database, tableNames)
                .subscribeOn(scheduler)
                .unsubscribeOn(scheduler)
                .observeOn(scheduler)
                .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                    @Override
                    public MaybeSource<T> apply(Object o) throws Exception {
                        return maybe;
                    }
                });
    }
    
    public static Observable<Object> createObservable(final RoomDatabase database,
            final String... tableNames) {
        return Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(final ObservableEmitter<Object> emitter) throws Exception {
                //新建一个InvalidationTracker.Observer
                final InvalidationTracker.Observer observer = new InvalidationTracker.Observer(
                        tableNames) {
                    //在观察的表“失效”时回调
                    @Override
                    public void onInvalidated(@androidx.annotation.NonNull Set<String> tables) {
                        //只是发射一个“信号”,表明需要重新查询
                        emitter.onNext(NOTHING);
                    }
                };
                //向InvalidationTracker添加观察者
                database.getInvalidationTracker().addObserver(observer);
                emitter.setDisposable(Disposables.fromAction(new Action() {
                    @Override
                    public void run() throws Exception {
                        //dispose时移除观察者
                        database.getInvalidationTracker().removeObserver(observer);
                    }
                }));

                // emit once to avoid missing any data and also easy chaining
                emitter.onNext(NOTHING);
            }
        });
    }
}

很简单,就是向InvalidationTracker添加观察者,在Observabledispose的时候移除观察者。因为RxJava 2不喜欢null,所以RxRoom的做法是仅仅发送数据“失效”的信号,然后使用flatMapMaybe操作符结合Maybe.fromCallable天然地过滤掉了null,很巧妙(巧妙也不是一天达成的,在之前的版本中没有使用这个操作符,还是使用我们总是用的那种“笨拙”的方法过滤null)。


还差最后一个环节就能形成闭环了,那就是啥时候去查询room_table_modification_log表,看看哪个被“追踪”的表“失效”了。最合适的时机就是“增删改”结束的时候:

public abstract class RoomDatabase {
    public void beginTransaction() {
        assertNotMainThread();
        SupportSQLiteDatabase database = mOpenHelper.getWritableDatabase();
        //Transaction开始时“同步”Trigger
        mInvalidationTracker.syncTriggers(database);
        database.beginTransaction();
    }

    //Transaction结束时refreshVersionsAsync
    public void endTransaction() {
        mOpenHelper.getWritableDatabase().endTransaction();
        if (!inTransaction()) {
            // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last
            // endTransaction call to do it.
            mInvalidationTracker.refreshVersionsAsync();
        }
    }
}


public class InvalidationTracker {
    public void refreshVersionsAsync() {
        if (mPendingRefresh.compareAndSet(false, true)) {
            mDatabase.getQueryExecutor().execute(mRefreshRunnable);
        }
    }
    
    Runnable mRefreshRunnable = new Runnable() {
        @Override
        public void run() {
            final Lock closeLock = mDatabase.getCloseLock();
            Set<Integer> invalidatedTableIds = null;
            try {
                closeLock.lock();

                if (!ensureInitialization()) {
                    return;
                }

                if (!mPendingRefresh.compareAndSet(true, false)) {
                    // no pending refresh
                    return;
                }

                if (mDatabase.inTransaction()) {
                    // current thread is in a transaction. when it ends, it will invoke
                    // refreshRunnable again. mPendingRefresh is left as false on purpose
                    // so that the last transaction can flip it on again.
                    return;
                }

                //查询room_table_modification_log,获取“失效”的表
                invalidatedTableIds = checkUpdatedTable();
            } catch (IllegalStateException | SQLiteException exception) {
                // may happen if db is closed. just log.
                Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?",
                        exception);
            } finally {
                closeLock.unlock();
            }
            //如果有“失效”的表
            if (invalidatedTableIds != null && !invalidatedTableIds.isEmpty()) {
                synchronized (mObserverMap) {
                    for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) {
                        //通知观察者,如果是观察者“关心”的表,就会回调 onInvalidated
                        entry.getValue().notifyByTableInvalidStatus(invalidatedTableIds);
                    }
                }
            }
        }

        //查询出room_table_modification_log中invalidated为1的,然后把它重置为0;返回的是 table_id的集合
        private Set<Integer> checkUpdatedTable() {
            //主要就是执行下面SQL
            //SELECT * FROM room_table_modification_log WHERE invalidated = 1
            //UPDATE room_table_modification_log SET invalidated = 0 WHERE invalidated = 1
        }
    };

}

“增删改”都是放在Transaction中的,因此endTransaction是个查询room_table_modification_log的好时机。通过room_table_modification_log获取“失效”的表,如果InvalidationTracker的观察者观察的是“失效”表中的某一个或几个,就会回调观察者的onInvalidated方法。onInvalidated方法正如前面展示的那样,会发送一个信号,然后相应查询就会被再次执行,最新的数据就会被传递出去。至此,整个流程完整地闭合。

某个数据表“失效”仅仅是说这个表中的数据被修改了,但是,是不是修改的我们查询关心的部分,其实并不知道,有可能重新查询的结果跟之前是一样的。如果不想接收到重复的数据,对于RxJava而言可以使用distinctUntilChanged来进行过滤,对于LiveData而言可以使用Transformations.distinctUntilChanged来过滤。

以上仅以RxJava Observable为例展示了数据流是如何实现的,对于Flowable而言是类似的。但是对于LiveData而言,跟Observable就不太一样了,中间多了很多处理(主要是防止在我们没有保存LiveData的情况下,LiveData被意外回收),整个流程下来比较繁琐,但是思路是一样的,这里就省略了。

5. 总结

Room实现数据流的流程

图中有一些不合适的地方,removeObserver并不是有Dao调用的,而是查询返回的RaJava Observable或者LiveData“不再用”了的时候被各自调用的。其中,“重新查询”的意思就是数据流被更新,数据流相关的RaJava Observable或者LiveData都没有在包含在图中。
能从Room分析的第一篇文章看到这里的,应该颁发一个最佳坚持奖,请留言或者点赞让我看见你们的双手。这是这一系列文章的最后一篇,文章题目叫“从Room源码看抽象与封装”,主要是因为写第一篇文章的时候,想从“抽象与封装”作为切入点去分析Room的源码,没有想到会写这么长。前面几篇文章还是比较符合题目的,后面的文章基本上就剩源码解析了,不是说这里面不包含抽象封装的内容,而是源码已经比较复杂了,再去谈抽象封装只会更加混乱。总之,希望你有所收获,欢迎留言与我交流。

上一篇下一篇

猜你喜欢

热点阅读