1. 程式人生 > >基於Spark的公安大資料實時運維技術實踐

基於Spark的公安大資料實時運維技術實踐

宣告:本文為《程式設計師》原創文章,未經允許不得轉載,更多精彩文章請訂閱2017年《程式設計師》。 
作者:秦海龍,杭州以數科技有限公司大資料工程師。Java及Scala語言,Hadoop生態、Spark大資料處理技術愛好者。 
責編:郭芮,關注大資料領域,尋求報道或投稿請聯絡[email protected]

公安行業存在數以萬計的前後端裝置,前端裝置包括相機、檢測器及感應器,後端裝置包括各級中心機房中的伺服器、應用伺服器、網路裝置及機房動力系統,數量巨大、種類繁多的裝置給公安內部運維管理帶來了巨大挑戰。傳統通過ICMP/SNMP、Trap/Syslog等工具對裝置進行診斷分析的方式已不能滿足實際要求,由於公安內部運維管理的特殊性,現行通過ELK等架構的方式同樣也滿足不了需要。為尋求合理的方案,我們將目光轉向開源架構,構建了一套適用於公安行業的實時運維管理平臺。

實時運維平臺整體架構

  • 資料採集層:Logstash+Flume,負責在不同場景下收集、過濾各類前後端硬體裝置輸出的Snmp Trap、Syslog日誌資訊以及應用伺服器自身產生的系統和業務日誌;
  • 資料傳輸層:採用高吞吐的分散式訊息佇列Kafka叢集,保證匯聚的日誌、訊息的可靠傳輸;
  • 資料處理層:由Spark實時Pull Kafka資料,通過Spark Streaming以及RDD操作進行資料流的處理以及邏輯分析;
  • 資料儲存層:實時資料存入MySQL中便於實時的業務應用和展示;全量資料存入ES以及HBase中便於後續的檢索分析;
  • 業務服務層:基於儲存層,後續的整體業務應用涵蓋了APM、網路監控、拓撲、告警、工單、CMDB等。

整體系統涉及的主要開源框架情況如下:

另外,整體環境基於JDK 8以及Scala 2.10.4。公安系統裝置種類繁多,接下來將以交換機Syslog日誌為例,詳細介紹日誌處理分析的整體流程。

圖1  公安實時運維平臺整體架構
圖1 公安實時運維平臺整體架構

Flume+Logstash日誌收集

Flume是Cloudera貢獻的一個分散式、可靠及高可用的海量日誌採集系統,支援定製各類Source(資料來源)用於資料收集,同時提供對資料的簡單處理以及通過快取寫入Sink(資料接收端)的能力。

Flume中,Source、Channel及Sink的配置如下:

# 為該Flume Agent的source、channel以及sink命名
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置Syslog源 a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = localhost # Kafka Sink的相關配置 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = syslog-kafka a1.sinks.k1.brokerList = gtcluster-slave01:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 a1.sinks.k1.channel = c1 # Channel基於記憶體作為快取 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 將Source以及Sink繫結至Channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

該配置通過syslog source配置localhost tcp 5140埠來接收網路裝置傳送的Syslog資訊,event快取在記憶體中,再通過KafkaSink將日誌傳送到kafka叢集中名為“syslog-kafka”的topic中。

Logstash來自Elastic公司,專為收集、分析和傳輸各類日誌、事件以及非結構化的資料所設計。它有三個主要功能:事件輸入(Input)、事件過濾器(Filter)以及事件輸出(Output),在後綴為.conf的配置檔案中設定,本例中Syslog配置如下:

# syslog.conf
input {
    Syslog {
        port => "514"
    }
}
filter {
}
output {
    kafka {
        bootstrap_servers => "192.168.8.65:9092"
        topic_id => "syslog-kafka"
        compression_type => "snappy"
        codec => plain {
           format => "%{host} %{@timestamp} %{message}"
        }
    }
}

Input(輸入)外掛用於指定各種資料來源,本例中的Logstash通過udp 514埠接收Syslog資訊;

Filter(過濾器)外掛雖然在本例中不需要配置,但它的功能非常強大,可以進行復雜的邏輯處理,包括正則表示式處理、編解碼、k/v切分以及各種數值、時間等資料處理,具體可根據實際場景設定;

Output(輸出)外掛用於將處理後的事件資料傳送到指定目的地,指定了Kafka的位置、topic以及壓縮型別。在最後的Codec編碼外掛中,指定來源主機的IP地址(host)、Logstash處理的時間戳(@timestamp)作為字首並整合原始的事件訊息(message),方便在事件傳輸過程中判斷Syslog資訊來源。單條原始Syslog資訊流樣例如下:

147>12164: Oct  9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

Logstash Output外掛處理後的資訊流變成為:

19.1.1.12 2016-10-13T10:04:54.520Z <147>12164: Oct  9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

其中紅色欄位就是codec編碼外掛植入的host以及timestamp資訊。處理後的Syslog資訊會發送至Kafka叢集中進行訊息的快取。

Kafka日誌緩衝

Kafka是一個高吞吐的分散式訊息佇列,也是一個訂閱/釋出系統。Kafka叢集中每個節點都有一個被稱為broker的例項,負責快取資料。Kafka有兩類客戶端,Producer(訊息生產者的)和Consumer(訊息消費者)。Kafka中不同業務系統的訊息可通過topic進行區分,每個訊息都會被分割槽,用以分擔訊息讀寫負載,每個分割槽又可以有多個副本來防止資料丟失。消費者在具體消費某個topic訊息時,指定起始偏移量。Kafka通過Zero-Copy、Exactly Once等技術語義保證了訊息傳輸的實時、高效、可靠以及容錯性。

Kafka叢集中某個broker的配置檔案server.properties的部分配置如下:

########## Server Basics ###########
# 為每一個broker設定獨立的數字作為id
broker.id=1

###### Socket Server Settings ######
# socket監聽埠
port=9092

########### Zookeeper ##############
# Zookeeper的連線配置
zookeeper.connect=gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=3000

其中需指定叢集裡不同broker的id,此臺broker的id為1,預設監聽9092埠,然後配置Zookeeper(後續簡稱zk)叢集,再啟動broker即可。

Kafka叢集名為syslog-kafka的topic:

bin/kafka-topics.sh
--create
--zookeeper gtcluster-slave02:2181,gtcluster-slave03:2181,gtcluster-slave04:2181
--replication-factor 3 --partitions 3
--topic syslog-kafka

Kafka叢集的topic以及partition等資訊也可以通過登入zk來觀察。然後再通過下列命令檢視Kafka接收到的所有交換機日誌資訊:

bin/kafka-console-consumer.sh--zookeeper gtcluster-slave02:2181--from-beginning
--topic Syslog-kafka

部分日誌樣例如下:

10.1.1.10 2016-10-18T05:23:04.015Z <163>5585: Oct 18 13:22:45: %LINK-3-UPDOWN: Interface FastEthernet0/9, changed state to down
19.1.1.113 2016-10-18T05:24:04.425Z <149>10857: Oct 18 13:25:23.019 cmt: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to down
19.1.1.113 2016-10-18T05:24:08.312Z <149>10860: Oct 18 13:25:27.935 cmt: %LINEPROTO-5-UPDOWN: Line protocol on Interface GigabitEthernet1/0/3, changed state to up

Spark日誌處理邏輯

Spark是一個為大規模資料處理而生的快速、通用的引擎,在速度、效率及通用性上表現極為優異。

在Spark主程式中,通過Scala的正則表示式解析Kafka Source中名為“syslog-kafka” 的topic中的所有Syslog資訊,再將解析後的有效欄位封裝為結果物件,最後通過MyBatis近實時地寫入MySQL中,供前端應用進行實時地視覺化展示。另外,全量資料儲存進入HBase及ES中,為後續海量日誌的檢索分析及其它更高階的應用提供支援。主程式示例程式碼如下:

object SwSyslogProcessor {
    def main(args: Array[String]): Unit = {
        // 初始化SparkContext,批處理間隔5秒
        val sparkConf: SparkConf = new SparkConf().setAppName("SwSyslogProcessorApp ")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        // 定義topic
        val topic = Set("syslog-kafka")
        // 定義kafka的broker list地址
        val brokers = "192.168.8.65:9092,192.168.8.66:9092,192.168.8.67:9092"
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringDecoder")
        // 通過topic以及brokers,建立從kafka獲取的資料流
        val swSyslogDstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
            ssc, kafkaParams, topic)
        val totalcounts = ssc.sparkContext.accumulator(0L, "Total count")
        val lines = swSyslogDstream.map(x => x._2)
        // 將一行一行資料對映成SwSyslog物件
        lines.filter(x => !x.isEmpty
            && x.contains("%LIN")
&& x.contains("Ethernet")
            .map(
                x => {
                    SwSyslogService.encapsulateSwSyslog(x) // 封裝並返回SwSyslog
                }).foreachRDD((s: RDD[SwSyslog], time: Time) => {
            // 遍歷DStream中的RDD
            if (!s.isEmpty()) {
                // 遍歷RDD中的分割槽記錄
                s.foreachPartition {
                    records => {
                        if (!records.isEmpty) records.toSet.foreach {
                            r: SwSyslog => // 統計當前處理的記錄總數
                                totalcounts.add(1L) // 儲存SwSyslog資訊到MySQL
                                SwSyslogService.saveSwSyslog(r)
                        }
                    }
                }
            }
        }
        )

        //啟動程式
        ssc.start()
        // 阻塞等待
        ssc.awaitTermination()
 }

整體的處理分析主要分為4步:

  1. 初始化SparkContext並指定Application的引數;
  2. 建立基於Kafka topic “syslog-kafka” 的DirectStream;
  3. 將獲取的每一行資料對映為Syslog物件,呼叫Service進行物件封裝並返回;
  4. 遍歷RDD,記錄不為空時儲存或者更新Syslog資訊到MySQL中。

Syslog POJO的部分基本屬性如下:

@Table(name = "sw_syslog")
public class SwSyslog {
    /**
     * 日誌ID
     */
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    /**
     * 裝置IP
     */
    @Column(name = "dev_ip")
    private String devIp;

    /**
     * 伺服器時間
     */
    @Column(name = "server_time")
    private String serverTime;

    /**
     * 資訊序號
     */
    @Column(name = "syslog_num")
    private Long syslogNum;

    ……
}

SwSyslog實體中的基本屬性對應Syslog中的介面資訊,註解中的name對應MySQL中的表sw_syslog 以及各個欄位,MyBatis完成成員屬性和資料庫結構的ORM(物件關係對映)。

程式中的SwSyslogService有兩個主要功能:

public static SwSyslog encapsulateSwSyslog(String syslogInfo) {
    SwSyslog swsyslog = new SwSyslog();
    swsyslog.setDevIp(SwSyslogExtractorUtil.extractDevIp(syslogInfo));
    swsyslog.setServerTime(SwSyslogExtractorUtil.extractServerTime(syslogInfo));
    swsyslog.setSyslogNum(SwSyslogExtractorUtil.extractSyslogNum(syslogInfo));
    swsyslog.setDevTime(SwSyslogExtractorUtil.extractDevTime(syslogInfo));
    swsyslog.setSyslogType(SwSyslogExtractorUtil.extractSyslogType(syslogInfo));
    swsyslog.setInfoType(SwSyslogExtractorUtil.extractInfoType(syslogInfo));
    swsyslog.setDevInterface(SwSyslogExtractorUtil
                                .extractDevInterface(syslogInfo));
    swsyslog.setInterfaceState(SwSyslogExtractorUtil
                                .extractInterfaceState(syslogInfo));
    return swsyslog;
}

public static void saveSwSyslog(SwSyslog swSyslog) {
    LOGGER.debug("開始儲存或更新SwSyslog", swSyslog);
    // 根據ip查詢所有Syslog
    List<swsyslog> list = swSyslogMapper.queryAllByIp(swSyslog.getDevIp());
    //  如果list非空,即查到對應IP的SwSyslog
    if (list != null && !list.isEmpty()) {
        for (SwSyslog sys : list) {
            // 若IP的介面相同,則更新資訊
            if (sys.getDevInterface().equals(swSyslog.getDevInterface())) {
                LOGGER.debug("有相同IP相同介面的記錄,開始更新SwSyslog");
                sys.setServerTime(swSyslog.getServerTime());
                sys.setSyslogNum(swSyslog.getSyslogNum());
                sys.setDevTime(swSyslog.getDevTime());
                sys.setSyslogType(swSyslog.getSyslogType());
                sys.setInfoType(swSyslog.getInfoType());
                sys.setInterfaceState(swSyslog.getInterfaceState());
                sys.setUpdated(new Date());
                swSyslogMapper.update(sys);
            } else {
                // 若介面不同,則直接儲存
                LOGGER.debug("相同IP無對應介面,儲存SwSyslog");
                swSyslog.setCreated(new Date());
                swSyslog.setUpdated(swSyslog.getCreated());
                swSyslogMapper.insert(swSyslog);
            }
        }
    } else {
        // 沒有對應的IP記錄,直接儲存資訊
        LOGGER.debug("沒有相同IP記錄,直接儲存SwSyslog");
        swSyslog.setCreated(new Date());
        swSyslog.setUpdated(swSyslog.getCreated());
        swSyslogMapper.insert(swSyslog);
    }
}</swsyslog>

encapsulateSwSyslog()將Spark處理後的每一行Syslog通過Scala的正則表示式解析為不同的欄位,然後封裝並返回Syslog物件;遍歷RDD分割槽生成的每一個Syslog物件中都有ip以及介面資訊,saveSwSyslog()會據此判斷該插入還是更新Syslog資訊至資料庫。另外,封裝好的Syslog物件通過ORM工具MyBatis與MySQL進行互操作。

獲取到的每一行Syslog資訊如之前所述:

19.1.1.12 2016-10-13T10:04:54.520Z <147>12164: Oct  9 18:04:10.735: %LINK-3-UPDOWN: Interface GigabitEthernet0/16, changed state to down

這段資訊需解析為裝置ip、伺服器時間、資訊序號、裝置時間、Syslog型別、屬性、裝置介面、介面狀態等欄位。Scala正則解析邏輯如下:

/**
      * 抽取伺服器時間
      * 樣例:2016-10-09T10:04:54.517Z
      * @param line
      * @return
      */
    def extractServerTime(line: String): String = {
        val regex1 = "20\\d{2}-\\d{2}-\\d{2}".r
        val regex2 = "\\d{2}:\\d{2}:\\d{2}.?(\\d{3})?".r
        val matchedDate = regex1.findFirstIn(line)
        val matchedTime = regex2.findFirstIn(line)
        val result1 = matchedDate match {
            case Some(date) => date
            case None => " "
        }
        val result2 = matchedTime match {
            case Some(time) => time
            case None => " "
        }
        result1 + " " + result2
}

/**
      * 抽取裝置時間
      * 樣例:Sep 29 09:33:06
      *       Oct  9 18:04:09.733
      * @param line
      * @return
      */
    def extractDevTime(line: String): String = {
        val regex = "[a-zA-Z]{3}\\s+\\d+\\s\\d{2}:\\d{2}:\\d{2}((.\\d{3})|())".r
        val matchedDevTime = regex.findFirstIn(line)
        val result = matchedDevTime match {
            case Some(devTime) => devTime
            case None => " "
        }
        result
    }

通過正則過濾、Syslog封裝以及MyBatis持久層對映,Syslog介面狀態資訊最終解析如下:

最後,諸如APM、網路監控或者告警等業務應用便可以基於MySQL做視覺化展示。

總結

本文首先對公安運維管理現狀做了簡要介紹,然後介紹公安實時運維平臺的整體架構,再以交換機Syslog資訊為例,詳細介紹如何使用Flume+Logstash+Kafka+Spark Streaming進行實時日誌處理分析,對處理過程中大量的技術細節進行了描述並通過程式碼詳細地介紹整體處理步驟。本文中的示例實時地將資料寫入MySQL存在一定的效能瓶頸,後期會對包含本例的相關程式碼重構,資料將會實時寫入HBase來提高效能。

相關推薦

基於Spark公安資料時運技術實踐

宣告:本文為《程式設計師》原創文章,未經允許不得轉載,更多精彩文章請訂閱2017年《程式設計師》。  作者:秦海龍,杭州以數科技有限公司大資料工程師。Java及Scala語言,Hadoop生態、Spark大資料處理技術愛好者。  責編:郭芮,關注大資料領域,尋求報道或投稿

超越Spark資料叢集計算的生產實踐(內含福利)

Spark擁有一個龐大的、不斷增長的社群,還有在企業環境中不可或缺的生態系統。這些生態系統提供了不同生產環境案例所需的許多功能。一般來說,Spark應用做的是機器學習演算法、日誌聚合分析或者商務智慧相關的運算,因為它在許多領域都有廣泛的應用,包括商務智慧、資料倉庫、推薦系

Linux下基於Hadoop的資料環境搭建步驟詳解(Hadoop,Hive,Zookeeper,Kafka,Flume,Hbase,Spark等安裝與配置)

Linux下基於Hadoop的大資料環境搭建步驟詳解(Hadoop,Hive,Zookeeper,Kafka,Flume,Hbase,Spark等安裝與配置) 系統說明 搭建步驟詳述 一、節點基礎配置 二、H

零基礎怎麼學spark資料開發學習

隨著大資料時代的到來。各種技術衍生,市場工作崗位的需求越亦增長。今天科多大資料挑選大資料裡高頻技術詞彙“spark”跟大家分享。 spark 如何入手? 概述 Apache Spark是一個快速和通用的叢集計算系統。它提供Java,scala,Python、R語言的APIs,以及支援一

消防隱患同比下降約60%,基於物聯網資料的智慧消防水系統遠端監測方案

近十年全國共發生高層建築火災3.1萬起,死亡474人,直接財產損失15.6億元。其中,特別重大火災3起、重大火災4起、較大火災24起,形勢非常嚴峻。消防給水系統完善與否直接影響火災撲救的效果,據火災統計,在撲救成功的火災案例中,93%的火場消防給水條件較好,水量、水壓有保障;而在撲救失利的火災案例中,81.5

跟我一起學Spark之——《Spark快速資料分析》pdf版下載

連結:https://pan.baidu.com/s/1vjQCJLyiXzIj6gnCCDyv3g  提取碼:ib01 國慶第四天,去逛了半天的王府井書店,五層出電梯右邊最裡面,倒數第三排《資料結構》,找到了一本很不錯的書《Spark快速大資料分析》,試讀了下,我很喜歡,也很適合

基於Python的資料的分頁模型程式碼

最近在寫一個cmdb系統的分頁,儘管Django本身有分頁的模組兒,但是還是想實現一下自己心中想的分頁的一種邏輯 因為,在我們工作當中,當我們的資料量超級大的時候,其實我們每次分頁查詢都不必將所有的資料查詢出來,而是可以按階段的查詢,舉個例子 每次查詢5頁資料,當需要第六頁的時候,再次進行載入,為了更加明

基於ODPS的資料應用源端感知

大資料雲平臺如ODPS是離線計算平臺,而源端的oracle資料庫,mysql資料庫等都是實時變化著的,一旦源端表結構發生變化,而云平臺又未及時獲知,對後續的應用業務,OGG,流計算等都會造成不小的麻煩,時間越長需要補做的資料就越多,甚至需要重做上雲 本文用shell編寫指令碼來對比源端和雲平

spark快速資料分析學習筆記(1)

本文是《spark快速大資料分析學習》第三章學習筆記,文中大量摘抄書中原本,僅為個人學習筆記。 RDD基礎: RDD是一個不可變的分散式物件集合。每個RDD都被分為多個分割槽,這個分割槽執行在叢集的不同節點上。RDD可以包含Python、Java、Scala中任意型別的物件。 建立RDD的方式:

Spark SQL資料處理並寫入Elasticsearch

1 # coding: utf-8 2 import sys 3 import os 4 5 pre_current_dir = os.path.dirname(os.getcwd()) 6 sys.path.append(pre_current_dir) 7 from pyspark.sq

基礎資料、三沙盤及相關產品

-基礎大資料 2018電子地圖/arcgis shp mxd格式地圖/Mapinfo tab Gst電子地圖 2018百度資訊點資料|2018高德資訊點資料|2018版本谷歌poi資料 2018版鄉鎮|街道辦 shp|tab|cad|supermap|jpg行政邊界 建築物電子地圖/1:20

資料安全】基於Kerberos的資料安全驗證方案

1.背景 網際網路從來就不是一個安全的地方。很多時候我們過分依賴防火牆來解決安全的問題,不幸的是,防火牆是假設“壞人”是來自外部的,而真正具有破壞性的攻擊事件都是往往都是來自於內部的。 近幾年,在thehackernews等網站上總會時不時看到可以看到一些因為資料安全問題被大面積攻擊、勒索的事件。在Hadoo

Spark快速資料分析(一)

楔子 Spark快速大資料分析 前3章內容,僅作為學習,有斷章取義的嫌疑。如有問題參考原書 Spark快速大資料分析 以下為了打字方便,可能不是在注意大小寫 1 Spark資料分析導論 1.1 Spark是什麼 Spark是一個用來實現快速而通用的叢

Spark快速資料分析——機器學習

楔子 《Spark快速大資料分析》學習 11 基於MLlib的機器學習 ​ MLlib是Saprk中提供機器學習函式的庫。它是專門在叢集上並行的情況而設計的。MLlib中包含許多機器學習演算法,可以在Spark支援的所有程式語言中使用。 11.1 概述

spark處理資料的幾個例項介紹

在叢集中跑應用,而不是在shell中感受寫spark應用的過程 整個過程為:1、案例分析:要用哪些spark的RDD的API2、程式設計實現: 用到scala3、提交到叢集執行:如何提交到叢集,檔案是否先傳到HDFS上4、監控執行結果: 通過web可以看到 介紹了四個案例:

Apache Spark資料處理統一引擎

工業和研究中資料的大幅增長為電腦科學帶來了巨大的機會與挑戰。由於資料大小超過了單臺機器的能力,使用者需要新的系統將計算擴充套件到多個節點。因此,針對不同計算工作負載的新叢集程式設計模型已呈爆炸式增長。 這些模型相對專業化。例如支援批處理的MapReduce,支援迭

基於Hadoop的資料平臺的整體架構介紹

Hadoop是開源的分散式儲存+分散式計算平臺的框架 大資料的熱度在持續的升溫,繼雲端計算之後大資料成為又一大眾所追捧的新星。我們暫不去討論大資料到底是否適用於您的組織,至少在網際網路上已經被吹噓成無所不能的超級戰艦。好像一夜之間我們就從網際網路時代跳躍進了大資料時

Spark快速資料分析》——讀書筆記(4)

第4章 鍵值對操作 鍵值對RDD通常用來進行聚合計算。我們一般要先通過一些初試ETL(抽取、轉化、裝載)操作來將資料轉化為鍵值對形式。 本章也會討論用來讓使用者控制鍵值對RDD在各節點上分佈情況的高階特性:分割槽。 4.1 動機 pair RDD(包

Spark快速資料分析》pdf格式下載電子書免費下載

內容簡介 本書由 Spark 開發者及核心成員共同打造,講解了網路大資料時代應運而生的、能高效迅捷地分析處理資料的工具——Spark,它帶領讀者快速掌握用 Spark 收集、計算、簡化和儲存海量資料的方法,學會互動、迭代和增量式分析,解決分割槽、資料本地化和

spark快速資料分析之讀書筆記-flatmap與map的區別

以前總是分不清楚spark中flatmap和map的區別,現在弄明白了,總結分享給大家,先看看flatmap和map的定義。 map()是將函式用於RDD中的每個元素,將返回值構成新的RDD。 flatmap()是將函式應用於RDD中的每個元素,將返回的迭代器的所有內