1. 程式人生 > >Flume、Kafka與Storm實現日誌處理

Flume、Kafka與Storm實現日誌處理

1. ZooKeeper

安裝參考

2. Kafka

2.1 解壓安裝

# 確保scala已經安裝好,本文安裝的是2.11.7
tar -xf kafka_2.11-0.9.0.1.tgz
cd kafka_2.11-0.9.0.1
mkdir logs

vim ~/.bash_profile

export KAFKA_HOME=/home/zkpk/kafka_2.11-0.9.0.1
export PATH=$PATH:$KAFKA_HOME/bin

source ~/.bash_profile

2.2 配置

2.2.1 server.properties

只設置了以下4項,其他使用預設值

# 當前機器在叢集中的唯一標識,和zookeeper的myid性質一樣
broker.id=0

host.name=hsm01

# 訊息存放的目錄,這個目錄可以配置為“,”逗號分割的表示式,
# 上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,
# 新建立的topic他把訊息持久化的地方是,當前以逗號分割的目錄中,
# 那個分割槽數最少就放那一個
log.dirs=/home/zkpk/kafka_2.11-0.9.0.1/logs

# 配置自定義的ZooKeeper
zookeeper.connect=hsm01:2181,hss01:2181,hss02:2181
/kafka

2.2.2 複製到其他節點

scp -r ~/kafka_2.11-0.9.0.1/ hss01:~/
scp -r ~/kafka_2.11-0.9.0.1/ hss02:~/

# 修改broker.id與host.name
# 配置環境變數

2.3 啟動

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

2.4 測試

# 建立Topic
kafka-topics.sh --create --zookeeper hsm01:2181/kafka --replication-factor 1 -
-partitions 1 --topic shuaige # 建立一個broker,釋出者 kafka-console-producer.sh --broker-list hsm01:9092 --topic shuaige # 建立一個訂閱者 kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic shuaige --from-beginning # 檢視主題 kafka-topics.sh --zookeeper hsm01:2181/kafka --list # 檢視主題詳情 kafka-topics.sh --describe --zookeeper localhost:2181 --topic test # 刪除主題 kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

2.5 參考

3. Flume

3.1 解壓安裝

# /home/zkpk目錄
tar -xf apache-flume-1.6.0-bin.tar.gz
mv apache-flume-1.6.0-bin/ flume-1.6.0

# 配置環境變數
vim .bash_profile

export FLUME_HOME=/home/zkpk/flume-1.6.0
export PATH=$PATH:$FLUME_HOME/bin

3.2 配置(與kafka整合)


kafkasink只有在1.6.0以上的flume版本才有。

3.2.1 flume-env.sh

JAVA_HOME=/opt/jdk1.8.0_45

3.2.2 kafka-sogolog.properties

# configure agent
a1.sources = f1
a1.channels = c1
a1.sinks = k1

# configure the source
a1.sources.f1.type = netcat
a1.sources.f1.bind = localhost
a1.sources.f1.port = 3333

# configure the sink (kafka)
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = sogolog
a1.sinks.k1.brokerList = hsm01:9092,hss01:9092/kafka
a1.sinks.k1.requiredAcks = 0
a1.sinks.k1.batchSize = 20

# configure the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind the source and sink to the channel
a1.sources.f1.channels = c1
a1.sinks.k1.channel = c1

3.3 啟動

啟動ZooKeeper服務

$ZOOKEEPER_HOME/bin/zkServer.sh start

啟動kafka

# 啟動服務
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 建立Topic
kafka-topics.sh --create --zookeeper hsm01:2181/kafka --replication-factor 1 --partitions 1 --topic sogolog

# 建立一個訂閱者
kafka-console-consumer.sh --zookeeper hsm01:2181/kafka --topic sogolog --from-beginning

啟動flume

flume-ng agent -n a1 -c conf -f conf/kafka-sogolog.properties -Dflume.root.logger=DEBUG,console


注:命令中的a1表示配置檔案中的Agent的Name,如配置檔案中的a1。flume-conf.properties表示配置檔案所在配置,需填寫準確的配置檔案路徑。

3.4 測試

telnet輸入

telnet輸入

flume採集資料

flume採集資料

kafka接收資料

kafka接收資料

3.5 參考

4. Storm

4.1 安裝

4.2 簡單測試

4.2.1 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>bigdata-demo</artifactId>
        <groupId>com.zw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>storm-demo</artifactId>
    <packaging>jar</packaging>

    <name>storm-demo</name>
    <url>http://maven.apache.org</url>

    <repositories>
        <repository>
            <id>github-releases</id>
            <url>http://oss.sonatype.org/content/repositories/github-releases</url>
        </repository>
        <repository>
            <id>clojars.org</id>
            <url>http://clojars.org/repo</url>
        </repository>
        <repository>
            <id>twitter4j</id>
            <url>http://twitter4j.org/maven2</url>
        </repository>
    </repositories>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <storm.version>0.9.7</storm.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!-- 
            java直接執行時,修改為compile, 
            maven執行時,使用provided 
            -->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>com.theoryinpractise</groupId>
                <artifactId>clojure-maven-plugin</artifactId>
                <version>1.3.8</version>
                <extensions>true</extensions>
                <configuration>
                    <sourceDirectories>
                        <sourceDirectory>src/clj</sourceDirectory>
                    </sourceDirectories>
                </configuration>
                <executions>
                    <execution>
                        <id>compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test</id>
                        <phase>test</phase>
                        <goals>
                            <goal>test</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>


注意storm-core依賴的scope

4.2.2 HelloWorldSpout.java

package com.zw.storm.helloworld;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

/**
 * Spout起到和外界溝通的作用,他可以從一個數據庫中按照某種規則取資料,也可以從分散式佇列中取任務
 * <p>
 *     生成一個隨機數生成的Tuple
 * </p>
 *
 * Created by zhangws on 16/10/3.
 */
public class HelloWorldSpout extends BaseRichSpout {

    // 用來發射資料的工具類
    private SpoutOutputCollector collector;

    private int referenceRandom;

    private static final int MAX_RANDOM = 10;

    public HelloWorldSpout() {
        final Random rand = new Random();
        referenceRandom = rand.nextInt(MAX_RANDOM);
    }

    /**
     * 定義欄位id,該id在簡單模式下沒有用處,但在按照欄位分組的模式下有很大的用處。
     * <p>
     *     該declarer變數有很大作用,我們還可以呼叫declarer.declareStream();
     *     來定義stramId,該id可以用來定義更加複雜的流拓撲結構
     * </p>
     * @param outputFieldsDeclarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }

    /**
     * 初始化collector
     *
     * @param map
     * @param topologyContext
     * @param spoutOutputCollector
     */
    @Override
    public void open(Map map, TopologyContext topologyContext,
                     SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    /**
     * 每呼叫一次就可以向storm叢集中發射一條資料(一個tuple元組),該方法會被不停的呼叫
     */
    @Override
    public void nextTuple() {
        Utils.sleep(100);
        final Random rand = new Random();
        int instanceRandom = rand.nextInt(MAX_RANDOM);
        if (instanceRandom == referenceRandom) {
            collector.emit(new Values("Hello World"));
        } else {
            collector.emit(new Values("Other Random Word"));
        }
    }
}

4.2.3 HelloWorldBolt.java

package com.zw.storm.helloworld;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

/**
 * 接收噴發節點(Spout)傳送的資料進行簡單的處理後,發射出去。
 * <p>
 *     用於讀取已產生的Tuple並實現必要的統計邏輯
 * </p>
 *
 * Created by zhangws on 16/10/4.
 */
public class HelloWorldBolt extends BaseBasicBolt {

    private int myCount;

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String test = tuple.getStringByField("sentence");
        if ("Hello World".equals(test)) {
            myCount++;
            System.out.println("==========================: " + myCount);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

4.2.4 HelloWorldTopology.java

package com.zw.storm.helloworld;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

/**
 * mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.zw.storm.helloworld.HelloWorldTopology
 * Created by zhangws on 16/10/4.
 */
public class HelloWorldTopology {

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        // 設定噴發節點並分配併發數,該併發數將會控制該物件在叢集中的執行緒數。
        builder.setSpout("randomHelloWorld", new HelloWorldSpout(), 1);
        // 設定資料處理節點並分配併發數。指定該節點接收噴發節點的策略為隨機方式。
        builder.setBolt("HelloWorldBolt", new HelloWorldBolt(), 2)
                .shuffleGrouping("randomHelloWorld");

        Config config = new Config();
        config.setDebug(true);

        if (args != null && args.length > 0) {
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            // 這裡是本地模式下執行的啟動程式碼。
            config.setMaxTaskParallelism(1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", config, builder.createTopology());
            Utils.sleep(10000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
}

4.2.5 執行

# maven
mvn compile exec:java -Dexec.classpathScope=compile -Dexec.mainClass=com.zw.storm.helloworld.HelloWorldTopology

# java直接執行
修改storm-core依賴的scope為compile

結果

...
34568 [Thread-15-HelloWorldBolt] INFO  backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:3, stream: default, id: {}, [Other Random Word]
34671 [Thread-11-randomHelloWorld] INFO  backtype.storm.daemon.task - Emitting: randomHelloWorld default [Hello World]
34671 [Thread-15-HelloWorldBolt] INFO  backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:3, stream: default, id: {}, [Hello World]
==========================: 5
34776 [Thread-11-randomHelloWorld] INFO  backtype.storm.daemon.task - Emitting: randomHelloWorld default [Other Random Word]
34776 [Thread-15-HelloWorldBolt] INFO  backtype.storm.daemon.executor - Processing received message source: randomHelloWorld:3, stream: default, id: {}, [Other Random Word]
34880 [Thread-11-randomHelloWorld] INFO  backtype.storm.daemon.task - Emitting: randomHelloWorld default [Other Random Word]
...

4.3 與Kafka整合

4.3.1 pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>bigdata-demo</artifactId>
        <groupId>com.zw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>kafka2storm</artifactId>
    <packaging>jar</packaging>

    <name>kafka2storm</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <storm.version>0.9.7</storm.version>
        <kafka.version>0.9.0.1</kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
            <!--
            java直接執行時,修改為 compile,
            maven執行時,使用 provided
            -->
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>${storm.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

4.3.2 MessageScheme.java

package com.zw.kafka.storm;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * 對kafka出來的資料轉換成字串
 * <p>
 *     KafkaSpout是Storm中自帶的Spout,
 *     使用KafkaSpout時需要子集實現Scheme介面,它主要負責從訊息流中解析出需要的資料
 * </p>
 *
 * Created by zhangws on 16/10/2.
 */
public class MessageScheme implements Scheme {

    public List<Object> deserialize(byte[] bytes) {
        try {
            String msg = new String(bytes, "UTF-8");
            return new Values(msg);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }

    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

4.3.3 SequenceBolt.java

package com.zw.kafka.storm;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;

/**
 * 把輸出儲存到一個檔案中
 * <p>
 *     把輸出的訊息放到檔案kafkastorm.out中
 * </p>
 *
 * Created by zhangws on 16/10/2.
 */
public class SequenceBolt extends BaseBasicBolt {

    /**
     * Process the input tuple and optionally emit new tuples based on the input tuple.
     * <p>
     * All acking is managed for you. Throw a FailedException if you want to fail the tuple.
     *
     * @param input
     * @param collector
     */
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = (String) input.getValue(0);
        System.out.println("==============" + word);

        //寫檔案
        try {
            DataOutputStream out_file = new DataOutputStream(new FileOutputStream("/home/zkpk/kafkastorm.out"));
            out_file.writeUTF(word);
            out_file.close();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        collector.emit(new Values(word));
    }

    /**
     * Declare the output schema for all the streams of this topology.
     *
     * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
     */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

4.3.4 KafkaTopology.java

package com.zw.kafka.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;

import java.util.HashMap;
import java.util.Map;

/**
 * 配置kafka提交topology到storm的程式碼
 * <p>
 * topic1的含義kafka接收生產者過來的資料所需要的topic;
 * topic2是KafkaBolt也就是storm中的bolt生成的topic,當然這裡topic2這行配置可以省略,
 * 是沒有任何問題的,類似於一箇中轉的東西
 * </p>
 * Created by zhangws on 16/10/2.
 */
public class KafkaTopology {

    private static final String BROKER_ZK_LIST = "hsm01:2181,hss01:2181,hss02:2181";
    private static final String ZK_PATH = "/kafka/brokers";

    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts(BROKER_ZK_LIST, ZK_PATH);
        // 配置Kafka訂閱的Topic,以及zookeeper中資料節點目錄和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "sogolog", "/kafka", "kafka");

        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();
        Map<String, String> map = new HashMap<String, String>();

        // 配置Kafka broker地址
        map.put("metadata.broker.list", "hsm01:9092");
        // serializer.class為訊息的序列化類
        map.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", map);
        // 配置KafkaBolt生成的topic
        conf.put("topic", "topic2");

        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
//        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
        builder.setBolt("kafka-bolt", new SequenceBolt()).shuffleGrouping("kafka-spout");
        builder.setBolt("kafka-bolt2", new KafkaBolt<String, Integer>()).shuffleGrouping("kafka-bolt");

        String name = KafkaTopology.class.getSimpleName();
        if (args != null && args.length > 0) {
            // Nimbus host name passed from command line
            conf.put(Config.NIMBUS_HOST, args[