本系列是 我TM人傻了 系列第五期[捂臉],往期精彩回顧:

本篇文章涉及底層設計以及原理,以及問題定位和可能的問題點,非常深入,篇幅較長,所以拆分成上中下三篇:

  • :問題簡單描述以及 Spring Cloud Gateway 基本結構和流程以及底層原理
  • :Spring Cloud Sleuth 如何在 Spring Cloud Gateway 加入的鏈路追蹤以及為何會出現這個問題
  • :現有 Spring Cloud Sleuth 的非侵入設計帶來的效能問題,其他可能的問題點,以及如何解決

Spring Cloud Gateway 其他的可能丟失鏈路資訊的點

經過前面的分析,我們可以看出,不止這裡,還有其他地方會導致 Spring Cloud Sleuth 的鏈路追蹤資訊消失,這裡舉幾個大家常見的例子:

1.在 GatewayFilter 中指定了非同步執行某些任務,由於執行緒切換了,並且這時候可能 Span 已經結束了,所以沒有鏈路資訊,例如

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).publishOn(Schedulers.parallel()).doOnSuccess(o -> {
//這裡就沒有鏈路資訊了
log.info("success");
});
}

2.將 GatewayFilter 中繼續鏈路的 chain.filter(exchange) 放到了非同步任務中執行,上面的 AdaptCachedBodyGlobalFilter 就屬於這種情況,這樣會導致之後的 GatewayFilter 都沒有鏈路資訊,例如:

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return Mono.delay(Duration.ofSeconds(1)).then(chain.filter(exchange));
}

Java 併發程式設計模型與 Project Reactor 程式設計模型的衝突思考

Java 中的很多框架,都用到了 ThreadLocal,或者通過 Thread 來標識唯一性。例如:

  • 日誌框架中的 MDC,一般都是 ThreadLocal 實現。
  • 所有的鎖、基於 AQS 的資料結構,都是通過 Thread 的屬性來唯一標識誰獲取到了鎖的。
  • 分散式鎖等資料結構,也是通過 Thread 的屬性來唯一標識誰獲取到了鎖的,例如 Redisson 中分散式 Redis 鎖的實現。

但是放到 Project Reactor 程式設計模型,這就顯得格格不入了,因為 Project Reactor 非同步響應式程式設計就是不固定執行緒,沒法保證提交任務和回撥能在同一個執行緒,所以 ThreadLocal 的語義在這裡很難成立。Project Reactor 雖然提供了對標 ThreadLocal 的 Context,但是主流框架還沒有相容這個 Context,所以給 Spring Cloud Sleuth 粘合這些鏈路追蹤帶來了很大困難,因為 MDC 是一個 ThreadLocal 的 Map 實現,而不是基於 Context 的 Map。這就需要 Spring Cloud Sleuth 在訂閱一開始,就需要將鏈路資訊放入 MDC,同時還需要保證執行時不切換執行緒。

執行不切換執行緒,這樣其實限制了 Project Reactor 的靈活排程,是有一些效能損失的。我們其實想盡量就算加入了鏈路追蹤資訊,也不用強制執行不切換執行緒。但是 Spring Cloud Sleuth 是非侵入式設計,很難實現這一點。但是對於我們自己業務的使用,我們可以定製一些程式設計規範,來保證大家寫的程式碼不丟失鏈路資訊

改進我們的程式設計規範

首先,我們自定義 Mono 和 Flux 的工廠

公共 Subscriber 封裝,將 reactor Subscriber 的所有關鍵介面,都檢查當前上下文是否有鏈路資訊,即 Span,如果沒有就包裹上,如果有則直接執行即可。

public class TracedCoreSubscriber<T> implements Subscriber<T>{
private final Subscriber<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span; TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
} @Override
public void onSubscribe(Subscription s) {
executeWithinScope(() -> {
delegate.onSubscribe(s);
});
} @Override
public void onError(Throwable t) {
executeWithinScope(() -> {
delegate.onError(t);
});
} @Override
public void onComplete() {
executeWithinScope(() -> {
delegate.onComplete();
});
} @Override
public void onNext(T o) {
executeWithinScope(() -> {
delegate.onNext(o);
});
} private void executeWithinScope(Runnable runnable) {
//如果當前沒有鏈路資訊,強制包裹
if (tracer.currentSpan() == null) {
try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {
runnable.run();
}
} else {
//如果當前已有鏈路資訊,則直接執行
runnable.run();
}
}
}

之後分別定義所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其實就是在 subscribe 的時候,用 TracedCoreSubscriber 包裝傳入的 CoreSubscriber:

public class TracedFlux<T> extends Flux<T> {
private final Flux<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span; TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
} @Override
public void subscribe(CoreSubscriber<? super T> actual) {
delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
} public class TracedMono<T> extends Mono<T> {
private final Mono<T> delegate;
private final Tracer tracer;
private final CurrentTraceContext currentTraceContext;
private final Span span; TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {
this.delegate = delegate;
this.tracer = tracer;
this.currentTraceContext = currentTraceContext;
this.span = span;
} @Override
public void subscribe(CoreSubscriber<? super T> actual) {
delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));
}
}

定義工廠類,使用請求 ServerWebExchange 和原始 Flux 建立 TracedFlux,以及使用請求 ServerWebExchange 和原始 Mono 建立 TracedMono,並且 Span 是通過 Attributes 獲取的,根據前文的原始碼分析我們知道,這個 Attribute 是通過 TraceWebFilter 放入 Attributes 的。由於我們只在 GatewayFilter 中使用,一定在 TraceWebFilter 之後 所以這個 Attribute 一定存在。

@Component
public class TracedPublisherFactory {
protected static final String TRACE_REQUEST_ATTR = Span.class.getName(); @Autowired
private Tracer tracer;
@Autowired
private CurrentTraceContext currentTraceContext; public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {
return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
} public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {
return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));
}
}

然後,我們規定:1. 所有的 GatewayFilter,需要繼承我們自定義的抽象類,這個抽象類僅僅是把 filter 的結果用 TracedPublisherFactory 的 getTracedMono 給封裝了一層 TracedMono,以 GlobalFilter 為例子:

public abstract class AbstractTracedFilter implements GlobalFilter {
@Autowired
protected TracedPublisherFactory tracedPublisherFactory; @Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return tracedPublisherFactory.getTracedMono(traced(exchange, chain), exchange);
} protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);
}

2. GatewayFilter 中新生成的 Flux 或者 Mono,統一使用 TracedPublisherFactory 再封裝一層

3. 對於 AdaptCachedBodyGlobalFilter 讀取 Request Body 導致的鏈路丟失,我向社群提了一個 Pull Request: fix #2004 Span is not terminated properly in Spring Cloud Gateway,大家可以參考。也可以在這個 Filter 之前自己將 Request Body 使用 TracedPublisherFactory 進行封裝解決。

微信搜尋“我的程式設計喵”關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer