Room是怎樣和LiveData結合使用的?(原始碼分析)
前言
之前寫專案的時候,對於資料庫的操作不是特別多,能避免就儘量避免,並且一直想不到比較好的方法去組織網路資料、本地資料的邏輯。所以在最近的面試中時,問及專案中的資料庫實現,以及比較好用的資料庫的框架及其實現原理時,我就只答道之前在《第一行程式碼》中看到了的 LitePal ,但原始碼就...所以這次來惡補一次資料庫。幾經搜尋,雲比較,比較青睞官方 Jetpack元件 中的 Room 。
Room簡介
Room框架是使用生成程式碼的方式在編譯時生成CRUD的程式碼,因此效能是遠遠好過通過反射實現的ORM框架。但是事實上,Room最吸引我的地方不止是效能,Room對架構元件(LiveData)、RxJava等流行框架做了適配。例如,Room中的查詢操作可以返回一個LiveData<XXX>,並且,每一次RUD操作,都會更新LiveData。這可以大大簡化本地、記憶體、網路多級快取的實現,具體官方也給出了一系列 Demo ,並且隨時都在隨著框架或者根據PR更新,強烈推薦研究這些Demo!
注
本文主要是對Room中與LiveData的聯動作出分析, 閱讀本文前建議先熟悉Room的基本使用,建議看一下與LiveData配合使用的Demo。
正文
建立相關類
AppDatabase.kt
@Database(entities = [Book::class], version = 1) abstract class AppDatabase : RoomDatabase() { abstract fun bookDao(): BookDao }
Book.kt
@Dao interface BookDao { @Insert fun insert(book: Book): Long @Delete fun delete(book: Book) @Query("select * from book where id = :id") fun queryById(id: Long): LiveData<Book> }
使用資料庫:
val db = Room.databaseBuilder(applicationContext, AppDatabase::class.java, "test.db") .build() db.bookDao().queryById(1).observe(this, Observer { // do something when book update })
這樣在Observer裡面就可以接收到任何時候資料庫id=1的資料修改操作了。
生成程式碼並分析
Build -> Make Project 編譯,會生成Room相關程式碼,如果是kapt的話,生成程式碼目錄應該是{專案目錄}/app/build/generated/source/kapt/debug/{包路徑}/下。
我們可以看到生成了AppDatabase_Impl和BookDao_Impl兩個程式碼檔案,分別對應前面貼出來的AppDatabase的實現類和BookDao的實現類。

image
AppDatabase_Impl則是表的建立、刪除相關程式碼,Dao則是具體表的CRUD操作。這裡我們重點關係生成的查詢方法。
BookDao_Impl#
@Override public LiveData<Book> queryById(final long id) { final String _sql = "select * from book where id = ?"; final RoomSQLiteQuery _statement = RoomSQLiteQuery.acquire(_sql, 1); int _argIndex = 1; _statement.bindLong(_argIndex, id); return __db.getInvalidationTracker().createLiveData(new String[]{"book"}, new Callable<Book>() { @Override public Book call() throws Exception { final Cursor _cursor = DBUtil.query(__db, _statement, false); try { final int _cursorIndexOfId = CursorUtil.getColumnIndexOrThrow(_cursor, "id"); final int _cursorIndexOfName = CursorUtil.getColumnIndexOrThrow(_cursor, "name"); final int _cursorIndexOfAuthor = CursorUtil.getColumnIndexOrThrow(_cursor, "author"); final int _cursorIndexOfPrice = CursorUtil.getColumnIndexOrThrow(_cursor, "price"); final Book _result; if (_cursor.moveToFirst()) { final long _tmpId; _tmpId = _cursor.getLong(_cursorIndexOfId); final String _tmpName; _tmpName = _cursor.getString(_cursorIndexOfName); final String _tmpAuthor; _tmpAuthor = _cursor.getString(_cursorIndexOfAuthor); final float _tmpPrice; _tmpPrice = _cursor.getFloat(_cursorIndexOfPrice); _result = new Book(_tmpId, _tmpName, _tmpAuthor, _tmpPrice); } else { _result = null; } return _result; } finally { _cursor.close(); } } @Override protected void finalize() { _statement.release(); } }); }
注意這一行
return __db.getInvalidationTracker().createLiveData(...);
我們跟進去,最終建立的是一個RoomTrackingLiveData,是一個繼承了LiveData的類。下面是它的構造方法。從構造方法來看,比較可疑的物件的是InvalidationTracker.Observer這個類,並且實現十有八九是觀察者模式。而最後的回撥也多半是onInvalidated方法。
@SuppressLint("RestrictedApi") RoomTrackingLiveData( RoomDatabase database, InvalidationLiveDataContainer container, Callable<T> computeFunction, String[] tableNames) { mDatabase = database; mComputeFunction = computeFunction; mContainer = container; mObserver = new InvalidationTracker.Observer(tableNames) { @Override public void onInvalidated(@NonNull Set<String> tables) { ArchTaskExecutor.getInstance().executeOnMainThread(mInvalidationRunnable); } }; }
而在RoomTrackingLiveData中,重寫了onActive方法。其中mContainer是InvalidationLiveDataContainer,文件上有寫僅僅是維護LiveData的強引用,防止正在使用的LiveData被回收,跟本文目標沒關係,可忽略。而後面的就有意思了,通過Excutor執行了一個任務,所以,我們來看一下這個任務把。
@Override protected void onActive() { super.onActive(); mContainer.onActive(this); mDatabase.getQueryExecutor().execute(mRefreshRunnable); }
mRefreshRunnable#run()
// mRegisteredObserver是否註冊的標誌 if (mRegisteredObserver.compareAndSet(false, true)) { mDatabase.getInvalidationTracker().addWeakObserver(mObserver); } boolean computed; do { computed = false; if (mComputing.compareAndSet(false, true)) { try { T value = null; while (mInvalid.compareAndSet(true, false)) { computed = true; try { // Dao實現類中返回LiveData時傳入的一個引數,用於查詢,並將資料組裝成一個實體類 value = mComputeFunction.call(); } catch (Exception e) { throw new RuntimeException("Exception while computing database" + " live data.", e); } } if (computed) { postValue(value); } } finally { mComputing.set(false); } } } while (computed && mInvalid.get());
這段程式碼後段通過CAS去完成一次資料庫的查詢,組裝成實體類並postValue,即更新LiveData。
注意到這個程式碼前段呼叫了InvalidationTracker的addWeakObserver,這個方法就應該就是訂閱了。
InvalidationTracker#addWeakObserver
public void addWeakObserver(Observer observer) { addObserver(new WeakObserver(this, observer)); }
InvalidationTracker#addObserver
public void addObserver(@NonNull Observer observer) { final String[] tableNames = resolveViews(observer.mTables); int[] tableIds = new int[tableNames.length]; final int size = tableNames.length; for (int i = 0; i < size; i++) { Integer tableId = mTableIdLookup.get(tableNames[i].toLowerCase(Locale.US)); if (tableId == null) { throw new IllegalArgumentException("There is no table with name " + tableNames[i]); } tableIds[i] = tableId; } ObserverWrapper wrapper = new ObserverWrapper(observer, tableIds, tableNames); ObserverWrapper currentObserver; synchronized (mObserverMap) { currentObserver = mObserverMap.putIfAbsent(observer, wrapper); } if (currentObserver == null && mObservedTableTracker.onAdded(tableIds)) { syncTriggers(); } }
InvalidationTracker$WeakObserver
static class WeakObserver extends Observer { final InvalidationTracker mTracker; final WeakReference<Observer> mDelegateRef; WeakObserver(InvalidationTracker tracker, Observer delegate) { super(delegate.mTables); mTracker = tracker; mDelegateRef = new WeakReference<>(delegate); } @Override public void onInvalidated(@NonNull Set<String> tables) { final Observer observer = mDelegateRef.get(); if (observer == null) { mTracker.removeObserver(this); } else { observer.onInvalidated(tables); } } }
可以看到,WeakObserver就是對Observer一個弱引用的包裝。而在addObserver中,根據observer中tableNames,對更新了InvalidationTracker的訂閱記錄。新增成功後,最後會呼叫onAdded。
boolean onAdded(int... tableIds) { boolean needTriggerSync = false; synchronized (this) { for (int tableId : tableIds) { final long prevObserverCount = mTableObservers[tableId]; mTableObservers[tableId] = prevObserverCount + 1; if (prevObserverCount == 0) { mNeedsSync = true; needTriggerSync = true; } } } return needTriggerSync; }
這裡mTableObservers是對每個table的observer進行計數。為什麼要計數呢?我們接著看。在發現了訂閱數從0->1的table時,這個方法會返回true,如果它返回true,會執行syncTriggers()方法,經過呼叫會執行這一段程式碼:
final int[] tablesToSync = mObservedTableTracker.getTablesToSync(); if (tablesToSync == null) { return; } final int limit = tablesToSync.length; try { database.beginTransaction(); for (int tableId = 0; tableId < limit; tableId++) { switch (tablesToSync[tableId]) { case ObservedTableTracker.ADD: startTrackingTable(database, tableId); break; case ObservedTableTracker.REMOVE: stopTrackingTable(database, tableId); break; } } database.setTransactionSuccessful(); } finally { database.endTransaction(); }
InvalidationTracker#getTablesToSync()
int[] getTablesToSync() { synchronized (this) { if (!mNeedsSync || mPendingSync) { return null; } final int tableCount = mTableObservers.length; for (int i = 0; i < tableCount; i++) { final boolean newState = mTableObservers[i] > 0; if (newState != mTriggerStates[i]) { mTriggerStateChanges[i] = newState ? ADD : REMOVE; } else { mTriggerStateChanges[i] = NO_OP; } mTriggerStates[i] = newState; } mPendingSync = true; mNeedsSync = false; return mTriggerStateChanges; } }
這個getTablesToSync方法很短,但這裡就體現了observer計數的作用,它遍歷這個表,找出計數與之前不一樣的,如果由一個大於0的數變為->0,表明現在沒有observer訂閱它,返回REMOVE,0->n,返回ADD,否則NO_OP。對於返回ADD的表,就應該是會監聽變化的表了。它會執行startTrackingTable方法。
private void startTrackingTable(SupportSQLiteDatabase writableDb, int tableId) { final String tableName = mTableNames[tableId]; StringBuilder stringBuilder = new StringBuilder(); for (String trigger : TRIGGERS) { stringBuilder.setLength(0); stringBuilder.append("CREATE TEMP TRIGGER IF NOT EXISTS "); appendTriggerName(stringBuilder, tableName, trigger); stringBuilder.append(" AFTER ") .append(trigger) .append(" ON `") .append(tableName) .append("` BEGIN INSERT OR REPLACE INTO ") .append(UPDATE_TABLE_NAME) .append(" VALUES(null, ") .append(tableId) .append("); END"); writableDb.execSQL(stringBuilder.toString()); } }
到這裡我們就很清楚了: 實現監聽修改的方法是觸發器。 (不過我之前僅僅是聽說過觸發器,很少用過,如果不瞭解,這裡有一份 簡易的教程 )。而觸發器關心的操作是這一些:
private static final String[] TRIGGERS = new String[]{"UPDATE", "DELETE", "INSERT"};
對應著更新、刪除、插入。當有這些操作時,根據上述觸發器語句,會更新一個由InvalidationTracker維護的表"UPDATE_TABLE_NAME"。
InvalidationTracker#UPDATE_TABLE_NAME
private static final String UPDATE_TABLE_NAME = "room_table_modification_log";
InvalidationTracker#internalInit
void internalInit(SupportSQLiteDatabase database) { synchronized (this) { if (mInitialized) { Log.e(Room.LOG_TAG, "Invalidation tracker is initialized twice :/."); return; } database.beginTransaction(); try { database.execSQL("PRAGMA temp_store = MEMORY;"); database.execSQL("PRAGMA recursive_triggers='ON';"); database.execSQL(CREATE_TRACKING_TABLE_SQL); database.setTransactionSuccessful(); } finally { database.endTransaction(); } syncTriggers(database); mCleanupStatement = database.compileStatement(RESET_UPDATED_TABLES_SQL); mInitialized = true; } }
注意到表中有這樣一列:
INVALIDATED_COLUMN_NAME + " INTEGER NOT NULL DEFAULT 0
在觸發器設定的是更新操作時會被設定為1。所以,應該就是檢驗這個值來判斷表是否有更新。那麼是哪裡進行判斷呢?我們可以從一個更新操作開始找,例如BookDao_Impl#insert()
@Override public long insert(final Book book) { __db.beginTransaction(); try { long _result = __insertionAdapterOfBook.insertAndReturnId(book); __db.setTransactionSuccessful(); return _result; } finally { __db.endTransaction(); } }
最後發現在endTransaction中呼叫了InvalidationTracker的refreshVersionsAsync方法。而在這個方法中,最終會執行InvalidationTracker的mRefreshRunnable物件的run方法。(注意,和上文的mRefreshRunnbale屬於不同類,不是同一個物件。)
RoomDatabase#endTransaction()
public void endTransaction() { mOpenHelper.getWritableDatabase().endTransaction(); if (!inTransaction()) { // enqueue refresh only if we are NOT in a transaction. Otherwise, wait for the last // endTransaction call to do it. mInvalidationTracker.refreshVersionsAsync(); } }
InvalidationTracker#mRefreshRunnable#run()
inal Lock closeLock = mDatabase.getCloseLock(); boolean hasUpdatedTable = false; try { ... 省略 if (mDatabase.mWriteAheadLoggingEnabled) { // This transaction has to be on the underlying DB rather than the RoomDatabase // in order to avoid a recursive loop after endTransaction. SupportSQLiteDatabase db = mDatabase.getOpenHelper().getWritableDatabase(); db.beginTransaction(); try { hasUpdatedTable = checkUpdatedTable(); db.setTransactionSuccessful(); } finally { db.endTransaction(); } } else { hasUpdatedTable = checkUpdatedTable(); } } catch (IllegalStateException | SQLiteException exception) { // may happen if db is closed. just log. Log.e(Room.LOG_TAG, "Cannot run invalidation tracker. Is the db closed?", exception); } finally { closeLock.unlock(); } if (hasUpdatedTable) { // 分發給Observer,最終會更新LiveData synchronized (mObserverMap) { for (Map.Entry<Observer, ObserverWrapper> entry : mObserverMap) { entry.getValue().notifyByTableVersions(mTableInvalidStatus); } } // Reset invalidated status flags. mTableInvalidStatus.clear(); }
注意,hasUpdatedTable = checkUpdatedTable();
private boolean checkUpdatedTable() { boolean hasUpdatedTable = false; Cursor cursor = mDatabase.query(new SimpleSQLiteQuery(SELECT_UPDATED_TABLES_SQL)); //noinspection TryFinallyCanBeTryWithResources try { while (cursor.moveToNext()) { final int tableId = cursor.getInt(0); mTableInvalidStatus.set(tableId); hasUpdatedTable = true; } } finally { cursor.close(); } if (hasUpdatedTable) { mCleanupStatement.executeUpdateDelete(); } return hasUpdatedTable; }
@VisibleForTesting static final String SELECT_UPDATED_TABLES_SQL = "SELECT * FROM " + UPDATE_TABLE_NAME + " WHERE " + INVALIDATED_COLUMN_NAME + " = 1;";
果然,是查詢"UPDATE_TABLE_NAME"這個表中"INVALIDATED_COLUMN_NAME"這列為1的記錄,然後設定自己的狀態。完成這個過程就分發給自己的Observers。
void notifyByTableVersions(BitSet tableInvalidStatus) { ... if (invalidatedTables != null) { mObserver.onInvalidated(invalidatedTables); } }
而在前文中有說到,註冊的Observer實際上是RoomTrackingLiveData的mObserver的包裝,最終會呼叫到它的onInvalidated。
mObserver = new InvalidationTracker.Observer(tableNames) { @Override public void onInvalidated(@NonNull Set<String> tables) { ArchTaskExecutor.getInstance().executeOnMainThread(mInvalidationRunnable); } }
final Runnable mInvalidationRunnable = new Runnable() { @MainThread @Override public void run() { boolean isActive = hasActiveObservers(); if (mInvalid.compareAndSet(false, true)) { if (isActive) { mDatabase.getQueryExecutor().execute(mRefreshRunnable); } } } };
可見,最後會線上程池中執行RoomTrackingLiveData的mRefreshRunnable任務。這個任務前文已經分析過了,通過CAS的方式查詢資料,並post給LiveData,這樣就實現了資料更新的通知。到這裡,Room和LiveData聯動的工作原理就大致分析完畢。