structuredstreaming消費kafka的資料實現wordcount
最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.程式碼測試過了,可以執行.
package spark import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession /** * structredstreaming消費kafka的資料,實現exactly-once的語義; */ object StructuredStreaming { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.INFO) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.ERROR) var query:org.apache.spark.sql.streaming.StreamingQuery = null val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate() val topic = "jason_1027" val broker = "master:9092,storm1:9092,storm2:9092" val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers",broker) .option("subscribe",topic) .option("startingOffsets", "latest") .option("max.poll.records", 10000) .load() import spark.implicits._ val word = df.selectExpr("CAST(value AS STRING)").as[String].flatMap(_.split(" ")) val wordcount = word.groupBy("value").count() val q = wordcount .writeStream .outputMode("complete") .format("console") .start() q.awaitTermination() } }
今天就先寫到這裡,有時間繼續更新.
如果有寫的不對的地方,歡迎大家指正,如果有什麼疑問,可以加QQ群:340297350,謝謝
相關推薦
structuredstreaming消費kafka的資料實現wordcount
最近也是有很多同學問我,StructuredStreaming結合kafka的使用,我簡單的寫了一個wordcount的demo,後續會有更加具體和詳細的介紹,今天先來一個簡單的demo吧.程式碼測試過了,可以執行. package spark import org.ap
kafka+flink實現wordCount及資料寫入mysql
step1:搭建flink環境 step2:搭建kafka環境(配置系統變數$KAFKA_HOME) step3:搭建zookeeper環境 step4:啟動zookeeper:進入zookeeper的bin目錄下輸入:zkServer.sh sta
Spark 消費Kafka資料
spark RDD消費的哦,不是spark streaming。 導maven包: 注意版本哦,要跟自己機器的一致 <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
Storm-Kafka模組常用介面分析及消費kafka資料例子
使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來配置kafka broker host與partition的mapping資訊; 2. 使用KafkaConfig來配置一些與kaf
簡單Storm消費Kafka資料並存儲到redis例項(訂單資訊處理)
maven依賴 <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId&g
storm實時消費kafka資料
程式環境,在kafka建立名稱為data的topic,開啟消費者模式,準備輸入資料。 程式的pom.xml檔案 <dependencies> <dependency> <groupId>org.
storm消費kafka資料
http://blog.csdn.net/tonylee0329/article/details/43016385 使用storm-kafka模組讀取kafka中的資料,按照以下兩步進行構建(我使用的版本是0.9.3) 1. 使用BrokerHosts介面來
spark Streaming 直接消費Kafka資料,儲存到 HDFS 實戰程式設計實踐
最近在學習spark streaming 相關知識,現在總結一下 主要程式碼如下 def createStreamingContext():StreamingContext ={ val sparkConf = new SparkConf().setAppName("
python消費kafka資料批量插入到es
1、es的批量插入 這是為了方便後期配置的更改,把配置資訊放在logging.conf中 用elasticsearch來實現批量操作,先安裝依賴包,sudo pip install Elasticsearch2 from elasticsear
Spark Streaming消費Kafka Direct方式資料零丟失實現
一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、
Kafka程式碼實現--from-beginning,讀取歷史未消費的資料
Kafka實際環境有可能會出現Consumer全部宕機,雖然基於Kafka的高可用特性,消費者群組中的消費者可以實現再均衡,所有Consumer不處理資料的情況很少,但是還是有可能會出現,此時就要求Consumer重啟的時候能夠讀取在宕機期間Producer傳送的資料。基於消費者訂閱模式預設
sparkstreaming+kafka+redis+hbase消費kafka的資料實現exactly-once的語義
最近在做實時流處理的一個專案,遇到N多問題,經過不斷的除錯,終於有點進展,記錄一下,防止後人遇到同樣的問題. 1,sparkstreaming消費kafka有兩種方法,這裡我就不介紹了,網上關於這方面的資料很多,我就簡單說一下兩者的區別吧, (1)基於receiver的方
Flink+kafka實現Wordcount實時計算
lis AS -c 安裝包 pos localhost 行動 private 配置信息 1. Flink Flink介紹: Flink 是一個針對流數據和批數據的分布式處理引擎。它主要是由 Java 代碼實現。目前主要還是依靠開源社區的貢獻而發展。對 Flink 而言,其所
kafka的receive方式實現WordCount,使用updateStateByKey函式,累加所有批次的wordCount
Spark Streaming的updateStateByKey可以把DStream中的資料按key做reduce操作,然後對各個批次的資料進行累加。注意 wordDstream.updateStateByKey[Int]每次傳遞給updateFunc函式兩個引數,其中, 1、第一個引數是某
SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移量的兩種方式
Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka
使用JavaAPI 實現操作消費Kafak資料,偶遇一坑!
一、檢查環境是否正常 檢視虛擬機器中的各個節點啟動是否正常,這一步很關鍵。產品上線前不可能直接拉到伺服器上測試,肯定在自己搭建的叢集中先行測試;通過kafka控制檯消費者是否可以消費資料;通過Java API 是否可以獲取到kafka的訊息。 二、示例程式碼! import java.u
簡單實現kafka資料寫入hbase
測試資料格式 19392963501,17816115082,2018-09-18 16:19:44,1431 19392963501,17816115082,2018-09-18 16:19:44,1431 14081946321,13094566759,2018-05-23
Spark Streaming消費Kafka的資料進行統計
流處理平臺: 這裡是第四步的實現: Spark Streaming整合Kafka採用的是Receiver-based,另一種方式Direct Approach,稍作修改就行。 package spark import org.apache.spark.SparkConf impo
sparkstreaming同時消費多個topic的資料實現exactly-once的語義
import java.io.File import kafka.{PropertiesScalaUtils, RedisKeysListUtils} import kafka.streamingRedisHive.{dbIndex} import org.apache.kafka.clients.consu
使用Java程式碼實現實時消費kafka的訊息
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/P