1. 程式人生 > >kafkaChannel實現一個source下,不同日誌採集到kafka不同主題中

kafkaChannel實現一個source下,不同日誌採集到kafka不同主題中

1.需求

使用flume採集資料,在使用一個source情況下,將不同的日誌採集到指定的kafka的主題中。

例如:有兩個日誌檔案:error.log和info.log

error.log採集到kafka的kafka_channel主題

info.log採集到kafka的kafka_channel2主題

2.解決方案

我們使用tailDir source  和kafkaChannel

思路:

使用a0.sources.r1.headers.f1.headerKey = error,a0.sources.r1.headers.f2.headerKey = info。去設定event的一個header值,不同檔案設定不同的header值,用於區分,其中headerKey可以隨便設定,就是header中的一個key而已,

原始碼中找到kafka-channel,在都doPut()方法中,去獲去每一個event的header,我們知道event的hader一個map。然後header.get(headerKey)獲取我們設定的頭標記,如果是error,kafka的主題設定為kafka_channel如果是info,則kafka的主題設定為kafka_channel2,也就是如下程式碼邏輯。

String type=headers.get("headerKey");
if(type.equals("info")){
  topicStr="kafka_channel2";
}else if(type.equals("error")){
  topicStr="kafka_channel";
}

原始碼更改

更改前:

 protected void doPut(Event event) throws InterruptedException {
      type = TransactionType.PUT;
      if (!producerRecords.isPresent()) {
        producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
      }
      String key = event.getHeaders().get(KEY_HEADER);
      //get header
      Map<String, String> headers = event.getHeaders();
      String  topicStr=null;
      Integer partitionId = null;
     
      try {
      if (staticPartitionId != null) {
          partitionId = staticPartitionId;
        }
        if (partitionHeader != null) {
          String headerVal = event.getHeaders().get(partitionHeader);
          if (headerVal != null) {
            partitionId = Integer.parseInt(headerVal);
          }
        }
       
        if (partitionId != null) {
          producerRecords.get().add(
 new ProducerRecord<String, byte[]>(topic.get(), partitionId, key,
                                       serializeValue(event, parseAsFlumeEvent)));
        } else {
          producerRecords.get().add(
            new ProducerRecord<String, byte[]>(topic.get(), key,
                                serializeValue(event, parseAsFlumeEvent)));
        }
      } catch (NumberFormatException e) {
        throw new ChannelException("Non integer partition id specified", e);
      } catch (Exception e) {
        throw new ChannelException("Error while serializing event", e);
      }
    }

更改後:

 protected void doPut(Event event) throws InterruptedException {
      type = TransactionType.PUT;
      if (!producerRecords.isPresent()) {
        producerRecords = Optional.of(new LinkedList<ProducerRecord<String, byte[]>>());
      }
      String key = event.getHeaders().get(KEY_HEADER);
      //get header
      Map<String, String> headers = event.getHeaders();
      String  topicStr=null;
      Integer partitionId = null;
      /**
       * 在這可以更改程式碼邏輯,實現:資料傳送到指定的kafka分割槽中
       */
      try {
      if (staticPartitionId != null) {
          partitionId = staticPartitionId;
        }
        if (partitionHeader != null) {
          String headerVal = event.getHeaders().get(partitionHeader);
          if (headerVal != null) {
            partitionId = Integer.parseInt(headerVal);
          }
        }
        /**
         *新增的邏輯
         */
        String type=headers.get("headerKey");
        if(type.equals("info")){
          topicStr="kafka_channel2";
        }else if(type.equals("error")){
          topicStr="kafka_channel";
        }
        if (partitionId != null) {
          producerRecords.get().add(
              new ProducerRecord<String, byte[]>(topicStr, partitionId, key,
                                                 serializeValue(event, parseAsFlumeEvent)));
        } else {
          producerRecords.get().add(
              new ProducerRecord<String, byte[]>(topicStr, key,
                                                 serializeValue(event, parseAsFlumeEvent)));
        }
      } catch (NumberFormatException e) {
        throw new ChannelException("Non integer partition id specified", e);
      } catch (Exception e) {
        throw new ChannelException("Error while serializing event", e);
      }
    }

採集方法 

注意:

更改原始碼後,不需要在配置檔案中指定kafka的主題,當然指定主題也不錯,但是已經沒作用了,已經在程式碼中更改了。如果你有精力還可以把不同的kafka主題寫到properties配置檔案中,把程式寫活一點。在相同的思路下你還可以做到顆粒更細:就是指定主題和分割槽,通過條件判斷更改topic和partitionId。最後kafkaSink要想實現這些功能更改原始碼的思路是一樣的

a0.sources = r1 
a0.channels = c1  

a0.sources.r1.type = TAILDIR
#通過 json 格式存下每個檔案消費的偏移量,避免從頭消費
a0.sources.r1.positionFile = /data/server/flume-1.8.0/conf/taildir_position.json
a0.sources.r1.filegroups = f1 f2
#配置f1資訊
a0.sources.r1.headers.f1.headerKey = error
a0.sources.r1.filegroups.f1 = /data/access/error.log
#配置f1資訊
a0.sources.r1.headers.f2.headerKey = info
a0.sources.r1.filegroups.f2 = /data/access/info.log
#是否新增一個儲存的絕對路徑名的標頭檔案
#a0.sources.r1.fileHeader = true

#攔截器獲取伺服器的主機名
a0.sources.r1.interceptors = i1 i2 i3
#a0.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a0.sources.r1.interceptors.i1.type = org.apache.flume.host.MyHostInterceptor$Builder
a0.sources.r1.interceptors.i1.preserveExisting = false
#a0.sources.r1.interceptors.i1.useIP = false
a0.sources.r1.interceptors.i1.HeaderName= agentHost
#靜態過濾器新增指定的標誌
a0.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.StaticInterceptor$Builder
a0.sources.r1.interceptors.i2.key = logType
a0.sources.r1.interceptors.i2.value= kafka_data
a0.sources.r1.interceptors.i2.preserveExisting = false
#新增時間戳
a0.sources.r1.interceptors.i3.type = timestamp
#定義channel
a0.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a0.channels.c1.kafka.bootstrap.servers = 10.2.40.10:9092,10.2.40.14:9092,10.2.40.15:9092
a0.channels.c1.parseAsFlumeEvent = false
#a0.channels.c1.kafka.producer.compression.type = lz4
a0.sources.r1.channels = c1