1. 程式人生 > >flume+kafka+storm整合00

flume+kafka+storm整合00

一、安裝

flume,kafka, storm 的安裝在下面三篇文章:
flume:1.6.0
kafka:注意這裡最好下載scala2.10版本的kafka,因為scala2.10版本的相容性比較好和2.11版本差別太大
這裡寫圖片描述

二、各個部分除錯

2.1、flume

# 監聽ftp日誌
#agent的名字a1
a1.sources = src
a1.channels = chl
a1.sinks = sk


#定義source, source使用spooldir, 監控ftp日誌
a1.sources.src.type=spooldir
#監控目錄
a1.sources.src
.spoolDir=/hadoop/ftp/idc_bakupload/20180307/23/idc/ #忽略的檔案 a1.sources.src.ignorePattern = ^(.)*\\.AVL\\.(.)*$ #處理後的檔案,新增字尾 a1.sources.src.fileSuffix = .bak #定義channel, 使用memory 作為channel a1.channels.chl.type = memory a1.channels.chl.capacity = 100000 a1.channels.chl.transactionCapacity = 10000 #定義sink, 輸出到控制檯 a1.sinks
.sk.type = com.chb.test.sink.MyKafkaSink #定義sink, source 與channel的關係 #注意sink後面是channel, 而不是s a1.sinks.sk.channel = chl a1.sources.src.channels = chl

2.2、kafka

2.2.1、kafka自身測試, 起一個生產者,一個消費者
2.2.2、啟動消費這去消費flumesink的資料

三、 Storm獲取資料流程

3.1、首先來了解Strom-kafka

Strom-kafka的官網介紹專案
注意:可能使用瀏覽器的問題, 導致在IE上只能看到部分,換成其他瀏覽器就好了。

介紹Storm核心Spout 和Trident spout的實現,使用者消費從 Apache Kafka 0.8.x獲取的資料。

3.1.1、Spouts

We support both Trident and core Storm spouts.為了兩種Spout實現,Strom使用一個BrokerHost interface 跟蹤Kafka broker主機到分割槽對映和kafkaConfig控制一些Kafka相關的引數。

3.1.2 BrokerHosts

為了初始化您的Kafka spout/emitter,您需要建立一個標記BrokerHosts介面的例項。 目前,支援以下兩種實現:

ZkHosts

如果你想動態跟蹤Kafka broker到分割槽對映(partition mapping), 你應該使用ZkHosts。 這個類使用Kafka的ZooKeeper entries 來跟蹤brokerHost - >分割槽對映。 您可以通過呼叫下面方法例項化物件:

   public ZkHosts(String brokerZkStr, String brokerZkPath)
   public ZkHosts(String brokerZkStr)

其中:

  • brokerZkStr**只為**ip:post(eg. localhost:2181),
  • brokerZkPath: the root directory under which all the topics and partition information is stored, 預設為 /brokers 。
  • 預設情況下,代理分割槽對映(borker-partition mapping)每60秒從ZooKeeper重新整理。 如果要更改它,您應該將host.refreshFrezqSecs設定為您選擇的值。
    實現如:
ZkHosts zkHosts = new ZkHosts("192.168.57.4:2181,192.168.57.5:2181,192.168.57.6:2181");

StaticHosts

這是一個可選的實現,其中broker - >分割槽資訊是靜態的。 為了構造這個類的例項,您需要首先構造一個GlobalPartitionInformation的例項。

Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
    partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
    partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
    partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
    StaticHosts hosts = new StaticHosts(partitionInfo);

KafkaConfig

為建立KafkaSpout所需的第二個物件是KafkaConfig
建立KafkaConfig

 public KafkaConfig(BrokerHosts hosts, String topic)
 public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

BrokerHosts可以是如上所述的BrokerHosts介面的任何實現。 topic是Kafka topic的名稱。 ClientId是可選的, 用作ZooKeeper路徑的一部分,其中儲存了spout的當前消耗偏移量。

目前有2個KafkaConfig的擴充套件。

Spoutconfig

Spoutconfig是KafkaConfig的擴充套件,它支援使用ZooKeeper連線資訊的其他欄位,並用於控制特定於KafkaSpout的行為。 Zkroot將用作root來儲存消費的偏移量。 ID應該唯一標識您的spout。

public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);

實現:

SpoutConfig spoutConfig 
    = new SpoutConfig(
        zkHosts, 
        topic, 
        "/test", // 偏移量offset的根目錄
        "test");// ID應該唯一標識您的spout

除了這些引數,SpoutConfig包含以下欄位控制KafkaSpout的行為:

    spoutConfig.forceFromStart = false; // 不從頭開始消費,保證spout出現故障, 重啟之後,能夠從kafka的原來位置處理, 而不是從開始位置處理,kafka的偏移量,週期性的寫入zookeeper中, 
    // setting for how often to save the current Kafka offset to ZooKeeper
    public long stateUpdateIntervalMs = 2000;

    // Retry strategy for failed messages
    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();

    // Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
    // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
    // Initial delay between successive retries
    public long retryInitialDelayMs = 0;
    public double retryDelayMultiplier = 1.0;

    // Maximum delay between successive retries    
    public long retryDelayMaxMs = 60 * 1000;
    // Failed message will be retried infinitely if retryLimit is less than zero. 
    public int retryLimit = -1;    

Core KafkaSpout only accepts an instance of SpoutConfig.

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TridentKafkaConfig

TridentKafkaConfig is another extension of KafkaConfig. TridentKafkaEmitter only accepts TridentKafkaConfig.

KafkaConfig類還有一堆公共變數,用於控制應用程式的行為。 這裡是預設值:

    public int fetchSizeBytes = 1024 * 1024;
    public int socketTimeoutMs = 10000;
    public int fetchMaxWait = 10000;
    public int bufferSizeBytes = 1024 * 1024;
    public MultiScheme scheme = new RawMultiScheme();
    public boolean ignoreZkOffsets = false;
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    public long maxOffsetBehind = Long.MAX_VALUE;
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60;

Most of them are self explanatory except MultiScheme.

MultiScheme

MultiScheme是一個介面,指示如何將從Kafka中消耗的ByteBuffer轉換為成Storm中的tuple。 它還控制輸出欄位的命名。

  public Iterable<List<Object>> deserialize(ByteBuffer ser);
  public Fields getOutputFields();

預設的RawMultiScheme只接受ByteBuffer,並返回一個帶有ByteBuffer的tuple,ByteBuffer轉換為byte []。 outputField的名稱為“bytes”。 還有一些可選實現,如SchemeAsMultiScheme和KeyValueSchemeAsMultiScheme,它們可以將ByteBuffer轉換為String。
//從Kafka中取出的byte[],該如何反序列化
如在整合專案中實現:使用SchemeAsMultiScheme

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定義輸出為String

還有SchemeAsMultiScheme,MessageMetadataSchemeAsMultiScheme的擴充套件,它具有一個附加的反序列化方法,除了與訊息關聯的分割槽和偏移之外,還接受訊息ByteBuffer。

public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset)

這對於從Kafka topic上的任意處 auditing/replaying 訊息非常有用,可以儲存離散流的每個訊息的分割槽和偏移量,而不是保留整個訊息。

四、KafakSpout的具體實現

        TopologyBuilder builder = new TopologyBuilder();
        // config kafka spout
        String topic = "testflume";
        //第一步建立Zkhosts
        ZkHosts zkHosts = new ZkHosts("192.168.57.4:2181,192.168.57.5:2181,192.168.57.6:2181");
        //第二步建立SpoutConfig, 為了設定各種引數
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, 
                topic,    //kafka的topic名稱
                "/test",  // 偏移量offset的根目錄
                "test");  // kafka的唯一表示。
        //設定zkserver的資訊, 可選的, 應為在上面的ZkHosts中已經設定了zookeeper的主機和埠號。
        List<String> zkServers = new ArrayList<String>();
        System.out.println(zkHosts.brokerZkStr);
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }
        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        //設定kafka的消費模式, 是否從頭開始。
        spoutConfig.forceFromStart = false; // 不從頭開始消費
        spoutConfig.socketTimeoutMs = 60 * 1000;  //與Kafka broker的連線的socket超時時間
        //從Kafka中取出的byte[],該如何反序列化
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定義輸出為String
        //KafkaSpout之接收一個引數SpoutConfig.
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

        // set kafka spout
        builder.setSpout("kafka_spout", kafkaSpout, 3);