1. 程式人生 > >Flume NG原始碼分析(六)應用程式使用的RpcClient設計

Flume NG原始碼分析(六)應用程式使用的RpcClient設計

上一篇Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌 介紹了ThriftSource利用Thrfit服務ThriftSourceProtocol來收集日誌。這篇說說flume-ng-sdk中提供給應用層序使用的RpcClient的設計和實現。繼續使用ThriftRpcClient來作例子。


先看看ThriftSourceProtocol提供的原生的客戶端,它是Thrfit通過flume.thrift檔案定義的Thrfit服務預設生成。這個原生的Client提供了網路傳輸和協議編解碼等RPC客戶端的基本功能。關於Thrift客戶端可以參考這篇Thrift原始碼分析(三)-- IDL和生成程式碼分析

public static class Client extends org.apache.thrift.TServiceClient implements Iface {
    public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
      public Factory() {}
      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
        return new Client(prot);
      }
      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
        return new Client(iprot, oprot);
      }
    }

    public Client(org.apache.thrift.protocol.TProtocol prot)
    {
      super(prot, prot);
    }

    public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
      super(iprot, oprot);
    }

    public Status append(ThriftFlumeEvent event) throws org.apache.thrift.TException
    {
      send_append(event);
      return recv_append();
    }

    public void send_append(ThriftFlumeEvent event) throws org.apache.thrift.TException
    {
      append_args args = new append_args();
      args.setEvent(event);
      sendBase("append", args);
    }

    public Status recv_append() throws org.apache.thrift.TException
    {
      append_result result = new append_result();
      receiveBase(result, "append");
      if (result.isSetSuccess()) {
        return result.success;
      }
      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "append failed: unknown result");
    }

    public Status appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException
    {
      send_appendBatch(events);
      return recv_appendBatch();
    }

    public void send_appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException
    {
      appendBatch_args args = new appendBatch_args();
      args.setEvents(events);
      sendBase("appendBatch", args);
    }

    public Status recv_appendBatch() throws org.apache.thrift.TException
    {
      appendBatch_result result = new appendBatch_result();
      receiveBase(result, "appendBatch");
      if (result.isSetSuccess()) {
        return result.success;
      }
      throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "appendBatch failed: unknown result");
    }

  }

來看看Flume NG是如何封裝Thrift客戶端的。Flume NG支援Avro,Thrfit等多種RPC實現,它的RpcClient層次結構如下



RpcClient介面定義了給應用程式使用的RPC客戶端的基本功能

public interface RpcClient {


  public int getBatchSize();

  
  public void append(Event event) throws EventDeliveryException;

  
  public void appendBatch(List<Event> events) throws
      EventDeliveryException;

  public boolean isActive();

  
  public void close() throws FlumeException;

}

AbstractRpcClient抽象類實現了RPCClient介面,提供了getBatchSize的預設實現,並增加了configure介面來支援配置

public abstract class AbstractRpcClient implements RpcClient {

  protected int batchSize =
      RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
  protected long connectTimeout =
      RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
  protected long requestTimeout =
      RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;

  @Override
  public int getBatchSize(){
    return batchSize;
  }
  @Override
  public abstract void append(Event event) throws EventDeliveryException;

  @Override
  public abstract void appendBatch(List<Event> events)
      throws EventDeliveryException;

  @Override
  public abstract boolean isActive();

  @Override
  public abstract void close() throws FlumeException;

  protected abstract void configure(Properties properties)
      throws FlumeException;

}

對於一個設計良好的服務框架的客戶端來說,有幾個基本的特性

1. 服務定址

2. 連線池管理

3. 客戶端實現RPC呼叫的負載均衡

4. 快取

5. 容災處理,失效轉移


我們來看看Flume NG是如何設計它的服務客戶端的。基本的元件如下:



服務定址

Flume NG的RPC客戶端的服務定址實現比較簡單,只是在Properties配置檔案裡設定Thrift伺服器的IP和埠,然後用這個值來建立TSocket。這裡是一個可以擴充套件點,使服務定址的能力更強,更靈活

      HostInfo host = HostInfo.getHostInfoList(properties).get(0);
      hostname = host.getHostName();
      port = host.getPortNumber();

// ClientWrapper
      public ClientWrapper() throws Exception{
      // 使用hostname, port來構建TSocket
      transport = new TFastFramedTransport(new TSocket(hostname, port));
      transport.open();
      client = new ThriftSourceProtocol.Client(new TCompactProtocol
        (transport));
      // Not a great hash code, but since this class is immutable and there
      // is at most one instance of the components of this class,
      // this works fine [If the objects are equal, hash code is the same]
      hashCode = random.nextInt();
    }


連線池管理

首先是使用ClientWrapper類來封裝Thrift生成的原生的Client,可以通過Properties配置來設定Client的值,設定socket連線和protocol編解碼協議

private class ClientWrapper {
    public final ThriftSourceProtocol.Client client;
    public final TFastFramedTransport transport;
    private final int hashCode;

    public ClientWrapper() throws Exception{
      transport = new TFastFramedTransport(new TSocket(hostname, port));
      transport.open();
      client = new ThriftSourceProtocol.Client(new TCompactProtocol
        (transport));
      // Not a great hash code, but since this class is immutable and there
      // is at most one instance of the components of this class,
      // this works fine [If the objects are equal, hash code is the same]
      hashCode = random.nextInt();
    }
} 

ConnectionPoolManager實現了一個簡單的連線池管理類,提供了checkOut和checkIn兩個方法來借出和歸還連線物件ClientWrapper。使用ReentrantLock和它的條件佇列Condition來實現管程的功能,自管理同步操作。當availableClients為空,並且已經達到連線池的最大值時,checkOut操作會阻塞。當checkIn歸還連線物件時,喚醒在checkOut上阻塞的執行緒。

 private class ConnectionPoolManager {
    private final Queue<ClientWrapper> availableClients;
    private final Set<ClientWrapper> checkedOutClients;
    private final int maxPoolSize;
    private int currentPoolSize;
    private final Lock poolLock;
    private final Condition availableClientsCondition;

    public ConnectionPoolManager(int poolSize) {
      this.maxPoolSize = poolSize;
      availableClients = new LinkedList<ClientWrapper>();
      checkedOutClients = new HashSet<ClientWrapper>();
      poolLock = new ReentrantLock();
      availableClientsCondition = poolLock.newCondition();
      currentPoolSize = 0;
    }

    public ClientWrapper checkout() throws Exception {

      ClientWrapper ret = null;
      poolLock.lock();
      try {
        if (availableClients.isEmpty() && currentPoolSize < maxPoolSize) {
          ret = new ClientWrapper();
          currentPoolSize++;
          checkedOutClients.add(ret);
          return ret;
        }
        while (availableClients.isEmpty()) {
          availableClientsCondition.await();
        }
        ret = availableClients.poll();
        checkedOutClients.add(ret);
      } finally {
        poolLock.unlock();
      }
      return ret;
    }

    public void checkIn(ClientWrapper client) {
      poolLock.lock();
      try {
        availableClients.add(client);
        checkedOutClients.remove(client);
        availableClientsCondition.signal();
      } finally {
        poolLock.unlock();
      }
    }

    public void destroy(ClientWrapper client) {
      poolLock.lock();
      try {
        checkedOutClients.remove(client);
        currentPoolSize--;
      } finally {
        poolLock.unlock();
      }
      client.transport.close();
    }

    public void closeAll() {
      poolLock.lock();
      try {
        for (ClientWrapper c : availableClients) {
          c.transport.close();
          currentPoolSize--;
        }
      /*
       * Be cruel and close even the checked out clients. The threads writing
       * using these will now get an exception.
       */
        for (ClientWrapper c : checkedOutClients) {
          c.transport.close();
          currentPoolSize--;
        }
      } finally {
        poolLock.unlock();
      }
    }
  }
}

客戶端負載均衡

LoadBalancingRpcClient繼承了AbstractRpcClient類,提供了RPC客戶端的負載均衡。這是一個裝飾器模式的實現。

HostSelector介面定義了負載均衡的介面,它是對HostInfo進行負載均衡,再由HostInfo找到對應的RpcClient物件。

public interface HostSelector {

    void setHosts(List<HostInfo> hosts);

    Iterator<HostInfo> createHostIterator();

    void informFailure(HostInfo failedHost);
  }

HostSelector有兩個預設的實現

RoundRobinHostSelector是輪詢方式的負載均衡實現

RandomOrderHostSelector是隨機方式的負載均衡實現


看下RoundRobinHostSelector的實現,它的邏輯主要在OrderSelector這個類中實現

 private static class RoundRobinHostSelector implements HostSelector {

    private OrderSelector<HostInfo> selector;

    RoundRobinHostSelector(boolean backoff, long maxBackoff){
      selector = new RoundRobinOrderSelector<HostInfo>(backoff);
      if(maxBackoff != 0){
        selector.setMaxTimeOut(maxBackoff);
      }
    }
    @Override
    public synchronized Iterator<HostInfo> createHostIterator() {
      return selector.createIterator();
    }

    @Override
    public synchronized void setHosts(List<HostInfo> hosts) {
      selector.setObjects(hosts);
    }

    public synchronized void informFailure(HostInfo failedHost){
      selector.informFailure(failedHost);
    }
  }

OrderSelector是一個支援回退backoff演算法的順序選擇容器,它的類層次結構如下


父類OrderSelector是抽象類,定義了回退演算法,子類RoundRobinOrderSelector和RandomOrderSelector實現了建立迭代器的演算法。

RoundRobinOrderSelector的程式碼如下

1. getIndexList()返回狀態正常的物件列表

2. nextHead索引指向當前位置,作為輪詢的起點

public class RoundRobinOrderSelector<T> extends OrderSelector<T> {

  private int nextHead = 0;

  public RoundRobinOrderSelector(boolean shouldBackOff) {
    super(shouldBackOff);
  }

  @Override
  public Iterator<T> createIterator() {
    List<Integer> activeIndices = getIndexList();
    int size = activeIndices.size();
    // possible that the size has shrunk so gotta adjust nextHead for that
    if (nextHead >= size) {
      nextHead = 0;
    }
    int begin = nextHead++;
    if (nextHead == activeIndices.size()) {
      nextHead = 0;
    }

    int[] indexOrder = new int[size];

    for (int i = 0; i < size; i++) {
      indexOrder[i] = activeIndices.get((begin + i) % size);
    }

    return new SpecificOrderIterator<T>(indexOrder, getObjects());
  }
}

對於LoadBalanceRpcClient來說,它的配置檔案裡,同一個RPC服務的伺服器列表至少有兩個服務端資訊才能使用負載均衡。在配置檔案中還配置了回退演算法和負載均衡演算法相關的配置

 protected void configure(Properties properties) throws FlumeException {
    clientMap = new HashMap<String, RpcClient>();
    configurationProperties = new Properties();
    configurationProperties.putAll(properties);
    hosts = HostInfo.getHostInfoList(properties);
    if (hosts.size() < 2) {
      throw new FlumeException("At least two hosts are required to use the "
          + "load balancing RPC client.");
    }
     String lbTypeName = properties.getProperty(
        RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR,
        RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN);

    boolean backoff = Boolean.valueOf(properties.getProperty(
            RpcClientConfigurationConstants.CONFIG_BACKOFF,
            String.valueOf(false)));

    String maxBackoffStr = properties.getProperty(
        RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF);

    long maxBackoff = 0;
    if(maxBackoffStr != null) {
      maxBackoff     = Long.parseLong(maxBackoffStr);
    }

    if (lbTypeName.equalsIgnoreCase(
        RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN)) {
      selector = new RoundRobinHostSelector(backoff, maxBackoff);
    } else if (lbTypeName.equalsIgnoreCase(
        RpcClientConfigurationConstants.HOST_SELECTOR_RANDOM)) {
      selector = new RandomOrderHostSelector(backoff, maxBackoff);
    } else {
      try {
        @SuppressWarnings("unchecked")
        Class<? extends HostSelector> klass = (Class<? extends HostSelector>)
            Class.forName(lbTypeName);

        selector = klass.newInstance();
      } catch (Exception ex) {
        throw new FlumeException("Unable to instantiate host selector: "
            + lbTypeName, ex);
      }
    }

    selector.setHosts(hosts);
    isOpen = true;
}


客戶端負載均衡的主要元件如下



客戶端快取

客戶端快取比較簡單,使用了一個Map結構,儲存了HostInfo和對應的RPCClient物件,這樣可以複用RPCClient物件,這是一個重物件,包含了一個連線池的例項。

clientMap = new HashMap<String, RpcClient>();


private synchronized RpcClient getClient(HostInfo info)
      throws FlumeException, EventDeliveryException {
    throwIfClosed();
    String name = info.getReferenceName();
    RpcClient client = clientMap.get(name);
    if (client == null) {
      client = createClient(name);
      clientMap.put(name, client);
    } else if (!client.isActive()) {
      try {
        client.close();
      } catch (Exception ex) {
        LOGGER.warn("Failed to close client for " + info, ex);
      }
      client = createClient(name);
      clientMap.put(name, client);
    }

    return client;
  }

客戶端容災處理

FailoverRpcClient類實現了客戶端的容災處理,它也是裝飾器模式的實現,基礎了AbstractRpcClient,實現了RpcClient介面FailoverRpcClient主要是實現了失效轉移,利用重試機制,當一個RpcClient失效,就使用下一個RpcClient重試RPC請求,直到成功,或者全部失敗

FailoverRpcClient也維護了一個HostInfo列表,由HostInfo再找到對應的RpcClient。還維護了一個最大的重試次數maxTries

private synchronized void configureHosts(Properties properties)
      throws FlumeException {
    if(isActive){
      logger.error("This client was already configured, " +
          "cannot reconfigure.");
      throw new FlumeException("This client was already configured, " +
          "cannot reconfigure.");
    }
    hosts = HostInfo.getHostInfoList(properties);
    String tries = properties.getProperty(
        RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS);
    if (tries == null || tries.isEmpty()){
      maxTries = hosts.size();
    } else {
      try {
        maxTries = Integer.parseInt(tries);
      } catch (NumberFormatException e) {
        maxTries = hosts.size();
      }
    }
......
}

看一下它的append方法,實現了重試機制來做失效轉移

public void append(Event event) throws EventDeliveryException {
    //Why a local variable rather than just calling getClient()?
    //If we get an EventDeliveryException, we need to call close on
    //that specific client, getClient in this case, will get us
    //the next client - leaving a resource leak.
    RpcClient localClient = null;
    synchronized (this) {
      if (!isActive) {
        logger.error("Attempting to append to an already closed client.");
        throw new EventDeliveryException(
            "Attempting to append to an already closed client.");
      }
    }
    // Sit in an infinite loop and try to append!
    int tries = 0;
    while (tries < maxTries) {
      try {
        tries++;
        localClient = getClient();
        localClient.append(event);
        return;
      } catch (EventDeliveryException e) {
        // Could not send event through this client, try to pick another client.
        logger.warn("Client failed. Exception follows: ", e);
        localClient.close();
        localClient = null;
      } catch (Exception e2) {
        logger.error("Failed to send event: ", e2);
        throw new EventDeliveryException(
            "Failed to send event. Exception follows: ", e2);
      }
    }
    logger.error("Tried many times, could not send event."
    throw new EventDeliveryException("Failed to send the event!");
  }