1. 程式人生 > >structuredstreaming消費kafka的資料實現wordcount

structuredstreaming消費kafka的資料實現wordcount

最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.程式碼測試過了,可以執行.

package spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

/**
  * structredstreaming消費kafka的資料,實現exactly-once的語義;
  */
object StructuredStreaming {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.ERROR)
    var query:org.apache.spark.sql.streaming.StreamingQuery = null
    val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
    val topic = "jason_1027"
    val broker = "master:9092,storm1:9092,storm2:9092"
    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",broker)
      .option("subscribe",topic)
      .option("startingOffsets", "latest")
      .option("max.poll.records", 10000)
      .load()
    import  spark.implicits._
    val word = df.selectExpr("CAST(value AS STRING)").as[String].flatMap(_.split(" "))
    val wordcount = word.groupBy("value").count()
    val q = wordcount
      .writeStream
      .outputMode("complete")
      .format("console")
      .start()
    q.awaitTermination()
  }
}

今天就先寫到這裡,有時間繼續更新.

如果有寫的不對的地方,歡迎大家指正,如果有什麼疑問,可以加QQ群:340297350,謝謝

相關推薦

structuredstreaming消費kafka資料實現wordcount

最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.程式碼測試過了,可以執行. package spark import org.ap

kafka+flink實現wordCount資料寫入mysql

    step1:搭建flink環境    step2:搭建kafka環境(配置系統變數$KAFKA_HOME)    step3:搭建zookeeper環境    step4:啟動zookeeper:進入zookeeper的bin目錄下輸入:zkServer.sh sta

Spark 消費Kafka資料

spark RDD消費的哦,不是spark streaming。 導maven包: 注意版本哦,要跟自己機器的一致 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->

Storm-Kafka模組常用介面分析及消費kafka資料例子

使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來配置kafka broker host與partition的mapping資訊; 2. 使用KafkaConfig來配置一些與kaf

簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)

maven依賴 <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId&g

storm實時消費kafka資料

程式環境,在kafka建立名稱為data的topic,開啟消費者模式,準備輸入資料。 程式的pom.xml檔案 <dependencies> <dependency> <groupId>org.

storm消費kafka資料

http://blog.csdn.net/tonylee0329/article/details/43016385 使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來

spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐

最近在學習spark streaming 相關知識,現在總結一下 主要程式碼如下 def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("

python消費kafka資料批量插入到es

1、es的批量插入 這是為了方便後期配置的更改,把配置資訊放在logging.conf中 用elasticsearch來實現批量操作,先安裝依賴包,sudo pip install Elasticsearch2 from elasticsear

Spark Streaming消費Kafka Direct方式資料零丟失實現

一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、

Kafka程式碼實現--from-beginning,讀取歷史未消費資料

Kafka實際環境有可能會出現Consumer全部宕機,雖然基於Kafka的高可用特性,消費者群組中的消費者可以實現再均衡,所有Consumer不處理資料的情況很少,但是還是有可能會出現,此時就要求Consumer重啟的時候能夠讀取在宕機期間Producer傳送的資料。基於消費者訂閱模式預設

sparkstreaming+kafka+redis+hbase消費kafka資料實現exactly-once的語義

最近在做實時流處理的一個專案,遇到N多問題,經過不斷的除錯,終於有點進展,記錄一下,防止後人遇到同樣的問題. 1,sparkstreaming消費kafka有兩種方法,這裡我就不介紹了,網上關於這方面的資料很多,我就簡單說一下兩者的區別吧, (1)基於receiver的方

Flink+kafka實現Wordcount實時計算

lis AS -c 安裝包 pos localhost 行動 private 配置信息 1. Flink Flink介紹: Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所

kafka的receive方式實現WordCount,使用updateStateByKey函式,累加所有批次的wordCount

Spark Streaming的updateStateByKey可以把DStream中的資料按key做reduce操作,然後對各個批次的資料進行累加。注意 wordDstream.updateStateByKey[Int]每次傳遞給updateFunc函式兩個引數,其中, 1、第一個引數是某

SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移量的兩種方式

Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka

使用JavaAPI 實現操作消費Kafak資料,偶遇一坑!

一、檢查環境是否正常 檢視虛擬機器中的各個節點啟動是否正常,這一步很關鍵。產品上線前不可能直接拉到伺服器上測試,肯定在自己搭建的叢集中先行測試;通過kafka控制檯消費者是否可以消費資料;通過Java API 是否可以獲取到kafka的訊息。 二、示例程式碼! import java.u

簡單實現kafka資料寫入hbase

測試資料格式 19392963501,17816115082,2018-09-18 16:19:44,1431 19392963501,17816115082,2018-09-18 16:19:44,1431 14081946321,13094566759,2018-05-23

Spark Streaming消費Kafka資料進行統計

流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo

sparkstreaming同時消費多個topic的資料實現exactly-once的語義

import java.io.File import kafka.{PropertiesScalaUtils, RedisKeysListUtils} import kafka.streamingRedisHive.{dbIndex} import org.apache.kafka.clients.consu

使用Java程式碼實現實時消費kafka的訊息

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation="http://maven.apache.org/P