Flume NG原始碼分析(六)應用程式使用的RpcClient設計
上一篇Flume NG原始碼分析(五)使用ThriftSource通過RPC方式收集日誌 介紹了ThriftSource利用Thrfit服務ThriftSourceProtocol來收集日誌。這篇說說flume-ng-sdk中提供給應用層序使用的RpcClient的設計和實現。繼續使用ThriftRpcClient來作例子。
先看看ThriftSourceProtocol提供的原生的客戶端,它是Thrfit通過flume.thrift檔案定義的Thrfit服務預設生成。這個原生的Client提供了網路傳輸和協議編解碼等RPC客戶端的基本功能。關於Thrift客戶端可以參考這篇Thrift原始碼分析(三)-- IDL和生成程式碼分析
public static class Client extends org.apache.thrift.TServiceClient implements Iface { public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> { public Factory() {} public Client getClient(org.apache.thrift.protocol.TProtocol prot) { return new Client(prot); } public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { return new Client(iprot, oprot); } } public Client(org.apache.thrift.protocol.TProtocol prot) { super(prot, prot); } public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) { super(iprot, oprot); } public Status append(ThriftFlumeEvent event) throws org.apache.thrift.TException { send_append(event); return recv_append(); } public void send_append(ThriftFlumeEvent event) throws org.apache.thrift.TException { append_args args = new append_args(); args.setEvent(event); sendBase("append", args); } public Status recv_append() throws org.apache.thrift.TException { append_result result = new append_result(); receiveBase(result, "append"); if (result.isSetSuccess()) { return result.success; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "append failed: unknown result"); } public Status appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException { send_appendBatch(events); return recv_appendBatch(); } public void send_appendBatch(List<ThriftFlumeEvent> events) throws org.apache.thrift.TException { appendBatch_args args = new appendBatch_args(); args.setEvents(events); sendBase("appendBatch", args); } public Status recv_appendBatch() throws org.apache.thrift.TException { appendBatch_result result = new appendBatch_result(); receiveBase(result, "appendBatch"); if (result.isSetSuccess()) { return result.success; } throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "appendBatch failed: unknown result"); } }
來看看Flume NG是如何封裝Thrift客戶端的。Flume NG支援Avro,Thrfit等多種RPC實現,它的RpcClient層次結構如下
RpcClient介面定義了給應用程式使用的RPC客戶端的基本功能
public interface RpcClient { public int getBatchSize(); public void append(Event event) throws EventDeliveryException; public void appendBatch(List<Event> events) throws EventDeliveryException; public boolean isActive(); public void close() throws FlumeException; }
AbstractRpcClient抽象類實現了RPCClient介面,提供了getBatchSize的預設實現,並增加了configure介面來支援配置
public abstract class AbstractRpcClient implements RpcClient {
protected int batchSize =
RpcClientConfigurationConstants.DEFAULT_BATCH_SIZE;
protected long connectTimeout =
RpcClientConfigurationConstants.DEFAULT_CONNECT_TIMEOUT_MILLIS;
protected long requestTimeout =
RpcClientConfigurationConstants.DEFAULT_REQUEST_TIMEOUT_MILLIS;
@Override
public int getBatchSize(){
return batchSize;
}
@Override
public abstract void append(Event event) throws EventDeliveryException;
@Override
public abstract void appendBatch(List<Event> events)
throws EventDeliveryException;
@Override
public abstract boolean isActive();
@Override
public abstract void close() throws FlumeException;
protected abstract void configure(Properties properties)
throws FlumeException;
}
對於一個設計良好的服務框架的客戶端來說,有幾個基本的特性
1. 服務定址
2. 連線池管理
3. 客戶端實現RPC呼叫的負載均衡
4. 快取
5. 容災處理,失效轉移
我們來看看Flume NG是如何設計它的服務客戶端的。基本的元件如下:
服務定址
Flume NG的RPC客戶端的服務定址實現比較簡單,只是在Properties配置檔案裡設定Thrift伺服器的IP和埠,然後用這個值來建立TSocket。這裡是一個可以擴充套件點,使服務定址的能力更強,更靈活
HostInfo host = HostInfo.getHostInfoList(properties).get(0);
hostname = host.getHostName();
port = host.getPortNumber();
// ClientWrapper
public ClientWrapper() throws Exception{
// 使用hostname, port來構建TSocket
transport = new TFastFramedTransport(new TSocket(hostname, port));
transport.open();
client = new ThriftSourceProtocol.Client(new TCompactProtocol
(transport));
// Not a great hash code, but since this class is immutable and there
// is at most one instance of the components of this class,
// this works fine [If the objects are equal, hash code is the same]
hashCode = random.nextInt();
}
連線池管理
private class ClientWrapper {
public final ThriftSourceProtocol.Client client;
public final TFastFramedTransport transport;
private final int hashCode;
public ClientWrapper() throws Exception{
transport = new TFastFramedTransport(new TSocket(hostname, port));
transport.open();
client = new ThriftSourceProtocol.Client(new TCompactProtocol
(transport));
// Not a great hash code, but since this class is immutable and there
// is at most one instance of the components of this class,
// this works fine [If the objects are equal, hash code is the same]
hashCode = random.nextInt();
}
}
ConnectionPoolManager實現了一個簡單的連線池管理類,提供了checkOut和checkIn兩個方法來借出和歸還連線物件ClientWrapper。使用ReentrantLock和它的條件佇列Condition來實現管程的功能,自管理同步操作。當availableClients為空,並且已經達到連線池的最大值時,checkOut操作會阻塞。當checkIn歸還連線物件時,喚醒在checkOut上阻塞的執行緒。
private class ConnectionPoolManager {
private final Queue<ClientWrapper> availableClients;
private final Set<ClientWrapper> checkedOutClients;
private final int maxPoolSize;
private int currentPoolSize;
private final Lock poolLock;
private final Condition availableClientsCondition;
public ConnectionPoolManager(int poolSize) {
this.maxPoolSize = poolSize;
availableClients = new LinkedList<ClientWrapper>();
checkedOutClients = new HashSet<ClientWrapper>();
poolLock = new ReentrantLock();
availableClientsCondition = poolLock.newCondition();
currentPoolSize = 0;
}
public ClientWrapper checkout() throws Exception {
ClientWrapper ret = null;
poolLock.lock();
try {
if (availableClients.isEmpty() && currentPoolSize < maxPoolSize) {
ret = new ClientWrapper();
currentPoolSize++;
checkedOutClients.add(ret);
return ret;
}
while (availableClients.isEmpty()) {
availableClientsCondition.await();
}
ret = availableClients.poll();
checkedOutClients.add(ret);
} finally {
poolLock.unlock();
}
return ret;
}
public void checkIn(ClientWrapper client) {
poolLock.lock();
try {
availableClients.add(client);
checkedOutClients.remove(client);
availableClientsCondition.signal();
} finally {
poolLock.unlock();
}
}
public void destroy(ClientWrapper client) {
poolLock.lock();
try {
checkedOutClients.remove(client);
currentPoolSize--;
} finally {
poolLock.unlock();
}
client.transport.close();
}
public void closeAll() {
poolLock.lock();
try {
for (ClientWrapper c : availableClients) {
c.transport.close();
currentPoolSize--;
}
/*
* Be cruel and close even the checked out clients. The threads writing
* using these will now get an exception.
*/
for (ClientWrapper c : checkedOutClients) {
c.transport.close();
currentPoolSize--;
}
} finally {
poolLock.unlock();
}
}
}
}
客戶端負載均衡
LoadBalancingRpcClient繼承了AbstractRpcClient類,提供了RPC客戶端的負載均衡。這是一個裝飾器模式的實現。
HostSelector介面定義了負載均衡的介面,它是對HostInfo進行負載均衡,再由HostInfo找到對應的RpcClient物件。
public interface HostSelector {
void setHosts(List<HostInfo> hosts);
Iterator<HostInfo> createHostIterator();
void informFailure(HostInfo failedHost);
}
HostSelector有兩個預設的實現
RoundRobinHostSelector是輪詢方式的負載均衡實現
RandomOrderHostSelector是隨機方式的負載均衡實現
看下RoundRobinHostSelector的實現,它的邏輯主要在OrderSelector這個類中實現
private static class RoundRobinHostSelector implements HostSelector {
private OrderSelector<HostInfo> selector;
RoundRobinHostSelector(boolean backoff, long maxBackoff){
selector = new RoundRobinOrderSelector<HostInfo>(backoff);
if(maxBackoff != 0){
selector.setMaxTimeOut(maxBackoff);
}
}
@Override
public synchronized Iterator<HostInfo> createHostIterator() {
return selector.createIterator();
}
@Override
public synchronized void setHosts(List<HostInfo> hosts) {
selector.setObjects(hosts);
}
public synchronized void informFailure(HostInfo failedHost){
selector.informFailure(failedHost);
}
}
OrderSelector是一個支援回退backoff演算法的順序選擇容器,它的類層次結構如下
父類OrderSelector是抽象類,定義了回退演算法,子類RoundRobinOrderSelector和RandomOrderSelector實現了建立迭代器的演算法。
RoundRobinOrderSelector的程式碼如下
1. getIndexList()返回狀態正常的物件列表
2. nextHead索引指向當前位置,作為輪詢的起點
public class RoundRobinOrderSelector<T> extends OrderSelector<T> {
private int nextHead = 0;
public RoundRobinOrderSelector(boolean shouldBackOff) {
super(shouldBackOff);
}
@Override
public Iterator<T> createIterator() {
List<Integer> activeIndices = getIndexList();
int size = activeIndices.size();
// possible that the size has shrunk so gotta adjust nextHead for that
if (nextHead >= size) {
nextHead = 0;
}
int begin = nextHead++;
if (nextHead == activeIndices.size()) {
nextHead = 0;
}
int[] indexOrder = new int[size];
for (int i = 0; i < size; i++) {
indexOrder[i] = activeIndices.get((begin + i) % size);
}
return new SpecificOrderIterator<T>(indexOrder, getObjects());
}
}
對於LoadBalanceRpcClient來說,它的配置檔案裡,同一個RPC服務的伺服器列表至少有兩個服務端資訊才能使用負載均衡。在配置檔案中還配置了回退演算法和負載均衡演算法相關的配置
protected void configure(Properties properties) throws FlumeException {
clientMap = new HashMap<String, RpcClient>();
configurationProperties = new Properties();
configurationProperties.putAll(properties);
hosts = HostInfo.getHostInfoList(properties);
if (hosts.size() < 2) {
throw new FlumeException("At least two hosts are required to use the "
+ "load balancing RPC client.");
}
String lbTypeName = properties.getProperty(
RpcClientConfigurationConstants.CONFIG_HOST_SELECTOR,
RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN);
boolean backoff = Boolean.valueOf(properties.getProperty(
RpcClientConfigurationConstants.CONFIG_BACKOFF,
String.valueOf(false)));
String maxBackoffStr = properties.getProperty(
RpcClientConfigurationConstants.CONFIG_MAX_BACKOFF);
long maxBackoff = 0;
if(maxBackoffStr != null) {
maxBackoff = Long.parseLong(maxBackoffStr);
}
if (lbTypeName.equalsIgnoreCase(
RpcClientConfigurationConstants.HOST_SELECTOR_ROUND_ROBIN)) {
selector = new RoundRobinHostSelector(backoff, maxBackoff);
} else if (lbTypeName.equalsIgnoreCase(
RpcClientConfigurationConstants.HOST_SELECTOR_RANDOM)) {
selector = new RandomOrderHostSelector(backoff, maxBackoff);
} else {
try {
@SuppressWarnings("unchecked")
Class<? extends HostSelector> klass = (Class<? extends HostSelector>)
Class.forName(lbTypeName);
selector = klass.newInstance();
} catch (Exception ex) {
throw new FlumeException("Unable to instantiate host selector: "
+ lbTypeName, ex);
}
}
selector.setHosts(hosts);
isOpen = true;
}
客戶端負載均衡的主要元件如下
客戶端快取
客戶端快取比較簡單,使用了一個Map結構,儲存了HostInfo和對應的RPCClient物件,這樣可以複用RPCClient物件,這是一個重物件,包含了一個連線池的例項。
clientMap = new HashMap<String, RpcClient>();
private synchronized RpcClient getClient(HostInfo info)
throws FlumeException, EventDeliveryException {
throwIfClosed();
String name = info.getReferenceName();
RpcClient client = clientMap.get(name);
if (client == null) {
client = createClient(name);
clientMap.put(name, client);
} else if (!client.isActive()) {
try {
client.close();
} catch (Exception ex) {
LOGGER.warn("Failed to close client for " + info, ex);
}
client = createClient(name);
clientMap.put(name, client);
}
return client;
}
客戶端容災處理
FailoverRpcClient也維護了一個HostInfo列表,由HostInfo再找到對應的RpcClient。還維護了一個最大的重試次數maxTries
private synchronized void configureHosts(Properties properties)
throws FlumeException {
if(isActive){
logger.error("This client was already configured, " +
"cannot reconfigure.");
throw new FlumeException("This client was already configured, " +
"cannot reconfigure.");
}
hosts = HostInfo.getHostInfoList(properties);
String tries = properties.getProperty(
RpcClientConfigurationConstants.CONFIG_MAX_ATTEMPTS);
if (tries == null || tries.isEmpty()){
maxTries = hosts.size();
} else {
try {
maxTries = Integer.parseInt(tries);
} catch (NumberFormatException e) {
maxTries = hosts.size();
}
}
......
}
看一下它的append方法,實現了重試機制來做失效轉移
public void append(Event event) throws EventDeliveryException {
//Why a local variable rather than just calling getClient()?
//If we get an EventDeliveryException, we need to call close on
//that specific client, getClient in this case, will get us
//the next client - leaving a resource leak.
RpcClient localClient = null;
synchronized (this) {
if (!isActive) {
logger.error("Attempting to append to an already closed client.");
throw new EventDeliveryException(
"Attempting to append to an already closed client.");
}
}
// Sit in an infinite loop and try to append!
int tries = 0;
while (tries < maxTries) {
try {
tries++;
localClient = getClient();
localClient.append(event);
return;
} catch (EventDeliveryException e) {
// Could not send event through this client, try to pick another client.
logger.warn("Client failed. Exception follows: ", e);
localClient.close();
localClient = null;
} catch (Exception e2) {
logger.error("Failed to send event: ", e2);
throw new EventDeliveryException(
"Failed to send event. Exception follows: ", e2);
}
}
logger.error("Tried many times, could not send event."
throw new EventDeliveryException("Failed to send the event!");
}