1. 程式人生 > >基於Docker的Kafka+Flume+flink日誌處理實驗

基於Docker的Kafka+Flume+flink日誌處理實驗

目錄

目錄

一、序言

驗證資料

一、序言

實驗用到的元件有:docker、kafka、kafka-manager、zookeeper、flume;由於資源限制使用docker下安裝kafka和zookeeper,在試驗機上直接安裝flume和kafka-manager。

實驗內容:1.本地產生日誌資料,通過log4j將日誌收集到flume中,flume將資料sink到kafka中;2.flume從kafka中獲取資料然後列印到控制檯中;(或者使用flink從kafka中拿到資料,新增標識欄位後重新放入kafka另一個topic中,注稍後補全這部分)

實驗目的:通過實驗學習到docker安裝、使用、kafka操作、flume操作以及部署工作;

所需maven依賴包

  <!-- kafka start -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.9.0.1</version>
        </dependency>

        <!-- kafka end -->

        <!--flume-->
        <dependency>
            <groupId>org.apache.flume.flume-ng-clients</groupId>
            <artifactId>flume-ng-log4jappender</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.7.0</version>
        </dependency>

        <!--flink-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.9 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>




        <!-- log4j start -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.5</version>
        </dependency>
        <!-- log4j end -->

        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

二、環境準備

2.1 docker環境準備

實驗機環境為centos7

2.2 安裝zookeeper、kafka、kafka-manager環境

2.2.1 zookeeper

使用docker search zookeeper命令獲取資源庫zookeeper列表

下載首先pull獲取 wurstmeister的zookeeper

啟動zookeeper:docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

2.2.2 kafka

使用docker search zookeeper命令獲取資源庫kafka列表

下載首先pull獲取 wurstmeister的kafka

啟動kafka:

docker run -d --name kafka -p 9092:9092  -e KAFKA_BROKER_ID=0  -e KAFKA_ZOOKEEPER_CONNECT=192.168.83.112:2181  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.83.112:9092  -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

解釋:

KAFKA_BROKER_ID=0               //broker id,如果想要啟動多個就執行多次命令保證 id不相同就行了

KAFKA_ZOOKEEPER_CONNECT=192.168.83.112:2181  //外界連線kafka所需

KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.83.112:9092  //外界連線kafka所需,地址是宿主機地址

KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

2.2.3 kafka-manager

安裝在宿主機上,沒有安裝到docker中,因為docker中的映象存在元件缺失

從 GitHub 下載後編譯,編譯方法自行百度,或者直接下載已經編譯好的

連結:https://pan.baidu.com/s/1zmhG6-eP_0RsGDxvcEMzyw

密碼:sc8w

解壓到最終安裝位置,然後配置兩項

kafka-manager.zkhosts="192.168.83.112:2181
akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  logger-startup-timeout = 30s
}

然後執行啟動命令

 nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9000 &

2.3 安裝flume

從官網下載檔案,解壓後上傳到最終安裝目錄,

配置java路徑到flume-env.sh

export JAVA_HOME=/usr/java/jdk1.8.0_171-amd64

從官網下載檔案,解壓上傳到最終目錄

因為本次試驗使用的是單機版的,因此直接在bin目錄下執行.start-cluster.sh即可,web埠為8081

至此,我們的環境已經全部準備好了

三、程式開發

3.1.程式生成日誌到flume

log4j配置:

log4j.rootLogger=INFO,flume,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern="%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.83.112
log4j.appender.flume.Port = 44444
log4j.appender.flume.UnsafeMode = true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n

迴圈生成日誌

import java.util.Date;
import org.apache.log4j.Logger;
public class WriteLog {
    private static Logger logger = Logger.getLogger(WriteLog.class);
	public static void main(String[] args) throws InterruptedException {
	// 記錄debug級別的資訊  
        logger.debug("This is debug message.");  
        // 記錄info級別的資訊  
        logger.info("This is info message.");  
        // 記錄error級別的資訊  
        logger.error("This is error message.");
        int i = 0;
		while (true) {
			logger.info(new Date().getTime());
			logger.info("測試資料" + i);
			Thread.sleep(2000);
			i += 1;
		}
	}
}

在flume中的conf,複製模板為example.conf

書寫配置:

# Name the components on this agent
# 定義一個 agent 的元素

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 配置 source

#使用avro接收log4j過來的資料
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

# Describe the sink
# 配置 sink

#a1.sinks.k1.type = logger
#將資料寫入kafka,設定topic和brokers地址
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = 192.168.83.112:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100

# Use a channel which buffers events in memory
# 定義 channel

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 用 channel 連線起來 source 和 sink

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

執行flume命令

flume-ng agent -c /opt/soft/apache-flume-1.8.0-bin/conf -f example.conf --name a1 -Dflume.root.logger=INFO,console

啟動java程式

3.2程式獲取kafka中的資料

java程式:

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

/**
 * Created by anan on 2018-7-31 14:20.
 */
public class CustomerSource extends AbstractSink implements Configurable {
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        // Start transaction
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();
        try {
            // This try clause includes whatever Channel operations you want to
            // do
            Event event = ch.take();
            // Send the Event to the external repository.
            // storeSomeData(e);
            String eventBody = new String(event.getBody(), "utf-8");

            System.out.println("============= " + eventBody + " ========");

            txn.commit();
            status = Status.READY;
        } catch (Throwable t) {
            txn.rollback();
            // Log exception, handle individual exceptions as needed
            status = Status.BACKOFF;

            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error) t;
            }
        }
        // you must add this line of code in order to close the Transaction.
        txn.close();
        return status;
    }

    @Override
    public void configure(Context context) {

    }

    @Override
    public synchronized void start() {
        super.start();
    }

    @Override
    public synchronized void stop() {
        super.stop();
    }
}

新建flume conf test.conf

#soource的名字
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink


agent.sources.kafkaSource.channels = memoryChannel
agent.sinks.hdfsSink.channel = memoryChannel

#-------- kafkaSource相關配置-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect =192.168.83.112:2181
# 配置消費的kafka topic
agent.sources.kafkaSource.topic = test
# 配置消費者組的id
agent.sources.kafkaSource.groupId = flume
# 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100



#------- memoryChannel相關配置-------------------------
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000

#---------hdfsSink 相關配置------------------
agent.sinks.hdfsSink.type = com.gd.bigdataleran.flume.customerSource.CustomerSource

執行flume命令

flume-ng agent -c /opt/soft/apache-flume-1.8.0-bin/conf -f test.conf --name agent -Dflume.root.logger=INFO,console

然後檢視控制檯是否列印生成的日誌。

flink接收

如果想要flink接收kafka資料然後將資料經過簡單處理後放到kafka,就需要使用到flinkkafkaconsumer和flinkkafkaproductor

,java程式碼如下

package com.gd.bigdataleran.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


/**
 * Created by anan on 2018-8-3 15:44.
 */
public class kafkaconsumer {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 非常關鍵,一定要設定啟動檢查點!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.83.112:9092");
//        properties.setProperty("zookeeper.connect", "192.168.83.112:2181");
        properties.setProperty("group.id", "test112");

        FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("test", new SimpleStringSchema(), properties);
        myConsumer.setStartFromEarliest();
        System.out.println("執行輸入");
        DataStream<String> stream = env.addSource(myConsumer);
        DataStream ds = stream.map(new MapFunction<String, Object>() {
            @Override
            public Object map(String s) throws Exception {
                return s + "==" + new Date().getTime();
            }
        });

        FlinkKafkaProducer09<String> flinkKafkaProducer09 = new FlinkKafkaProducer09<String>("192.168.83.112:9092","test1",new SimpleStringSchema());
        ds.addSink(flinkKafkaProducer09);
        System.out.println("執行輸出");

        env.execute();
    }
}

將需要的jar包上傳到flink目錄下的lib目錄下,然後執行flink命令即可

 flume-ng agent -c /opt/soft/apache-flume-1.8.0-bin/conf -f example.conf --name a1 -Dflume.root.logger=INFO,console

驗證資料

使用java連線kafka,獲取topic中的資料進行驗證;驗證程式碼 如下

    private void getKafkaData() {
        String topic = "test1";
        Properties kafkaProps = new Properties();
        kafkaProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProps.put("bootstrap.servers", "192.168.83.112:9092");
        kafkaProps.put("zookeeper.connect", "192.168.83.112:2181");
        kafkaProps.put("group.id", "farmtest1");
        kafkaProps.put("auto.offset.reset", "smallest");
        kafkaProps.put("enable.auto.commit", "true");
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(kafkaProps));

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, 1); // 一次從主題中獲取一個數據
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 獲取每次接收到的這個資料
        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
        while (iterator.hasNext()) {
            String message = new String(iterator.next().message());
            consumer.commitOffsets();
            System.out.println(message);
        }
    }

時間差匆忙,有問題歡迎大家提問討論

相關推薦

基於Docker的Kafka+Flume+flink日誌處理實驗

目錄 目錄 一、序言 驗證資料 一、序言 實驗用到的元件有:docker、kafka、kafka-manager、zookeeper、flume;由於資源限制使用docker下安裝kafka和zookeeper,在試驗

基於SparkStreaming對銀行日誌處理系列--整體技術框架

基於SparkStreaming對銀行日誌分析,實時技術架構圖 通過flume實時採集原日誌,送到kafka快取,SparkStreaming準實時從kafka拿資料,經過ETL、聚合計算送到redis,供前端展示,具體技術及程式碼見後面部落格; 除了實時部分,還有離線這一塊,技術框架如下: 前面都一樣,

基於flink和drools的實時日誌處理

1、背景 日誌系統接入的日誌種類多、格式複雜多樣,主流的有以下幾種日誌: filebeat採集到的文字日誌,格式多樣 winbeat採集到的作業系統日誌 裝置上報到logstash的syslog日誌 接入到kafka的業務日誌 以上通過各種渠道接入的日誌,存在2個主要的問題: 格式不統一、不規範、標準化

Flink視頻教程_基於Flink處理的動態實時電商實時分析系統

分布 業務 電商分析 apr 進行 處理 密碼 教程 包括 Flink視頻教程_基於Flink流處理的動態實時電商實時分析系統 課程分享地址鏈接:https://pan.baidu.com/s/1cX7O-45y6yUPT4B-ACfliA 密碼:jqmk 在開始學習前給

基於flume日誌收集系統配置

大資料系統中通常需要採集的日誌有: 系統訪問日誌 使用者點選日誌 其他業務日誌(比如推薦系統的點選日誌) 在收集日誌的時候,一般分為三層結構:採集層、彙總層和儲存層,而不是直接從採集端將資料傳送到儲存端,這樣的好處有: 如果儲存端如Hadoop叢集、Kafka等需要停

springboot2.0---05、基於AOP日誌處理

@Aspect @Component public class LogAop { private final Logger log = LoggerFactory.getLogger(LogA

基於Apache Flume Datahub外掛將日誌資料同步上雲

本文用到的 簡介 Apache Flume是一個分散式的、可靠的、可用的系統,可用於從不同的資料來源中高效地收集、聚合和移動海量日誌資料到集中式資料儲存系統,支援多種Source和Sink外掛。本文將介紹如何使用Apache Flume的Datahub Sink外掛將日誌

基於Flink處理的動態實時電商實時分析系統

在開始學習前給大家說下什麼是Flink? 1.Flink是一個針對流資料和批資料的分散式處理引擎,主要用Java程式碼實現。 2.Apache Flink作為Apache的頂級專案,Flink集眾多優點於一身,包括快速、可靠可擴充套件、完全相容Hadoop、使

基於flume日誌系統

思路 日誌統一輸出至kafka flume agent充當kafka消費者,將日誌輸出至elasticsearch kibana負責展示日誌資訊 準備工作 flume 1.8 kafka 1.1.0 elasticsearch&kibana 6.5.4 專案中一般

雲星資料---Apache Flink實戰系列(精品版)】:Flink處理API詳解與程式設計實戰002-Flink基於流的wordcount示例002

三、基於socket的wordcount 1.傳送資料 1.傳送資料命令 nc -lk 9999 2.傳送資料內容 good good study day day

基於Flink處理的動態實時億級全端用戶畫像系統

發展 完美 企業應用 應用 消費 spring 服務架構 習慣 link 基於Flink流處理的動態實時億級全端用戶畫像系統網盤地址:https://pan.baidu.com/s/1t_VNw9I6ML1o-jSqVlhBJQ 提取碼: 3t5q備用地址(騰訊微雲):ht

基於Heka+Flume+Kafka+ELK的日誌系統

前期準備 系統是centos6.6,64位機器。 所用軟體版本: Logstash:2.3.3 JDK:1.8.0_25 Elasticsearch:2.3.4 Kibana:4.5.2 Heka:0.10.0 Flume:1.7.0 Zookeeper:3.4

Flume、Kafka與Storm實現日誌處理

1. ZooKeeper 安裝參考 2. Kafka 2.1 解壓安裝 # 確保scala已經安裝好,本文安裝的是2.11.7 tar -xf kafka_2.11-0.9.0.1.tgz cd kafka_2.11-0.9.0.1 mkdi

【Spark深入學習 -10】基於spark構建企業級流處理系統

變現 大內存 空間換時間 detail python 訪問量 新版本 kafak 計算框架 ----本節內容------- 1.流式處理系統背景 1.1 技術背景 1.2 Spark技術很火 2.流式處理技術介紹 2.1流式處理技術概念 2.

學習筆記17_網站異常和日誌處理

add asa asp 日誌類 cte clob rect string 學習 *在clobal.asax中,寫protected void Application_Error() {   Respone.Redirect("Default.aspx");//發生錯誤能在此

人工智能會促成一張基於物聯網可自主處理的數據互聯網絡

雲計算 物聯網 邊緣計算 移動互聯 雲計算——信息和數據爆炸的自然產物互聯網發展至今,總結下來大概經歷了這麽三個歷史時期或階段,分別是IT時代、ICT時代、DT時代。IT時代可以理解為桌面時代,那個時候網絡剛起步發展,人們只是通過桌面機進行簡單的信息展示或者獲取,雖然很多設備當時都處於脫

controller層統一攔截進行日誌處理

uuid final asp end sna ogg long lan tostring 前言 在項目中添加統一日誌時,我們往往會用到aop進行切面處理,常用在controller層添加切面,以處理請求和返回的各項參數數據。 使用切面進行日誌處理 下面我們就看一個例子說明基

實現基於MySQL管理rsyslog日誌

mysql 管理 rsyslog 實驗概要: 本實驗主機默認采用系統為Centos 6.9 host1主機作為遠程日誌客戶端 安裝:rsyslog (默認安裝)、rsyslog-mysql host2主機作為日誌服務器 安裝:mysql(默認安裝,也可安裝

基於LAMP實現web日誌管理查看

日誌 web 管理 前言:日誌是一個重要的信息庫,如何高效便捷的查看系統中的日誌信息,是系統管理員管理系統的必備的技術。實現方式: 1、將日誌存儲於數據庫。 2、采用LAMP架構,搭建PHP應用,通過web服務訪問數據庫,提取日誌信息,展現到web頁面。基本結構圖:一、搭建日誌服務器

Python 日誌處理(二) 使用正則表達式處理Nginx 日誌

表示 cnblogs sunday sta return __main__ pattern eth 解析 使用正則表達式來處理Nginx 日誌 一、 先對單行的日誌進行分組正則匹配,返回匹配後的結果(字典格式): from datetime import dateti