1. 程式人生 > >flume+kafka+storm的整合使用

flume+kafka+storm的整合使用

Flume-ng

Flume是一個分散式、可靠、和高可用的海量日誌採集、聚合和傳輸的系統。

       不過這裡寫寫自己的見解


這個是flume的架構

 從上圖可以看到幾個名詞:

Agent: 一個Agent包含SourceChannelSink和其他的元件。Flume就是一個或多個Agent構成的。

      Source:資料來源。簡單的說就是agent獲取資料的入口 。

       Channel:管道。資料流通和儲存的通道。一個source必須至少和一個channel關聯。

       Sink:用來接收channel傳輸的資料並將之傳送到指定的地方。傳送成功後資料從channel中刪除。

Flume具有高可擴充套件性 可隨意組合:

 

注意 source是接收源 sink是傳送源



上圖是一個source將資料發給3個channel 其中的sink2將資料發給JMS ,sink3將資料發給另一個source。

總的來說flume的擴充套件性非常高 根據需要可隨意組合。

現在在說說一個概念叫Event:

       Event是flume的資料傳輸的基本單元。Flume本質上是將資料作為一個event從源頭傳到結尾。是由可選的Headers和載有資料的一個byte array構成。

  程式碼結構:

[java] view plain copy print
?
  1. /** 
  2.  * Basic representation of a data object inFlume. 
  3.  * Provides access to data as it flows throughthe system. 
  4.  */
  5. publicinterface Event{  
  6.   /** 
  7.    * Returns a map of name-valuepairs describing the data stored in the body. 
  8.    */
  9.   public Map<String, String> getHeaders();  
  10.   /** 
  11.    * Set the event headers
     
  12.    * @param headersMap of headers to replace the current headers. 
  13.    */
  14.   publicvoid setHeaders(Map<String, String> headers);  
  15.   /** 
  16.    * Returns the raw byte array of the datacontained in this event. 
  17.    */
  18.   publicbyte[] getBody();  
  19.   /** 
  20.    * Sets the raw byte array of the datacontained in this event. 
  21.    * @param body Thedata. 
  22.    */
  23.   publicvoid setBody(byte[] body);  
  24. }  
/**
 * Basic representation of a data object inFlume.
 * Provides access to data as it flows throughthe system.
 */
public interface Event{
  /**
   * Returns a map of name-valuepairs describing the data stored in the body.
   */
  public Map<String, String> getHeaders();
  /**
   * Set the event headers
   * @param headersMap of headers to replace the current headers.
   */
  public void setHeaders(Map<String, String> headers);
  /**
   * Returns the raw byte array of the datacontained in this event.
   */
  public byte[] getBody();
  /**
   * Sets the raw byte array of the datacontained in this event.
   * @param body Thedata.
   */
  public void setBody(byte[] body);
}


這個是網上找的flume channel ,source,sink的彙總

Component

Type

Description

Implementation Class

Channel

memory

In-memory, fast, non-durable event transport

MemoryChannel

Channel

file

A channel for reading, writing, mapping, and manipulating a file

FileChannel

Channel

jdbc

JDBC-based, durable event transport (Derby-based)

JDBCChannel

Channel

recoverablememory

A durable channel implementation that uses the local file system for its storage

RecoverableMemoryChannel

Channel

org.apache.flume.channel.PseudoTxnMemoryChannel

Mainly for testing purposes. Not meant for production use.

PseudoTxnMemoryChannel

Channel

(custom type as FQCN)

Your own Channel impl.

(custom FQCN)

Source

avro

Avro Netty RPC event source

AvroSource

Source

exec

Execute a long-lived Unix process and read from stdout

ExecSource

Source

netcat

Netcat style TCP event source

NetcatSource

Source

seq

Monotonically incrementing sequence generator event source

SequenceGeneratorSource

Source

org.apache.flume.source.StressSource

Mainly for testing purposes. Not meant for production use. Serves as a continuous source of events where each event has the same payload. The payload consists of some number of bytes (specified bysize property, defaults to 500) where each byte has the signed value Byte.MAX_VALUE (0x7F, or 127).

org.apache.flume.source.StressSource

Source

syslogtcp

SyslogTcpSource

Source

syslogudp

SyslogUDPSource

Source

org.apache.flume.source.avroLegacy.AvroLegacySource

AvroLegacySource

Source

org.apache.flume.source.thriftLegacy.ThriftLegacySource

ThriftLegacySource

Source

org.apache.flume.source.scribe.ScribeSource

ScribeSource

Source

(custom type as FQCN)

Your own Source impl.

(custom FQCN)

Sink

hdfs

Writes all events received to HDFS (with support for rolling, bucketing, HDFS-200 append, and more)

HDFSEventSink

Sink

org.apache.flume.sink.Hbase.HBaseSink

A simple sink that reads events from a channel and writes them to HBase.

org.apache.flume.sink.hbase.HBaseSink

Sink

org.apache.flume.sink.hbase.AsyncHBaseSink

org.apache.flume.sink.hbase.AsyncHBaseSink

Sink

logger

Log events at INFO level via configured logging subsystem (log4j by default)

LoggerSink

Sink

avro

Sink that invokes a pre-defined Avro protocol method for all events it receives (when paired with an avro source, forms tiered collection)

AvroSink

Sink

file_roll

RollingFileSink

Sink

irc

IRCSink

Sink

null

/dev/null for Flume – blackhole all events received

NullSink

Sink

(custom type as FQCN)

Your own Sink impl.

(custom FQCN)

ChannelSelector

replicating

ReplicatingChannelSelector

ChannelSelector

multiplexing

MultiplexingChannelSelector

ChannelSelector

(custom type)

Your own ChannelSelector impl.

(custom FQCN)

SinkProcessor

default

DefaultSinkProcessor

SinkProcessor

failover

FailoverSinkProcessor

SinkProcessor

load_balance

Provides the ability to load-balance flow over multiple sinks.

LoadBalancingSinkProcessor

SinkProcessor

(custom type as FQCN)

Your own SinkProcessor impl.

(custom FQCN)

Interceptor$Builder

host

HostInterceptor$Builder

Interceptor$Builder

timestamp

TimestampInterceptor

TimestampInterceptor$Builder

Interceptor$Builder

static

StaticInterceptor$Builder

Interceptor$Builder

regex_filter

RegexFilteringInterceptor$Builder

Interceptor$Builder

(custom type as FQCN)

Your own Interceptor$Builder impl.

(custom FQCN)

EventSerializer$Builder

text

BodyTextEventSerializer$Builder

EventSerializer$Builder

avro_event

FlumeEventAvroEventSerializer$Builder

EventSerializer

org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

SimpleHbaseEventSerializer

EventSerializer

org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

SimpleAsyncHbaseEventSerializer

EventSerializer

org.apache.flume.sink.hbase.RegexHbaseEventSerializer

RegexHbaseEventSerializer

HbaseEventSerializer

Custom implementation of serializer for HBaseSink.
(custom type as FQCN)

Your own HbaseEventSerializer impl.

(custom FQCN)

AsyncHbaseEventSerializer

Custom implementation of serializer for AsyncHbase sink.
(custom type as FQCN)

Your own AsyncHbaseEventSerializer impl.

(custom FQCN)

EventSerializer$Builder

Custom implementation of serializer for all sinks except for HBaseSink and AsyncHBaseSink.
(custom type as FQCN)

Your own EventSerializer$Builder impl.

下面介紹下kafka以及kafka和flume的整合

Kafka:

  Kafka是Linkedin於2010年12月份開源的訊息系統,它主要用於處理活躍的流式資料。活躍的流式資料在web網站應用中非常常見,這些資料包括網站的pv、使用者訪問了什麼內容,搜尋了什麼內容等。 這些資料通常以日誌的形式記錄下來,然後每隔一段時間進行一次統計處理。

傳統的日誌分析系統提供了一種離線處理日誌資訊的可擴充套件方案,但若要進行實時處理,通常會有較大延遲。而現有的消(佇列)系統能夠很好的處理實時或者近似實時的應用,但未處理的資料通常不會寫到磁碟上,這對於Hadoop之類(一小時或者一天只處理一部分資料)的離線應用而言,可能存在問題。Kafka正是為了解決以上問題而設計的,它能夠很好地離線和線上應用。

2、  設計目標

(1)資料在磁碟上存取代價為O(1)。一般資料在磁碟上是使用BTree儲存的,存取代價為O(lgn)。

(2)高吞吐率。即使在普通的節點上每秒鐘也能處理成百上千的message。

(3)顯式分散式,即所有的producer、broker和consumer都會有多個,均為分散式的。

(4)支援資料並行載入到Hadoop中。

3、  KafKa部署結構


kafka是顯式分散式架構,producer、broker(Kafka)和consumer都可以有多個。Kafka的作用類似於快取,即活躍的資料和離線處理系統之間的快取。幾個基本概念:

(1)message(訊息)是通訊的基本單位,每個producer可以向一個topic(主題)釋出一些訊息。如果consumer訂閱了這個主題,那麼新發布的訊息就會廣播給這些consumer。

(2)Kafka是顯式分散式的,多個producer、consumer和broker可以執行在一個大的叢集上,作為一個邏輯整體對外提供服務。對於consumer,多個consumer可以組成一個group,這個message只能傳輸給某個group中的某一個consumer.

  資料從producer推送到broker,接著consumer在從broker是一個分散式服務框架 用來解決分散式應用中的資料管理問題等。

在kafka中 有幾個重要概念producer生產者  consumer 消費者 topic 主題。

我們來實際開發一個簡單的生產者消費者的例子。

生產者:

[java] view plain copy print?
  1. public classProducerTest {  
  2.       publicstaticvoid main(String[] args) {  
  3.             Properties props = newProperties();  
  4.                 props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");  
  5.              props.setProperty("serializer.class","kafka.serializer.StringEncoder");  
  6.               props.put("request.required.acks","1");  
  7.               ProducerConfigconfig = new ProducerConfig(props);  
  8.              Producer<String, String> producer = newProducer<String, String>(config);  
  9.              KeyedMessage<String, String> data = newKeyedMessage<String, String>("kafka","test-kafka");  
  10.               try {  
  11.                 producer.send(data);  
  12.                  } catch (Exception e) {  
  13.                   e.printStackTrace();  
  14.                  }  
  15.              producer.close();   
  16.       }  
  17.     }  
public classProducerTest {
     
      public static void main(String[] args) {
            Properties props = newProperties();
                props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");
             props.setProperty("serializer.class","kafka.serializer.StringEncoder");
              props.put("request.required.acks","1");
              ProducerConfigconfig = new ProducerConfig(props);
             Producer<String, String> producer = newProducer<String, String>(config);
             KeyedMessage<String, String> data = newKeyedMessage<String, String>("kafka","test-kafka");
              try {
                producer.send(data);
                 } catch (Exception e) {
                  e.printStackTrace();
                 }
             producer.close(); 
      }
    }

上面的程式碼中的xx.xx.xx.xx是kafka server的地址.

上面程式碼的意思就是向主題 kafka中同步(不配置的話 預設是同步發射)傳送了一個資訊 是test-kafka.

下面來看看消費者:

[java] view plain copy print?
  1. public classConsumerTest extends Thread {   
  2.     private finalConsumerConnector consumer;   
  3.     privatefinal String topic;   
  4.     publicstatic voidmain(String[] args) {   
  5.         ConsumerTest consumerThread = newConsumerTest("kafka");   
  6.         consumerThread.start();   
  7.     }   
  8.     publicConsumerTest(String topic) {   
  9.         consumer =kafka.consumer.Consumer   
  10.                 .createJavaConsumerConnector(createConsumerConfig());   
  11.         this.topic =topic;   
  12.     }   
  13.     private staticConsumerConfig createConsumerConfig() {   
  14.         Properties props = newProperties();   
  15.         props.put("zookeeper.connect","xx.xx.xx.xx:2181");   
  16.         props.put("group.id""0");   
  17.         props.put("zookeeper.session.timeout.ms","10000");   
  18. //       props.put("zookeeper.sync.time.ms", "200"); 
  19. //       props.put("auto.commit.interval.ms", "1000"); 
  20.         return newConsumerConfig(props);   
  21.     }   
  22.     publicvoid run(){   
  23.         Map<String,Integer> topickMap = new HashMap<String, Integer>();  
  24.         topickMap.put(topic, 1);  
  25.          Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);  
  26.          KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
  27.          ConsumerIterator<byte[],byte[]> it =stream.iterator();  
  28.          System.out.println("--------------------------");  
  29.          while(it.hasNext()){  
  30.             //
  31.              System.out.println("(consumer)--> " +new String(it.next().message()));  
  32.          }  
  33.     }   
  34. }  
 public classConsumerTest extends Thread { 
    private finalConsumerConnector consumer; 
    private final String topic; 
 
    public static voidmain(String[] args) { 
        ConsumerTest consumerThread = newConsumerTest("kafka"); 
        consumerThread.start(); 
    } 
    publicConsumerTest(String topic) { 
        consumer =kafka.consumer.Consumer 
                .createJavaConsumerConnector(createConsumerConfig()); 
        this.topic =topic; 
    } 
 
    private staticConsumerConfig createConsumerConfig() { 
        Properties props = newProperties(); 
        props.put("zookeeper.connect","xx.xx.xx.xx:2181"); 
        props.put("group.id", "0"); 
        props.put("zookeeper.session.timeout.ms","10000"); 
//       props.put("zookeeper.sync.time.ms", "200"); 
//       props.put("auto.commit.interval.ms", "1000"); 
 
        return newConsumerConfig(props); 
 
    } 
 
    public void run(){ 
       
        Map<String,Integer> topickMap = new HashMap<String, Integer>();
        topickMap.put(topic, 1);
         Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);
         KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
         ConsumerIterator<byte[],byte[]> it =stream.iterator();
         System.out.println("--------------------------");
         while(it.hasNext()){
            //
             System.out.println("(consumer)--> " +new String(it.next().message()));
         }
       
    } 
}

上面的程式碼就是負責接收生產者傳送過來的訊息 測試的時候先開啟消費者 然後再執行生產者即可看到效果。

接下來 我們將flume 和kafka進行整合:

 在flume的source資料來源接收到資料後 通過管道 到達sink,我們需要寫一個kafkaSink 來將sink從channel接收的資料作為kafka的生產者 將資料 傳送給消費者。

 具體程式碼:

[java] view plain copy print?
  1. publicclass KafkaSink extends AbstractSinkimplementsConfigurable {  
  2.       privatestaticfinal Log logger = LogFactory.getLog(KafkaSink.class);  
  3.       private Stringtopic;  
  4.       private Producer<String, String>producer;  
  5.       @Override
  6.       public Status process()throwsEventDeliveryException {  
  7.             Channel channel =getChannel();  
  8.          Transaction tx =channel.getTransaction();  
  9.          try {  
  10.                  tx.begin();  
  11.                  Event e = channel.take();  
  12.                  if(e ==null) {  
  13.                          tx.rollback();  
  14.                          return Status.BACKOFF;  
  15.                  }  
  16.                  KeyedMessage<String,String> data = new KeyedMessage<String, String>(topic,newString(e.getBody()));  
  17.                  producer.send(data);  
  18.                  logger.info("Message: {}"+new String( e.getBody()));  
  19.                  tx.commit();  
  20.                  return Status.READY;  
  21.          } catch(Exceptione) {  
  22.            logger.error("KafkaSinkException:{}",e);  
  23.                  tx.rollback();  
  24.                  return Status.BACKOFF;  
  25.          } finally {  
  26.                  tx.close();  
  27.          }  
  28.       }  
  29.       @Override
  30.       publicvoid configure(Context context) {  
  31.            topic = "kafka";  
  32.             Properties props = newProperties();  
  33.                 props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");  
  34.              props.setProperty("serializer.class","kafka.serializer.StringEncoder");  
  35. //           props.setProperty("producer.type", "async");
  36. //           props.setProperty("batch.num.messages", "1");
  37.               props.put("request.required.acks","1");  
  38.               ProducerConfigconfig = new ProducerConfig(props);  
  39.               producer = newProducer<String, String>(config);  
  40.       }  
  41. }  
 public class KafkaSink extends AbstractSinkimplementsConfigurable {
     
      private static final Log logger = LogFactory.getLog(KafkaSink.class);
     
      private Stringtopic;
      private Producer<String, String>producer;
     
 
      @Override
      public Status process()throwsEventDeliveryException {
          
            Channel channel =getChannel();
         Transaction tx =channel.getTransaction();
         try {
                 tx.begin();
                 Event e = channel.take();
                 if(e ==null) {
                         tx.rollback();
                         return Status.BACKOFF;
                 }
                 KeyedMessage<String,String> data = new KeyedMessage<String, String>(topic,newString(e.getBody()));
                 producer.send(data);
                 logger.info("Message: {}"+new String( e.getBody()));
                 tx.commit();
                 return Status.READY;
         } catch(Exceptione) {
           logger.error("KafkaSinkException:{}",e);
                 tx.rollback();
                 return Status.BACKOFF;
         } finally {
                 tx.close();
         }
      }
 
      @Override
      public void configure(Context context) {
           topic = "kafka";
            Properties props = newProperties();
                props.setProperty("metadata.broker.list","xx.xx.xx.xx:9092");
             props.setProperty("serializer.class","kafka.serializer.StringEncoder");
//           props.setProperty("producer.type", "async");
//           props.setProperty("batch.num.messages", "1");
              props.put("request.required.acks","1");
              ProducerConfigconfig = new ProducerConfig(props);
              producer = newProducer<String, String>(config);
      }
}
 

將此檔案打成jar包 傳到flume的lib下面 如果你也用的是maven的話 需要用到assembly 將依賴的jar包一起打包進去。

      在flume的配置是如下:

[plain] view plain copy print?
  1.       agent1.sources = source1  
  2. agent1.sinks = sink1  
  3. agent1.channels =channel1  
  4. # Describe/configuresource1  
  5. agent1.sources.source1.type= avro  
  6. agent1.sources.source1.bind= localhost  
  7. agent1.sources.source1.port= 44444  
  8. # Describe sink1  
  9. agent1.sinks.sink1.type= xx.xx.xx.KafkaSink(這是類的路徑地址)  
  10. # Use a channel whichbuffers events in memory  
  11. agent1.channels.channel1.type= memory  
  12. agent1.channels.channel1.capacity= 1000  
  13. agent1.channels.channel1.transactionCapactiy= 100  
  14. # Bind the source andsink to the channel  
  15. agent1.sources.source1.channels= channel1  
  16. agent1.sinks.sink1.channel= channel1  
      agent1.sources = source1
agent1.sinks = sink1
agent1.channels =channel1
 
# Describe/configuresource1
agent1.sources.source1.type= avro
agent1.sources.source1.bind= localhost
agent1.sources.source1.port= 44444
# Describe sink1
agent1.sinks.sink1.type= xx.xx.xx.KafkaSink(這是類的路徑地址)
 
# Use a channel whichbuffers events in memory
agent1.channels.channel1.type= memory
agent1.channels.channel1.capacity= 1000
agent1.channels.channel1.transactionCapactiy= 100
 
# Bind the source andsink to the channel
agent1.sources.source1.channels= channel1
agent1.sinks.sink1.channel= channel1


測試的話是avro的方式傳送資料的 可以這樣測試

bin/flume-ng avro-client--conf conf -H localhost -p 44444 -F/data/flumetmp/a

/data/flumetmp/a 這個為檔案的地址.

測試的時候在本地 一定要把上面寫的消費者程式開啟 以便接收資料測試是否成功。

接下來我們介紹下storm然後將kafka的消費者和storm進行整合:

Storm:

   Storm是一個分散式的實時訊息處理系統。

 Storm各個元件之間的關係:


Storm叢集主要由一個主節點和一群工作節點(worker node)組成,通過 Zookeeper進行協調。

主節點:主節點通常執行一個後臺程式 —— Nimbus,用於響應分佈在叢集中的節點,分配任務和監測故障。

工作節點:Supervisor,負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker程序。Nimbus和Supervisor之間的協調由zookeeper完成。

 Worker:處理邏輯的程序,在其中執行著多個Task,每個task 是一組spout/blots的組合。

Topology:是storm的實時應用程式,從啟動開始一直執行,只要有tuple過來 就會觸發執行。拓撲:storm的訊息流動很像一個拓撲結構。

2. stream是storm的核心概念,一個stream是一個持續的tuple序列,這些tuple被以分散式並行的方式建立和處理。

3. spouts是一個stream的源頭,spouts負責從外部系統讀取資料,並組裝成tuple發射出去,tuple被髮射後就開始再topology中傳播。

4. bolt是storm中處理 資料的核心,storm中所有的資料處理都是在bolt中完成的

這裡就簡單介紹一些概念 具體的可以看些詳細的教程。

我們接下來開始整合storm和kafka。

從上面的介紹得知storm的spout是負責從外部讀取資料的 所以我們需要開發一個KafkaSpout 來作為kafka的消費者和storm的資料接收源。可以看看這個https://github.com/HolmesNL/kafka-spout。我在下面只寫一個簡單的可供測試。

具體程式碼:

[java] view plain copy print?
  1. publicclass KafkaSpout implements IRichSpout {  
  2.       privatestaticfinal Log logger = LogFactory.getLog(KafkaSpout.class);  
  3.       /** 
  4.        * 
  5.        */
  6.       privatestaticfinallong serialVersionUID = -5569857211173547938L;  
  7.       SpoutOutputCollector collector;  
  8.       private ConsumerConnectorconsumer;  
  9.       private Stringtopic;  
  10.       public KafkaSpout(String topic) {  
  11.            this.topic = topic;  
  12.       }  
  13.       @Override
  14.       publicvoid open(Map conf, TopologyContext context,  
  15.                  SpoutOutputCollector collector) {  
  16.            this.collector = collector;  
  17.       }  
  18.       privatestatic ConsumerConfig createConsumerConfig() {  
  19.            Properties props = newProperties();  
  20.            props.put("zookeeper.connect","xx.xx.xx.xx:2181");  
  21.            props.put("group.id","0");  
  22.            props.put("zookeeper.session.timeout.ms","10000");  
  23.            //props.put("zookeeper.sync.time.ms", "200");
  24.            //props.put("auto.commit.interval.ms", "1000");
  25.            returnnew ConsumerConfig(props);  
  26.       }  
  27.       @Override
  28.       publicvoid close() {  
  29.            // TODOAuto-generated method stub
  30.       }  
  31.       @Override
  32.       publicvoid activate() {  
  33.            this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());  
  34.            Map<String, Integer> topickMap = newHashMap<String, Integer>();  
  35.            topickMap.put(topic,new Integer(1));  
  36.            Map<String, List<KafkaStream<byte[],byte[]>>>streamMap =consumer.createMessageStreams(topickMap);  
  37.            KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);