1. 程式人生 > >ElasticSearch + Canal 開發千萬級的實時搜尋系統【轉】

ElasticSearch + Canal 開發千萬級的實時搜尋系統【轉】

公司是做社交相關產品的,社交類產品對搜尋功能需求要求就比較高,需要根據使用者城市、使用者ID暱稱等進行搜尋。

專案原先的搜尋介面採用SQL查詢的方式實現,資料庫表採用了按城市分表的方式。但隨著業務的發展,搜尋介面呼叫頻次越來越高,搜尋介面壓力越來越大,搜尋資料庫經常崩潰,從而導致搜尋功能經常不能使用。

從上面的系統架構圖可以看出,當用戶修改資料時,介面會修改使用者庫資訊,接著觸發器會將改變的使用者資訊寫入臨時表。定時指令碼每隔1分鐘掃描一次臨時表,將變更的資料寫入到搜尋庫中。當用戶再次請求搜尋介面時,就可以搜尋到最新的資料。

從技術層面分析,原搜尋系統的設計有以下缺點:

  • 搜尋資訊不實時。
    當用戶修改資訊時,需要等待1分鐘的時間才能將最新的使用者資訊同步到搜尋資料庫中。
  • ID、暱稱搜尋速度慢。按照地區分表的資料庫設計是為了減輕資料庫壓力,保證大部分按照地區搜尋的請求能正常響應。但是如果使用者按照ID或暱稱搜尋,那麼我們就需要對成千上萬個地區表全都搜尋一次,這時間複雜度可想而知。很多時候按照暱稱和ID搜尋速度太慢,需要10多秒才能響應。
  • 系統穩定性、拓展性以及處理能力差。這可以歸結為技術老舊,無法滿足業務需求。隨著搜尋量的提升,對資料庫的壓力將會越來越大,而MySQL資料庫天然不適合用來應對海量的請求。現在已經有更加成熟的ElasticSearch可以用來做搜尋方面的業務。
  • 觸發器不便於管理。觸發器這種東西不好維護,並且擴充套件性很差,一旦修改的請求變多,很可能導致整個資料庫崩潰(使用者庫崩潰是很嚴重的)。

我們總結一下新搜尋系統需要解決的幾個問題:

  • 海量請求。幾百萬的請求毫無壓力,上千萬上億也要可以扛得住。
  • 實時搜尋。指的是當一個使用者修改了其資料之後,另一個使用者能實時地搜尋到改使用者。

海量請求。要扛得起海量的搜尋請求,可以使用ElasticSearch來實現,它是在Lucene的基礎上進行封裝的一個開源專案,它將Lucene複雜的原理以及API封裝起來,對外提供了一個易用的API介面。ElasticSearch現在已經廣泛地被許多公司使用,其中包括:愛奇藝、百姓網、58到家等公司。

實時搜尋。阿里有一個開源專案Canal,就是用來解決這個問題的,Canal專案利用了MySQL資料庫主從同步的原理,將Canal Server模擬成一臺需要同步的從庫,從而讓主庫將binlog日誌流傳送到Canal Server介面。Canal專案對binlog日誌的解析進行了封裝,我們可以直接得到解析後的資料,而不需要理會binlog的日誌格式。而且Canal專案整合了zookeeper,整體實現了高可用,可伸縮性強,是一個不錯的解決方案。

經過一段時間的技術預研,我們設計了整個搜尋技術架構:

從架構圖可以看出整個系統分為兩大部分:

  • Canal資料變更服務平臺。這部分負責解析MySQL的binlog日誌,並將其解析後的資料封裝成特定的物件放到Kafka中。
  • Kafka資料消費方。這部分負責消費存放在Kafka中的訊息,當消費方拿到具體的使用者表變更訊息時,將最新的使用者資訊存放到ES資料倉庫中。

Canal技術變更基礎平臺

因為考慮到未來可能有其他專案需要監控資料庫某些表的變化,因此我們將Canal獲取MySQL資料變更部分做成一個公用的平臺。當有其他業務需要增加監控的表時,我們可以直接修改配置檔案,重啟伺服器即可完成新增,極大地提高了開發效率。

在這一部分中,主要分為兩大部分:Canal Server 和 Canal Client。

Canal Server端。Canal Server偽裝成MySQL的一個從庫,使主庫傳送binlog日誌給 Canal Server,Canal Server 收到binlog訊息之後進行解析,解析完成後將訊息直接傳送給Canal Client。在Canal Server端可以設定配置檔案進行具體scheme(資料庫)和table(資料庫表)的篩選,從而實現動態地增加對資料庫表的監視。

Canal Client端。Canal Client端接收到Canal Server的訊息後直接將訊息存到Kafka指定Partition中,並將最新的binlogid傳送給zookeeper叢集儲存。

Kafka訊息消費端

Canal技術變更平臺在獲取到對應的資料庫變更訊息後會將其放到指定的Kafka分片裡,具體的業務專案需要到指定的Kafka片區裡消費對應的資料變更訊息,之後根據具體的業務需求進行處理。

因為Canal變化是根據表為最小單位進行地,因此我在實現方面定義了一個以表為處理單位的MsgDealer介面:

public interface MsgDealer {
    void deal(CanalMsgVo canalMsgVo);
}

搜尋庫涉及對5個表的監視,因此我實現了5個對應的處理類:

針對不同表的資料變化,自動呼叫不同的實現類進行處理。