1. 程式人生 > >大資料流處理框架介紹

大資料流處理框架介紹

實時流處理簡單概述:實時是說整個流處理相應時間較短,流式技算是說資料是源源不斷的,沒有盡頭的。實時流處理一般是將業務系統產生的資料進行實時收集,交由流處理框架進行資料清洗,統計,入庫,並可以通過視覺化的方式對統計結果進行實時的展示。本文涉及到的框架或技術有 Flume,Logstash,kafka,Storm, SparkStreaming等。

實時流處理的的流程與技術選型 :

一、日誌收集

由於業務系統一般是遊離與流處理叢集如SparkStreaming、Storm之外的,所以我們需要對業務系統的資料進行實時收集。這就用到了日誌收集框架,日誌收集框架主要需要解決三個問題:資料從哪兒來,資料到哪兒去,實時收集。因為在流處理中為了防止突發或激增流量壓垮流處理叢集,通常將收集過後的資料輸出到kafka分散式訊息系統,然後流處理叢集去消費kafka中的資料,下面介紹兩種常用的日誌收集框架以及他們如何對接kafka.

1).Apache Flume

這是一個apache的頂級專案,所以他的域名為flume.apache.org, 下面是官網上的原理圖,Flume框架把每個收集任務都定義為一個Agent(這是一個JAVA程序),他有三個基本元件Source、Channel、Sink。

source:收集資料,可以對接各種常用資料來源,如檔案(exec source),kafka(kafka source),jms(java訊息系統)等。        channel:source元件把資料收集來以後,臨時存放在channel(管道)中,即channel元件在agent中是專門用來存放臨時資料的,並起到資料緩衝的作用。常用的channel有memory chanel 、jdbc chanel 、file channel 等等。

sink:sink元件是用於從channel中取資料並送到目的地的元件,目的地包括hdfs、logger、avro、thrift、file、hbase等。

其實flume的使用就是編寫配置檔案,下面是使用flume將Nginx的日誌對接kafka的配置檔案,我們將該收集任務命名為

exec-memory-kafka,只需如下編寫:

#配置source、sink、channel

exec-memory-kafka.sources = exec-source  #指定source (資料從哪兒來),可以指定多個數據源,用逗號分隔。 exec-memory-kafka.sinks = kafka-sink #指定sink(資料到哪兒去) exec-memory-kafka.channels = memory-channel #指定channel

#source詳細配置 exec-memory-kafka.sources.exec-source.type = exec  執行作業系統命令 exec-memory-kafka.sources.exec-source.command = sudo tail -F /var/log/nginx/access.log #監控Nginx日誌檔案 exec-memory-kafka.sources.exec-source.shell = /bin/sh -c  #shell命令的字首

#channel 詳細配置 exec-memory-kafka.channels.memory-channel.type = memory  #記憶體channel

#sink詳細配置 exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink #型別 為kafka sink exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092  #kafaka服務的地址,多個用逗號分隔 exec-memory-kafka.sinks.kafka-sink.topic = test1 #指定主題 exec-memory-kafka.sinks.kafka-sink.batchSize = 5  #指定每多少條收集一次,這裡是每5條傳送一次。 exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1 #使kafka對是否收到資料進行確認,確保資料不會丟失

#為sink和source指定channel

exec-memory-kafka.sources.exec-source.channels = memory-channel

exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

編寫好配置檔案後切換到flume的bin目錄下執行:

flume-ng agent --conf 配置檔案的目錄–conf-file 配置檔案的全路徑–name exec-memory-kafka -Dflume.root.logger=INFO,console

即可開啟收集任務(程序的方式)

2).ELK技術棧的Logstash

Logstash 是一個開源的資料收集引擎,它具有備實時資料傳輸能力。它可以統一過濾來自不同源的資料,並按照開發者的制定的規範輸出到目的地。Logstash使用時也是編寫配置檔案,下面是如何使用配置檔案的方式將Nginx日誌輸出到Kafka。

#定義資料來源

input{

#這裡是Nginx日誌檔案

file{

path =>"/var/log/nginx/access.log"    } }

#資料發到哪,這裡是kafka output{ kafka{     topic_id => “test1”  #指定topic     codec=>plain{     format=>"%{message}"  #輸出的格式,這裡表示只輸出訊息,不輸出其他資訊,如版本資訊等。 }     bootstrap_servers=>“hadoop000:9092”  #kafka服務的地址     batch_size=>1  #每幾條資料傳送一次 } }

切換到logstash的bin目錄,執行以下命令即可開始收集任務:

logstash -f 你的配置檔案的位置。

二、kafka

kafka是一個分散式的流處理平臺,在流處理中,我們通常使用他作為一個訊息系統來使用,他是一個分散式、支援分割槽的(partition)、多副本的(replica),基於zookeeper協調的分散式訊息系統。

kafka作為訊息系統時相比其他訊息中介軟體主要有4大優勢:

- 可擴充套件性:可以通過增加broker的方式水平擴充套件kafka叢集

- 永續性、可靠性:訊息被持久化到本地磁碟,並且支援資料備份防止資料丟失

- 容錯性:最大限度的容災,允許叢集中節點失敗,包括主節點,是高可用的。

- 高併發

幾個重要的角色:

Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集,一臺機器可以啟動多個broker在不同的埠上。

Topic:訊息系統中的主題,生產者和消費者共同關注的部分。

Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列

Segment:partition物理上由多個segment組成,每個Segment存著message資訊,以檔案的形式

Producer :生產者, 生產message傳送到topic

Consumer : 消費者,訂閱topic消費message, consumer作為一個執行緒來消費

具體的使用可參照官網,有詳細的介紹:

三、流處理框架

日誌資訊輸出到kafka後,需要使用流處理框架作為消費者去消費kafka中的資料,下面是Storm和Spark的基本原理及其如何使用。

1 .Storm

apache的頂級專案,官網是storm.apache.org ,他是一個免費的,開源的,分散式的實時計算系統。

Storm有很多用處:如實時計算分析,線上機器學習,分散式RPC即DRPC,作為ETL工具等,

Storm特點:處理速度快、可擴充套件 、容災與高可用的,能夠實現高頻資料和大規模資料的實時處理。

Storm中幾個核心的概念:

Topologies:拓撲,將整個流處理流程串起來,每個storm應用程式都需要定義Toplogies,由spout和bolt組成的。

Streams:訊息流,抽象概念,由沒有邊界的Tuple構成

Spouts:訊息流的源頭,Topology的訊息生產者。產生資料的元件,比如我們要對接kafka,我們就要定義一個kafka Spout

Bolts:訊息處理單元,可以做過濾、聚合、查詢/寫資料庫等操作。

Tuple:具體的資料,傳遞的基本單元。

Storm架構:

類似於Hadoop的架構,也是主從架構(Master/Slave),所有節點都是無狀態的,在他們上面的資訊(元資料)會儲存在zookeeper中

Nimbus: 叢集的主節點,負責任務(task)的指派和分發、資源的分配

Supervisor: 從節點,可以啟動多個Worker,可以通過配置來指定一個Topo執行在多個Worker之上,也可以通過配置來指定叢集的從節點(負責幹活的),Supervisor節點負責執行任務的具體部分,啟動和停止自己管理的Worker程序等,一個Supervisor預設啟動4個Worker程序

Worker: 執行具體元件邏輯(Spout/Bolt)的程序,這是一個程序,一個Work程序只為一個Topology服務。 Task: Worker中每一個Spout和Bolt的執行緒稱為一個Task,他是最終執行spout或者bolt程式碼的最小執行單元

executor:是一個被worker程序啟動的單獨執行緒,Spout和bolt和共享一個executor,而且一個executor可以執行多個Task。

下面是各個元件職責的示意圖:

編碼時幾個核心的角色:

1).  ISpout:核心介面(interface),負責將資料傳送到topology中去處理,Storm會跟蹤Spout發出去的tuple的,通過ack/fail機制,對Spout傳送成功或失敗時做處理,沒條資料即Tuple都有自己的message id,而且ack/fail/nextTuple是在同一個執行緒中執行的,所以不用考慮執行緒安全方面。

核心方法

open: 初始化操作

close: 資源釋放操作

nextTuple: 傳送資料    ack: tuple處理成功,storm會反饋給spout一個成功訊息 fail:tuple處理失敗,storm會發送一個訊息給spout,處理失敗         實現類: public abstract class BaseRichSpout extends BaseComponent implements IRichSpout { public interface IRichSpout extends ISpout, IComponent {}

我們定義Spout時只需要繼承BaseRichSpout這個類,並實現其中的方法即可。

2).IComponent介面 概述:public interface IComponent extends Serializable   他為topology中所有可能的元件提供公用的方法         如 void declareOutputFields(OutputFieldsDeclarer declarer); 此方法用於聲明當前Spout/Bolt傳送的tuple的名稱,使用OutputFieldsDeclarer配合使用 實現類: public abstract class BaseComponent implements IComponent

IBolt介面:

概述職責:接收tuple處理,並進行相應的處理(filter/join/…),IBolt會在一個執行的機器上建立,使用Java序列化它,然後提交到主節點(nimbus)上去執行,nimbus會啟動worker來反序列化,呼叫prepare方法,然後才開始處理tuple處理 方法: prepare:初始化 execute:處理一個tuple資料,tuple物件中包含了元資料資訊 cleanup:shutdown之前的資源清理操作 實現類: public abstract class BaseRichBolt extends BaseComponent implements IRichBolt { public interface IRichBolt extends IBolt, IComponent

RichShellBolt

我們定義Bolt時只需繼承BaseRichBolt並實現其中的方法即可。

以下是Storm對kafka的訊息進行實時列印的程式碼實現。Storm官網有許多對接主流框架的介紹,引入所需jar包,就可以使用寫好的KafkaSpout,而無需自己定義KafkaSpout類了。

org.apache.storm storm-kafka ${storm.version}  

public class StormKafkaTopology {

public static class LogBolt extends BaseRichBolt {

    private OutputCollector outputCollector;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
    }

    public void execute(Tuple tuple) {
        try {
            byte[] bytes = tuple.getBinaryByField("bytes");
            String value = new String(bytes);
            System.out.println("value :" + value);
            this.outputCollector.ack(tuple);
        } catch (Exception e) {
            this.outputCollector.fail(tuple);
        }

    }
   //無後續bolt,無需宣告
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
    //kafka的配置
    String topicName = "project_topic";
    BrokerHosts hosts = new ZkHosts("hadoop000:2181");
    SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
    //從上次收集的位置開始,而不是從頭開始
    spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
    //建立kafkaSpout
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    builder.setSpout("KafkaSpout", kafkaSpout);
    //設定Bolt
    builder.setBolt("LogBolt", new LogBolt()).shuffleGrouping("KafkaSpout");
    //本地執行Storm任務
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("StormKafkaTopology", new Config(), builder.createTopology());
}

}} 2.SparkStreaming

官網上的介紹如下:

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

即:Spark Streaming 是Spark核心API的一個擴充套件,可以實現高吞吐量的、具備容錯機制的實時流資料的處理。支援從多種資料來源獲取資料,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,從資料來源獲取資料之後,可以使用諸如map、reduce、join和window等高階函式進行復雜演算法的處理。最後還可以將處理結果儲存到檔案系統,資料庫和現場儀表盤。在“One Stack rule them all”的基礎上,還可以使用Spark的其他子框架,如叢集學習、圖計算等,對流資料進行處理。

Spark嚴格意義上來說並不能算實時流處理,他粗粒度的工作原理為:將實時接收的資料,根據一定的時間間隔拆成一批批的資料,具體來說是一批批RDD(分散式彈性資料集,Spark中的核心概念),然後通過SparkEngine來處理這些資料,可能是一些transformation和action操作,最後得到一批批的處理結果。

Strom和SparkStreaming的對比:

1).Strom是真正意義上的的流處理,時延相比SparkStreaming較低,而SparkStremming是將接受的實時流資料,按照指定的時間間隔拆成一個個RDD,在每個RDD中以批處理的形式處理資料。本質上還是批處理。

2).Storm會通過messageId的方式全域性追蹤和記錄每一條記錄,並通過ack/fail機制確保每條資料至少被處理一次(也可能是多次),而SparkStream應用程式只需要批處理級別對記錄進行追蹤,他能保證每個批處理記錄僅僅被處理一次。

3).由於SparkStreming是執行在Spark平臺上的無需單獨安裝,可以和批處理SparkSql,機器學習等其他其框架結合起來使用。

下面使用scala語言將SparkStreming對接kafka並對圖書點選量進行實時統計的應用程式碼:將kafka中收集到的日誌進行清洗,並轉換成ClikcLog物件,並實時統計的結果轉化成BookClick物件並寫入Hbase,Nginx日誌結構如下:

192.168.126.1 - - [2017-12-02 19:20:28] “GET /books/1 HTTP/1.1” 200 2403 “-” “Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36” “-”

object BookCount { def main(args: Array[String]): Unit = { //以引數的形式執行SparkStreming應用程式 四個引數為zk地址 ,使用者組, 主題,執行緒數 if (args.length != 4) { System.err.println("Usage: KafkaReceiverWordCount ") }

val Array(zkQuorum, group, topics, numThreads) = args

val sparkConf = new SparkConf()
//構造StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(5))

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

// Spark Streaming對接Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

val logs = messages.map(_._2)
// 192.168.126.1 - - [2017-12-02 19:20:28] "GET /books/1 HTTP/1.1" 200 2403 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36" "-"
//  0            1 2   3             4        5    6        7     8
val cleanData = logs.map(line => {
  val infos = line.split(" ")
  val url = infos(6)
  var bookId = 0
  val time = infos(3).substring(1) + " " + infos(4).substring(0, 7)
  if (url.startsWith("/books/")) {//只關注以books/開頭的請求
    bookId = url.split("/")(2).toInt
  }
  ClickLog(infos(0), TimeUtil.newTime(time), bookId, infos(8).toInt)
}).filter(clickLog => clickLog.bookId != 0)//為零表示不滿足要求,忽略。

//cleanData.print()

cleanData.map(x => {
  (x.time.substring(0, 8) + "_" + x.bookId, 1)
}).reduceByKey(_ + _).foreachRDD(rdd => {
  rdd.foreachPartition(record => {
    val list= new ListBuffer[BookClick]
    record.foreach(pair => {
      list.append(BookClick(pair._1,pair._2))
    })
    BookClickDao.put(list)
  })
})

ssc.start()
ssc.awaitTermination()

} } case class ClickLog(ip:String,time:String,bookId:Int,statusCode:Int) case class BookClick(day_id:String,click_count:Int) object BookClickDao { val tableName = “book_clickcount” val cf = “info” val colume = “click_count”

def put(list: ListBuffer[BookClick]): Unit = { val table = HbaseUtils.getInstance().getTable(tableName) for (ele <- list) { table.incrementColumnValue(Bytes.toBytes(ele.day_id), Bytes.toBytes(cf), Bytes.toBytes(colume), ele.click_count) } } def get(day_id: String): Long = { val table = HbaseUtils.getInstance().getTable(tableName) val get = new Get(Bytes.toBytes(day_id)) val value = table.get(get).getValue(cf.getBytes, colume.getBytes) if (value == null) 0l else Bytes.toLong(value) } } object TimeUtil { val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance(“yyyy-MM-dd HH:mm:ss”) val TARGET_TIME = FastDateFormat.getInstance(“yyyyMMddHHmmss”)

def passTime(time: String)={ YYYYMMDDHHMMSS_FORMAT.parse(time) } def newTime(time:String)={ TARGET_TIME.format(passTime(time)) }

def main(args: Array[String]): Unit = { println(newTime(“2017-12-02 19:13:25”)) } }

因為流處理框架本身不具備儲存能力,最後需要將統計結果入庫,並可通過百度的Echart或者阿里的DataV等資料視覺化工具,定義sql和時間間隔,對統計結果進行實時的展示。