flume+kafka+storm的整合使用
Flume-ng
Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。
不過這裡寫寫自己的見解
這個是flume的架構圖
從上圖可以看到幾個名詞:
Agent: 一個Agent包含Source、Channel、Sink和其他的元件。Flume就是一個或多個Agent構成的。
Source:資料來源。簡單的說就是agent獲取資料的入口 。
Channel:管道。資料流通和儲存的通道。一個source必須至少和一個channel關聯。
Sink:用來接收channel傳輸的資料並將之傳送到指定的地方。傳送成功後資料從channel中刪除。
Flume具有高可擴充套件性 可隨意組合:
注意 source是接收源 sink是傳送源
上圖是一個source將資料發給3個channel 其中的sink2將資料發給JMS ,sink3將資料發給另一個source。
總的來說flume的擴充套件性非常高 根據需要可隨意組合。
現在在說說一個概念叫Event:
Event是flume的資料傳輸的基本單元。Flume本質上是將資料作為一個event從源頭傳到結尾。是由可選的Headers和載有資料的一個byte array構成。
程式碼結構:
[java] view plain copy print- /**
- * Basic representation of a data object inFlume.
- * Provides access to data as it flows throughthe system.
- */
- publicinterface Event{
- /**
- * Returns a map of name-valuepairs describing the data stored in the body.
- */
- public Map<String, String> getHeaders();
- /**
- * Set the event headers
- * @param headersMap of headers to replace the current headers.
- */
- publicvoid setHeaders(Map<String, String> headers);
- /**
- * Returns the raw byte array of the datacontained in this event.
- */
- publicbyte[] getBody();
- /**
- * Sets the raw byte array of the datacontained in this event.
- * @param body Thedata.
- */
- publicvoid setBody(byte[] body);
- }
/**
* Basic representation of a data object inFlume.
* Provides access to data as it flows throughthe system.
*/
public interface Event{
/**
* Returns a map of name-valuepairs describing the data stored in the body.
*/
public Map<String, String> getHeaders();
/**
* Set the event headers
* @param headersMap of headers to replace the current headers.
*/
public void setHeaders(Map<String, String> headers);
/**
* Returns the raw byte array of the datacontained in this event.
*/
public byte[] getBody();
/**
* Sets the raw byte array of the datacontained in this event.
* @param body Thedata.
*/
public void setBody(byte[] body);
}
這個是網上找的flume channel ,source,sink的彙總
|
下面介紹下kafka以及kafka和flume的整合
Kafka:
Kafka是Linkedin於2010年12月份開源的訊息系統,它主要用於處理活躍的流式資料。活躍的流式資料在web網站應用中非常常見,這些資料包括網站的pv、使用者訪問了什麼內容,搜尋了什麼內容等。 這些資料通常以日誌的形式記錄下來,然後每隔一段時間進行一次統計處理。
傳統的日誌分析系統提供了一種離線處理日誌資訊的可擴充套件方案,但若要進行實時處理,通常會有較大延遲。而現有的消(佇列)系統能夠很好的處理實時或者近似實時的應用,但未處理的資料通常不會寫到磁碟上,這對於Hadoop之類(一小時或者一天只處理一部分資料)的離線應用而言,可能存在問題。Kafka正是為了解決以上問題而設計的,它能夠很好地離線和線上應用。
2、 設計目標
(1)資料在磁碟上存取代價為O(1)。一般資料在磁碟上是使用BTree儲存的,存取代價為O(lgn)。
(2)高吞吐率。即使在普通的節點上每秒鐘也能處理成百上千的message。
(3)顯式分散式,即所有的producer、broker和consumer都會有多個,均為分散式的。
(4)支援資料並行載入到Hadoop中。
3、 KafKa部署結構
kafka是顯式分散式架構,producer、broker(Kafka)和consumer都可以有多個。Kafka的作用類似於快取,即活躍的資料和離線處理系統之間的快取。幾個基本概念:
(1)message(訊息)是通訊的基本單位,每個producer可以向一個topic(主題)釋出一些訊息。如果consumer訂閱了這個主題,那麼新發布的訊息就會廣播給這些consumer。
(2)Kafka是顯式分散式的,多個producer、consumer和broker可以執行在一個大的叢集上,作為一個邏輯整體對外提供服務。對於consumer,多個consumer可以組成一個group,這個message只能傳輸給某個group中的某一個consumer.
資料從producer推送到broker,接著consumer在從broker是一個分散式服務框架 用來解決分散式應用中的資料管理問題等。
在kafka中 有幾個重要概念producer生產者 consumer 消費者 topic 主題。
我們來實際開發一個簡單的生產者消費者的例子。
生產者:
[java] view plain copy print?- public classProducerTest {
- publicstaticvoid main(String[] args) {
- Properties props = newProperties();
- props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");
- props.setProperty("serializer.class","kafka.serializer.StringEncoder");
- props.put("request.required.acks","1");
- ProducerConfigconfig = new ProducerConfig(props);
- Producer<String, String> producer = newProducer<String, String>(config);
- KeyedMessage<String, String> data = newKeyedMessage<String, String>("kafka","test-kafka");
- try {
- producer.send(data);
- } catch (Exception e) {
- e.printStackTrace();
- }
- producer.close();
- }
- }
public classProducerTest {
public static void main(String[] args) {
Properties props = newProperties();
props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");
props.setProperty("serializer.class","kafka.serializer.StringEncoder");
props.put("request.required.acks","1");
ProducerConfigconfig = new ProducerConfig(props);
Producer<String, String> producer = newProducer<String, String>(config);
KeyedMessage<String, String> data = newKeyedMessage<String, String>("kafka","test-kafka");
try {
producer.send(data);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
上面的程式碼中的xx.xx.xx.xx是kafka server的地址.
上面程式碼的意思就是向主題 kafka中同步(不配置的話 預設是同步發射)傳送了一個資訊 是test-kafka.
下面來看看消費者:
[java] view plain copy print?- public classConsumerTest extends Thread {
- private finalConsumerConnector consumer;
- privatefinal String topic;
- publicstatic voidmain(String[] args) {
- ConsumerTest consumerThread = newConsumerTest("kafka");
- consumerThread.start();
- }
- publicConsumerTest(String topic) {
- consumer =kafka.consumer.Consumer
- .createJavaConsumerConnector(createConsumerConfig());
- this.topic =topic;
- }
- private staticConsumerConfig createConsumerConfig() {
- Properties props = newProperties();
- props.put("zookeeper.connect","xx.xx.xx.xx:2181");
- props.put("group.id", "0");
- props.put("zookeeper.session.timeout.ms","10000");
- // props.put("zookeeper.sync.time.ms", "200");
- // props.put("auto.commit.interval.ms", "1000");
- return newConsumerConfig(props);
- }
- publicvoid run(){
- Map<String,Integer> topickMap = new HashMap<String, Integer>();
- topickMap.put(topic, 1);
- Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap);
- KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
- ConsumerIterator<byte[],byte[]> it =stream.iterator();
- System.out.println("--------------------------");
- while(it.hasNext()){
- //
- System.out.println("(consumer)--> " +new String(it.next().message()));
- }
- }
- }
public classConsumerTest extends Thread {
private finalConsumerConnector consumer;
private final String topic;
public static voidmain(String[] args) {
ConsumerTest consumerThread = newConsumerTest("kafka");
consumerThread.start();
}
publicConsumerTest(String topic) {
consumer =kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic =topic;
}
private staticConsumerConfig createConsumerConfig() {
Properties props = newProperties();
props.put("zookeeper.connect","xx.xx.xx.xx:2181");
props.put("group.id", "0");
props.put("zookeeper.session.timeout.ms","10000");
// props.put("zookeeper.sync.time.ms", "200");
// props.put("auto.commit.interval.ms", "1000");
return newConsumerConfig(props);
}
public void run(){
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
System.out.println("--------------------------");
while(it.hasNext()){
//
System.out.println("(consumer)--> " +new String(it.next().message()));
}
}
}
上面的程式碼就是負責接收生產者傳送過來的訊息 測試的時候先開啟消費者 然後再執行生產者即可看到效果。
接下來 我們將flume 和kafka進行整合:
在flume的source資料來源接收到資料後 通過管道 到達sink,我們需要寫一個kafkaSink 來將sink從channel接收的資料作為kafka的生產者 將資料 傳送給消費者。
具體程式碼:
[java] view plain copy print?- publicclass KafkaSink extends AbstractSinkimplementsConfigurable {
- privatestaticfinal Log logger = LogFactory.getLog(KafkaSink.class);
- private Stringtopic;
- private Producer<String, String>producer;
- @Override
- public Status process()throwsEventDeliveryException {
- Channel channel =getChannel();
- Transaction tx =channel.getTransaction();
- try {
- tx.begin();
- Event e = channel.take();
- if(e ==null) {
- tx.rollback();
- return Status.BACKOFF;
- }
- KeyedMessage<String,String> data = new KeyedMessage<String, String>(topic,newString(e.getBody()));
- producer.send(data);
- logger.info("Message: {}"+new String( e.getBody()));
- tx.commit();
- return Status.READY;
- } catch(Exceptione) {
- logger.error("KafkaSinkException:{}",e);
- tx.rollback();
- return Status.BACKOFF;
- } finally {
- tx.close();
- }
- }
- @Override
- publicvoid configure(Context context) {
- topic = "kafka";
- Properties props = newProperties();
- props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");
- props.setProperty("serializer.class","kafka.serializer.StringEncoder");
- // props.setProperty("producer.type", "async");
- // props.setProperty("batch.num.messages", "1");
- props.put("request.required.acks","1");
- ProducerConfigconfig = new ProducerConfig(props);
- producer = newProducer<String, String>(config);
- }
- }
public class KafkaSink extends AbstractSinkimplementsConfigurable {
private static final Log logger = LogFactory.getLog(KafkaSink.class);
private Stringtopic;
private Producer<String, String>producer;
@Override
public Status process()throwsEventDeliveryException {
Channel channel =getChannel();
Transaction tx =channel.getTransaction();
try {
tx.begin();
Event e = channel.take();
if(e ==null) {
tx.rollback();
return Status.BACKOFF;
}
KeyedMessage<String,String> data = new KeyedMessage<String, String>(topic,newString(e.getBody()));
producer.send(data);
logger.info("Message: {}"+new String( e.getBody()));
tx.commit();
return Status.READY;
} catch(Exceptione) {
logger.error("KafkaSinkException:{}",e);
tx.rollback();
return Status.BACKOFF;
} finally {
tx.close();
}
}
@Override
public void configure(Context context) {
topic = "kafka";
Properties props = newProperties();
props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");
props.setProperty("serializer.class","kafka.serializer.StringEncoder");
// props.setProperty("producer.type", "async");
// props.setProperty("batch.num.messages", "1");
props.put("request.required.acks","1");
ProducerConfigconfig = new ProducerConfig(props);
producer = newProducer<String, String>(config);
}
}
將此檔案打成jar包 傳到flume的lib下面 如果你也用的是maven的話 需要用到assembly 將依賴的jar包一起打包進去。
在flume的配置是如下:
[plain] view plain copy print?- agent1.sources = source1
- agent1.sinks = sink1
- agent1.channels =channel1
- # Describe/configuresource1
- agent1.sources.source1.type= avro
- agent1.sources.source1.bind= localhost
- agent1.sources.source1.port= 44444
- # Describe sink1
- agent1.sinks.sink1.type= xx.xx.xx.KafkaSink(這是類的路徑地址)
- # Use a channel whichbuffers events in memory
- agent1.channels.channel1.type= memory
- agent1.channels.channel1.capacity= 1000
- agent1.channels.channel1.transactionCapactiy= 100
- # Bind the source andsink to the channel
- agent1.sources.source1.channels= channel1
- agent1.sinks.sink1.channel= channel1
agent1.sources = source1
agent1.sinks = sink1
agent1.channels =channel1
# Describe/configuresource1
agent1.sources.source1.type= avro
agent1.sources.source1.bind= localhost
agent1.sources.source1.port= 44444
# Describe sink1
agent1.sinks.sink1.type= xx.xx.xx.KafkaSink(這是類的路徑地址)
# Use a channel whichbuffers events in memory
agent1.channels.channel1.type= memory
agent1.channels.channel1.capacity= 1000
agent1.channels.channel1.transactionCapactiy= 100
# Bind the source andsink to the channel
agent1.sources.source1.channels= channel1
agent1.sinks.sink1.channel= channel1
測試的話是avro的方式傳送資料的 可以這樣測試
bin/flume-ng avro-client--conf conf -H localhost -p 44444 -F/data/flumetmp/a
/data/flumetmp/a 這個為檔案的地址.
測試的時候在本地 一定要把上面寫的消費者程式開啟 以便接收資料測試是否成功。
接下來我們介紹下storm然後將kafka的消費者和storm進行整合:
Storm:
Storm是一個分散式的實時訊息處理系統。
Storm各個元件之間的關係:
Storm叢集主要由一個主節點和一群工作節點(worker node)組成,通過 Zookeeper進行協調。
主節點:主節點通常執行一個後臺程式 —— Nimbus,用於響應分佈在叢集中的節點,分配任務和監測故障。
工作節點:Supervisor,負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker程序。Nimbus和Supervisor之間的協調由zookeeper完成。
Worker:處理邏輯的程序,在其中執行著多個Task,每個task 是一組spout/blots的組合。
Topology:是storm的實時應用程式,從啟動開始一直執行,只要有tuple過來 就會觸發執行。拓撲:storm的訊息流動很像一個拓撲結構。
2. stream是storm的核心概念,一個stream是一個持續的tuple序列,這些tuple被以分散式並行的方式建立和處理。
3. spouts是一個stream的源頭,spouts負責從外部系統讀取資料,並組裝成tuple發射出去,tuple被髮射後就開始再topology中傳播。
4. bolt是storm中處理 資料的核心,storm中所有的資料處理都是在bolt中完成的
這裡就簡單介紹一些概念 具體的可以看些詳細的教程。
我們接下來開始整合storm和kafka。
從上面的介紹得知storm的spout是負責從外部讀取資料的 所以我們需要開發一個KafkaSpout 來作為kafka的消費者和storm的資料接收源。可以看看這個https://github.com/HolmesNL/kafka-spout。我在下面只寫一個簡單的可供測試。
具體程式碼:
[java] view plain copy print?- publicclass KafkaSpout implements IRichSpout {
- privatestaticfinal Log logger = LogFactory.getLog(KafkaSpout.class);
- /**
- *
- */
- privatestaticfinallong serialVersionUID = -5569857211173547938L;
- SpoutOutputCollector collector;
- private ConsumerConnectorconsumer;
- private Stringtopic;
- public KafkaSpout(String topic) {
- this.topic = topic;
- }
- @Override
- publicvoid open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- this.collector = collector;
- }
- privatestatic ConsumerConfig createConsumerConfig() {
- Properties props = newProperties();
- props.put("zookeeper.connect","xx.xx.xx.xx:2181");
- props.put("group.id","0");
- props.put("zookeeper.session.timeout.ms","10000");
- //props.put("zookeeper.sync.time.ms", "200");
- //props.put("auto.commit.interval.ms", "1000");
- returnnew ConsumerConfig(props);
- }
- @Override
- publicvoid close() {
- // TODOAuto-generated method stub
- }
- @Override
- publicvoid activate() {
- this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
- Map<String, Integer> topickMap = newHashMap<String, Integer>();
- topickMap.put(topic,new Integer(1));
- Map<String, List<KafkaStream<byte[],byte[]>>>streamMap =consumer.createMessageStreams(topickMap);
- KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);