1. 程式人生 > >基於Hadoop生態SparkStreaming的大數據實時流處理平臺的搭建

基於Hadoop生態SparkStreaming的大數據實時流處理平臺的搭建

perm cati permsize receive jdk1 處理方式 行數據 con GC

隨著公司業務發展,對大數據的獲取和實時處理的要求就會越來越高,日誌處理、用戶行為分析、場景業務分析等等,傳統的寫日誌方式根本滿足不了業務的實時處理需求,所以本人準備開始著手改造原系統中的數據處理方式,重新搭建一個實時流處理平臺,主要是基於hadoop生態,利用Kafka作為中轉,SparkStreaming框架實時獲取數據並清洗,將結果多維度的存儲進HBase數據庫。

整個平臺大致的框架如下:

技術分享圖片

操作系統:Centos7

用到的框架:

1. Flume1.8.0

2. Hadoop2.9.0

3. kafka2.11-1.0.0

4.Spark2.2.1

5.HBase1.2.6

6. ZooKeeper3.4.11

7. maven3.5.2

整體的開發環境是基於JDK1.8以上以及Scala,所以得提前把java和Scala的環境給準備好,接下來就開始著手搭建基礎平臺:

一、配置開發環境

下載並解壓JDK1.8,、下載並解壓Scala,配置profile文件:

vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_144
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export SCALA_HOME=/usr/local/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin
source /etc/profile

二、配置zookeeper、maven環境

下載並解壓zookeeper以及maven並配置profile文件

wget http://mirrors.hust.edu.cn/apache/maven/maven-3/3.5.2/binaries/apache-maven-3.5.2-bin.tar.gz
tar -zxvf apache-maven-3.5.2-bin.tar.gz -C /usr/local
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz
tar -zxvf zookeeper-3.4.11.tar.gz -C /usr/local
vim /etc/profile
export MAVEN_HOME=/usr/local/apache-maven-3.5.2
export PATH=$PATH:$MAVEN_HOME/bin
source /etc/profile

zookeeper的配置文件配置一下:

cp /usr/local/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.11/conf/zoo.cfg

然後配置一下zoo.cfg裏面的相關配置,指定一下dataDir目錄等等

啟動zookeeper:

/usr/local/zookeeper-3.4.11/bin/zkServer.sh start

如果不報錯,jps看一下是否啟動成功

三、安裝配置Hadoop

Hadoop的安裝配置在之前文章中有說過(傳送門),為了下面的步驟方便理解,這裏只做一個單機版的簡單配置說明:

下載hadoop解壓並配置環境:

wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.9.0/hadoop-2.9.0.tar.gz
tar -zxvf hadoop-2.9.0.tar.gz -C /usr/local
vim /etc/profile
export HADOOP_HOME=/usr/local/hadoop-2.9.0
export PATH=$PATH:$HADOOP_HOME/bin
source /etc/profile

配置hadoop 進入/usr/local/hadoop-2.9.0/etc/hadoop目錄

cd /usr/local/hadoop-2.9.0/etc/hadoop

首先配置hadoop-env.sh、yarn-env.sh,修改JAVA_HOME到指定的JDK安裝目錄/usr/local/java/jdk1.8.0_144

創建hadoop的工作目錄

mkdir /opt/data/hadoop

編輯core-site.xml、hdfs-site.xml、yarn-site.xml等相關配置文件,具體配置不再闡述請看前面的文章,配置完成之後記得執行hadoop namenode -format,否則hdfs啟動會報錯,啟動完成後不出問題瀏覽器訪問50070端口會看到hadoop的頁面。

想學習大數據或者想學習大數據的朋友,我整理了一套大數據的學習視頻免費分享給大家,從入門到實戰都有,大家可以加微信:Lxiao_28獲取,還可以入微信群交流!(備註領取資料,真實有效)。

四、安裝配置kafka

還是一樣,先下載kafka,然後配置:

wget http://mirrors.hust.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka_2.11-1.0.0
export PATH=$KAFKA_HOME/bin:$PATH
source /etc/profile

進入kafka的config目錄,配置server.properties,指定log.dirs和zookeeper.connect參數;配置zookeeper.properties文件中zookeeper的dataDir,配置完成後啟動kafka

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

可以用jps查看有沒有kafka進程,然後測試一下kafka是否能夠正常收發消息,開兩個終端,一個用來做producer發消息一個用來做consumer收消息,首先,先創建一個topic

kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic testTopic
kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic

如果不出一下會看到如下輸出:

Topic:testTopic	PartitionCount:1	ReplicationFactor:1	Configs:
Topic: testTopic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

然後在第一個終端中輸入命令:

kafka-console-producer.sh –broker-list localhost:9092 –topic testTopic

在第二個終端中輸入命令:

kafka-console-consumer.sh –zookeeper 127.0.0.1:2181 –topic testTopic

如果啟動都正常,那麽這兩個終端將進入阻塞監聽狀態,在第一個終端中輸入任何消息第二個終端都將會接收到。

五、安裝配置HBase

下載並解壓HBase:

wget http://mirrors.hust.edu.cn/apache/hbase/1.2.6/hbase-1.2.6-bin.tar.gz
tar -zxvf hbase-1.2.6-bin.tar.gz -C /usr/local/
vim /etc/profile
export HBASE_HOME=/usr/local/hbase-1.2.6
export PATH=$PATH:$HBASE_HOME/bin
source /etc/profile

修改hbase下的配置文件,首先修改hbase-env.sh,主要修改JAVA_HOME以及相關參數,這裏要說明一下HBASE_MANAGES_ZK這個參數,因為采用了自己的zookeeper,所以這裏設置為false,否則hbase會自己啟動一個zookeeper

cd /usr/local/hbase-1.2.6/conf
vim hbase-env.sh
export JAVA_HOME=/usr/local/java/jdk1.8.0_144/
HBASE_CLASSPATH=/usr/local/hbase-1.2.6/conf
export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"
export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=256m -XX:MaxPermSize=1024m"
export HBASE_PID_DIR=/opt/data/hbase
export HBASE_MANAGES_ZK=false

然後修改hbase-site.xml,我們設置hbase的文件放在hdfs中,所以要設置hdfs地址,其中tsk1是我安裝hadoop的機器的hostname,hbase.zookeeper.quorum參數是安裝zookeeper的地址,這裏的各種地址最好用機器名

vim hbase-site.xml
<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://tsk1:9000/hbase</value>
    </property>
    <property>
        <name>hbase.master</name>
        <value>tsk1:60000</value>
    </property>
    <property>
        <name>hbase.master.port</name>
        <value>60000</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>192.168.70.135</value>
    </property>
    <property>
        <name>zookeeper.znode.parent</name>
        <value>/hbase</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/opt/data/zookeeper</value>
    </property>
    <property>
		<name>hbase.master.info.bindAddress</name>
        <value>tsk1</value>
    </property>
</configuration>

配置完成後啟動hbase,輸入命令:

start-hbase.sh

完成後查看日誌沒有報錯的話測試一下hbase,用hbase shell進行測試:

hbase shell
hbase(main):001:0>create ‘myTestTable‘,‘info‘
0 row(s) in 2.2460 seconds
=> Hbase::Table - myTestTable
hbase(main):003:0>list
TABLE                                                                                                                    
testTable                                                                                                                
1 row(s) in 0.1530 seconds

=> ["myTestTable"]

至此,hbase搭建成功,訪問以下hadoop的頁面,查看file system(菜單欄Utilities->Browse the file system),這時可以看見base的相關文件已經載hadoop的文件系統中。

技術分享圖片

六、安裝spark

下載spark並解壓

wget http://mirrors.hust.edu.cn/apache/spark/spark-2.2.1/spark-2.2.1-bin-hadoop2.7.tgz
tar -zxvf spark-2.2.1-bin-hadoop2.7.tgz -C /usr/local
vim /etc/profile
export SPARK_HOME=/usr/local/spark-2.2.1-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile

七、測試

至此,環境基本搭建完成,以上搭建的環境僅是服務器生產環境的一部分,涉及服務器信息、具體調優信息以及集群的搭建就不寫在這裏了,下面我們寫一段代碼整體測試一下從kafka生產消息到spark streaming接收到,然後處理消息並寫入HBase。先寫一個HBase的連接類HBaseHelper:

public class HBaseHelper {
    private static HBaseHelper ME;
    private static Configuration config;
    private static Connection conn;
    private static HBaseAdmin admin;

    public static HBaseHelper getInstances() {
        if (null == ME) {
            ME = new HBaseHelper();
            config = HBaseConfiguration.create();
            config.set("hbase.rootdir", "hdfs://tsk1:9000/hbase");
            config.set("hbase.zookeeper.quorum", "tsk1");
            config.set("hbase.zookeeper.property.clientPort", "2181");
            config.set("hbase.defaults.for.version.skip", "true");
        }
        if (null == conn) {
            try {
                conn = ConnectionFactory.createConnection(config);
                admin = new HBaseAdmin(config);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return ME;
    }

    public Table getTable(String tableName) {
        Table table = null;
        try {
            table = conn.getTable(TableName.valueOf(tableName));
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        return table;
    }

    public void putAdd(String tableName, String rowKey, String cf, String column, Long value) {
        Table table = this.getTable(tableName);
        try {
            table.incrementColumnValue(rowKey.getBytes(), cf.getBytes(), column.getBytes(), value);
            System.out.println("OK!");
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
 //......以下省略
}

再寫一個測試類KafkaRecHbase用來做spark-submit提交

package com.test.spark.spark_test;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaRecHbase {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        Logger.getLogger("org").setLevel(Level.ERROR);
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("kafkaRecHbase");
        sparkConf.setMaster("local[2]");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
        int numThreads = Integer.parseInt(args[3]);
        Map<String, Integer> topicMap = new HashMap<>();
        String[] topics = args[2].split(",");
        for (String topic : topics) {
            topicMap.put(topic, numThreads);
        }
        JavaPairReceiverInputDStream<String, String> kafkaStream =
                KafkaUtils.createStream(ssc, args[0], args[1], topicMap);
        JavaDStream<String> lines = kafkaStream.map(Tuple2::_2);
        JavaDStream<String> lineStr = lines.map(line -> {
            if (null == line || line.equals("")) {
                return "";
            }
            String[] strs = SPACE.split(line);
            if (strs.length < 1) {
                return "";
            }
            try {
                for (String str : strs) {
                    HBaseHelper.getInstances().putAdd("myTestTable", str, "info", "wordCunts", 1l);
                }
                return "strs:" + line;
            } catch (Exception ex) {
                System.out.println(line);
                return "報錯了:" + ex.getMessage();
            }
        });
        lineStr.print();
        ssc.start();
        System.out.println("spark 啟動!!!");
        ssc.awaitTermination();
    }
}

編譯提交到服務器,執行命令:

spark-submit --jars $(echo /usr/local/hbase-1.2.6/lib/*.jar | tr ‘ ‘ ‘,‘) --class com.test.spark.spark_test.KafkaRecHbase --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1 /opt/FileTemp/streaming/spark-test-0.1.1.jar tsk1:2181 test testTopic 1

沒報錯的話執行kafka的producer,輸入幾行數據在HBase內就能看到結果了!

八、裝一個Flume實時采集Nginx日誌寫入Kafka

Flume是一個用來日誌采集的框架,安裝和配置都比較簡單,可以支持多個數據源和輸出,具體可以參考Flume的文檔,寫的比較全 傳送門

下載Flume並配置環境

wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
tar -zxvf apache-flume-1.8.0-bin.tar.gz -C /usr/local
vim /etc/profile
export FLUME_HOME=/usr/local/apache-flume-1.8.0-bin/
export PATH=$FLUME_HOME/bin:$PATH
source /etc/profile

寫一個Flume的配置文件在flume的conf目錄下:

vim nginxStreamingKafka.conf
agent1.sources=r1
agent1.channels=logger-channel
agent1.sinks=kafka-sink

agent1.sources.r1.type=exec
agent1.sources.r1.deserializer.outputCharset= UTF-8
agent1.sources.r1.command=tail -F /opt/data/nginxLog/nginxLog.log

agent1.channels.logger-channel.type=memory

agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = flumeKafka
agent1.sinks.kafka-sink.brokerList = tsk1:9092
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sinks.kafka-sink.batchSize = 20

agent1.sources.r1.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

kafka創建一個名為flumeKafka的topic用來接收,然後啟動flume:

flume-ng agent --name agent1 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/nginxStreamingKafka.conf -Dflume.root.logger=INFO,console

如果沒有報錯,Flume將開始采集opt/data/nginxLog/nginxLog.log中產生的日誌並實時推送給kafka,再按照上面方法寫一個spark streaming的處理類進行相應的處理就好。

基於Hadoop生態SparkStreaming的大數據實時流處理平臺的搭建