設計模式 | 中介者模式及典型應用
本文的主要內容:
- 介紹中介者模式
- 資料同步示例
- 中介者模式總結
- 原始碼分析中介者模式的典型應用
- Java Timer 中的中介者模式
更多內容可訪問我的個人部落格:laijianfeng.org
關注【小旋鋒】微信公眾號,及時接收博文推送

中介者模式
世界上存在著各種各樣的資料庫,不同資料庫有各自的應用場景,對於同一份資料,最開始可能使用關係型資料庫(如SQL/">MySQL)進行儲存查詢,使用Redis作為快取資料庫,當資料量較大時使用MySQL進行查詢可能較慢,所以需要將資料同步到Elasticsearch或者列式資料庫如Hbase中進行大資料查詢。
如何設計資料同步方案是一個重要的問題。資料來源眾多,目標端也眾多,設計得不好可能 "牽一髮而動全身"。
如果我們這樣設計:每個資料來源直接同步資料到目標端資料庫的,如果資料庫有 N 個,那麼最多可能的同步作業將達到 N * N
個,當修改了其中一個數據庫的某些配置,可能需要修改另外的 N - 1
個數據庫的同步作業。
現在介紹另一種方案,DataX 是阿里巴巴集團內被廣泛使用的離線資料同步工具/平臺,實現包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各種異構資料來源之間高效的資料同步功能。

DataX 其實相當於一箇中介,從資料來源讀取資料,寫入到目標端,資料來源不再需要維護到目標端的同步作業,只需要與 DataX 通訊即可。DataX 體現了中介者模式的思想。
中介者模式(Mediator Pattern):用一箇中介物件(中介者)來封裝一系列的物件互動,中介者使各物件不需要顯式地相互引用,從而使其耦合鬆散,而且可以獨立地改變它們之間的互動。中介者模式又稱為調停者模式,它是一種物件行為型模式。
角色
Mediator(抽象中介者):它定義一個介面,該介面用於與各同事物件之間進行通訊。
ConcreteMediator(具體中介者):它是抽象中介者的子類,通過協調各個同事物件來實現協作行為,它維持了對各個同事物件的引用。
Colleague(抽象同事類):它定義各個同事類公有的方法,並聲明瞭一些抽象方法來供子類實現,同時它維持了一個對抽象中介者類的引用,其子類可以通過該引用來與中介者通訊。
ConcreteColleague(具體同事類):它是抽象同事類的子類;每一個同事物件在需要和其他同事物件通訊時,先與中介者通訊,通過中介者來間接完成與其他同事類的通訊;在具體同事類中實現了在抽象同事類中宣告的抽象方法。
中介者模式的核心在於中介者類的引入,在中介者模式中, 中介者類承擔了兩方面的職責 :
- 中轉作用(結構性) :通過中介者提供的中轉作用,各個同事物件就不再需要顯式引用其他同事,當需要和其他同事進行通訊時,可通過中介者來實現間接呼叫。該中轉作用屬於中介者在結構上的支援。
- 協調作用(行為性) :中介者可以更進一步的對同事之間的關係進行封裝,同事可以一致的和中介者進行互動,而不需要指明中介者需要具體怎麼做,中介者根據封裝在自身內部的協調邏輯,對同事的請求進行進一步處理,將同事成員之間的關係行為進行分離和封裝。
示例
我們來實現一個簡化版的資料同步方案,有三種資料庫 Mysql、Redis、Elasticsearch,其中的 Mysql 作為主資料庫,當增加一條資料時 需要 同步到另外兩個資料庫中;Redis 作為快取資料庫,當增加一條資料時 不需要 同步到另外另個數據庫;而 Elasticsearch 作為大資料查詢資料庫,有一個統計功能,當增加一條資料時 只需要 同步到 Mysql,所以它們之間的關係圖如下所示。

首先我們來實現第一種 不使用 中介者模式的資料同步方案,各資料來源維護各自的同步作業。
抽象資料庫
public abstract class AbstractDatabase { public abstract void add(String data); public abstract void addData(String data); } 複製程式碼
具體資料庫 Mysql,維護同步到 Redis和Elasticsearch 的同步作業
public class MysqlDatabase extends AbstractDatabase { private List<String> dataset = new ArrayList<String>(); @Setter private RedisDatabase redisDatabase; @Setter private EsDatabase esDatabase; @Override public void addData(String data) { System.out.println("Mysql 新增資料:" + data); this.dataset.add(data); } @Override public void add(String data) { addData(data); this.redisDatabase.addData(data);// 維護同步到Redis的同步作業 this.esDatabase.addData(data);// 維護同步到Elasticsearch的同步作業 } public void select() { System.out.println("- Mysql 查詢,資料:" + this.dataset.toString()); } } 複製程式碼
具體資料庫 Redis,不需要同步到其它資料庫
public class RedisDatabase extends AbstractDatabase { private List<String> dataset = new LinkedList<String>(); @Override public void addData(String data) { System.out.println("Redis 新增資料:" + data); this.dataset.add(data); } @Override public void add(String data) { addData(data); // 不同步到其它資料庫 } public void cache() { System.out.println("- Redis 快取的資料:" + this.dataset.toString()); } } 複製程式碼
Elasticsearch ,只需要同步到Mysql
public class EsDatabase extends AbstractDatabase { private List<String> dataset = new CopyOnWriteArrayList<String>(); @Setter private MysqlDatabase mysqlDatabase; @Override public void addData(String data) { System.out.println("ES 新增資料:" + data); this.dataset.add(data); } @Override public void add(String data) { addData(data); this.mysqlDatabase.addData(data);// 維護同步到MySQL的同步作業 } public void count() { int count = this.dataset.size(); System.out.println("- Elasticsearch 統計,目前有 " + count + " 條資料,資料:" + this.dataset.toString()); } } 複製程式碼
測試客戶端,分別往三個資料庫中加入一些資料檢視同步效果
public class Client { public static void main(String[] args) { MysqlDatabase mysqlDatabase = new MysqlDatabase(); RedisDatabase redisDatabase = new RedisDatabase(); EsDatabase esDatabase = new EsDatabase(); mysqlDatabase.setRedisDatabase(redisDatabase); mysqlDatabase.setEsDatabase(esDatabase); esDatabase.setMysqlDatabase(mysqlDatabase); System.out.println("\n---------mysql 新增資料 1,將同步到Redis和ES中-----------"); mysqlDatabase.add("1"); mysqlDatabase.select(); redisDatabase.cache(); esDatabase.count(); System.out.println("\n---------Redis新增資料 2,將不同步到其它資料庫-----------"); redisDatabase.add("2"); mysqlDatabase.select(); redisDatabase.cache(); esDatabase.count(); System.out.println("\n---------ES 新增資料 3,只同步到 Mysql-----------"); esDatabase.add("3"); mysqlDatabase.select(); redisDatabase.cache(); esDatabase.count(); } } 複製程式碼
輸出結果
---------mysql 新增資料 1,將同步到Redis和ES中----------- Mysql 新增資料:1 Redis 新增資料:1 ES 新增資料:1 - Mysql 查詢,資料:[1] - Redis 快取的資料:[1] - Elasticsearch 統計,目前有 1 條資料,資料:[1] ---------Redis新增資料 2,將不同步到其它資料庫----------- Redis 新增資料:2 - Mysql 查詢,資料:[1] - Redis 快取的資料:[1, 2] - Elasticsearch 統計,目前有 1 條資料,資料:[1] ---------ES 新增資料 3,只同步到 Mysql----------- ES 新增資料:3 Mysql 新增資料:3 - Mysql 查詢,資料:[1, 3] - Redis 快取的資料:[1, 2] - Elasticsearch 統計,目前有 2 條資料,資料:[1, 3] 複製程式碼
其實這樣已經實現了我們的需求,但是 存在一些問題 :
- 系統結構複雜且耦合度高 。資料來源需要維護目標端資料庫的引用,以便完成資料同步
- 元件的可重用性差 。由於每一個數據源和目標端之間具有很強的關聯,若沒有目標端的支援,這個元件很難被另一個系統或模組重用
- 系統的可擴充套件性差 :如果需要增加、修改或刪除其中一個數據庫、將導致多個類的原始碼需要修改,這違反了 "開閉原則",可擴充套件性和靈活性欠佳。
我們 使用中介者模式來重構 ,將資料同步的功能遷移到中介者中,由中介者來管理資料同步作業
首先還是抽象資料庫類(抽象同事類),維護了一箇中介者
public abstract class AbstractDatabase { public static final String MYSQL = "mysql"; public static final String REDIS = "redis"; public static final String ELASTICSEARCH = "elasticsearch"; protected AbstractMediator mediator;// 中介者 public AbstractDatabase(AbstractMediator mediator) { this.mediator = mediator; } public abstract void addData(String data); public abstract void add(String data); } 複製程式碼
Mysql 資料庫(具體同事類)
public class MysqlDatabase extends AbstractDatabase { private List<String> dataset = new ArrayList<String>(); public MysqlDatabase(AbstractMediator mediator) { super(mediator); } @Override public void addData(String data) { System.out.println("Mysql 新增資料:" + data); this.dataset.add(data); } @Override public void add(String data) { addData(data); this.mediator.sync(AbstractDatabase.MYSQL, data); // 資料同步作業交給中介者管理 } public void select() { System.out.println("Mysql 查詢,資料:" + this.dataset.toString()); } } 複製程式碼
Redis 資料庫(具體同事類)
public class RedisDatabase extends AbstractDatabase { private List<String> dataset = new LinkedList<String>(); public RedisDatabase(AbstractMediator mediator) { super(mediator); } @Override public void addData(String data) { System.out.println("Redis 新增資料:" + data); this.dataset.add(data); } @Override public void add(String data) { addData(data); this.mediator.sync(AbstractDatabase.REDIS, data);// 資料同步作業交給中介者管理 } public void cache() { System.out.println("Redis 快取的資料:" + this.dataset.toString()); } } 複製程式碼
Elasticsearch(具體同事類)
public class EsDatabase extends AbstractDatabase { private List<String> dataset = new CopyOnWriteArrayList<String>(); public EsDatabase(AbstractMediator mediator) { super(mediator); } @Override public void addData(String data) { System.out.println("ES 新增資料:" + data); this.dataset.add(data); } @Override public void add(String data) { addData(data); this.mediator.sync(AbstractDatabase.ELASTICSEARCH, data);// 資料同步作業交給中介者管理 } public void count() { int count = this.dataset.size(); System.out.println("Elasticsearch 統計,目前有 " + count + " 條資料,資料:" + this.dataset.toString()); } } 複製程式碼
抽象中介者
@Data public abstract class AbstractMediator { protected MysqlDatabase mysqlDatabase; protected RedisDatabase redisDatabase; protected EsDatabase esDatabase; public abstract void sync(String databaseName, String data); } 複製程式碼
具體中介者
public class SyncMediator extends AbstractMediator { @Override public void sync(String databaseName, String data) { if (AbstractDatabase.MYSQL.equals(databaseName)) { // mysql 同步到 redis 和 Elasticsearch this.redisDatabase.addData(data); this.esDatabase.addData(data); } else if (AbstractDatabase.REDIS.equals(databaseName)) { // redis 快取同步,不需要同步到其他資料庫 } else if (AbstractDatabase.ELASTICSEARCH.equals(databaseName)) { // Elasticsearch 同步到 Mysql this.mysqlDatabase.addData(data); } } } 複製程式碼
測試客戶端
public class Client { public static void main(String[] args) { AbstractMediator syncMediator = new SyncMediator(); MysqlDatabase mysqlDatabase = new MysqlDatabase(syncMediator); RedisDatabase redisDatabase = new RedisDatabase(syncMediator); EsDatabase esDatabase = new EsDatabase(syncMediator); syncMediator.setMysqlDatabase(mysqlDatabase); syncMediator.setRedisDatabase(redisDatabase); syncMediator.setEsDatabase(esDatabase); System.out.println("\n---------mysql 新增資料 1,將同步到Redis和ES中-----------"); mysqlDatabase.add("1"); mysqlDatabase.select(); redisDatabase.cache(); esDatabase.count(); System.out.println("\n---------Redis新增資料 2,將不同步到其它資料庫-----------"); redisDatabase.add("2"); mysqlDatabase.select(); redisDatabase.cache(); esDatabase.count(); System.out.println("\n---------ES 新增資料 3,只同步到 Mysql-----------"); esDatabase.add("3"); mysqlDatabase.select(); redisDatabase.cache(); esDatabase.count(); } } 複製程式碼
輸出結果,與預期一致
---------mysql 新增資料 1,將同步到Redis和ES中----------- Mysql 新增資料:1 Redis 新增資料:1 ES 新增資料:1 - Mysql 查詢,資料:[1] - Redis 快取的資料:[1] - Elasticsearch 統計,目前有 1 條資料,資料:[1] ---------Redis新增資料 2,將不同步到其它資料庫----------- Redis 新增資料:2 - Mysql 查詢,資料:[1] - Redis 快取的資料:[1, 2] - Elasticsearch 統計,目前有 1 條資料,資料:[1] ---------ES 新增資料 3,只同步到 Mysql----------- ES 新增資料:3 Mysql 新增資料:3 - Mysql 查詢,資料:[1, 3] - Redis 快取的資料:[1, 2] - Elasticsearch 統計,目前有 2 條資料,資料:[1, 3] 複製程式碼
畫出類圖如下

中介者模式總結
中介者模式的主要優點
-
中介者模式 簡化了物件之間的互動 ,它用中介者和同事的一對多互動代替了原來同事之間的多對多互動,一對多關係更容易理解、維護和擴充套件,將原本難以理解的網狀結構轉換成相對簡單的星型結構。
-
中介者模式可 將各同事物件解耦 。中介者有利於各同事之間的鬆耦合,我們可以獨立的改變和複用每一個同事和中介者,增加新的中介者和新的同事類都比較方便,更好地符合 "開閉原則"。
-
可以 減少子類生成 ,中介者將原本分佈於多個物件間的行為集中在一起,改變這些行為只需生成新的中介者子類即可,這使各個同事類可被重用,無須對同事類進行擴充套件。
中介者模式的主要缺點
- 在 具體中介者類中包含了大量同事之間的互動細節 ,可能會導致具體中介者類非常複雜,使得系統難以維護。(也就是把具體同事類之間的互動複雜性集中到了中介者類中,結果中介者成了最複雜的類)
適用場景
-
系統中物件之間存在複雜的引用關係,系統結構混亂且難以理解。
-
一個物件由於引用了其他很多物件並且直接和這些物件通訊,導致難以複用該物件。
-
想通過一箇中間類來封裝多個類中的行為,而又不想生成太多的子類。可以通過引入中介者類來實現,在中介者中定義物件互動的公共行為,如果需要改變行為則可以增加新的具體中介者類。
中介者模式的典型應用
Java Timer 中的中介者模式
敲一個 java.util.Timer
的Demo
兩個任務類
public class MyOneTask extends TimerTask { private static int num = 0; @Override public void run() { System.out.println("I'm MyOneTask " + ++num); } } public class MyTwoTask extends TimerTask { private static int num = 1000; @Override public void run() { System.out.println("I'm MyTwoTask " + num--); } } 複製程式碼
客戶端測試,3秒後開始執行,迴圈週期為 1秒
public class TimerTest { public static void main(String[] args) { // 注意:多執行緒並行處理定時任務時,Timer執行多個TimeTask時,只要其中之一沒有捕獲丟擲的異常, // 其它任務便會自動終止執行,使用ScheduledExecutorService則沒有這個問題 Timer timer = new Timer(); timer.schedule(new MyOneTask(), 3000, 1000); // 3秒後開始執行,迴圈週期為 1秒 timer.schedule(new MyTwoTask(), 3000, 1000); } } 複製程式碼
輸出
I'm MyOneTask 1 I'm MyTwoTask 1000 I'm MyTwoTask 999 I'm MyOneTask 2 I'm MyOneTask 3 I'm MyTwoTask 998 I'm MyTwoTask 997 I'm MyOneTask 4 I'm MyOneTask 5 I'm MyTwoTask 996 I'm MyTwoTask 995 I'm MyOneTask 6 ... 複製程式碼
Timer
的部分關鍵原始碼如下
public class Timer { private final TaskQueue queue = new TaskQueue(); private final TimerThread thread = new TimerThread(queue); public void schedule(TimerTask task, long delay) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); sched(task, System.currentTimeMillis()+delay, 0); } public void schedule(TimerTask task, Date time) { sched(task, time.getTime(), 0); } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; // 獲取任務佇列的鎖(同一個執行緒多次獲取這個鎖並不會被阻塞,不同執行緒獲取時才可能被阻塞) synchronized(queue) { // 如果定時排程執行緒已經終止了,則丟擲異常結束 if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); // 再獲取定時任務物件的鎖(為什麼還要再加這個鎖呢?想不清) synchronized(task.lock) { // 判斷執行緒的狀態,防止多執行緒同時排程到一個任務時多次被加入任務佇列 if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); // 初始化定時任務的下次執行時間 task.nextExecutionTime = time; // 重複執行的間隔時間 task.period = period; // 將定時任務的狀態由TimerTask.VIRGIN(一個定時任務的初始化狀態)設定為TimerTask.SCHEDULED task.state = TimerTask.SCHEDULED; } // 將任務加入任務佇列 queue.add(task); // 如果當前加入的任務是需要第一個被執行的(也就是他的下一次執行時間離現在最近) // 則喚醒等待queue的執行緒(對應到上面提到的queue.wait()) if (queue.getMin() == task) queue.notify(); } } // cancel會等到所有定時任務執行完後立刻終止定時執行緒 public void cancel() { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.clear(); queue.notify();// In case queue was already empty. } } // ... } 複製程式碼
Timer
中在 schedulexxx
方法中通過 TaskQueue
協調各種 TimerTask
定時任務, Timer
是中介者, TimerTask
是抽象同事類,而我們自己寫的任務則是具體同事類
TimerThread
是 Timer
中定時排程執行緒類的定義,這個類會做為一個執行緒一直執行來執行 Timer
中任務佇列中的任務。
Timer
這個中介者的功能就是 定時排程我們寫的各種任務 ,將任務新增到 TaskQueue
任務佇列中,給 TimerThread
執行,讓任務與執行執行緒解耦