1. 程式人生 > >Flume NG 學習筆記(九)Flune Client 開發

Flume NG 學習筆記(九)Flune Client 開發

由於在實際工作中,資料的生產方式極具多樣性,Flume 雖然包含了一些內建的機制來採集資料,但是更多的時候使用者更希望能將應用程式和flume直接相通。所以這邊執行使用者開發應用程式,通過IPC或者RPC連線flume並往flume傳送資料。

一、RPC client interface

Flume的RpcClient實現了Flume的RPC機制。使用者的應用程式可以很簡單的呼叫Flume Client SDK的append(Event) 或者appendBatch(List<Event>) 方法傳送資料,不用擔心底層資訊交換的細節。使用者可以提供所需的event通過直接實現Event介面,例如可以使用簡單的方便的實現SimpleEvent類或者使用EventBuilder的writeBody()靜態輔助方法。

自Flume 1.4.0起,Avro是預設的RPC協議。NettyAvroRpcClient和ThriftRpcClient實現了RpcClient介面。實現中我們需要知道我們將要連線的目標flume agent的host和port用於建立client例項,然後使用RpcClient傳送資料到flume agent。

官網給了一個Avro RPCclients的例子,這邊直接拿來做實際測試例子。

這裡我們把client.init("host.example.org",41414);

改成 client.init("192.168.233.128",50000);  與我們的主機對接

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
 
public class MyApp {
  public static voidmain(String[] args) {
   MyRpcClientFacade client = new MyRpcClientFacade();
   // Initializeclient with the remote Flume agent's host and port
//client.init("host.example.org",41414);
client.init("192.168.233.128",50000);
 
   // Send 10events to the remote Flume agent. That agent should be
   // configured tolisten with an AvroSource.
   String sampleData = "Hello Flume!";
   for (int i =0; i < 10; i++) {
     client.sendDataToFlume(sampleData);
   }
 
   client.cleanUp();
  }
}
 
class MyRpcClientFacade {
  private RpcClient client;
  private String hostname;
  private int port;
 
  public void init(String hostname, int port) {
   // Setup the RPCconnection
   this.hostname = hostname;
   this.port = port;
   this.client = RpcClientFactory.getDefaultInstance(hostname, port);
   // Use thefollowing method to create a thrift client (instead of the above line):
    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
  }
 
  public void sendDataToFlume(String data) {
   // Create aFlume Event object that encapsulates the sample data
   Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));
 
   // Send theevent
   try {
     client.append(event);
   } catch (EventDeliveryException e) {
     // clean up andrecreate the client
     client.close();
     client = null;
     client = RpcClientFactory.getDefaultInstance(hostname, port);
     // Use thefollowing method to create a thrift client (instead of the above line):
     // this.client =RpcClientFactory.getThriftInstance(hostname, port);
   }
  }
 
  public void cleanUp() {
   // Close the RPCconnection
   client.close();
  }
 
}
這邊程式碼不解釋了,主要是將HelloFlume 傳送10遍給flume,同時記得將flume 安裝主目錄下的lib 檔案都新增進專案,才能正常執行程式。

下面是代理配置:

#配置檔案:avro_client_case20.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
 
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.port = 50000
a1.sources.r1.host = 192.168.233.128
a1.sources.r1.channels = c1
 
# Describe the sink
a1.sinks.k1.channel = c1
a1.sinks.k1.type = logger
 
# Use a channel which buffers events inmemory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


這裡要注意下,之前說了,在接收端需要AvroSource或者Thrift Source來監聽介面。所以配置代理的時候要把a1.sources.r1.type 寫成avro或者thrift

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

啟動成功後

在eclipse 裡執行JAVA程式,當然也可以打包後在伺服器上執行JAVA程式。

#在啟動源傳送的代理終端檢視console輸出


可以看到10條資料正常傳送。

這裡要說明下,開發程式碼中client.append(event)不僅僅可以傳送一條資料,也可以傳送一個List(string) 的資料資訊,也就是批量傳送。這邊就不做演示了。

二、Failover Client

這個類包封裝了Avro RPCclient的類預設提供故障處理能力。hosts採用空格分開host:port所代表的flume agent,構成一個故障處理組。這Failover RPC Client目前不支援thrift。如果當前選擇的host agent有問題,這個failover client會自動負載到組中下一個host中。

下面是官網開發例子:

// Setup properties for the failover
Properties props = new Properties();
props.put("client.type", "default_failover");

// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);

// create the client with failover properties
RpcClient client = RpcClientFactory.getInstance(props);

下面是測試的開發例子

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;

import java.nio.charset.Charset;
import java.util.Properties;

public class Failover_Client {
	public static void main(String[] args) {
	    MyRpcClientFacade2 client = new MyRpcClientFacade2();
	    // Initialize client with the remote Flume agent's host and port
	    client.init();

	    // Send 10 events to the remote Flume agent. That agent should be
	    // configured to listen with an AvroSource.
	    String sampleData = "Hello Flume!";
	    for (int i = 0; i < 10; i++) {
	      client.sendDataToFlume(sampleData);
	    }

	    client.cleanUp();
	  }
	}

	class MyRpcClientFacade2 {
	  private RpcClient client;
	  private String hostname;
	  private int port;

	  public void init() {
	    // Setup the RPC connection
	    // Use the following method to create a thrift client (instead of the above line):
	    // this.client = RpcClientFactory.getThriftInstance(hostname, port);
	 // Setup properties for the failover
	    Properties props = new Properties();
	    props.put("client.type", "default_failover");

	    // List of hosts (space-separated list of user-chosen host aliases)
	    props.put("hosts", "h1 h2 h3");

	    // host/port pair for each host alias
	    String host1 = "192.168.233.128:50000";
	    String host2 = "192.168.233.128:50001";
	    String host3 = "192.168.233.128:50002";
	    props.put("hosts.h1", host1);
	    props.put("hosts.h2", host2);
	    props.put("hosts.h3", host3);

	    // create the client with failover properties
	    client = RpcClientFactory.getInstance(props);
	  }

	  public void sendDataToFlume(String data) {
	    // Create a Flume Event object that encapsulates the sample data
	    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

	    // Send the event
	    try {
	      client.append(event);
	    } catch (EventDeliveryException e) {
	      // clean up and recreate the client
	      client.close();
	      client = null;
	      client = RpcClientFactory.getDefaultInstance(hostname, port);
	      // Use the following method to create a thrift client (instead of the above line):
	      // this.client = RpcClientFactory.getThriftInstance(hostname, port);
	    }
	  }

	  public void cleanUp() {
	    // Close the RPC connection
	    client.close();
	  }
}

這邊程式碼設三個host用於故障轉移,這裡偷懶,用同一個主機的3個埠模擬。程式碼還是將Hello Flume 傳送10遍給第一個flume代理,當第一個代理故障的時候,則傳送給第二個代理,以順序進行故障轉移。

下面是代理配置沿用之前的那個,並對配置檔案進行拷貝,

cp avro_client_case20.conf avro_client_case21.conf

cp avro_client_case20.conf avro_client_case22.conf

分別修改avro_client_case21.conf與avro_client_case22.conf中的

a1.sources.r1.port= 50001 與a1.sources.r1.port = 50002

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

啟動成功後

在eclipse 裡執行JAVA程式Failover_Client.java,當然也可以打包後在伺服器上執行JAVA程式。

#在啟動源傳送的3個代理終端檢視console輸出

我們可以看到第一個代理終端收到了,資料而其他2個終端沒有資料。



然後我們把第一個終端的程序關掉,再執行一遍client程式,然後會發現這個時候是發生到第二個終端中。當第二個終端也關閉的時候,再發送資料,則是傳送到最後一個終端。這裡我們可以看到,故障轉移的代理主機轉移是採用順序序列的。

三、LoadBalancing RPC client

Flume Client SDK也支援在多個host之間使用負載均衡的Rpc Client。這種型別的client帶有一個通過空格分隔的host:port主機列表並構成了一個負載均衡組。這個client可以指定一個負載均衡的策略,既可以隨機的選擇一個配置的host,也可以迴圈選擇一個host。當然你也可以自己編寫一個類實現LoadBalancingRpcClient$HostSelector介面以至於使用者可以使用自己編寫的選擇順序。在這種情況下,使用者自定義的類需要被指定為host-selector屬性的值。LoadBalancing RPC Client當前不支援thrift。

如果開啟了backoff,那麼client失敗將被放入黑名單中,只有過了被指定的超時之間之後這個被選擇的失敗的主機才會從黑名單中被排除。當超時到了,如果主機還是沒有反應,那麼這被認為是一個連續的失敗並且超時時間會成倍的增長,以避免可能陷入對反應遲鈍主機的長時間等待中。

這backoff的最大超時時間可以通過maxBackoff屬性來配置,單位是毫秒。在預設情況下maxBackoff的值是30秒(在orderSelector類裡面指定)。

下面是官網例子

// Setup properties for the load balancing
Properties props = new Properties();
props.put("client.type", "default_loadbalance");

// List of hosts (space-separated list of user-chosen host aliases)
props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias
String host1 = "host1.example.org:41414";
String host2 = "host2.example.org:41414";
String host3 = "host3.example.org:41414";
props.put("hosts.h1", host1);
props.put("hosts.h2", host2);
props.put("hosts.h3", host3);

props.put("host-selector", "random"); // For random host selection
// props.put("host-selector", "round_robin"); // For round-robin host
//                                            // selection
props.put("backoff", "true"); // Disabled by default.

props.put("maxBackoff", "10000"); // Defaults 0, which effectively
                                  // becomes 30000 ms

// Create the client with load balancing properties
RpcClient client = RpcClientFactory.getInstance(props);

下面是測試的開發例子

import java.nio.charset.Charset;

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.util.Properties;

public class Load_Client {
	public static void main(String[] args) {
	    MyRpcClientFacade3 client = new MyRpcClientFacade3();
	    // Initialize client with the remote Flume agent's host and port
	    client.init();

	    // Send 10 events to the remote Flume agent. That agent should be
	    // configured to listen with an AvroSource.
	    String sampleData = "Flume Load_Client";
	    for (int i = 0; i < 10; i++) {
	      client.sendDataToFlume(sampleData);
	    }

	    client.cleanUp();
	  }
	}

	class MyRpcClientFacade3{
	  private RpcClient client;
	  private String hostname;
	  private int port;

	  public void init() {
		  Properties props = new Properties();
		  props.put("client.type", "default_loadbalance");

		  // List of hosts (space-separated list of user-chosen host aliases)
		  props.put("hosts", "h1 h2 h3");

		  // host/port pair for each host alias
		  String host1 = "192.168.233.128:50000";
		  String host2 = "192.168.233.128:50001";
		  String host3 = "192.168.233.128:50002";
		  props.put("hosts.h1", host1);
		  props.put("hosts.h2", host2);
		  props.put("hosts.h3", host3);

		  props.put("host-selector", "random"); // For random host selection
		  // props.put("host-selector", "round_robin"); // For round-robin host
//		                                              // selection
		  props.put("backoff", "true"); // Disabled by default.

		  props.put("maxBackoff", "10000"); // Defaults 0, which effectively
		                                    // becomes 30000 ms

		  // Create the client with load balancing properties
		  client = RpcClientFactory.getInstance(props);
	  }

	  public void sendDataToFlume(String data) {
	    // Create a Flume Event object that encapsulates the sample data
	    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

	    // Send the event
	    try {
	      client.append(event);
	    } catch (EventDeliveryException e) {
	      // clean up and recreate the client
	      client.close();
	      client = null;
	      client = RpcClientFactory.getDefaultInstance(hostname, port);
	      // Use the following method to create a thrift client (instead of the above line):
	      // this.client = RpcClientFactory.getThriftInstance(hostname, port);
	    }
	  }

	  public void cleanUp() {
	    // Close the RPC connection
	    client.close();
	  }
}

這裡採用隨機的負載均衡props.put("host-selector","random") 。測試的時候沿用之前的3個接受代理配置avro_client_case20.conf、avro_client_case21.conf和avro_client_case22.conf,並將他們起起來。

#敲命令

flume-ng agent -c conf -f conf/avro_client_case20.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case21.conf-n a1 -Dflume.root.logger=INFO,console

flume-ng agent -c conf -f conf/avro_client_case22.conf-n a1 -Dflume.root.logger=INFO,console

啟動成功後

在eclipse 裡執行JAVA程式Failover_Client.java,當然也可以打包後在伺服器上執行JAVA程式。

#在啟動源傳送的3個代理終端檢視console輸出

下面是Host1,收到了2條資料


下面是Host2,收到了2條資料


下面是Host3,收到了6條資料。


可以看到我們開發例子中,host-selector選擇的是隨機,因此程式也是隨機發送資料。下面我們測試輪詢round_robin選項。

程式裡我們修改這句

//props.put("host-selector","random"); // For random host selection

props.put("host-selector", "round_robin");// Forround-robin host

再執行Java 程式

下面是Host1,收到了4條資料


下面是Host2,收到了3條資料


同樣Host3,收到了3條資料,這邊就不放圖了。輪詢就是按照順序放圖。