1. 程式人生 > >openTSDB 原始碼詳解之寫入資料到 tsdb-uid 表

openTSDB 原始碼詳解之寫入資料到 tsdb-uid 表

openTSDB 原始碼詳解之寫入資料到tsdb-uid

1.方法入口messageReceived

public void messageReceived(final ChannelHandlerContext ctx,
                              final MessageEvent msgevent) {...}

該方法是RpcHandler類中的。
呼叫 private void handleHttpQuery(final TSDB tsdb, final Channel chan, final HttpRequest req) {...}


接著呼叫UniqueIdRpc的execute()方法
接著判斷前端的http請求屬於哪種型別:assign? uidmeta? tsmeta? rename?相應程式碼是

 if (endpoint.toLowerCase().equals("assign")) {
      this.handleAssign(tsdb, query);
      return;
    } else if (endpoint.toLowerCase().equals("uidmeta")) {
      this.handleUIDMeta(tsdb, query);
      return
; } else if (endpoint.toLowerCase().equals("tsmeta")) { this.handleTSMeta(tsdb, query); return; } else if (endpoint.toLowerCase().equals("rename")) { this.handleRename(tsdb, query); return; } else { throw new BadRequestException(HttpResponseStatus.NOT_IMPLEMENTED,
"Other UID endpoints have not been implemented yet"); }

這裡看一個 handleAssign() 方法,如下:

/**
   * Assigns UIDs to the given metric, tagk or tagv names if applicable
   * 如果可用的話,分配UIDs 為給定的metric,tagk,tagv names。
   *
   * <p>
   * This handler supports GET and POST whereby the GET command can
   * parse query strings with the {@code type} as their parameter and a comma
   * separated list of values to assign UIDs to.
   * 這個handler支援GET以及POST,憑藉GET命令能夠解析查詢字串使用型別作為他們的引數
   * 並且根據值的一個逗號分隔列表分配UIDs。
   *
   * <p>
   * Multiple types and names can be provided in one call. Each name will be
   * processed independently and if there's an error (such as an invalid name or
   * it is already assigned) the error will be stored in a separate error map
   * and other UIDs will be processed.
   * 多種型別已經名字能夠被提供在一次呼叫。每個name將會單獨被處理,並且如果有錯誤的話,(諸如
   * 一個無效的name或者該name已經被分配),這個Error將會被儲存在一個隔離的Error map,並且其它的UIDs
   * 將會被處理
   *
   * @param tsdb The TSDB from the RPC router
   *             來自RPC router 的TSDB物件
   * @param query The query for this request
   *              來自這個請求的查詢
   */
  private void handleAssign(final TSDB tsdb, final HttpQuery query) {
    // only accept GET And POST
    if (query.method() != HttpMethod.GET && query.method() != HttpMethod.POST) {
      throw new BadRequestException(HttpResponseStatus.METHOD_NOT_ALLOWED, 
          "Method not allowed", "The HTTP method [" + query.method().getName() +
          "] is not permitted for this endpoint");
    }


    //這個在很多地方都是一樣的
    final HashMap<String, List<String>> source;
    if (query.method() == HttpMethod.POST) {
        //為此source 賦值
        source = query.serializer().parseUidAssignV1();
    } else {
      source = new HashMap<String, List<String>>(3);
      // cut down on some repetitive code, split the query string values by
      // comma and add them to the source hash
        //減少一些重複的程式碼,通過逗號分割字串值,並且將這些值新增到source hash中
      String[] types = {"metric", "tagk", "tagv"};//type 包含的型別有metric ,tagk ,tagv
        CustomedMethod.printSuffix(types.length+"");

        //types.length 表示的String陣列types的實際容量
      for (int i = 0; i < types.length; i++) {
          //types[0] = metric , types[1] = tagk, types[2] = tagv
        final String values = query.getQueryStringParam(types[i]);
        if (values != null && !values.isEmpty()) {
            //這裡使用metric命名欠妥,因為得到的陣列值不一定僅僅是metric,也有可能是tagk,tagv等
            final String[] metrics = values.split(",");
          if (metrics != null && metrics.length > 0) {
            source.put(types[i], Arrays.asList(metrics));
          }
        }
      }
    }
    
    if (source.size() < 1) {
      throw new BadRequestException("Missing values to assign UIDs");
    }

    //這裡又來一個map?
    final Map<String, TreeMap<String, String>> response = 
      new HashMap<String, TreeMap<String, String>>();


    int error_count = 0;
    //HashMap<String, List<String>> source
      //對這個操作有點兒不解
    for (Map.Entry<String, List<String>> entry : source.entrySet()) {
      final TreeMap<String, String> results = 
        new TreeMap<String, String>();
      final TreeMap<String, String> errors = 
        new TreeMap<String, String>();
      
      for (String name : entry.getValue()) {
        try {
            //assignUid -> Attempts to assign a UID to a name for the given type
          final byte[] uid = tsdb.assignUid(entry.getKey(), name);
          results.put(name, 
              UniqueId.uidToString(uid));
        } catch (IllegalArgumentException e) {
          errors.put(name, e.getMessage());
          error_count++;
        }
      }

      //得到的結果應該類似如下的樣子:
        // metric TreeMap<String,String>
        // tagk TreeMap<String,String>
        // tagv TreeMap<String,String>
      response.put(entry.getKey(),results);
      if (errors.size() > 0) {
        response.put(entry.getKey() + "_errors", errors);
      }
    }
    
    if (error_count < 1) {
      query.sendReply(query.serializer().formatUidAssignV1(response));
    } else {
      query.sendReply(HttpResponseStatus.BAD_REQUEST,
          query.serializer().formatUidAssignV1(response));
    }
  }

上面這個方法中呼叫的關鍵方法是:

/**
   * Attempts to assign a UID to a name for the given type
   * 嘗試為給定的型別名分配一個UID
   *
   * Used by the UniqueIdRpc call to generate IDs for new metrics, tagks or 
   * tagvs. The name must pass validation and if it's already assigned a UID,
   * this method will throw an error with the proper UID. Otherwise if it can
   * create the UID, it will be returned
   * 被UniqueIdRpc呼叫,為新的metrics,tagks,tagvs生成一個新的IDs。name必須通過驗證,
   * 如果已經被分配一個UID,這個方法將會丟擲一個異常同時和匹配的UID。否則如果它能建立UID,
   * 它將被返回
   *
   * @param type The type of uid to assign, metric, tagk or tagv
   *             需要分配uid的型別:metric,tagk,tagv
   * @param name The name of the uid object
   *             uid 物件的名字
   * @return A byte array with the UID if the assignment was successful
   * 如果分配是成功的,則會返回UID的位元組陣列
   * @throws IllegalArgumentException if the name is invalid or it already 
   * exists
   * @since 2.0
   */
  public byte[] assignUid(final String type, final String name) {
    //先檢測字串是否符合標準
      Tags.validateString(type, name);

      //接著判斷type的型別哪種?
      if (type.toLowerCase().equals("metric")) {
      try {

          final byte[] uid = this.metrics.getId(name);

          //為什麼這裡直接丟擲一個異常,而不是先判斷一下位元組陣列uid?
          //意思應該是:如果getId(name)沒有丟擲異常,則表明是已經分配了uid的name
          //否則會丟擲一個異常,這個異常的型別就是NoSuchUniqueName
          //可以檢視getId(name)方法丟擲的異常的確就是NoSuchUniqueName
        throw new IllegalArgumentException("Name already exists with UID: " +
            UniqueId.uidToString(uid));
      } catch (NoSuchUniqueName nsue) {
        //如果沒有這個name對應的id,那麼直接建立一個
          return this.metrics.getOrCreateId(name);
      }
    } else if (type.toLowerCase().equals("tagk")) {
      try {
        final byte[] uid = this.tag_names.getId(name);
        throw new IllegalArgumentException("Name already exists with UID: " +
            UniqueId.uidToString(uid));
      } catch (NoSuchUniqueName nsue) {
        return this.tag_names.getOrCreateId(name);
      }
    } else if (type.toLowerCase().equals("tagv")) {
      try {
        final byte[] uid = this.tag_values.getId(name);
        throw new IllegalArgumentException("Name already exists with UID: " +
            UniqueId.uidToString(uid));
      } catch (NoSuchUniqueName nsue) {
        return this.tag_values.getOrCreateId(name);
      }
    } else {
      LOG.warn("Unknown type name: " + type);
      throw new IllegalArgumentException("Unknown type name");
    }
  }

其中最為關鍵的方法是 getOrCreateId(),如下

/**
   * Finds the ID associated with a given name or creates it.
   * 尋找或者是建立與給出的名字相應的ID
   *
   * <p>
   * <strong>This method is blocking.</strong>  Its use within OpenTSDB itself
   * is discouraged, please use {@link #getOrCreateIdAsync} instead.
   * <p>
   * 這個方法是blocking(阻塞的)。它的使用在openTSDB中是不被鼓勵的,相反請使用getOrCreateIdAsync
   *
   * The length of the byte array is fixed in advance by the implementation.
   * 陣列的長度是固定的,由實現提前確定
   *
   * @param name The name to lookup in the table or to assign an ID to.
   *  在表中尋找的,亦或是即將分配ID的name
   * @throws HBaseException if there is a problem communicating with HBase.
   *  如果和HBase 通訊有問題,則會丟擲HBaseException
   * @throws IllegalStateException if all possible IDs are already assigned.
   *  如果所有可能的IDs均勻被分配,則丟擲IllegalStateException
   * @throws IllegalStateException if the ID found in HBase is encoded on the
   *  wrong number of bytes.
   *  如果在HBase中發現的ID是用錯誤的位元組數編碼
   */
  public byte[] getOrCreateId(final String name) throws HBaseException {
    try {
        //非同步呼叫,這裡是先尋找name所對應的id是否存在,如果不存在的話,就需要分配一個id
      return getIdAsync(name).joinUninterruptibly();
    } catch (NoSuchUniqueName e) {


        //預設情況下,這個uidFilter是關閉的。所以下面這個操作是不會被執行的
      if (tsdb != null && tsdb.getUidFilter() != null && tsdb.getUidFilter().fillterUIDAssignments()) {
        try {
          if (!tsdb.getUidFilter()
                  .allowUIDAssignment(type, name, null, null)
                  .join()) {
            rejected_assignments++;
            throw new FailedToAssignUniqueIdException(new String(kind), name, 0, 
                "Blocked by UID filter.");
          }
        } catch (FailedToAssignUniqueIdException e1) {
          throw e1;
        } catch (InterruptedException e1) {
          LOG.error("Interrupted", e1);
          Thread.currentThread().interrupt();
        } catch (Exception e1) {
          throw new RuntimeException("Should never be here", e1);
        }
      }
      
      Deferred<byte[]> assignment = null; //是一個Deferred物件
      boolean pending = false;

      //進行同步操作,對pending_assignments這個物件
      synchronized (pending_assignments) {
        assignment = pending_assignments.get(name);
        if (assignment == null) {
          // to prevent UID leaks that can be caused when multiple time
          // series for the same metric or tags arrive, we need to write a 
          // deferred to the pending map as quickly as possible. Then we can 
          // start the assignment process after we've stashed the deferred 
          // and released the lock
            /*在有著相同的metric和tags的多個時間序列到達時,為了阻止UID洩漏,我們需要建立一個deferred物件去儘可能快地
            掛起地圖。在我們已經儲存deferred物件以及釋放鎖之後,我們就能夠開始程序去分配。*/
          assignment = new Deferred<byte[]>();
          pending_assignments.put(name, assignment);
        } else {
          pending = true;
        }
      }

      if (pending) {//接下來就等待分配UID了
        LOG.info("Already waiting for UID assignment: " + name);
        try {
          return assignment.joinUninterruptibly();
        } catch (Exception e1) {
          throw new RuntimeException("Should never be here", e1);
        }
      }
      
      // start the assignment dance after stashing the deferred =>//stash:存放
        //在存放了deferred物件自後,開始分配的工作
      byte[] uid = null;
      try {
        uid = new UniqueIdAllocator(name, assignment).tryAllocate().joinUninterruptibly();
      } catch (RuntimeException e1) {
        throw e1;
      } catch (Exception e1) {
        throw new RuntimeException("Should never be here", e);
      } finally {
        synchronized (pending_assignments) {
          if (pending_assignments.remove(name) != null) {
            LOG.info("Completed pending assignment for: " + name);
          }
        }
      }
      return uid;
    } catch (Exception e) {
      throw new RuntimeException("Should never be here", e);
    }
  }

而這個方法其中呼叫 uid = new UniqueIdAllocator(name, assignment).tryAllocate().joinUninterruptibly();

這個UniqueIdAllocator類如下:
private final class UniqueIdAllocator implements Callback<Object, Object>{...}
UniqueIdAllocatorUniqueId 的一個內部類。同時該類實現了Callback 介面。作為一個回撥函式

/**
   * Implements the process to allocate a new UID.
   * 實現分配一個新的UID的過程
   *
   * This callback is re-used multiple times in a four step process:
   *   1. Allocate a new UID via atomic increment.
   *   2. Create the reverse mapping (ID to name).
   *   3. Create the forward mapping (name to ID).
   *   4. Return the new UID to the caller.
   * 這個過程會被多次重複使用在如下四個步驟中:
   *  01.通過原子增加的方式,分配一個新的UID
   *  02.建立一個逆對映(ID -> name)
   *  03.建立一個順對映(name -> ID)
   *  04.給呼叫者返回一個新的UID
   *
   */
  private final class UniqueIdAllocator implements Callback<Object, Object> {
    private final String name;  // What we're trying to allocate an ID for. -> 我們需要為哪個name分配一個ID
    private final Deferred<byte[]> assignment; // deferred to call back -> 需要回調的deferred
    private short attempt = randomize_id ?     // Give up when zero.
        MAX_ATTEMPTS_ASSIGN_RANDOM_ID : MAX_ATTEMPTS_ASSIGN_ID;

    private HBaseException hbe = null;  // Last exception caught.
    // TODO(manolama) - right now if we retry the assignment it will create a 
    // callback chain MAX_ATTEMPTS_* long and call the ErrBack that many times.
    // This can be cleaned up a fair amount but it may require changing the 
    // public behavior a bit. For now, the flag will prevent multiple attempts
    // to execute the callback.
    private boolean called = false; // whether we called the deferred or not  -> 我們是否呼叫了deferred?

    private long id = -1;  // The ID we'll grab with an atomic increment.
    private byte row[];    // The same ID, as a byte array.

    private static final byte ALLOCATE_UID = 0;
    private static final byte CREATE_REVERSE_MAPPING = 1;
    private static final byte CREATE_FORWARD_MAPPING = 2;
    private static final byte DONE = 3;

    //state表示的是當前這個執行緒需要執行的操作是什麼。在這裡就是分配UID
    private byte state = ALLOCATE_UID;  // Current state of the process.


    UniqueIdAllocator(final String name, final Deferred<byte[]> assignment) {
      this.name = name;
      this.assignment = assignment;
    }

    Deferred<byte[]> tryAllocate() {
      attempt--;
      state = ALLOCATE_UID;
      call(null);
      return assignment;
    }

    @SuppressWarnings("unchecked")
    public Object call(final Object arg) {
      if (attempt == 0) {
        if (hbe == null && !randomize_id) {
          throw new IllegalStateException("Should never happen!");
        }
        LOG.error("Failed to assign an ID for kind='" + kind()
                  + "' name='" + name + "'", hbe);
        if (hbe == null) {
          throw new FailedToAssignUniqueIdException(kind(), name, 
              MAX_ATTEMPTS_ASSIGN_RANDOM_ID);
        }
        throw hbe;
      }

      if (arg instanceof Exception) {
        final String msg = (