1. 程式人生 > >Java分散式跟蹤系統Zipkin(七):Zipkin原始碼分析-Zipkin的原始碼結構

Java分散式跟蹤系統Zipkin(七):Zipkin原始碼分析-Zipkin的原始碼結構

前面花了大量篇幅來介紹Brave的使用,一直把Zipkin當黑盒在使用,現在來逐漸撥開Zipkin的神祕面紗。
Zipkin的原始碼地址為:https://github.com/openzipkin/zipkin

Zipkin的原始碼結構
Zipkin的原始碼結構
- zipkin - 對應的是zipkin v1
- zipkin2 - 對應的是zipkin v2
- zipkin-server - 是zipkin的web工程目錄,zipkin.server.ZipkinServer是啟動類
- zipkin-ui - zipkin ui工程目錄,zipkin的設計師前後端分離的,zipkin-server提供資料查詢介面,zipkin-ui做資料展現。
- zipkin-autoconfigure - 是為springboot提供的自動配置相關的類
collector-kafka
collector-kafka10
collector-rabbitmq
collector-scribe
metrics-prometheus
storage-cassandra
storage-cassandra3
storage-elasticsearch-aws
storage-elasticsearch-http
storage-mysql
ui

  • zipkin-collector - 是zipkin比較重要的模組,收集trace資訊,支援從kafka和rabbitmq,以及scribe中收集,這個模組是可選的,因為zipkin預設使用http協議提供給客戶端來收集
    kafka
    kafka10
    rabbitmq
    scribe

  • zipkin-storage - 也是zipkin比較重要的模組,用於儲存收集的trace資訊,預設是使用內建的InMemoryStorage,即儲存在記憶體中,重啟就會丟失。我們可以根據我們實際的需要更換儲存方式,將trace儲存在mysql,elasticsearch,cassandra中。
    cassandra
    elasticsearch
    elasticsearch-http
    mysql
    zipkin2_cassandra

ZipkinServer

ZipkinServer是SpringBoot啟動類,該類上使用了@EnableZipkinServer註解,載入了相關的Bean,而且在啟動方法中添加了監聽器RegisterZipkinHealthIndicators類,來初始化健康檢查的相關bean。

@SpringBootApplication
@EnableZipkinServer
public class ZipkinServer {

  public static void main(String[] args) {
    new SpringApplicationBuilder(ZipkinServer.class)
        .listeners(new
RegisterZipkinHealthIndicators()) .properties("spring.config.name=zipkin-server").run(args); } }
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({
  ZipkinServerConfiguration.class,
  BraveConfiguration.class,
  ZipkinQueryApiV1.class,
  ZipkinHttpCollector.class
})
public @interface EnableZipkinServer {

}

EnableZipkinServer註解匯入了ZipkinServerConfiguration,BraveConfiguration,ZipkinQueryApiV1,ZipkinHttpCollector。注意,這裡並沒有匯入ZipkinQueryApiV2,但是由於SpringBoot專案會預設載入和啟動類在一個包,或者在其子包的所有使用Component,Controller,Service等註解的類,所以在啟動後,也會發現ZipkinQueryApiV2也被載入了。
- ZipkinServerConfiguration - Zipkin Server端所有核心配置
- BraveConfiguration - Zipkin儲存trace資訊時,還可以將自身的trace資訊一起記錄,這時就依賴Brave相關的類,都在這個類裡配置
- ZipkinQueryApiV1 - Zipkin V1版本的查詢API都在這個Controller中
- ZipkinQueryApiV2 - Zipkin V2版本的查詢API都在這個Controller中
- ZipkinHttpCollector - Zipkin預設的Collector使用http協議裡收集Trace資訊,客戶端呼叫/api/v1/spans或/api/v2/spans來上報trace資訊

ZipkinServerConfiguration

所有Zipkin服務需要的Bean都在這個類裡進行配置
- ZipkinHealthIndicator - Zipkin健康自檢的類
- CollectorSampler - Collector的取樣率,預設100%取樣,可以通過zipkin.collector.sample-rate來設定取樣率
- CollectorMetrics - Collector的統計資訊,預設實現為ActuateCollectorMetrics
- BraveTracedStorageComponentEnhancer - Zipkin儲存trace時的self-trace類,啟用後會將Zipkin的Storage儲存模組執行的trace資訊也採集進系統中
- InMemoryConfiguration - 預設的記憶體Storage儲存配置,當zipkin.storage.type屬性未指定,或者容器中沒有配置StorageComponent時,該配置被啟用

ZipkinHealthIndicator

Zipkin健康自檢的類,實現了springboot-actuate的CompositeHealthIndicator,提供系統元件的健康資訊

final class ZipkinHealthIndicator extends CompositeHealthIndicator {

  ZipkinHealthIndicator(HealthAggregator healthAggregator) {
    super(healthAggregator);
  }

  void addComponent(Component component) {
    String healthName = component instanceof V2StorageComponent
      ? ((V2StorageComponent) component).delegate().getClass().getSimpleName()
      : component.getClass().getSimpleName();
    healthName = healthName.replace("AutoValue_", "");
    addHealthIndicator(healthName, new ComponentHealthIndicator(component));
  }

  static final class ComponentHealthIndicator implements HealthIndicator {
    final Component component;

    ComponentHealthIndicator(Component component) {
      this.component = component;
    }

    @Override public Health health() {
      Component.CheckResult result = component.check();
      return result.ok ? Health.up().build() : Health.down(result.exception).build();
    }
  }
}

RegisterZipkinHealthIndicators

啟動時載入的RegisterZipkinHealthIndicators類,當啟動啟動後,收到ApplicationReadyEvent事件,即系統已經啟動完畢,會將Spring容器中的zipkin.Component新增到ZipkinHealthIndicator中

public final class RegisterZipkinHealthIndicators implements ApplicationListener {

  @Override public void onApplicationEvent(ApplicationEvent event) {
    if (!(event instanceof ApplicationReadyEvent)) return;
    ConfigurableListableBeanFactory beanFactory =
        ((ApplicationReadyEvent) event).getApplicationContext().getBeanFactory();
    ZipkinHealthIndicator healthIndicator = beanFactory.getBean(ZipkinHealthIndicator.class);
    for (Component component : beanFactory.getBeansOfType(Component.class).values()) {
      healthIndicator.addComponent(component);
    }
  }
}
{"status":"UP","zipkin":{"status":"UP","InMemoryStorage":{"status":"UP"}},"diskSpace":{"status":"UP","total":429495595008,"free":392936411136,"threshold":10485760}}

ZipkinHttpCollector

Zipkin預設的Collector使用http協議裡收集Trace資訊,客戶端均呼叫/api/v1/spans或/api/v2/spans來上報trace資訊

  @Autowired ZipkinHttpCollector(StorageComponent storage, CollectorSampler sampler,
      CollectorMetrics metrics) {
    this.metrics = metrics.forTransport("http");
    this.collector = Collector.builder(getClass())
        .storage(storage).sampler(sampler).metrics(this.metrics).build();
  }

  @RequestMapping(value = "/api/v2/spans", method = POST)
  public ListenableFuture<ResponseEntity<?>> uploadSpansJson2(
    @RequestHeader(value = "Content-Encoding", required = false) String encoding,
    @RequestBody byte[] body
  ) {
    return validateAndStoreSpans(encoding, JSON2_DECODER, body);
  }

  ListenableFuture<ResponseEntity<?>> validateAndStoreSpans(String encoding, SpanDecoder decoder,
      byte[] body) {
    SettableListenableFuture<ResponseEntity<?>> result = new SettableListenableFuture<>();
    metrics.incrementMessages();
    if (encoding != null && encoding.contains("gzip")) {
      try {
        body = gunzip(body);
      } catch (IOException e) {
        metrics.incrementMessagesDropped();
        result.set(ResponseEntity.badRequest().body("Cannot gunzip spans: " + e.getMessage() + "\n"));
      }
    }
    collector.acceptSpans(body, decoder, new Callback<Void>() {
      @Override public void onSuccess(@Nullable Void value) {
        result.set(SUCCESS);
      }

      @Override public void onError(Throwable t) {
        String message = t.getMessage() == null ? t.getClass().getSimpleName() : t.getMessage();
        result.set(t.getMessage() == null || message.startsWith("Cannot store")
            ? ResponseEntity.status(500).body(message + "\n")
            : ResponseEntity.status(400).body(message + "\n"));
      }
    });
    return result;
  }

ZipkinHttpCollector中uploadSpansJson2方法接受所有/api/v2/spans請求,然後呼叫validateAndStoreSpans方法校驗並存儲Span
在validateAndStoreSpans方法中,當請求資料為gzip格式,會先解壓縮,然後呼叫collector的acceptSpans方法

Collector

zipkin.collector.Collector的acceptSpans方法中,對各種格式的Span資料做了相容處理,我們這裡只看下V2版的JSON格式的Span是如何處理的,即會呼叫storage2(V2Collector)的acceptSpans方法

public class Collector
  extends zipkin.internal.Collector<SpanDecoder, zipkin.Span> {
  @Override
  public void acceptSpans(byte[] serializedSpans, SpanDecoder decoder, Callback<Void> callback) {
    try {
      if (decoder instanceof DetectingSpanDecoder) decoder = detectFormat(serializedSpans);
    } catch (RuntimeException e) {
      metrics.incrementBytes(serializedSpans.length);
      callback.onError(errorReading(e));
      return;
    }
    if (storage2 != null && decoder instanceof V2JsonSpanDecoder) {
      storage2.acceptSpans(serializedSpans, SpanBytesDecoder.JSON_V2, callback);
    } else {
      super.acceptSpans(serializedSpans, decoder, callback);
    }
  }
}

V2Collector

zipkin.internal.V2Collector繼承了zipkin.internal.Collector,而在Collector的acceptSpans方法中會呼叫decodeList先將傳入的二進位制資料轉換成Span物件,然後呼叫accept方法,accept方法中會呼叫sampled方法,將需要取樣的Span過濾出來,最後呼叫record方法將Span資訊存入Storage中。

public abstract class Collector<D, S> {
  protected void acceptSpans(byte[] serializedSpans, D decoder, Callback<Void> callback) {
    metrics.incrementBytes(serializedSpans.length);
    List<S> spans;
    try {
      spans = decodeList(decoder, serializedSpans);
    } catch (RuntimeException e) {
      callback.onError(errorReading(e));
      return;
    }
    accept(spans, callback);
  }

  public void accept(List<S> spans, Callback<Void> callback) {
    if (spans.isEmpty()) {
      callback.onSuccess(null);
      return;
    }
    metrics.incrementSpans(spans.size());

    List<S> sampled = sample(spans);
    if (sampled.isEmpty()) {
      callback.onSuccess(null);
      return;
    }

    try {
      record(sampled, acceptSpansCallback(sampled));
      callback.onSuccess(null);
    } catch (RuntimeException e) {
      callback.onError(errorStoringSpans(sampled, e));
      return;
    }
  }

  List<S> sample(List<S> input) {
    List<S> sampled = new ArrayList<>(input.size());
    for (S s : input) {
      if (isSampled(s)) sampled.add(s);
    }
    int dropped = input.size() - sampled.size();
    if (dropped > 0) metrics.incrementSpansDropped(dropped);
    return sampled;
  }
}

V2Collector中的record方法會呼叫storage的accept方法,zipkin預設會使用InMemoryStorage來儲存

public final class V2Collector extends Collector<BytesDecoder<Span>, Span> {
  @Override protected List<Span> decodeList(BytesDecoder<Span> decoder, byte[] serialized) {
    List<Span> out = new ArrayList<>();
    if (!decoder.decodeList(serialized, out)) return Collections.emptyList();
    return out;
  }

  @Override protected boolean isSampled(Span span) {
    return sampler.isSampled(Util.lowerHexToUnsignedLong(span.traceId()), span.debug());
  }

  @Override protected void record(List<Span> sampled, Callback<Void> callback) {
    storage.spanConsumer().accept(sampled).enqueue(new V2CallbackAdapter<>(callback));
  }
}

ZipkinQueryApiV1 & ZipkinQueryApiV2

暴露了Zipkin對外的查詢API,V1和V2的區別,主要是Span裡的欄位叫法不一樣了,這裡主要看下ZipkinQueryApiV2,ZipkinQueryApiV2方法都比較簡單,主要是呼叫storage元件來實現查詢功能。

/dependencies - 檢視所有trace的依賴關係
/services - 檢視所有的services
/spans - 根據serviceName查詢spans資訊
/traces - 根據serviceName,spanName,annotationQuery,minDuration,maxDuration等來搜尋traces資訊
/trace/{traceIdHex} - 根據traceId查詢某條trace資訊

至此ZipkinServer的程式碼分析的差不多了,在後面博文中我們再具體分析各種Storage,和Collector的原始碼。