1. 程式人生 > >kafka與flume 的應用(實戰)

kafka與flume 的應用(實戰)

版本號:

RedHat6.5   JDK1.8    flume-1.6.0   kafka_2.11-0.8.2.1

1.flume安裝

2.kafka安裝

3.Flume和Kafka整合

在conf目錄新建flume-kafka.conf檔案:
  1. touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
  2. sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-kafka.conf
輸入以下內容:
  1. # 指定Agent的元件名稱  
  2. agent1.sources = source1  
  3. agent1.sinks 
    = sink1  
  4. agent1.channels = channel1  
  5. # 指定Flume source(要監聽的路徑)  
  6. agent1.sources.source1.type = spooldir  
  7. agent1.sources.source1.spoolDir =/usr/local/flume/logtest
  8. # 指定Flume sink  
  9. #agent1.sinks.sink1.type = logger  
  10. agent1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
  11. agent1.sinks.sink1.topic = test  
  12. agent1.sinks.sink1.brokerList =192.168.168.200:9092
  13. agent1.sinks.sink1.requiredAcks =1
  14. agent1.sinks.sink1.batchSize =100
  15. # 指定Flume channel  
  16. agent1.channels.channel1.type = memory  
  17. agent1.channels.channel1.capacity =1000
  18. agent1.channels.channel1.transactionCapacity =100
  19. # 繫結source和sink到channel上  
  20. agent1.sources.source1
    .channels = channel1  
  21. agent1.sinks.sink1.channel = channel1  

agent1.sinks.sink1.topic = test   代表flume監聽路徑下發生變化時,會把訊息傳送到localhost機器上的test主題。

啟動flume-kafka.conf:

  1. cd /usr/local/flume/apache-flume-1.6.0-bin
  2. bin/flume-ng agent --conf conf --conf-file conf/flume-kafka.conf --name agent1 -Dflume.root.logger=INFO,console

執行成功日誌如下:

  1. 2017-07-0722:22:02,270(lifecycleSupervisor-1-2)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]Monitored counter groupfor type: SINK, name: sink1:Successfully registered newMBean.
  2. 2017-07-0722:22:02,270(lifecycleSupervisor-1-2)[INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]Component type: SINK, name: sink1 started

啟動kafka的消費者,監聽topic主題:

  1. kafka-console-consumer.sh --zookeeper localhost:2181--topic test

testKafka.log :

在/usr/local/flume目錄下面新建一個testKafka.log日誌檔案,寫入Flume connect Kafka success! 作為測試內容:
  1. touch /usr/local/flume/testKafka.log
  2. sudo gedit /usr/local/flume/testKafka.log
然後拷貝testKafka.log到flume監聽路徑/usr/local/flume/logtest下:
  1. cp /usr/local/flume/testKafka.log /usr/local/flume/logtest
接著就可以在前一個終端看到剛剛採集的內容了,如下:---------------------------------kafka------------------------------
  1. [[email protected] kafka_2.11-0.9.0.0]# kafka-console-consumer.sh --zookeeper localhost:2181--topic test  
  2. [2017-07-0722:36:38,687] INFO [GroupMetadataManager on Broker200]:Removed0 expired offsets in1 milliseconds.(kafka.coordinator.GroupMetadataManager)
  3. Flume connect Kafka success!

 ---------------------------------flume------------------------------

  1. 2017-07-0722:41:32,602(pool-3-thread-1)[INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:348)]Preparing to move file /usr/local/flume/logtest/testKafka.log to /usr/local/flume/logtest/testKafka.log.COMPLETED
  2. 2017-07-0722:41:35,669(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Fetching metadata from broker id:0,host:localhost,port:9092with correlation id 0for1 topic(s)Set(test)
  3. 2017-07-0722:41:35,728(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Connected to localhost:9092for producing
  4. 2017-07-0722:41:35,757(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala:68)]Disconnectingfrom localhost:9092
  5. 2017-07-0722:41:35,791(SinkRunner-PollingRunner-DefaultSinkProcessor)[INFO - kafka.utils.Logging$class.info(Logging.scala

    相關推薦

    kafkaflume應用實戰

    版本號:RedHat6.5   JDK1.8    flume-1.6.0   kafka_2.11-0.8.2.11.flume安裝2.kafka安裝3.Flume和Kafka整合在conf目錄新建flume-kafka.conf檔案:touch /usr/local/fl

    項目實戰-大數據Kafka原理剖析及實戰演練

    實戰 kafka 大數據 com nbsp attach forum ignore spa Kafka原理剖析及實戰演練 Kafka理論+實戰視頻教程 Kafka完美入門視頻教程 煉數成金<ignore_js_op> <ignore_js_op> &

    Spring Boot Actuator詳解深入應用:Actuator 1.x

    《Spring Boot Actuator詳解與深入應用》預計包括三篇,第一篇重點講Spring Boot Actuator 1.x的應用與定製端點;第二篇將會對比Spring Boot Actuator 2.x 與1.x的區別,以及應用和定製2.x的端點;第三篇將會介紹Actuator metric指

    Spring Boot Actuator詳解深入應用:Actuator 2.x

    《Spring Boot Actuator詳解與深入應用》預計包括三篇,第一篇重點講Spring Boot Actuator 1.x的應用與定製端點;第二篇將會對比Spring Boot Actuator 2.x 與1.x的區別,以及應用和定製2.x的端點;第三篇將會介紹Actuator metric指

    Spring Boot Actuator詳解深入應用:Prometheus+Grafana應用監控

    《Spring Boot Actuator詳解與深入應用》預計包括三篇,第一篇重點講Spring Boot Actuator 1.x的應用與定製端點;第二篇將會對比Spring Boot Actuator 2.x 與1.x的區別,以及應用和定製2.x的端點;第三篇將會介紹Actuator metric指

    Kafka.net core安裝

    1.安裝JDK 目前官網不能直接下載,在網上找到1.8.0版本安裝包下載到本地。 1.1.下載jdk並解壓 [[email protected] java]# ls jdk1.8.0_191 jdk-8u191-linux-x64.tar.gz 1.2.配置java環境變數

    Kafka.net corekafka操作

    1.Kafka相關知識 Broker:即Kafka的伺服器,使用者儲存訊息,Kafa叢集中的一臺或多臺伺服器統稱為broker。 Message訊息:是通訊的基本單位,每個 producer 可以向一個 topic(主題)釋出一些訊息。

    對於 RxJava2 的 認知直接應用

    最近有時間學習些許內容,加上重新寫部落格來記錄自己的學習過程與心得 1.rxjava2 認知 rxjava作為知名的響應式程式設計庫,這半年內極大的火爆開發者中 介紹 Observable 被觀察者 | | subscribe(

    mongodb專案實戰高階應用MongoDB 高可用方案-MongoDB 副本集搭建

    MongoDB 副本集 中文翻譯叫做副本集,不過我並不喜歡把英文翻譯成中文,總是感覺怪怪的。其實簡單來說就是集 群當中包含了多份資料,保證主節點掛掉了,備節點能繼續提供資料服務,提供的前提就是資料需要和 主節點一致。 Mongodb(M)表示主節點,Mongodb(S)表示備節點,Mon

    mongodb專案實戰高階應用MongoDB 高可用方案-主從搭建

      1、命令列啟動 $ ./mongod --fork --dbpath=/opt/mongodb/data 2、配置檔案啟動 $ ./mongod -f mongodb.cfg mongoDB 基本配置/opt/mongodb/mongodb.cfg dbpa

    mongodb專案實戰高階應用使用者管理

    1.1、新增使用者 為testdb 新增ma 使用者 use testdb db.createUser({user:"ma",pwd:"123",roles:[{ role:"dbAdmin",db:"testdb"}]}) 具體角色有 read:允許使用者讀取指定資料庫 readWri

    大資料系列之分散式釋出訂閱訊息系統KafkaKafkaFlume的3種整合

    前面我們已經介紹了Flume,現在我們將Kafka與Flume整合 先看一下Flume的結構組成:            我們可以發現,將Flume與Kafka進行整合無非3種情況,Flume作為生產者——Sink輸出到Kafka,Flume作為消費者——Source接

    Linux同步相互排斥應用:基礎概念

    使用 line 關系 並發執行 來看 文章 必須 生產者 而且 【版權聲明:尊重原創,轉載請保留出處:blog.csdn.net/shallnet 或 .../gentleliu,文章僅供學習交流,請勿用於商業用途】 當操作系統進入多道批處理

    TF-IDF余弦相似性的應用:自動摘要

    下一步 dip target 似的 abs tps .net ebo ace 轉:http://www.ruanyifeng.com/blog/2013/03/automatic_summarization.html 有時候,很簡單的數學方法,就可以完成很復雜的任務。 這個

    第44課 遞歸的思想應用

    != 遞歸法 ati 恢復 直接 clu spa tex height 1. 單向鏈表的轉置 【編程實驗】單向鏈表的轉置(Node* reverse(Node* list)) 2. 單向排序鏈表的合並 【編程實驗】單向排序鏈表的合並(Node* merge(Node

    web應用web框架Day65

    pos ack ++ 環境 lex roo http請求 main conn Web應用 對於所有的web應用,本質上其實就是一個socket服務端,用戶的瀏覽器其實就是一個socket客戶端 import socket def handle_request(clien

    kafka源碼分析Metadata的數據結構讀取、更新策略

    思路 sync 源碼分析 png ada ret code 入隊 後臺線程 一、基本思路 異步發送的基本思路就是:send的時候,KafkaProducer把消息放到本地的消息隊列RecordAccumulator,然後一個後臺線程Sender不斷循環,把消息發給K

    由散列表到BitMap的概念應用:面試中的海量資料處理

    一道面試題 在面試軟體開發工程師時,經常會遇到海量資料排序和去重的面試題,特別是大資料崗位。 例1:給定a、b兩個檔案,各存放50億個url,每個url各佔64位元組,記憶體限制是4G,找出a、b檔案共同的url? 首先我們最常想到的方法是讀取檔案a,建立雜湊表,然後再讀取檔案b,遍歷檔

    IO的應用--序列化反序列化

    package com.bjpowernode.demo02; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import ja

    VB6基本資料庫應用:連線資料庫SQL語句的Select語句初步

    資料庫我們已經建好了,重提一下上一章的結果,我們最後建立了一張Student的表,其中有StudentID(數字的雙精度型別)和StudentName(文字型別。補充一下,2013中有【長文字】和【短文字】,人名不會很長,根據上一章選擇儘量小的資料型別的規則,這裡就選【短文字】就可以了)。儘