1. 程式人生 > >設計資料密集型應用第三部分:派生資料

設計資料密集型應用第三部分:派生資料

  《Designing Data-Intensive Applications》的第一部分,基於單點(single node)介紹了資料系統的基礎理論與知識;在第二部分,則是將視野擴充套件到了分散式資料系統,主要是Partition和Repliacation。在第三部分,則聚焦於派生資料系統。

integrating multiple different data systems, potentially with different data models and optimized for different access patterns, into one coherent application architecture.

  對於目前日益複雜的應用,沒有哪一種單一的資料系統可以滿足應用的所有需求,所以本章就是介紹如何將不同的資料系統整合到單一應用中。資料系統可以分為兩類

  system of record:原始資料,source of truth

  derived data system:派生資料系統,即資料來自其他資料系統。派生資料系統包括但不限於:cache、索引、檢視,本質上派生資料是原始資料的冗餘,為了效能而做的冗餘。

  值得注意的是,原始資料系統與派生資料系統的區別並不在於對應的工具,而在於在應用中的具體使用方式。

The distinction between system of record and derived data system depends not on the tool, but on how you use it in your application.

Batch Processing

  在某個層面,可以資料系統進行以下分類

  • Services (online systems)

    追求response time

  • Batch processing systems (offline systems)

    追求throughput

  • Stream processing systems (near-real-time systems)

    sth between online and batch system

  本章討論的是批處理系統(Batch processing),MapReduce是批處理系統的典型代表,在MapReduce的諸多設計中,都可以看到unix的一些影子。

unix哲學

  • 一個程式只做一件事,做好這件事,如果有新需求,那麼重新寫一個程式,而不是在原來的程式上修修補補
  • 儘量讓每個程式的輸出做其他程式的輸入,即輸出要通用、普適性,不要堅持互動式輸入
  • 設計和構造軟體,即使是作業系統,都要及早嘗試,對於不合理的地方,要毫不猶豫的推到重構
  • 儘早藉助工具來避免重複性的勞動,自動化

automation, rapid prototyping, incremental iteration, being friendly to experimentation, and breaking down large projects into manageable chunks

Separating the input/output wiring from the program logic makes

  unix pipeline的最大缺陷在於這些組合工具只能在單個機器上執行,需要擴充套件到多個節點時,就需要Hadoop這樣的分散式系統

MapReduce

  與unix區別

  • 從單機到多機
  • stdin stdout 到 file

  關於MapReduce的原理與框架,之前在《典型分散式系統分析:MapReduce》一文中描述過。下面關注一些不是在MapReduce論文中出現的一些討論。

Joins and Grouping

  批處理中,經常也需要join操作,通過join操作來補充完整一個事件(event)。在批處理中,既可以在Map的時候join,也可以在Reduce的時候join,如下所示

  

  event log中只記錄uid,而其他屬性需要從user database(profile information)讀取,這樣避免了profile資料的冗餘

  每次通過網路去讀取user profile 顯然是不切實際的,拖慢批處理速度;而且由於profile 是可變的,導致批處理 結果不是確定性的。一個友好的解決辦法是:冗餘一份資料,放到批處理系統中。

  下面是一個reduce_side join的例子。稱之為sort-merge join,因為不管是User event 還是 User profile都按照userID進行hash,因此都一個使用者的event 和 profile會分配到都一個reducer。

  

批處理使用場景

  • 搜尋引擎 search index

  關於增量建立search index,寫入新的segment檔案,後臺批量合併壓縮。

  new segment files and asynchronously merges and compacts segment files in the background.

  • 機器學習與推薦

  一般來說,將資料寫入到一個key value store,然後給使用者查詢

  怎麼講批處理的結果匯入到kvs? 直接匯入是不大可能的。寫入到一個新的db,然後切換。

  build a brand-new database inside the batch job and write it as files to the job’s output directory in the distributed filesystem

Beyond MapReduce

  MapReduce的問題

  (1)比較底層,需要寫大量程式碼:using the raw MapReduce APIs is actually quite hard and laborious

  解決辦法:higher-level programming models (Pig, Hive, Cascading, Crunch) were created as abstractions on top of MapReduce.

  (2) mapreduce execution model的問題,如下

Materialization of Intermediate State

  materiallization(物化)是指:每一個MapReduce的輸出都需要寫入到檔案再給下一個MapReduce Task Job。

  顯然,materiallization是提前計算,而不是按需計算。而Unix pipleline 是通過stream按需計算,只佔用少量記憶體空間。

  MapReduce相比unix pipeline缺陷

  1.   MapReduce job完成之後才能進行下一個,而unix pipeline是同時執行的
  2.   Mapper經常是多餘的:很多時候僅僅是出去上一個reducer的輸出
  3.   中間狀態的儲存也是要冗餘的,有點浪費

  dataflow engines如Spark、Tez、Flink試圖解決Mapreduce問題的

they handle an entire workflow as one job, rather than breaking it up into independent subjobs. 

  dataflow engines 沒有明顯的map reduce , 而是一個接一個的operator。其優勢:

  • 避免了無謂的sort(mr 在map和reduce之間總是要sort)
  • 較少非必需的map task
  • 由於知道整個流程,可以實現locality optimizations
  • 中間狀態寫入記憶體或者本地檔案,而不是HDFS
  • operator流水線工作,不同等到上一個stage完全結束
  • 在執行新的operator時,可以複用JVM

Stream Processing

  批處理與流處理的最大區別在於,批處理的輸入是確定的、有限的,而流處理的輸入是源源不斷的,因此流處理系統一般比批處理系統有更好的實時性。

  流處理相關術語

  event:In a stream processing context, a record is more commonly known as an event

  producer、publisher、sender

  consumer、subscriber、recipient

  topic、stream,一組相關event

messaging system

  用於事件發生時,通知消費者,對於某個topic 一般是多生產者 多消費者。

  如何對訊息系統分類:

  (1)What happens if the producers send messages faster than the consumers can process them?

  第一個問題,生產速度大於消費速度,對應的處理方式包括:丟包、快取、流控(限制寫入速度)

  (2)What happens if nodes crash or temporarily go offline—are any messages lost?

  第二個問題,當節點crash或者臨時故障,訊息會不會丟

event如何從producer到達consumer

(1)直達訊息系統(沒有中間商)

  即一個event直接從producer到達consumer,如UDP廣播,brokerless : zeroMQ,這樣的系統有訊息丟失的風險。

(2)message broker(message queue)

  定製化的DB

  非同步過程

  保證訊息可靠性

multi consumer

  shared subscriptions,一條訊息任意一個consumer處理即可;負載均衡;可擴充套件性

  topic subscriptions 一條訊息需要被不同的comsumer消費

  

  上圖(a)中的event只需要被任意一個consumer消費即可,而(b)中的每一個event則需要被所有關注該topic的consumer處理

Acknowledgments and redelivery

  需要consumer的ack來保證訊息已被消費,訊息可能會被重複投遞,因此需要冪等性

  當 load balancing遇上redeliver,可能會出現messgae 亂序

logbased message brokers

  一般的訊息佇列都是一次性消費,基於log的訊息佇列可以重複消費

The log-based approach trivially supports fan-out messaging, because several consumers can independently read the log without affecting each other—reading a message does not delete it from the log

  其優點在於:持久化且immutable的日誌允許comsumer重新處理所有的事件

This aspect makes log-based messaging more like the batch processes of the last chapter, where derived data is clearly separated from input data through a repeatable transformation process. It allows more experimentation and easier recovery from errors and bugs, making it a good tool for integrating dataflows within an organization

Databases and Streams

  在log-based message broker有資料庫的影子,即資料在log中,那麼反過來呢,能否將message的思想應用於db,或者說db中是否本身就有message的思想?

  其實是有的,在primary-secondary 中,primary寫oplog, produce event;secondary讀oplog, consume event。

Keep Systems in Sync

  一份資料以不同的形式儲存多分,db、cache、search index、recommend system、OLAP

  很多都是使用full database dumps(batch process),這個速度太慢,又有滯後; 多寫(dual write)也是不現實的,增加應用層負擔、耦合嚴重。

Change Data Capture

  一般來說,應用(db_client)按照db的約束來使用db,而不是直接讀取、解析replication log。但如果可以直接讀取,則有很多用處,例如用來建立serach index、cache、data warehouse。

  如下圖所示

  

  前面是DB(leader),中間是log-based message broker,後面是derived data system(cache, data warehouse) as followers

  這樣做的潛在問題是,日誌會越來越多,耗光磁碟,直接刪除就的log也是不行的,可以週期性的log compaction:處理對一個key重複的操作,或者說已經被刪除的key。這樣也能解決新增加一個consumber,且consumber需要所有完整資料的情況。

Event Sourcing

event sourcing involves storing all changes to the application state as a log of change events.

  CDC在資料層記錄,增刪改查,一個event可能對應多個data change;mutable

  event sourcing 在應用層記錄,immutable(不應該修改 刪除)

  event soucing 一般只記錄操作,不記錄操作後的結果,因此需要所有資料才能恢復當前的狀態

  週期性的snapshot有助於效能

  Commands and events: 二者並不等價,Command只是意圖(比如想預定座位),只有通過檢查,執行成功,才會生成對應的event,event 代表 fact

State, Streams, and Immutability

  

  上圖非常有意思:state是event stream的累計值,積分的效果,而stream是state的瞬時值,微分的效果

  Advantages of immutable events

  • immutable event log 有利於追溯到任意時間點,也可以更容易從錯誤中恢復
  • immutable event log 比當前狀態有更多的資訊:使用者新增物品到購物籃,然後從購物籃移除;從狀態來看,什麼都沒有發生,但event log卻意義豐富
  • Deriving several views from the same event log  當有event log,很容易回放event,產生新的資料檢視,而不用冒險修改當前使用的資料檢視,做到灰度升級

Processing Streams

  資料流應用廣泛:

  1. 寫到其他資料系統:db、cache等
  2. 推送給使用者,或者實時展示
  3. 產生其他的資料流,形成鏈路

  stream processing 通常用於監控:風控、實時交易系統、機器狀態、軍事系統

  CEP(Complex event processing)是對特定事件的監控,對於stream,設定匹配規則,滿足條件則觸發 complex event

  In these systems, the relationship between queries and data is reversed compared to normal databases.

  DB持久化資料,查詢是臨時的

  而CEP持久化的是查詢語句,資料時源源不斷的

Reasoning About Time

  批處理一般使用event time,而流處理可能採用本地時間(stream processing time),這可能導致不準確的時間視窗(尤其兩個時間差抖動的時候)

  以event time作為時間視窗的問題:不確定是否收到了某個window內所有的event。

  通常,需要結合使用本地時鐘與伺服器時鐘,考慮一個情況,客戶端採集日誌傳送到伺服器,在未聯網的時候本地快取。如果用本地時間,本地時間可能不準,用伺服器時間,不能反映事件發生的時刻(可能過了很長時間才從快取傳送到伺服器),解決辦法:

  1.   用device clock記錄事件發生時間;
  2.   用device clock記錄事件上傳時間;
  3.   用server clock記錄伺服器收到event的時間

  用(3)減去(2)可以得到時間偏差(忽略了網路延時),在用(1)加上這個時間偏差就得到了事件的真正發生時間。

Types of windows

  • 滾動 tumbling window

  正交的時間塊,一個5分鐘,接下來又一個5分鐘

  • 跳動 hopping window

  相交的時間塊,5分鐘,然後前進1分鐘,有一個5分鐘

  • 滑動 Sliding window

  無固定的邊界,一點點向前滑

Stream Joins

  流處理系統中也是需要一些join操作

  • stream-stream joins

  for example.click-through rate 網頁搜尋、點選事件

  • stream-table joins(stream enrichment)

  a set of user activity events and a database of user profiles

  • Time-dependence of joins

  if events on different streams happen around a similar time, in which order are they processed?

  比如跨境交易,匯率是實時變化的,那麼交易事件與事件發生時間的匯率繫結。解決辦法是 交易事件裡面維護當時的匯率id,但這導致沒法做log compaction

Fault Tolerance

  stream processing system的容錯性,batch processing 可以做到 exactly-once semantics,雖然可能會有失敗-重試。對於流處理

  • Microbatching and checkpointing 做到了可重試
  • Idempotence 保證了可重複執行

The Future of Data Systems

Data Integration

  the most appropriate choice of software tool also depends on the circumstances.

  in complex applications, data is often used in several different ways. There is unlikely to be one piece of software that is suitable for all the different circumstances in which the data is used

複雜的應用可能需要整合多個數據系統,讓單個數據系統各司其職,那麼如何保證多個數據系統資料的一致性:以其中一個數據系統為準(Primary),然後通過CDC或者event sourcing複製到其他資料系統。

Derived data versus distributed transactions

  分散式事務通過互斥鎖決定寫操作順序;而CDC 使用log來保證順序

  分散式事務通過atomic保證只生效一次;而log based依賴於確定性的重試與冪等

The lambda architecture

batch processing is used to reprocess historical data, and stream processing is used to process recent updates, then how do you combine the two?

  結合批處理與流處理

  •   批處理:慢但是準確
  •   流處理:快速但不一定精確

  潛在的問題:

  •   在batch、stream processing framework上維護兩份一樣的邏輯
  •   batch pipeline、stream pipeline輸出不同,導致讀取的時候需要merge
  •   增量batch 需要解決時間視窗、stragglers 問題

  解決辦法如下:Unifying batch and stream processing

Unbundling Databases

Unix and relational databases have approached the information management problem with very different philosophies.

  Unix 提供了是比較底層的對硬體的封裝,thin wrapper; 而relationaldb 對程式設計師提供high-level抽象

Composing Data Storage Technologies

it seems that there are parallels between the features that are built into databases and the derived data systems that people are building with batch and stream processors.

  資料庫的feature(secondary index、view、replication log, full-text search index)與 derived data system有一些類似之處

  以建立新索引為例:

  • 快照,然後處理已有資料
  • 處理在上一步過程中新加入的資料
  • 索引建立完畢,處理後續資料

  這個過程類似於

  • 增加新的secondary(follower)
  • 在流處理系統中增加新的消費者

Observing Derived State

  

  write path:precomputed; eager evaluation

   whenever some piece of information is written to the system, it may go through multiple stages of batch and stream processing, and eventually every derived dataset is updated to incorporate the data that was written

  read path:lazy evaluation

   when serving a user request you read from the derived dataset, perhaps perform some more processing on the results, and construct the response to the user.

  The derived dataset is the place where the write path and the read path meet, as illustrated in Figure 12-1. It represents a trade-off between the amount of work that needs to be done at write time and the mount that needs to be done at read time.

  derived dataset是write path與read path連線的地方,是寫入時處理與讀取時工作量的折中。caches, indexes, and materialized views 都是在write path上多做一些工作,減輕read path負擔

  寫 與 讀的折中;twinter的例子,名人 普通人策略不一樣

Stateful, offline-capable clients

  當前網際網路應用都是client server模式,client無狀態,資料都在server;但single path application或者mobile app在斷網的時候也能使用,提供更好的使用者體驗;而且,web-socket等技術提供了server主動向client推送的能力,這就是的write-path 進一步擴充套件到了客戶端

  大多數的db,lib,framework、protocol都是按照staleless and request/response的思想來設計的,根深蒂固

In order to extend the write path all the way to the end user, we would need to fundamentally rethink the way we build many of these systems: moving away from request response interaction and toward publish/subscribe dataflow

Aiming for Correctness

  build applications that are reliable and correct

  In this section I will suggest some ways of thinking about correctness in the context of dataflow architectures.

The End-to-End Argument for Databases

  即使使用了強事務性,也不能保證資料不會有問題,因為由於程式碼bug、人為錯誤,導致資料的損壞 丟失,immutable and append-only data helps

  考慮一個複雜的問題 exactly-once

  其中一個解決辦法:Idempotent(冪等)。 但需要額外的一些工作,而且需要非常細心的實現

  TCP的seq number也是為了保證excat once, 但這隻對單個TCP連線生效

In many databases, a transaction is tied to a client connection。 If the client suffers a network interruption and connection timeout after sending the COMMIT, but before hearing back from the database server, it does not know whether the transaction has been committed or aborted。

  2pc break the 1:1 mapping between a TCP connection and a transaction,因此 suppress duplicate transactions between the database client and server;但是end-user與 application server之間呢

  終極解決辦法,Unique Operation ID

  

  Besides suppressing duplicate requests, the requests table in Example 12-2 acts as a kind of event log, hinting in the direction of event sourcing

  除了保證點到點的約束,也充當了event log,可以用於event sourcing

Enforcing Constraints

  約束:如unique constraint,賬戶餘額不能為負等

  通過consume log來實現約束:

  Its fundamental principle is that any writes that may conflict are routed to the same partition and processed sequentially

Timeliness and Integrity

  consistency可能包含兩重意義

  及時性(Timeliness ):user讀取到的是實時狀態

  完整性(Integrity):user讀取到的是完整的狀態

violations of timeliness are “eventual consistency,” whereas violations of integrity are “perpetual inconsistency.”

  ACID同時保證及時性與完整性,但基於時間的資料流一般不保證及時性,exactly-once保證完整性

  在資料流系統如何保證完整性?

  (1) 寫操作是一個單一的message,原子性寫入

  (2)derived datasystem從單一訊息確定性地提取狀態

  (3)客戶端生成reqid,在整個流程用整個reqid保證冪等性

  (4)單一訊息不可變,持久化,允許derived datasystem重新處理所有訊息

  儘量避免協調的資料系統 Coordination-avoiding data systems

  (1)資料流系統通過derived data,無需原子性提交、線性、跨分片的同步協調就能保證完整性

  (2)雖然嚴格的unique 約束要求實時性(timeliness)和協調,很多應用可以通過事後補償放鬆約束

Trust, but Verify

  對資料完整性的校驗,防止資料默默出錯silent corruption,多副本不能解決這個問題

  基於事件的系統提供了更好的可審計性, 如記錄A給B轉賬,比記錄A扣錢,B加錢更好

  Checking the integrity of data systems is best done in an end-to-end fashion

Doing the Right Thing

  軟體和資料大大影響了我們生存的世界,對於我們這些工程師,需要承擔起責任:建立一個充滿人文關懷和尊重的世界。