1. 程式人生 > >Java分散式跟蹤系統Zipkin(八):Zipkin原始碼分析-KafkaCollector

Java分散式跟蹤系統Zipkin(八):Zipkin原始碼分析-KafkaCollector

前面幾篇博文中,都是使用OkHttpSender來上報Trace資訊給Zipkin,這在生產環境中,當業務量比較大的時候,可能會成為一個性能瓶頸,這一篇博文我們來使用KafkaSender將Trace資訊先寫入到Kafka中,然後Zipkin使用KafkaCollector從Kafka中收集Span資訊。
在Brave配置中需要將Sender設定為KafkaSender,而zipkin的collector元件配置為KafkaCollector

相關程式碼在Chapter8/zipkin-kafka中
pom.xml中新增依賴

<dependency>
    <groupId
>
io.zipkin.reporter2</groupId> <artifactId>zipkin-sender-kafka11</artifactId> <version>${zipkin-reporter2.version}</version> </dependency>

TracingConfiguration中,我們修改Sender為KafkaSender,指定Kafka的地址,以及topic

@Bean
Sender sender() {
    return KafkaSender.newBuilder().bootstrapServers("localhost:9091,localhost:9092,localhost:9093"
).topic("zipkin").encoding(Encoding.JSON).build(); }

我們先啟動zookeeper(預設埠號為2181),再依次啟動一個本地的3個broker的kafka叢集(埠號分別為9091、9092、9093),最後啟動一個KafkaManager(預設埠號9000),KafkaManager是Kafka的UI管理工具
關於如何搭建本地Kafka偽叢集,請自行上網搜尋教程,本文使用的Kafka版本為0.10.0.0。

kafka啟動完畢後,我們建立名為zipkin的topic,因為我們有3個broker,我這裡設定replication-factor=3

bin/windows/kafka
-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic zipkin

我們使用如下命令啟動zipkin,帶上Kafka的Zookeeper地址引數,這樣zipkin就會從kafka中消費我們上報的trace資訊。

java -jar zipkin-server-2.2.1-exec.jar --KAFKA_ZOOKEEPER=localhost:2181

然後分別執行,主意我們這裡將backend的埠改為9001,目的是為了避免和KafkaManager埠號衝突。

mvn spring-boot:run -Drun.jvmArguments="-Dserver.port=9001 -Dzipkin.service=backend"
mvn spring-boot:run -Drun.jvmArguments="-Dserver.port=8081 -Dzipkin.service=frontend"

為了看到這兩條訊息的具體內容,我們可以在kafka安裝目錄使用如下命令

bin/windows/kafka-console-consumer.bat --zookeeper localhost:2181 --topic zipkin --from-beginning

在控制檯會打印出最近的兩條訊息

[{"traceId":"802bd09f480b5faa","parentId":"802bd09f480b5faa","id":"bb3c70909ea3ee3c","kind":"SERVER","name":"get","timestamp":1510891296426607,"duration":10681,"localEndpoint":{"serviceName":"backend","ipv4":"10.200.170.137"},"remoteEndpoint":{"ipv4":"127.0.0.1","port":64421},"tags":{"http.path":"/api"},"shared":true}]
[{"traceId":"802bd09f480b5faa","parentId":"802bd09f480b5faa","id":"bb3c70909ea3ee3c","kind":"CLIENT","name":"get","timestamp":1510891296399882,"duration":27542,"localEndpoint":{"serviceName":"frontend","ipv4":"10.200.170.137"},"tags":{"http.path":"/api"}},{"traceId":"802bd09f480b5faa","id":"802bd09f480b5faa","kind":"SERVER","name":"get","timestamp":1510891296393252,"duration":39514,"localEndpoint":{"serviceName":"frontend","ipv4":"10.200.170.137"},"remoteEndpoint":{"ipv6":"::1","port":64420},"tags":{"http.path":"/"}}]

這說明我們的應用frontend和backend已經將trace資訊寫入kafka成功了!

在Zipkin的Web介面中,也能查詢到這次跟蹤資訊
在zipkin的控制檯,我們也看到跟Kafka相關的類ConsumerFetcherThread啟動,我們在後續專門分析zipkin的原始碼再來看看這個類。

2017-11-17 11:25:00.477  INFO 9292 --- [49-8e18eab0-0-1] kafka.consumer.ConsumerFetcherThread     : [ConsumerFetcherThread-zipkin_LT290-1510889099649-8e18eab0-0-1], Starting
2017-11-17 11:25:00.482  INFO 9292 --- [r-finder-thread] kafka.consumer.ConsumerFetcherManager    : [ConsumerFetcherManager-1510889099800] Added fetcher for partitions ArrayBuffer([[zipkin,0], initOffset 0 to broker id:1,host:10.200.170.137,port:9091] )

KafkaSender

public abstract class KafkaSender extends Sender {
  public static Builder newBuilder() {
    // Settings below correspond to "Producer Configs"
    // http://kafka.apache.org/0102/documentation.html#producerconfigs
    Properties properties = new Properties();
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        ByteArraySerializer.class.getName());
    properties.put(ProducerConfig.ACKS_CONFIG, "0");
    return new zipkin2.reporter.kafka11.AutoValue_KafkaSender.Builder()
        .encoding(Encoding.JSON)
        .properties(properties)
        .topic("zipkin")
        .overrides(Collections.EMPTY_MAP)
        .messageMaxBytes(1000000);
  }

  @Override public zipkin2.Call<Void> sendSpans(List<byte[]> encodedSpans) {
    if (closeCalled) throw new IllegalStateException("closed");
    byte[] message = encoder().encode(encodedSpans);
    return new KafkaCall(message);
  }

}

KafkaSender中通過KafkaProducer客戶端來發送訊息給Kafka,在newBuilder方法中,設定了一些預設值,比如topic預設為zipkin,編碼預設用JSON,訊息最大位元組數1000000,還可以通過overrides來覆蓋預設的配置來定製KafkaProducer。

在sendSpans方法中返回KafkaCall,這個物件的execute方法,在AsyncReporter中的flush方法中會被呼叫:

void flush(BufferNextMessage bundler) {
    // ...
    sender.sendSpans(nextMessage).execute();
    // ...
}

KafkaCall的父類BaseCall方法execute會呼叫doExecute,而在doExecute方法中使用了一個AwaitableCallback將KafkaProducer的非同步傳送訊息的方法,強制轉為了同步傳送,這裡也確實處理的比較優雅。

  class KafkaCall extends BaseCall<Void> { // KafkaFuture is not cancelable
    private final byte[] message;

    KafkaCall(byte[] message) {
      this.message = message;
    }

    @Override protected Void doExecute() throws IOException {
      final AwaitableCallback callback = new AwaitableCallback();
      get().send(new ProducerRecord<>(topic(), message), (metadata, exception) -> {
        if (exception == null) {
          callback.onSuccess(null);
        } else {
          callback.onError(exception);
        }
      });
      callback.await();
      return null;
    }

    @Override protected void doEnqueue(Callback<Void> callback) {
      get().send(new ProducerRecord<>(topic(), message), (metadata, exception) -> {
        if (exception == null) {
          callback.onSuccess(null);
        } else {
          callback.onError(exception);
        }
      });
    }

    @Override public Call<Void> clone() {
      return new KafkaCall(message);
    }
  }

這裡還有一個知識點,get方法每次都會返回一個新的KafkaProducer,我在第一眼看到這段程式碼時也曾懷疑,難道這裡沒有效能問題?
原來這裡用到了google的外掛autovalue裡的標籤@Memoized,結合@AutoValue標籤,它會在自動生成的類裡,給我們新增一些程式碼,可以看到get方法裡作了一層快取,所以我們的擔心是沒有必要的

  @Memoized KafkaProducer<byte[], byte[]> get() {
    KafkaProducer<byte[], byte[]> result = new KafkaProducer<>(properties());
    provisioned = true;
    return result;
  }

AutoValue_KafkaSender

final class AutoValue_KafkaSender extends $AutoValue_KafkaSender {
  private volatile KafkaProducer<byte[], byte[]> get;

  AutoValue_KafkaSender(Encoding encoding$, int messageMaxBytes$, BytesMessageEncoder encoder$,
      String topic$, Properties properties$) {
    super(encoding$, messageMaxBytes$, encoder$, topic$, properties$);
  }

  @Override
  KafkaProducer<byte[], byte[]> get() {
    if (get == null) {
      synchronized (this) {
        if (get == null) {
          get = super.get();
          if (get == null) {
            throw new NullPointerException("get() cannot return null");
          }
        }
      }
    }
    return get;
  }
}

KafkaCollector

我們再來看下Zipkin中的KafkaCollector,我們開啟zipkin-server的原始碼,在目錄resources/zipkin-server-shared.yml檔案中,發現關於kafka的配置片段
而我們在本文前面使用–KAFKA_ZOOKEEPER啟動了zipkin,將kafka的zookeeper引數傳遞給了KafkaServer的main方法,也就是說,我們制定了zipkin.collector.kafka.zookeeper的值為localhost:2181

java -jar zipkin-server-2.2.1-exec.jar --KAFKA_ZOOKEEPER=localhost:2181

zipkin-server-shared.yml

zipkin:
  collector:
    kafka:
      # ZooKeeper host string, comma-separated host:port value.
      zookeeper: ${KAFKA_ZOOKEEPER:}
      # Name of topic to poll for spans
      topic: ${KAFKA_TOPIC:zipkin}
      # Consumer group this process is consuming on behalf of.
      group-id: ${KAFKA_GROUP_ID:zipkin}
      # Count of consumer threads consuming the topic
      streams: ${KAFKA_STREAMS:1}
      # Maximum size of a message containing spans in bytes
      max-message-size: ${KAFKA_MAX_MESSAGE_SIZE:1048576}

在pom.xml中,有如下依賴

    <!-- Kafka Collector -->
    <dependency>
      <groupId>${project.groupId}</groupId>
      <artifactId>zipkin-autoconfigure-collector-kafka</artifactId>
      <optional>true</optional>
    </dependency>

ZipkinKafkaCollectorAutoConfiguration

我們找到zipkin-autoconfigure/collector-kafka的ZipkinKafkaCollectorAutoConfiguration類,使用了@Conditional註解,當KafkaZooKeeperSetCondition條件滿足時,ZipkinKafkaCollectorAutoConfiguration類會被SpringBoot載入。當載入時,會配置KafkaCollector到spring容器中。

@Configuration
@EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
@Conditional(KafkaZooKeeperSetCondition.class)
public class ZipkinKafkaCollectorAutoConfiguration {

  /**
   * This launches a thread to run start. This prevents a several second hang, or worse crash if
   * zookeeper isn't running, yet.
   */
  @Bean KafkaCollector kafka(ZipkinKafkaCollectorProperties kafka, CollectorSampler sampler,
      CollectorMetrics metrics, StorageComponent storage) {
    final KafkaCollector result =
        kafka.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build();

    // don't use @Bean(initMethod = "start") as it can crash the process if zookeeper is down
    Thread start = new Thread("start " + result.getClass().getSimpleName()) {
      @Override public void run() {
        result.start();
      }
    };
    start.setDaemon(true);
    start.start();

    return result;
  }
}

KafkaZooKeeperSetCondition

KafkaZooKeeperSetCondition繼承了SpringBootCondition,實現了getMatchOutcome方法,當上下文的環境變數中有配置zipkin.collector.kafka.zookeeper的時候,則條件滿足,即ZipkinKafkaCollectorAutoConfiguration會被載入

final class KafkaZooKeeperSetCondition extends SpringBootCondition {
  static final String PROPERTY_NAME = "zipkin.collector.kafka.zookeeper";

  @Override
  public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata a) {
    String kafkaZookeeper = context.getEnvironment().getProperty(PROPERTY_NAME);
    return kafkaZookeeper == null || kafkaZookeeper.isEmpty() ?
        ConditionOutcome.noMatch(PROPERTY_NAME + " isn't set") :
        ConditionOutcome.match();
  }
}

在ZipkinKafkaCollectorAutoConfiguration中,啟動了一個守護執行緒來執行KafkaCollector的start方法,避免zookeeper連不上,阻塞zipkin的啟動過程。

public final class KafkaCollector implements CollectorComponent {
  final LazyConnector connector;
  final LazyStreams streams;

  KafkaCollector(Builder builder) {
    connector = new LazyConnector(builder);
    streams = new LazyStreams(builder, connector);
  }

  @Override public KafkaCollector start() {
    connector.get();
    streams.get();
    return this;
  }
}

KafkaCollector中初始化了兩個物件,LazyConnector,和LazyStreams,在start方法中呼叫了2個物件的get方法

LazyConnector

LazyConnector繼承了Lazy,當get方法被呼叫的時候,compute方法會被呼叫

  static final class LazyConnector extends LazyCloseable<ZookeeperConsumerConnector> {

    final ConsumerConfig config;

    LazyConnector(Builder builder) {
      this.config = new ConsumerConfig(builder.properties);
    }

    @Override protected ZookeeperConsumerConnector compute() {
      return (ZookeeperConsumerConnector) createJavaConsumerConnector(config);
    }

    @Override
    public void close() {
      ZookeeperConsumerConnector maybeNull = maybeNull();
      if (maybeNull != null) maybeNull.shutdown();
    }
  }

Lazy的get方法中,使用了典型的懶漢式單例模式,並使用了double-check,方式多執行緒構造多個例項,而真正構造物件是委派給compute方法

public abstract class Lazy<T> {

  volatile T instance = null;

  /** Remembers the result, if the operation completed unexceptionally. */
  protected abstract T compute();

  /** Returns the same value, computing as necessary */
  public final T get() {
    T result = instance;
    if (result == null) {
      synchronized (this) {
        result = instance;
        if (result == null) {
          instance = result = tryCompute();
        }
      }
    }
    return result;
  }

  /**
   * This is called in a synchronized block when the value to memorize hasn't yet been computed.
   *
   * <p>Extracted only for LazyCloseable, hence package protection.
   */
  T tryCompute() {
    return compute();
  }
}

在LazyConnector的compute方法中根據ConsumerConfig構造出了ZookeeperConsumerConnector,這個是kafka 0.8版本一種重要的物件,基於zookeeper的ConsumerConnector。

LazyStreams

在LazyStreams的compute中,新建了一個執行緒池,執行緒池大小可以由引數streams(即zipkin.collector.kafka.streams)來指定,預設為一個執行緒的執行緒池。
然後通過topicCountMap設定zipkin的kafka消費使用的執行緒數,再使用ZookeeperConsumerConnector的createMessageStreams方法來建立KafkaStream,然後使用執行緒池執行KafkaStreamProcessor。

  static final class LazyStreams extends LazyCloseable<ExecutorService> {
    final int streams;
    final String topic;
    final Collector collector;
    final CollectorMetrics metrics;
    final LazyCloseable<ZookeeperConsumerConnector> connector;
    final AtomicReference<CheckResult> failure = new AtomicReference<>();

    LazyStreams(Builder builder, LazyCloseable<ZookeeperConsumerConnector> connector) {
      this.streams = builder.streams;
      this.topic = builder.topic;
      this.collector = builder.delegate.build();
      this.metrics = builder.metrics;
      this.connector = connector;
    }

    @Override protected ExecutorService compute() {
      ExecutorService pool = streams == 1
          ? Executors.newSingleThreadExecutor()
          : Executors.newFixedThreadPool(streams);

      Map<String, Integer> topicCountMap = new LinkedHashMap<>(1);
      topicCountMap.put(topic, streams);

      for (KafkaStream<byte[], byte[]> stream : connector.get().createMessageStreams(topicCountMap)
          .get(topic)) {
        pool.execute(guardFailures(new KafkaStreamProcessor(stream, collector, metrics)));
      }
      return pool;
    }

    Runnable guardFailures(final Runnable delegate) {
      return () -> {
        try {
          delegate.run();
        } catch (RuntimeException e) {
          failure.set(CheckResult.failed(e));
        }
      };
    }

    @Override
    public void close() {
      ExecutorService maybeNull = maybeNull();
      if (maybeNull != null) maybeNull.shutdown();
    }
  }

KafkaStreamProcessor

在KafkaStreamProcessor的run方法中,迭代stream物件,取出獲得的流資料,然後呼叫Collector的acceptSpans方法,即使用storage元件來接收並存儲span資料。

final class KafkaStreamProcessor implements Runnable {
  final KafkaStream<byte[], byte[]> stream;
  final Collector collector;
  final CollectorMetrics metrics;

  KafkaStreamProcessor(
      KafkaStream<byte[], byte[]> stream, Collector collector, CollectorMetrics metrics) {
    this.stream = stream;
    this.collector = collector;
    this.metrics = metrics;
  }

  @Override
  public void run() {
    ConsumerIterator<byte[], byte[]> messages = stream.iterator();
    while (messages.hasNext()) {
      byte[] bytes = messages.next().message();
      metrics.incrementMessages();

      if (bytes.length == 0) {
        metrics.incrementMessagesDropped();
        continue;
      }

      // If we received legacy single-span encoding, decode it into a singleton list
      if (bytes[0] <= 16 && bytes[0] != 12 /* thrift, but not a list */) {
        try {
          metrics.incrementBytes(bytes.length);
          Span span = SpanDecoder.THRIFT_DECODER.readSpan(bytes);
          collector.accept(Collections.singletonList(span), NOOP);
        } catch (RuntimeException e) {
          metrics.incrementMessagesDropped();
        }
      } else {
        collector.acceptSpans(bytes, DETECTING_DECODER, NOOP);
      }
    }
  }
}

這裡的kafka消費方式還是kafka0.8版本的,如果你想用kafka0.10+的版本,可以更改zipkin-server的pom,將collector-kafka10加入到依賴中,其原理跟kafka0.8的差不多,此處不再展開分析了。

    <!-- Kafka10 Collector -->
    <dependency>
      <groupId>io.zipkin.java</groupId>
      <artifactId>zipkin-autoconfigure-collector-kafka10</artifactId>
      <optional>true</optional>
    </dependency>

    <dependency>
      <groupId>io.zipkin.java</groupId>
      <artifactId>zipkin-collector-kafka10</artifactId>
    </dependency>

在生產環境中,我們可以將zipkin的日誌收集器改為kafka來提高系統的吞吐量,而且也可以讓客戶端和zipkin服務端解耦,客戶端將不依賴zipkin服務端,只依賴kafka叢集。

當然我們也可以將zipkin的collector替換為RabbitMQ來提高日誌收集的效率,zipkin對scribe也作了支援,這裡就不展開篇幅細說了。