1. 程式人生 > >kafka效能監控之KafkaMetrics Sensor

kafka效能監控之KafkaMetrics Sensor

說起kafka的metrics,很多人應該是即陌生又熟悉,

熟悉是因為閱讀原始碼的過程中,不可避免地會看到metrics.add()的程式碼.而陌生是因為metrics僅僅只是輔助功能,並不是kafka主要邏輯的一部分,並不會引起讀者太多的關注.

同時網上關於metrics這一塊的分析也較少,這篇文章就帶著大家一探metrics的究竟.

在這裡首先說明一個容易產生誤解的地方,不少文章說kafka使用yammers框架來實現效能監控.這麼說其實沒有問題,因為kafka確實通過yammers向外暴露了介面,可以通過jmx或者grahite來監視各個效能引數.但是kafka內的效能監控比如producer,consumer的配額限制,並不是通過yammer實現的.而是通過自己的一套metrics框架來實現的.

事實上,kafka有兩個metrics包,在看原始碼的時候很容易混淆

package kafka.metrics
以及
package org.apache.kafka.common.metrics

可以看到這兩個包的包名都是metrics,但是他們負責的任務並不相同,而且兩個包中的類並沒有任何的互相引用關係.可以看作是兩個完全獨立的包.kafka.mtrics這個包,主要呼叫yammer的Api,並進行封裝,提供給client監測kafka的各個效能引數.而commons.metrics這個包是我這篇文章主要要介紹的,這個包並不是面向client提供服務的,他是為了給kafka中的其他元件,比如replicaManager,PartitionManager,QuatoManager提供呼叫,讓這些Manager瞭解kafka現在的執行狀況,以便作出相應決策的.

首先metrics第一次被初始化,在kafkaServer的startup()方法中

metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
quotaManagers = QuotaFactory.instantiate(config, metrics, time)
初始化了一個Metrics,並將這個例項傳到quotaManagers的建構函式中,這裡簡單介紹一下quotaManagers.這是kafka中用來限制kafka,producer的傳輸速度的,比如在config檔案下設定producer不能以超過5MB/S的速度傳輸資料,那麼這個限制就是通過quotaManager來實現的.

回到metrics上,跟進程式碼.

public class Metrics implements Closeable {
 ....
 ....
    private final ConcurrentMap<MetricName, KafkaMetric> metrics;
    private final ConcurrentMap<String, Sensor> sensors;
metrics與sensors這兩個concurrentMap是Metrics中兩個重要的成員屬性.那麼什麼是KafkaMetric,什麼是Sensor呢?

首先分析KafkaMetric

KafkaMetric實現了Metric介面,可以看到它的核心方法value()返回要監控的引數的值.

public interface Metric {

    /**
     * A name for this metric
     */
public MetricName metricName();
/**
     * The value of the metric
     */
public double value();
}
那麼KafkaMetric又是如何實現value()方法的呢?
@Override
public double value() {
    synchronized (this.lock) {
        return value(time.milliseconds());
}
}

double value(long timeMs) {
    return this.measurable.measure(config, timeMs);
}
原來value()是通過kafkaMetric中的另一個成員屬性measurable完成
public interface Measurable {

    /**
     * Measure this quantity and return the result as a double
     * @param config The configuration for this metric
     * @param now The POSIX time in milliseconds the measurement is being taken
     * @return The measured value
     */
public double measure(MetricConfig config, long now);
}
其實這邊挺繞的,Metrics有kafkaMetric的成員變數,而kafkaMetric又通過Measurable返回要檢測的值.打個比方,Metrics好比是汽車的儀表盤,kafkaMetric就是儀表盤上的一個儀表,Measurable就是對真正要檢測的元件的一個封裝.來看看一個Measrable的簡單實現,在sender.java類中.
metrics.addMetric(m, new Measurable() {
    public double measure(MetricConfig config, long now) {
        return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
}
});
可以看到measure的實現就是簡單地返回要返回的值,因為是直接在目標類中定義的,所以可以直接獲得相應變數的引用.

介紹完KafkaMetric,接下來介紹Sensor,也就是下面的ConcurrentMap中的Sensor

private final ConcurrentMap<String, Sensor> sensors;

以下是Sensor類的原始碼

/**
 * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
 * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
 * of metrics about request sizes such as the average or max.
 */
public final class Sensor {
    //一個kafka就只有一個Metrics例項,這個registry就是對這個Metrics的引用
    private final Metrics registry;
    private final String name;
    private final Sensor[] parents;
    private final List<Stat> stats;
    private final List<KafkaMetric> metrics;
這一段的註釋很有意義,從註釋中可以看到Sensor的作用不同KafkaMetric. KafkaMetric僅僅是返回某一個引數的值,而Sensor有基於某一引數時間序列進行統計的功能,比如平均值,最大值,最小值.那這些統計又是如何實現的呢?答案是List<Stat> stats這個屬性成員.
public interface Stat {

    /**
     * Record the given value
     * @param config The configuration to use for this metric
     * @param value The value to record
     * @param timeMs The POSIX time in milliseconds this value occurred
     */
public void record(MetricConfig config, double value, long timeMs);
}
可以看到Stat是一個介面,其中有一個record方法可以記錄一個取樣數值,下面看一個例子,max這個功能如何用Stat來實現?
public final class Max extends SampledStat {

    public Max() {
        super(Double.NEGATIVE_INFINITY);
}

    @Override
protected void update(Sample sample, MetricConfig config, double value, long now) {
        sample.value = Math.max(sample.value, value);
}

    @Override
public double combine(List<Sample> samples, MetricConfig config, long now) {
        double max = Double.NEGATIVE_INFINITY;
        for (int i = 0; i < samples.size(); i++)
            max = Math.max(max, samples.get(i).value);
        return max;
}

}
是不是很簡單,update相當於冒一次泡,把當前的值與歷史的最大值比較.combine相當於用一次完整的氣泡排序找出最大值,需要注意的是,max是繼承SampleStat的,而SampleStat是Stat介面的實現類.那我們回到Sensor類上來.
public void record(double value, long timeMs) {
    this.lastRecordTime = timeMs;
    synchronized (this) {
        // increment all the stats
for (int i = 0; i < this.stats.size(); i++)
            this.stats.get(i).record(config, value, timeMs);
checkQuotas(timeMs);
}
    for (int i = 0; i < parents.length; i++)
        parents[i].record(value, timeMs);
}
record方法,每個註冊於其中的stats提交值,同時如果自己有父sensor的話,向父sensor提交.
public void checkQuotas(long timeMs) {
    for (int i = 0; i < this.metrics.size(); i++) {
        KafkaMetric metric = this.metrics.get(i);
MetricConfig config = metric.config();
        if (config != null) {
            Quota quota = config.quota();
            if (quota != null) {
                double value = metric.value(timeMs);
                if (!quota.acceptable(value)) {
                    throw new QuotaViolationException(
                        metric.metricName(),
value,
quota.bound());
}
            }
        }
    }
}
checkQuotas,通過這裡其實是遍歷註冊在sensor上的每一個KafkaMetric來檢查他們的值有沒有超過config檔案中設定的配額.注意這裡的QuotaVioLationException,是不是很熟悉.在QuatoManager中,如果有一個client的上傳/下載速度超過指定配額.那麼就會丟擲這個警告.
try {
  clientSensors.quotaSensor.record(value)
  // trigger the callback immediately if quota is not violated
callback(0)
} catch {
  case qve: QuotaViolationException =>
    // Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
    throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
    clientSensors.throttleTimeSensor.record(throttleTimeMs)
    // If delayed, add the element to the delayQueue
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
    delayQueueSensor.record()
    logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}
這裡就很好理解了,向clientSensor提交上傳,下載的值,如果成功了,就掉用相應的callback,如果失敗了catch的就是QuotaViolationException.

其實metrics的執行模型還是很簡單的,讓人感覺繞的就是,各種抽象,Metrics,KafkaMetrics,Sensor,Stat這些概念吧.

最後,Sensor會初始化一個執行緒專門用來清除長時間沒有使用的執行緒.這個執行緒名為"SensorExpiryThread"

class ExpireSensorTask implements Runnable {
    public void run() {
        for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) {
            // removeSensor also locks the sensor object. This is fine because synchronized is reentrant
            // There is however a minor race condition here. Assume we have a parent sensor P and child sensor C.
            // Calling record on C would cause a record on P as well.
            // So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed,
            // that will cause C to also get removed.
            // Since the expiration time is typically high it is not expected to be a significant concern
            // and thus not necessary to optimize
synchronized (sensorEntry.getValue()) {
                if (sensorEntry.getValue().hasExpired()) {
                    log.debug("Removing expired sensor {}", sensorEntry.getKey());
removeSensor(sensorEntry.getKey());
}
            }
        }
    }