1. 程式人生 > >Flink原始碼系列——指標監測

Flink原始碼系列——指標監測

1、Metric簡介

Flink對於指標監測有一套自己的實現,指標的統計方式有四種,這些指標都實現了Metric這個介面,而Metric這個介面只是一個標識,本身並沒有定義如何方法介面,部分子類的繼承關係如下所示。
這裡寫圖片描述
從圖中可以看出,Metric這個介面有四個直接子類,分別是:

Gauge —— 最簡單的度量指標,只是簡單的返回一個值,比如返回一個佇列中當前元素的個數;
Counter —— 計數器,在一些情況下,會比Gauge高效,比如通過一個AtomicLong變數來統計一個佇列的長度;
Meter —— 吞吐量的度量,也就是一系列事件發生的速率,例如TPS;
Histogram —— 度量值的統計結果,如最大值、最小值、平均值,以及分佈情況等。

以MeterView為例,分析一個Metric的具體實現。MeterView還實現View介面,實現View介面的類,表示其會定期的執行update方法,進行資料的更新。

public class MeterView implements Meter, View {
   /** 底層使用的計算器 */
   private final Counter counter;
   /** 計算平均值的事件跨度 */
   private final int timeSpanInSeconds;
   /** 包含歷史資料的迴圈陣列 */
   private final long[] values;
   /** 當前時間在陣列中的索引 */
private int time = 0; /** 最新計算的rate */ private double currentRate = 0; public MeterView(int timeSpanInSeconds) { this(new SimpleCounter(), timeSpanInSeconds); } public MeterView(Counter counter, int timeSpanInSeconds) { this.counter = counter; /** 這裡的操作是為了讓時間跨度剛好是 UPDATE_INTERVAL_SECONDS 的整數倍 */
this.timeSpanInSeconds = timeSpanInSeconds - (timeSpanInSeconds % UPDATE_INTERVAL_SECONDS); this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS + 1]; } @Override public void markEvent() { this.counter.inc(); } @Override public void markEvent(long n) { this.counter.inc(n); } @Override public long getCount() { return counter.getCount(); } @Override public double getRate() { return currentRate; } @Override public void update() { time = (time + 1) % values.length; values[time] = counter.getCount(); currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds); } }

從類的屬性變數中可以看出,MeterView是在一個Counter計數器的基礎之上,封裝了一層,從而實現事件每秒的平均速率。以values這個長整型的陣列,作為環形陣列,實現對最新的歷史資料的儲存。
在建構函式中,會對入參timeSpanInSeconds這個時間跨度進行修正,使其剛好是UPDATE_INTERVAL_SECONDS的整數倍,另外values陣列的長度是timeSpanInSeconds對UPDATE_INTERVAL_SECONDS倍數,再加上1,這樣這個陣列的最新資料和最老的資料之間的時間間隔就剛好是timeSpanInSeconds。
假設values陣列的長度為n,則:

1、索引n-1處的統計值,和索引0處的統計值,時間間隔就是timeSpanInSeconds;
2、由於是環形陣列,所以索引0處的統計值,和索引1處的統計值的時間間隔就是timeSpanInSeconds;
3、所以索引i處的統計值,和索引(i+1)%n處的統計值,時間間隔是timeSpanInSeconds;

這個邏輯理清楚了,對update方法的邏輯也就清楚了。

2、MetricGroup

為了便於對Metric進行方便的管理和區分,可以對Metric進行分組,MetricGroup就是用來實現這個功能的。
MetricGroup的相關子類的繼承關係如下所示。
這裡寫圖片描述

1、ProxyMetricGroup —— 這是一個代理類,就是把新Metric或者新的子MetricGroup的註冊,委託給代理MetricGroup進行處理;
2、AbstractMetricGroup —— 對新增Metric和子MetricGroup的相關方法進行了實現;

在AbstractMetricGroup中有這些屬性

protected final A parent;
private final Map<String, Metric> metrics = new HashMap<>();
private final Map<String, AbstractMetricGroup> groups = new HashMap<>();

parent —— 用來儲存這個MetricGroup的父MetricGroup
metrics —— 這個map,是用來儲存當前MetricGroup中註冊的Metric;
groups —— 這個map,是用來儲存當前MetricGroup中註冊子MetricGroup;

通過這個資料結構可以看出,在MetricGroup中,可以建立一個樹狀的結構,用來儲存和歸類相關的Metric。

3、MetricReporter

MetricReporter是用來向外披露Metric的監測結果的介面。
由於MetricReporter的子類在例項化時,都是通過反射機制,所以對於其實現子類,需要有一個公共,無參的建構函式,這個介面的定義如下:

public interface MetricReporter {
   void open(MetricConfig config);
   void close();
   void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
   void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
}

open —— 由於子類都是用無參建構函式,通過反射進行例項化,所以相關初始化的工作都是放在這裡進行的,並且這個方法需要在例項化後,就需要呼叫該方法進行相關初始化的工作;
close —— 這裡就是在關閉時,進行資源回收等相關操作的;
notifyOfAddedMetric —— 當有一個新的Metric註冊時,會呼叫該方法來通知MetricReporter;
notifyOfRemovedMetric —— 當有一個Metric被移除時,通過這個方法來通知MetricReporter;

4、MetricRegistry

MetricGroup是用來對Metric進行分組管理,MetricReporter是用來對外披露Metric,而MetricRegistry就是這兩者之間的橋樑,通過MetricRegistry,就可以讓MetricReporter感知到在MetricGroup中的Metric發生的變化情況。
對於MetricRegistry這個介面,其實現為MetricRegistryImpl,而其在例項化時,建構函式的入參是一個MetricRegistryConfiguration例項。

4.1、MetricRegistryConfiguration

MetricRegistryConfiguration顧名思義,就是MetricRegistry的相關配置引數,主要有三個屬性,如下:

/** flink中不同元件的範圍格式 */
private final ScopeFormats scopeFormats;

/** 字串的分隔符,這是一個全域性的分隔符 */
private final char delimiter;

/** 配置中每個reporter的名稱和其對應的配置物件的列表 */
private final List<Tuple2<String, Configuration>> reporterConfigurations;

這些屬性,都是從配置引數中獲取而來,邏輯如下:

public static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
   /** 獲取scopeFormats */
   ScopeFormats scopeFormats;
   try {
      scopeFormats = ScopeFormats.fromConfig(configuration);
   } catch (Exception e) {
      LOG.warn("Failed to parse scope format, using default scope formats", e);
      scopeFormats = ScopeFormats.fromConfig(new Configuration());
   }

   /** 獲取分隔符 */
   char delim;
   try {
      delim = configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
   } catch (Exception e) {
      LOG.warn("Failed to parse delimiter, using default delimiter.", e);
      delim = '.';
   }

   /** 獲取MetricReporter相關的配置資訊,MetricReporter的配置格式是 metrics.reporters = foo, bar */
   final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST);
   List<Tuple2<String, Configuration>> reporterConfigurations;

   if (definedReporters == null) {
      /** 如果沒有配置,則返回空集合 */
      reporterConfigurations = Collections.emptyList();
   } else {
      /** 按模式匹配分割,如上述的配置,則namedReporters={"foo", "bar"} */
      String[] namedReporters = splitPattern.split(definedReporters);

      reporterConfigurations = new ArrayList<>(namedReporters.length);

      for (String namedReporter: namedReporters) {
         /** 
          * 這裡是獲取一個代理配置物件,就是在原來配置物件的基礎上,在查詢key時,需要加上這裡配置的字首,
          * 如 metrics.reporter.foo. ,這樣就可以獲取特定reporter的配置 
          */
         DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
            configuration,
            ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');

         reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
      }
   }

   return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations);
}

4.1.1 ScopeFormat

上述的ScopeFormats也是配置物件中獲取的,如下:

public static ScopeFormats fromConfig(Configuration config) {
   String jmFormat = config.getString(MetricOptions.SCOPE_NAMING_JM);
   String jmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
   String tmFormat = config.getString(MetricOptions.SCOPE_NAMING_TM);
   String tmJobFormat = config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
   String taskFormat = config.getString(MetricOptions.SCOPE_NAMING_TASK);
   String operatorFormat = config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);

   return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
}

這裡就需要介紹ScopeFormat,其類繼承關係如下:
這裡寫圖片描述
從圖中可以看出,Flink中的每個元件,都有對應的格式。
首先看下ScopeFormat中的主要屬性物件:

/** 這是原生格式,比如 <host>.jobmanager ,如果為空,則是 <empty> */
private final String format;

/** format按照分割符分割後的陣列,如 template= {"<host>", "jobmanager”},被<>包裹的元素,是變數元素 */
private final String[] template;

/** 這是template陣列中,變數元素的索引,如"<host>"是變數,在template中的索引是0,則 templatePos = {0} */
private final int[] templatePos;

/** 這個是template中變數元素對應的真實的值,在values陣列中的位置,詳見 建構函式 和 #bindVariables方法 */
private final int[] valuePos;

這裡以JobManagerScopeFormat為例進行分析說明,在ScopeFormats中,預設傳給JobManagerScopeFormat的建構函式的入參值是 .jobmanager 。
則JobManagerScopeFormat的構造過程如下:

/** format的預設值是 <host>.jobmanager */
public JobManagerScopeFormat(String format) {
   super(format, null, new String[] {
      SCOPE_HOST
   });
}

接著看起父類ScopeFormat的構造過程:

/** 接上面,入參值為 format="<host>.jobmanager" ,parent=null , variables={"<host>"} */
protected ScopeFormat(String format, ScopeFormat parent, String[] variables) {
   checkNotNull(format, "format is null");

   /** 將format這個字串分割, rawComponents = {"<host>", "jobmanager"} */
   final String[] rawComponents = format.split("\\" + SCOPE_SEPARATOR);

   /** 根據rawComponents的第一個元素是為"*",來判斷是否要繼承父組的範圍 */
   final boolean parentAsPrefix = rawComponents.length > 0 && rawComponents[0].equals(SCOPE_INHERIT_PARENT);
   if (parentAsPrefix) {
      /** 需要繼承父組的範圍,而父組有是null,則丟擲異常 */
      if (parent == null) {
         throw new IllegalArgumentException("Component scope format requires parent prefix (starts with '"
            + SCOPE_INHERIT_PARENT + "'), but this component has no parent (is root component).");
      }

      /** 如果以 "*." 開頭,則format至少需要有3個字元,否則就是無效字元,設定為 "<empty>" */
      this.format = format.length() > 2 ? format.substring(2) : "<empty>";

      String[] parentTemplate = parent.template;
      int parentLen = parentTemplate.length;

      /** 將父組的範圍和自身的範圍,合併到一起 */
      this.template = new String[parentLen + rawComponents.length - 1];
      System.arraycopy(parentTemplate, 0, this.template, 0, parentLen);
      System.arraycopy(rawComponents, 1, this.template, parentLen, rawComponents.length - 1);
   }
   else {
      /** 不需要繼承父組的範圍,則直接賦值,format="<host>.jobmanager",template={"<host>", "jobmanager"} */
      this.format = format.isEmpty() ? "<empty>" : format;
      this.template = rawComponents;
   }

   /** 將 variables={"<host>"} 轉換為map {"<host>" -> 0} */
   HashMap<String, Integer> varToValuePos = arrayToMap(variables);
   List<Integer> templatePos = new ArrayList<>();
   List<Integer> valuePos = new ArrayList<>();

   for (int i = 0; i < template.length; i++) {
      final String component = template[i];
      /** 檢查當前這個元件是否是一個變數 */
      if (component != null && component.length() >= 3 &&
            component.charAt(0) == '<' && component.charAt(component.length() - 1) == '>') {
         /** 這是一個變數,則從上面的map中,獲取其索引 */
         Integer replacementPos = varToValuePos.get(component);
         if (replacementPos != null) {
            templatePos.add(i);
            valuePos.add(replacementPos);
         }
      }
   }

   this.templatePos = integerListToArray(templatePos);
   this.valuePos = integerListToArray(valuePos);
}

經過這個構造過程,ScopeFormat中的四個屬性的值如下:

format = “.jobmanager”
template = {“”, “jobmanager”}
templatePos = {0}
valuePos = {0}

對於JobManagerScopeFormat來說,構建一個具體的範圍陣列的邏輯如下:

public String[] formatScope(String hostname) {
   /** 獲取template陣列的一份拷貝,深拷貝 */
   final String[] template = copyTemplate();
   final String[] values = { hostname };
   /** 使用hostname替換掉template中索引為0的元素<host> */
   return bindVariables(template, values);
}

protected final String[] copyTemplate() {
   String[] copy = new String[template.length];
   System.arraycopy(template, 0, copy, 0, template.length);
   return copy;
}

/** 在結合這個邏輯,就知道ScopeFormat中的屬性valuePos的作用了 */
protected final String[] bindVariables(String[] template, String[] values) {
   final int len = templatePos.length;
   for (int i = 0; i < len; i++) {
      template[templatePos[i]] = values[valuePos[i]];
   }
   return template;
}

4.2 MetricRegistryImpl

在獲取了MetricRegistryConfiguration例項後,在看MetricRegistryImpl的建構函式的實現邏輯。

this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));

這裡給executor這個屬性,設定了一個單執行緒可排程的執行器。
接下來主要看下對MetricReporter相關的初始化工作。

/** 變數配置中配置的reporter的配置 */
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
   String namedReporter = reporterConfiguration.f0;
   /** reporterConfig是Configuration的子類DelegatingConfiguration,會肯定定義的字首來找key */
   Configuration reporterConfig = reporterConfiguration.f1;

   /** 獲取MetricReporter的具體實現子類的全限定型別,配置的key如:metrics.reporter.foo.class */
   final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
   if (className == null) {
      LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
      continue;
   }

   try {
      /** 獲取配置的定期執行的時間間隔,key的格式如:metrics.reporter.foo.interval */
      String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
      TimeUnit timeunit = TimeUnit.SECONDS;
      long period = 10;

      if (configuredPeriod != null) {
         try {
            String[] interval = configuredPeriod.split(" ");
            period = Long.parseLong(interval[0]);
            timeunit = TimeUnit.valueOf(interval[1]);
         }
         catch (Exception e) {
            LOG.error("Cannot parse report interval from config: " + configuredPeriod +
                  " - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
                  "Using default reporting interval.");
         }
      }

      /** 例項化MetricReporter的子類 */
      Class<?> reporterClass = Class.forName(className);
      MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();

      /** 構造MetricConfig的例項,並把reporterConfig中的配置key-value都新增到metricConfig中 */
      MetricConfig metricConfig = new MetricConfig();
      reporterConfig.addAllToProperties(metricConfig);
      LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig);
      /** 這裡就是reporter進行初始化操作的地方 */
      reporterInstance.open(metricConfig);

      /** 如果reporter實現了Scheduled介面,則通過executor進行定期排程執行,執行時間間隔就是上面獲取的時間間隔 */
      if (reporterInstance instanceof Scheduled) {
         LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
         /** 將reporter封裝成一個task,並排程定期更新執行 */
         executor.scheduleWithFixedDelay(
               new MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
      } else {
         LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
      }
      /** 將reporter新增到集合中 */
      reporters.add(reporterInstance);

      /** 獲取reporter定製化的分隔符,如果沒有設定,則設定為全域性分割符 */
      String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter));
      if (delimiterForReporter.length() != 1) {
         LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
         delimiterForReporter = String.valueOf(globalDelimiter);
      }
      this.delimiters.add(delimiterForReporter.charAt(0));
   }
   catch (Throwable t) {
      LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
   }
}

其中Schedule介面,只有一個report介面。

public interface Scheduled {
   void report();
}

實現Scheduled介面的reporter,表示其需要被定期排程執行,定期執行的就是其report方法,沒有實現Scheduled介面的reporter方法,是不會被定期排程的。

1、Slf4jReporter這個MetricReporter的子類就實現了Scheduled介面,而其report方法,就是將註冊的Metric的資訊列印到日誌裡;
2、JMXReporter這個子類是沒有實現Scheduled介面的,但可以通過JMX服務來獲取註冊的Metric的資訊。

4.3、新增Metric的過程

Metric的新增邏輯的入口在AbstractMetricGroup的addMetric方法中,邏輯如下:

protected void addMetric(String name, Metric metric) {
   if (metric == null) {
      LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name);
      return;
   }
   /** 只有group仍然開啟的情況下, 才新增這個metric */
   synchronized (this) {
      if (!closed) {
         /**
          * 在沒有進行"contains"校驗下, 立即進行put操作, 來優化常見的情況(沒有碰撞)
          * 碰撞的情況後面會處理。
          */
         Metric prior = metrics.put(name, metric);

         /** 檢查與其他度量名稱的衝突 */
         if (prior == null) {
            /** 這個名字還沒有其他指標,也就是與註冊在當前group下的metric沒有名稱衝突 */

            if (groups.containsKey(name)) {
               /** 與註冊在group下的子groups的名稱由衝突,這裡給出warn日誌, 而不是fail, 因為metrics是工具, 當使用錯誤時, 不應該使得程式失敗 */
               LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" +
                     name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
            }

            /** 這裡就是橋樑起作用的地方 */
            registry.register(metric, name, this);
         }
         else {
            /** 有碰撞, 放回原來的metric */
            metrics.put(name, prior);

            LOG.warn("Name collision: Group already contains a Metric with the name '" +
                  name + "'. Metric will not be reported." + Arrays.toString(scopeComponents));
         }
      }
   }
}

上述邏輯就是把Metric註冊到當前Group中,接著看呼叫了MetricRegistry的register裡的邏輯。

public void register(Metric metric, String metricName, AbstractMetricGroup group) {
   synchronized (lock) {
      if (isShutdown()) {
         LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
      } else {
         if (reporters != null) {
            /** 通知所有的reporters,註冊了一個metric,以及對應的metricName,group */
            for (int i = 0; i < reporters.size(); i++) {
               MetricReporter reporter = reporters.get(i);
               try {
                  if (reporter != null) {
                     /**
                      * 這裡會將group,以及這個reporter在reporters這個列表中的索引,一起封裝到FrontMetricGroup這個代理類中
                      * 這裡封裝索引的目的,是可以通過 #getDelimiter 方法,獲取這個reporter配置的特製分隔符
                      */
                     FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
                     /** 然後呼叫reporter的介面方法,通知reporter */
                     reporter.notifyOfAddedMetric(metric, metricName, front);
                  }
               } catch (Exception e) {
                  LOG.warn("Error while registering metric.", e);
               }
            }
         }
         try {
            /** 如果queryService不為null,則也通知它 */
            if (queryService != null) {
               MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
            }
         } catch (Exception e) {
            LOG.warn("Error while registering metric.", e);
         }
         try {
            /** 如果metric實現了View介面,則將其新增到定期更新的viewUpdater中 */
            if (metric instanceof View) {
               if (viewUpdater == null) {
                  viewUpdater = new ViewUpdater(executor);
               }
               viewUpdater.notifyOfAddedView((View) metric);
            }
         } catch (Exception e) {
            LOG.warn("Error while registering metric.", e);
         }
      }
   }
}

從上述邏輯,可以看出MetricRegistry所起的橋樑作用了,它會再依次通知配置的各個reporter,前面已經介紹過AbstractReporter這個抽象子類實現。

4.4、View介面

View介面的定義如下:

public interface View {
   /** metrics更新的間隔 */
   int UPDATE_INTERVAL_SECONDS = 5;

   /** 被定期呼叫進行metric更新的方法 */
   void update();
}

實現了View介面的Metric,需要定期的呼叫update方法,進行狀態的更新,而這個定期更新的功能是通過ViewUpdater實現的,其建構函式中,就是在executor中添加了一個定期執行的task。

public ViewUpdater(ScheduledExecutorService executor) {
   executor.scheduleWithFixedDelay(new ViewUpdaterTask(lock, toAdd, toRemove), 5, UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

新增一個Metric時,通知viewUpdater時的邏輯如下:

public void notifyOfAddedView(View view) {
   synchronized (lock) {
      toAdd.add(view);
   }
}

就是想toAdd這個Set中新增一個新的元素,通過lock這個鎖來實現同步。
而ViewUpdaterTask的run方法中,就會呼叫註冊的Metric的update方法,同時更新幾個Set。邏輯如下:

public void run() {
   for (View toUpdate : this.views) {
      toUpdate.update();
   }

   synchronized (lock) {
      views.addAll(toAdd);
      toAdd.clear();
      views.removeAll(toRemove);
      toRemove.clear();
   }
}

4.5、MetricQueryService

在MetricRegistryImpl中有一個屬性queryService,是一個ActorRef,對應的具體實現是MetricQueryService。在MetricQueryService中也維護了註冊的各種Metric,並且也是從MetricRegistry那裡接受Metric的新增和刪除的訊息。

/** 用來接收Metric新增的訊息 */
public static void notifyOfAddedMetric(ActorRef service, Metric metric, String metricName, AbstractMetricGroup group) {
   service.tell(new AddMetric(metricName, metric, group), null);
}

/** 用於接收Metric刪除的訊息 */
public static void notifyOfRemovedMetric(ActorRef service, Metric metric) {
   service.tell(new RemoveMetric(metric), null);
}

MetricQueryService在接收到這類訊息後,會在onReceive方法中根據不同的訊息型別進行相應的處理,新增和刪除Metric就是在四類Metric對應的map屬性上進行相應的新增刪除操作。以此來實現對Metric資訊的維護。
onReceive方法中還會接收到一類訊息,叫CreateDump訊息,接收到這個訊息後,就會把當前所有的Metric資料進行序列化操作,得到一個MetricDumpSerialization.MetricSerializationResult序列化後的結果例項,併發送給請求者。
對於Metric的序列化和反序列化的實現都在MetricDumpSerialization這個類中。

1、通過MetricDumpSerializer進行序列化,序列化後的結果為MetricSerializationResult;
2、通過MetricDumpDeserializer進行反序列化,反序列化後的結果為MetricDump;