分散式叢集環境下呼叫鏈路追蹤
前言
現代微服務系統中,一套複雜的分散式 Web 系統中,客戶端的一次請求操作,可能需要經過系統中多個模組、多箇中間件、多臺機器的相互協作才能完成,並且這一系列呼叫請求中,有些是序列處理的,有些是併發執行的,那麼如何確定客戶端的一次操作背後呼叫了哪些應用、哪些模組,經過了哪些節點,每個模組的呼叫先後順序是怎樣的,每個模組的效能問題如何?隨著業務系統模型的日趨複雜化,分散式系統中急需一套鏈路追蹤(Trace)系統來解決這些痛點。
分散式服務跟蹤是整個分散式系統中跟蹤一個使用者請求的過程,包括資料採集、資料傳輸、資料儲存、資料分析和資料視覺化,捕獲此類跟蹤讓我們構建使用者互動背後的整個呼叫鏈的檢視,這是除錯和監控微服務的關鍵工具。
Google Dapper 就是這樣需求下的一套應用於大型分散式系統環境下的鏈路追蹤系統。藉助 Dapper 理念,系統開發人員只需將 Trace 元件嵌入到基礎通用庫中,就可以正常執行,而應用層開發者則不需要關心具體 Trace 元件實現、整合方式,達到以應用層透明的方式嵌入各個模組的目的。
Google Dapper 簡介
Google Dapper 這一分散式跟蹤系統,最初只是作為一個自給自足的監控工具起步的,但最終進化成一個監控平臺,並且促生出多種多樣的甚至已經不是由 Dapper 團隊開發的監控工具,說明分散式追蹤系統在當前的分散式環境下是必不可少的工具。
現代網際網路服務通常實現為複雜的大規模系統,比如採用流行的微服務架構模式。應用是由一組服務組合起來的,這些服務可能是由不同的團隊開發的,而且可能採用不同的程式語言。像 Google 這種級別的公司,應用會跨越多個基礎設施的上千臺機器,即便是相對較小的雲端計算用例,推薦的做法也是使用跨地域的 Availability Zone 和 Region 執行服務的多個版本。在這種複雜的系統環境中,分散式追蹤系統能夠輔助我們理解系統的行為、幫助除錯和排查效能問題。
Google 最初開發 Dapper 為了收集更多的複雜分散式系統的行為資訊,然後呈現給 Google 的開發者們。這樣的分散式系統因那些大規模的低端伺服器作為網際網路服務的載體,是一個特殊的經濟划算的平臺。想要在這個上下文中理解分散式系統的行為,就需要監控那些橫跨了不同的應用、不同的伺服器之間的關聯動作。
從眾多的分散式系統應用中得出的經驗,對於分散式追蹤系統需要滿足至少兩點要求:涵蓋面的廣度和持續的監控。涵蓋廣度的重要性不言而喻,因為在使用追蹤系統的進行監控時,如果有一小部分沒被監控到,那麼人們對這個系統是不是值得信任都會產生巨大的質疑。另外,監控應該是 7x24 小時的,畢竟,系統異常或是那些重要的系統行為有可能出現過一次,就很難甚至不太可能重現。那麼,根據這兩個明確的需求,可以推出如下幾個具體的設計目標:
- 低效能損耗 (Low overhead):分散式線上系統對於效能、資源要求都很嚴格,Trace 元件必須對服務影響足夠小。對於一些高度優化過的服務,即使一點點效能損耗也會很容易察覺到,而且有可能迫使線上服務的部署團隊不得不將分散式追蹤系統關停。
- 應用級的透明 (Application-level transparency):應用層開發者不需要對 Trace 元件關心,Trace 嵌入到基礎通用庫中,提供高穩定性,而如果 Trace 需要應用開發者配合,那可能會引入額外的 bug 導致 Trace 系統更加脆弱。面對當下網際網路公司快速迭代的開發環境來說,這一點尤其重要。
- 擴充套件性 (Scalability):Google 的服務線上叢集數量可想而知,一次檢索就可能跨越上百臺甚至成千臺機器,因此這個 Trace Infrastructure 的擴充套件性也很重要。
當然,以上幾點設計目標的背後,還有一個額外的設計目標就是追蹤資料產生之後,能夠快速匯出資料,快速進行資料分析,在最短的時間內定位系統異常,理想情況是資料存入跟蹤倉庫後一分鐘內就能統計出來。儘管跟蹤系統對一小時前的舊資料進行統計也是相當有價值的,但如果跟蹤系統能提供足夠快的資訊反饋,就可以對生產環境下的異常狀況做出快速反應。
另外,要做到真正的應用級別的透明(這應該是當下面臨的最挑戰性的設計目標),我們需要把核心跟蹤程式碼做的很輕巧,然後把它植入到那些無所不在的公共元件中,比如執行緒呼叫、控制流以及 RPC 庫。使用自適應的取樣率可以使跟蹤系統變得可伸縮,並降低效能損耗。結果展示的相關係統也需要包含一些用來收集跟蹤資料的程式碼,用來圖形化的工具,以及用來分析大規模跟蹤資料的庫和 API。雖然單獨使用 Dapper 有時就足夠讓開發人員查明異常的來源,但是 Dapper 的初衷不是要取代所有其他監控的工具。Dapper 的資料往往側重效能方面的調查,其他監控工具也有他們各自的用處。
基於 Google Dapper 的分散式追蹤介紹
當我們進行微服務架構開發時,通常會根據業務或功能劃分成若干個不同的微服務模組,各模組之間通過 REST/RPC 等協議進行呼叫。一個使用者在前端發起某種操作,可能需要很多微服務的協同才能完成,如果在業務呼叫鏈路上任何一個微服務出現問題或者網路超時,將導致功能失效,或出現某些模組在呼叫鏈中耗時突然增大等問題。隨著業務越來越複雜,微服務架構橫向、縱向擴充套件之後,其規模越來越大,對於微服務之間的呼叫鏈路的分析將會越來越複雜。此時你會發現,如果在最初架構設計時,能夠將分散式鏈路追蹤這種需求考慮進來,後期微服務叢集擴容時候,前期所做的工作將會達到事半功倍的效果。
下面先來看看 Google Dapper 論文裡面介紹的一個非常簡單、常見的一個場景:
圖 1. 服務呼叫示例

引用自文章 " ofollow,noindex" target="_blank">Dapper, a Large-Scale Distributed Systems Tracing Infrastructure " 中的 Figure 1
圖 1 中 A~E 分別表示五個服務,使用者發起一次 X 請求到前端系統 A,然後 A 分別傳送 RPC 請求到中間層 B 和 C,B 處理請求後返回,C 還要發起兩個 RPC 請求到後端系統 D 和 E。
以上完整呼叫迴路中,一次請求需要經過多個系統處理完成,並且追蹤系統像是內嵌在 RPC 呼叫鏈上的樹形結構,然而,我們的核心資料模型不只侷限於特定的 RPC 框架,我們還能跟蹤其他行為,例如外界的 HTTP 請求,和外部對 Kafka 伺服器的呼叫等。從形式上看,分散式追蹤模型使用的 Trace 樹形結構來記錄請求之間的關係(父子關係、先後順序等)。
Trace 和 Span
Google Dapper 中關於 Trace 的介紹中主要有以下三個主要元素構成:
- Span:基本工作單元,例如,在一個新建的 Span 中傳送一個 RPC 等同於傳送一個迴應請求給 RPC,Span 通過一個 64 位 ID 唯一標識,Trace 以另一個 64 位 ID 表示。Span 還有其他資料資訊,比如摘要、時間戳事件、關鍵值註釋 (Tags)、Span 的 ID、以及進度 ID (通常是 IP 地址)。Span 在不斷地啟動和停止,同時記錄了時間資訊,當你建立了一個 Span,你必須在未來的某個時刻停止它。
- Trace 樹:一系列 Span 組成的一個樹狀結構,例如,如果你正在跑一個分散式大資料工程,你可能需要建立一個 Trace 樹。
- Annotation(標註):用來及時記錄一個事件的存在,一些核心 Annotation 用來定義一個請求的開始和結束。
分散式追蹤系統要做的就是記錄每次傳送和接受動作的識別符號和時間戳,將一次請求涉及到的所有服務串聯起來,只有這樣才能清楚記錄每次請求的完整呼叫鏈。在分散式追蹤系統中使用 Trace 表示對一次請求完整呼叫鏈的跟蹤,將兩個服務例如上面圖 1 中的服務 A 和服務 B 的請求/響應過程叫做一次 Span。 我們可以看出每一次跟蹤 Trace 都是一個樹型結構,Span 可以體現出服務之間的具體依賴關係。
對於每個 Trace 樹,Trace 都要定義一個全域性唯一的 Trace ID,在這個跟蹤中的所有 Span 都將獲取到這個 Trace ID。 每個 Span 都有一個 Parent Span ID 和它自己的 Span ID。上面圖 1 中 A 服務的 Parent Span ID 為空,Span ID 為 1;然後 B 服務的 Parent Span ID 為 1,Span ID 為 2;C 服務的 Parent Span ID 也為 1,Span ID 為 3,依次類推,如圖 2 所示:
圖 2. Trace 樹

引用自文章 " Dapper, a Large-Scale Distributed Systems Tracing Infrastructure " 中的 Figure 2
追蹤系統中用 Span 來表示一個服務呼叫的開始和結束時間,也就是時間區間。追蹤系統記錄了 Span 的名稱以及每個 Span ID 的 Parent Span ID,如果一個 Span 沒有 Parent Span ID 則被稱為 Root Span,當前節點的 Parent Span ID 即為呼叫鏈路上游的 Span ID,所有的 Span 都掛在一個特定的追蹤上,共用一個 Trace ID。
Span 內部結構
下面看一下 Span 的內部結構,Span 除了記錄 Parent Span ID 和自身的 Span ID 外,還會記錄自己請求其他服務的時間和響應時間。但是每個伺服器的時間可能不是完全相同,為了解決這個問題需要約定一個前提,即 RPC 客戶端必須發出請求後,服務端才能收到,如果服務端的時間戳比客戶端發出請求的時間戳還靠前,那麼就按請求時間來算,響應時間也是如此。
任何一個 Span 可以包含來自不同 Server 的資訊,這些也要記錄下來。事實上,每一個 RPC Span 可能包含客戶端和伺服器兩個過程的 Annotation 內容。由於客戶端和伺服器上的時間戳來自不同的主機,我們必須考慮到 Server 之間的時間偏差。在鏈路追蹤分析中,每一個請求,總是一方先發送一個請求之後,然後另外一方才接收到;這樣一來,每個呼叫請求就有一個時間戳的上限和下限。
圖 3. Span 的細節圖

引用自文章 " Dapper, a Large-Scale Distributed Systems Tracing Infrastructure " 中的 Figure 3
從圖 3 可以可以很清楚的看出,這是一次 Span 名為 Hello.Call
的呼叫,Span ID 是 5,Parent Span ID 是 3,Trace ID 是 100。 我們重點看一下 Span 對應的四個狀態:
- Client Send(CS):客戶端傳送時間,客戶端發起一個請求,這個 Annotation 描述了這個 Span 的開始。
- Server Received(SR):服務端接收時間,服務端獲得請求並準備開始處理它,如果將其 SR 減去 CS 時間戳便可得到網路延遲。
- Server Send(SS):服務端傳送時間,Annotation 表明請求處理的完成(當請求返回客戶端),如果 SS 減去 SR 時間戳便可得到服務端需要的處理請求時間。
- Client Received(CR):客戶端接收時間,表明 Span 的結束,客戶端成功接收到服務端的回覆,如果 CR 減去 CS 時間戳便可得到客戶端從服務端獲取回覆的所有所需時間。
通過收集這四個時間戳,就可以在一次請求完成後計算出整個 Trace 的執行耗時和網路耗時,以及 Trace 中每個 Span 過程的執行耗時和網路耗時:
- 服務呼叫耗時 = CR – CS
- 服務處理耗時 = SS – SR
- 網路耗時 = 服務呼叫耗時 – 服務處理耗時
生成 Span
我們已經初步瞭解了 Span 的組成,那麼怎麼生成 Span 呢?Google Dapper 中使用到的是基於標註 (Annotation-based) 的監控方案。此方案會有程式碼侵入,所以應儘可能少改動程式碼。
基於標註的方式就是根據請求中的 Trace ID 來獲取 Trace 這個例項,各種程式語言有各自的方式。獲取到 Trace 例項後就可以呼叫 Recorder 來記錄 Span 了,記錄值先直接以日誌的形式存在本地,然後跟蹤系統會啟動一個 Collector Daemon 來收集日誌,然後整理日誌寫入資料庫。解析的日誌結果建議放在 BigTable (Cassandra 或者 HDFS) 這類稀疏表的資料庫裡。因為每個 Trace 攜帶的 Span 可能不一樣,最終的記錄是每一行代表一個 Trace,這一行的每一列代表一個 Span,如圖 4 所示:
圖 4. Trace 資料儲存

引用自文章 " Dapper, a Large-Scale Distributed Systems Tracing Infrastructure " 中的 Figure 5
對於減少程式碼的侵入性,論文建議將核心跟蹤程式碼做的很輕巧,然後把它植入公共元件中,比如執行緒呼叫、控制流以及 RPC 庫。
取樣率
分散式跟蹤系統的實現要求效能低損耗,尤其在生產環境中不能影響到核心業務的效能,也不可能每次請求都跟蹤,所以要進行取樣,每個應用和服務可以自己設定取樣率。取樣率應該是在每個應用自己的配置裡設定的,這樣每個應用可以動態調整,特別是剛應用剛上線時可以適當調高取樣率。
一般在系統峰值流量很大的情況下,只需要取樣其中很小一部分請求,例如 1/1000 的取樣率,即分散式跟蹤系統只會在 1000 次請求中取樣其中的某一次。
Trace 收集
分散式跟蹤系統的追蹤記錄和收集管道的過程分為三個階段:
- Span 資料寫入本地日誌檔案中。
- Dapper 的守護程序和收集元件把這些資料從生產環境的主機中拉出來。
- 最終寫到 Dapper 的 Bigtable 倉庫中。一次跟蹤被設計成 Bigtable 中的一行,每一列相當於一個 Span。
Bigtable 的支援稀疏表格佈局正適合這種情況,因為每一次追蹤請求可能產生多個 Span。寫入資料後的 Bigtable,每一條記錄集表示一條 Trace 記錄。這些監控資料的彙總是單獨進行的,而不是伴隨系統對使用者的應答一起返回的。如此選擇主要有如下的兩個原因:
- 首先,一個內建的彙總方案 (監控資料隨 RPC 應答頭返回)會影響網路動態。一般來說,RPC 應答資料規模比較小,通常不超過 10KB。而區間資料往往非常龐大,如果將二者放在一起傳輸,會使這些 RPC 應答資料相對"矮化"進而影響後期的分析。
- 另一方面,內建的彙總方案需要保證所有的 RPC 都是完全巢狀的,但有許多的中介軟體系統在其所有的後臺返回最終結果之前就對呼叫者返回結果,這樣有些監控資訊就無法被收集。
基於這兩個考慮,最終選擇將監控資料和應答資訊分開傳輸。
Java 下 Trace 介紹以及實踐
工具介紹
分散式追蹤系統一般主要由兩方面構成:即基於 Annotation 的標註和追蹤系統框架。對應 Java 中主要有下面兩個工具:
- Brave:用來裝備 Java 程式的類庫,提供了面向 Standard Servlet、Spring MVC、HTTP Client、JAX RS、Jersey、Resteasy 和 SQL/">MySQL 等介面的裝備能力,可以通過編寫簡單的配置和程式碼,讓基於這些框架構建的應用可以向 Zipkin 報告資料。同時 Brave 也提供了非常簡單且標準化的介面,在以上封裝無法滿足要求的時候可以方便擴充套件與定製。
- Zipkin:是一款開源的分散式實時資料追蹤系統(Distributed Tracking System),基於 Google Dapper 的論文設計而來,由 Twitter 公司開發貢獻。其主要功能是聚集來自各個異構系統的實時監控資料,用來追蹤微服務架構下的系統延時問題。
兩種工具在追蹤系統中的用途如下:
圖 5. 分散式追蹤系統圖

由上圖可見,一次請求過程中,請求先通過 HTTP 到達前端,然後通過 HTTP 呼叫後臺系統,同時後臺系統通過 RPC 呼叫 Greeting 模組,每一個 Span 對應的 Annotation 資訊都會實時的回傳至 Zipkin Server,通過這樣的鏈路可以準確的分析出網路、系統性能等問題。
工具選擇
鑑於目前微服務框架大多都選擇 Spring Cloud 來實現,本文主要使用 Spring Cloud Sleuth 和 Zipkin 來進行介紹:
- Sleuth:是 Spring Cloud 在分散式系統中提供鏈路追蹤解決方案。
- Zipkin:是一套基於 Google Dapper 的分散式鏈路呼叫的監控系統。
搭建 Zipkin Server
- 新建一個基於 Spring Boot 的 Gradle 專案,專案名稱為:zipkin-server。
- 專案 build.gradle 配置如下:
清單 1. zipkin-server Gradle 指令碼
repositories { } dependencies { compile('org.springframework.boot:spring-boot-starter') compile('org.springframework.boot:spring-boot-starter-jdbc') compile('io.zipkin.java:zipkin-server') runtime('io.zipkin.java:zipkin-autoconfigure-ui') compile('io.zipkin.java:zipkin-autoconfigure-storage-mysql') compile('mysql:mysql-connector-java') }
- 專案 application.yml 配置如下:
清單 2. application.yml 配置
server: port: 9411 docs: service: name: docs-zipkin # Spring Profiles spring: application: name: docs-zipkin-server # Http Encoding http: encoding.charset: UTF-8 encoding.enable: true encoding.force: true datasource: schema: classpath:/mysql.sql url: jdbc:mysql://127.0.0.1:3306/zipkin username: root password: password # Switch this on to create the schema on startup: initialize: true continueOnError: true zipkin: storage: type: mysql spring.profiles.active: dev
- Spring Boot 啟動類參照專案程式碼: ZipkinApplication.java 。
- zipkin-server 配置完成,將其啟動之後,訪問 http://localhost:9411 將看到如下 Zipkin 管理介面:
圖 6. Zipkin 管理介面圖

建立 demoe-web 工程
- 新建一個基於 Spring Boot Web 的 Gradle 專案,專案名稱為:demo-web。
- 專案 build.gradle 配置如下:
清單 3. demo-web Gradle 指令碼
repositories { } dependencies { compile project(':demo-model') compile project(':demo-util') compile('org.springframework.boot:spring-boot-starter-web') compile('org.springframework.cloud:spring-cloud-starter-sleuth') compile('org.springframework.cloud:spring-cloud-sleuth-zipkin') }
- 專案 application.yml 配置如下,由於是 demo 環境,所以取樣率設定為 100%。
清單 4. application.yml 配置
spring: application: name: demo-web profiles: active: dev sleuth: sampler: percentage: 1 #Percentage of logs export to zipkin server zipkin: # For enabling Zipkin Client for this Microservice enabled: true # Server Url baseUrl: http://localhost:9411 # Server Information server: port: 8080
- Spring Boot 啟動類如清單 5 中所示,通過配置
HttpResponseInjectingTraceFilter
、CustomHttpServletResponseSpanInjector
類(具體參考 專案程式碼 ),將 Trace ID 和 Web 模組的 Span ID 儲存在 header 中返回給使用者端,通過這種方式也可以顯示在 Web 介面。清單 5. demo-web 啟動類
@SpringBootApplication(scanBasePackages = { "com.ibm.demo" }) public class DemoWebApplication { public static void main(String[] args) { SpringApplication.run(DemoWebApplication.class, args); } @Bean public RestTemplate restTemplate() { return new RestTemplate(); } @Bean public SpanInjector<HttpServletResponse> customHttpServletResponseSpanInjector() { return new CustomHttpServletResponseSpanInjector(); } @Bean public HttpResponseInjectingTraceFilter responseInjectingTraceFilter(Tracer tracer) { return new HttpResponseInjectingTraceFilter(tracer, customHttpServletResponseSpanInjector()); } @Bean public Sampler defaultSampler() { return new AlwaysSampler(); } }
- Web Controller 類中包含以下 4 個介面:
- /demo/web/:demo-web 直接返回。
- /demo/web/call:demo-web 通過 HTTP 呼叫 demo-service 介面。
- /demo/web/callkafka:demo-web 通過 Kafka 發訊息給 demo-service。
- /demo/web/callredis:demo-web 通過 Redis 發訊息給 demo-service。
Node.js 下 Trace 介紹以及實踐
Node.js 追蹤是通過與 Zipkin 整合來實現的,具體是通過 Express 與 Zipkin 整合來完成,兩者的整合非常簡單,並且入侵性也小。
整合介紹
- Express Server:Express Server 與 Zipkin 的整合,即 Server 接收到請求時把自身的呼叫資訊傳送到 Zipkin。
- Rest Client:服務 A 呼叫服務 B 時,服務 A 把呼叫鏈資訊傳送到 Zipkin。
- Tracer:根據 context 把記錄的 Span 資訊通過 Recorder 傳送到 Zipkin。
- Recorder:傳送資訊到 Zipkin,支援兩種方式,一種是直接傳送到 Zipkin Server,還有一種是控制檯輸出,控制檯輸出的方式可以用來除錯,本 demo 中我們選擇第一種方式。
建立 demo-front 工程
限於篇幅問題,將涉及 Node.js 的 demo-frontend、demo-backend 都放在 demo-front 工程中。
- 新建一個基於 Node.js 的專案,專案名稱為:demo-front。
- 專案 package.json 配置如下:
清單 6. package.json 配置
{ "name": "demo-front", "version": "0.0.1", "description": "Example project that shows how to use zipkin with javascript", "repository": "https://github.com/openzipkin/zipkin-js", "scripts": { "lint": "eslint .", "start": "node servers.js", "browserify": "browserify browser.js -o bundle.js" }, "dependencies": { "browser-process-hrtime": "^0.1.2", "express": "^4.14.0", "rest": "^1.3.2", "zipkin": "^0.11.1", "zipkin-context-cls": "^0.11.0", "zipkin-instrumentation-cujojs-rest": "^0.11.1", "zipkin-instrumentation-express": "^0.11.1", "zipkin-instrumentation-fetch": "^0.11.1", "zipkin-transport-http": "^0.11.1" }, "devDependencies": { "browserify": "^14.1.0", "eslint": "^3.4.0", "eslint-config-airbnb": "^14.1.0", "eslint-plugin-import": "^2.2.0", "eslint-plugin-jsx-a11y": "^4.0.0", "eslint-plugin-react": "^6.2.0" } }
- 建立 frontend.js,後續將使用 frontend.js 作為最前端呼叫 demo-web,完成 Node.js 端呼叫 Java Web 模組的 Trace 模擬,如清單 7 所示,完整程式碼參照專案: frontend.js 。
清單 7. frontend.js 程式碼
app.get('/redis', (req, res) => { var url_parts = url.parse(req.url, true); var query = url_parts.query; res.header('X-B3-TraceId', tracer.id.traceId); res.header('X-B3-SpanId', tracer.id.spanId); tracer.local('docs-front', () => zipkinRest(`http://localhost:8080/docs/web/callredis?msg=${query.msg}`) .then(response => res.send(response.entity)) .catch(err => console.error('Error', err.stack)) ); console.log(`tracerId : ${tracer.id.traceId}, spanId : ${tracer.id.spanId}`); }); app.get('/kafka', (req, res) => { var url_parts = url.parse(req.url, true); var query = url_parts.query; res.header('X-B3-TraceId', tracer.id.traceId); res.header('X-B3-SpanId', tracer.id.spanId); tracer.local('docs-front', () => zipkinRest(`http://localhost:8080/docs/web/callkafka?msg=${query.msg}`) .then(response => res.send(response.entity)) .catch(err => console.error('Error', err.stack)) ); console.log(`tracerId : ${tracer.id.traceId}, spanId : ${tracer.id.spanId}`); });
- 建立 backend.js,如清單 8 所示。後續將使用 backend.js 作為最後端,完成 Node.js 端呼叫 Java 模組,然後 Java 模組呼叫 Node.js 模組的一套完整的 Trace 模擬。
清單 8. backend.js 程式碼
app.get('/api', (req, res) => { var url_parts = url.parse(req.url, true); var query = url_parts.query; console.log(`docs-backend msg : ${query.msg}`); console.log(`docs-backend tracerId : ${tracer.id.traceId}, spanId : ${tracer.id.spanId}`); res.send(new Date().toString()) });
中介軟體中 Trace 介紹以及實踐
本文 demo 中所談到的中介軟體是基於 Java 語言環境下,與 Kafka、Redis 中介軟體的整合,Spring Cloud 中已經包含與 Kafka 的整合工具:spring-cloud-starter-stream-kafka,只需在 dependency 中引用加上配置即可,與 Redis 的整合,由於版本還處於 1.0 並且已經被 deprecated 了,所以只能按照 Stream binder 實現方式自行實現。
整合介紹
- spring-cloud-starter-stream-kafka:Spring Cloud 中與 Kafka 整合的工具包,即通過整合之後能夠將 Trace 資訊通過 Kafka Message 傳遞到下游系統。
- Redis:與 Redis 整合能夠傳遞 Trace 資訊,在上游系統傳送 Message 到 Redis 之前在 Message 基礎上將 Trace 資訊封包進去,通過 Redis 傳遞到下游系統,下游系統收到訊息之後再將封包訊息進行解包處理,同時將解包出來的 Trace 資訊註冊到系統中,解包出來的 Message 資訊交由下游系統處理,並且需要做到封包處理和解包處理對於 Redis 上、下游模組都是透明的。
- Pub/Sub:與 Redis 整合 Pub/Sub。
Redis 整合思路
- 在上游系統在接收包使用者訊息之後,將 Trace 資訊封裝包 Message 中,然後再發送給 Redis。
- Message 儲存到 Redis。
- 下游系統收到訊息之後,對訊息進行解包處理,同時將解包出來的 Trace 資訊註冊到當前系統中,解包出來的 Message 資訊交由後續系統處理。
以上方式是通過 Pub/Sub 方式實現,並且需要保持對 Pub/Sub 兩端系統都透明;不過本 demo 中 Redis 訊息整合僅限於 JSON 訊息格式,如清單 9 所示:
清單 9. Message 封包示例
{ contentType = application / json, header = { spanParentSpanId = 1665384374984194801, spanTraceId = 4676089090076077054, spanId = -4549244921492974775, spanProcessId = null, spanName = publish }, message = { "id": "24506", "name": "abef" } }
建立 demoe-service 工程
- 新建一個基於 Spring Boot Web 的 Gradle 專案,專案名稱為:demo-service。
- 專案 build.gradle 配置如下:
清單 10. demo-service Gradle 指令碼
repositories { } dependencies { compile project(':demo-model') compile project(':demo-util') compile('org.springframework.boot:spring-boot-starter-aop') compile('org.springframework.cloud:spring-cloud-starter-sleuth') compile('org.springframework.cloud:spring-cloud-sleuth-stream') compile('org.springframework.cloud:spring-cloud-starter-stream-kafka') compile('org.springframework.boot:spring-boot-starter-data-redis') compile('org.springframework.kafka:spring-kafka') compile('org.springframework.boot:spring-boot-starter-web') compile('org.springframework.cloud:spring-cloud-sleuth-zipkin') }
- 配置 demo-service controller 類(如清單 11 所示),主要處理 demo-web 傳送過來的 HTTP 請求,併發送訊息到 Kafka、Redis 中介軟體。
清單 11. demo-service controller 類
@RequestMapping(value = "/testkafka", method = RequestMethod.GET) public String testkafka(@RequestParam(value = "msg") String msg) throws JsonProcessingException { logger.info("Hello Docs Service testkafka Test!"); KafkaModel kafkaModel = new KafkaModel(); kafkaModel.setId(String.valueOf(Math.round(Math.random() * 99999))); kafkaModel.setName(msg); ObjectMapper jsonMapper = new ObjectMapper(); logger.info("Putting message to kafka topic name : sampletopic"); MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(jsonMapper.writeValueAsString(kafkaModel)); source.output().send(messageBuilder.build()); return "Message successfully pushed to kafka"; } @RequestMapping(value = "/testredis", method = RequestMethod.GET) public String testredis(@RequestParam(value = "msg") String msg) throws JsonProcessingException { logger.info("Hello Demo Service testredis Test!"); KafkaModel kafkaModel = new KafkaModel(); kafkaModel.setId(String.valueOf(Math.round(Math.random() * 99999))); kafkaModel.setName(msg); ObjectMapper jsonMapper = new ObjectMapper(); Object message = jsonMapper.writeValueAsString(kafkaModel); customerInfoPublisher.publish(message); return "Message successfully pushed to redis"; }
- demoe-service Redis 訊息封包處理 (如清單 12 所示),基於 Aspect 實現,以此來實現對傳送訊息進行封包,同時儘可能實現透明注入。
清單 12. demo-service Message 封包處理
@Pointcut("execution(* publish(..)) & & target(com.ibm.demo.service.redis.pubsub.CustomerInfoPublisher)") public void messageListener() { // This func is intentionally empty. Nothing special is needed here. } @Around("messageListener()") public void interceptMessage(ProceedingJoinPoint joinPoint) throws Throwable { Object[] joinPointArgs = joinPoint.getArgs(); Map<String, Object> messageObj = new HashMap<>(); createSpanIntoTrace(joinPoint); Map<String, String> spanObj = SleuthHelper.toMap(tracer.getCurrentSpan()); messageObj.put("contentType", "application/json"); messageObj.put("header", spanObj); RedisMessage redisMessage = new RedisMessage("application/json", spanObj, joinPointArgs[0]); ObjectMapper jsonMapper = new ObjectMapper(); Object mesgObj = jsonMapper.writeValueAsString(redisMessage); Object[] message = new Object[] { mesgObj }; joinPoint.proceed(message); logger.info("Putting message into redis with message {}", message); afterSendCompletion(mesgObj, joinPoint.getSignature().getName(), null); } private void createSpanIntoTrace(ProceedingJoinPoint joinPoint) { Span span = startSpan(this.tracer.getCurrentSpan(), joinPoint.getSignature().getName()); SpanBuilder spanBuilder = Span.builder().from(span); Span currentSpan = spanBuilder.remote(false).build(); this.tracer.continueSpan(currentSpan); } public void afterSendCompletion(Object object, String name, Exception ex) { ObjectMapper objectMapper = new ObjectMapper(); try { RedisMessage redisMessage = objectMapper.readValue((String) object.toString(), RedisMessage.class); Map<String, String> spanMap = redisMessage.getHeader(); Span currentSpan = this.tracer.isTracing() ? this.tracer.getCurrentSpan() : SleuthHelper.fromMap(spanMap); if (logger.isDebugEnabled()) { logger.debug("Completed sending and current span is " + currentSpan); } if (containsServerReceived(currentSpan)) { if (logger.isDebugEnabled()) { logger.debug("Marking span with server send"); } currentSpan.logEvent(Span.SERVER_SEND); } else if (currentSpan != null) { if (logger.isDebugEnabled()) { logger.debug("Marking span with client received"); } currentSpan.logEvent(Span.CLIENT_RECV); } addErrorTag(ex); if (logger.isDebugEnabled()) { logger.debug("Closing messaging span " + currentSpan); } this.tracer.close(currentSpan); if (logger.isDebugEnabled()) { logger.debug("Messaging span " + currentSpan + " successfully closed"); } } catch (IOException e) { logger.error("Parse Messaging span {} with exception {}", this.tracer.getCurrentSpan(), e); } } private boolean containsServerReceived(Span span) { if (span == null) { return false; } for (Log log : span.logs()) { if (Span.SERVER_RECV.equals(log.getEvent())) { return true; } } return false; } private void addErrorTag(Exception ex) { if (ex != null) { this.errorParser.parseErrorTags(this.tracer.getCurrentSpan(), ex); } } private Span startSpan(Span span, String name) { return startSpan(span, name, null); } private Span startSpan(Span span, String name, RedisMessage message) { if (span != null) { return this.tracer.createSpan(name, span); } if (message != null & & Span.SPAN_NOT_SAMPLED.equals(message.getHeader().get(TraceMessageHeaders.SAMPLED_NA ME))) { return this.tracer.createSpan(name, NeverSampler.INSTANCE); } return this.tracer.createSpan(name); }
建立 demoe-message 工程
- 新建一個基於 Spring Boot Web 的 Gradle 專案,專案名稱為:demo-message。
- 專案 build.gradle 配置如下:
清單 13. demo-message Gradle 指令碼
repositories { } dependencies { compile project(':demo-model') compile project(':demo-util') compile('org.springframework.boot:spring-boot-starter-aop') compile('org.springframework.cloud:spring-cloud-starter-sleuth') compile('org.springframework.cloud:spring-cloud-sleuth-stream') compile('org.springframework.cloud:spring-cloud-starter-stream-kafka') compile('org.springframework.boot:spring-boot-starter-data-redis') compile('org.springframework.kafka:spring-kafka') compile('org.springframework.boot:spring-boot-starter-web') compile('org.springframework.cloud:spring-cloud-sleuth-zipkin') }
- demoe-message Redis 訊息解包處理, (如清單 14 所示),基於 Aspect 實現,以此來實現對接受到的訊息進行解包處理,同時保證程式碼透明注入,請在 專案 中檢視完整原始碼。
清單 14. demo-message Message 封包處理
@Pointcut("execution(* onMessage(..)) & & target(org.springframework.data.redis.connection.MessageListener)") public void messageListener() { // This func is intentionally empty. Nothing special is needed here. } @Around("messageListener()") public void interceptMessage(ProceedingJoinPoint joinPoint) throws Throwable { try { Object[] joinPointArgs = joinPoint.getArgs(); Object[] result = new Object[joinPointArgs.length]; for (int i = 0; i < joinPointArgs.length; i++) { Object object = joinPointArgs[i]; ObjectMapper objectMapper = new ObjectMapper(); if (object instanceof Message && jsonParse(object)) { Message message = (Message) object; RedisMessage redisMessage = objectMapper.readValue((String) object.toString(), RedisMessage.class); logger.debug("Received >> redisMessage {} ", redisMessage); Map<String, String> spanMap = redisMessage.getHeader(); String traceIdString = Span.idToHex(Long.parseLong(spanMap.get(TraceMessageHeaders.TRACE_ID_NAME))); String spanId = Span.idToHex(Long.parseLong(spanMap.get(TraceMessageHeaders.SPAN_ID_NAME))); Span parentSpan = SleuthHelper.fromMap(spanMap); logger.debug("Received >> trace {} span {}", traceIdString, spanId); if (sleuthProperties != null && sleuthProperties.isSupportsJoin()) { SleuthHelper.joinSpan(tracer, spanMap); logger.debug("Join trace span {}", spanMap); } else { SleuthHelper.continueSpan(tracer, spanMap); logger.debug("Continue trace span {}", spanMap); } slf4jSpanLogger.logStartedSpan(parentSpan, tracer.getCurrentSpan()); logger.debug("Received >> trace {} span {}", Span.idToHex(tracer.getCurrentSpan().getTraceId()), Span.idToHex(tracer.getCurrentSpan().getSpanId())); byte[] channel = message.getChannel(); byte[] body = SerializationUtils.serialize(redisMessage.getMessage()); logger.debug("Received >> message {}", SerializationUtils.deserialize(body)); result[i] = new RedisDefaultMessage(channel, body); } else { result[i] = object; } } SpanBuilder spanBuilder = Span.builder().from(this.tracer.getCurrentSpan()); Span currentSpan = spanBuilder.parents(this.tracer.getCurrentSpan().getParents()).name(joinPoint.getSi gnature().getName()).remote(false).build(); currentSpan.logEvent("sr"); this.tracer.close(currentSpan); this.tracer.continueSpan(currentSpan); joinPoint.proceed(result); } catch (JsonParseException e) { logger.error("Join trace span with JsonParseException {} ", e); } catch (JsonMappingException e) { logger.error("Join trace span with JsonMappingException {} ", e); } catch (IOException e) { logger.error("Join trace span with IOException {}", e); } } private boolean jsonParse(Object object) { boolean falg = true; ObjectMapper objectMapper = new ObjectMapper(); try { objectMapper.readValue((String) object.toString(), RedisMessage.class); falg = true; } catch (Exception e) { falg = false; } return falg; }
完整 Trace 鏈路實踐
鏈路圖
前文介紹和模擬了從 Node.js 前端呼叫 Java API,Java API 再呼叫 Java Service API 從而傳送 Kafka、Redis 訊息到 Java Message 模組,Java Message 模組收到訊息之後再呼叫最後端的 Node.js 系統,從而完成一套完整的 Node.js -> Java -> Kafka/Redis -> Java -> Node.js 的呼叫鏈路模擬。具體模擬見下圖:
圖 7. 完整鏈路模擬示例圖

模組流圖
下圖為各個模組之間呼叫流程圖:
圖 8. 模組鏈路流圖

Zipkin 資訊圖
下圖為 Zipkin Server 中查到該次請求中對應各個模組之間呼叫時序圖,可以清晰知道各個模組耗時、網路耗時等資訊:
圖 9. 模組鏈路流圖

總結
對於分散式叢集環境,特別是當前微服務廣泛應用,對於每個請求,全鏈路呼叫的跟蹤就變得越來越重要,通過實現對請求呼叫的跟蹤可以幫助我們快速發現錯誤根源以及監控分析每條請求鏈路上的效能瓶頸等。以上只是個人在專案中的一點思考,希望與您交流心得,一同學習進步。
參考資源
- 參考 spring-cloud-sleuth ,檢視更多有關 Spring Cloud Sleuth 的最新資訊。
- 參考 spring-cloud-stream ,瞭解更多 Spring Cloud Stream 的相關內容。
- 參考 Zipkin ,瞭解更多 Zipkin 的相關內容。
- 檢視文章 " Dapper, a Large-Scale Distributed Systems Tracing Infrastructure ",瞭解更多 Google Dapper 的相關內容。