1. 程式人生 > >spark+kafka 小案例

spark+kafka 小案例

轉載自https://www.cnblogs.com/zhangXingSheng/p/6646879.html

(1)下載kafka的jar包

spark2.1 支援kafka0.8.2.1以上的jar,我是spark2.0.2,下載的kafka_2.11-0.10.2.0 (2)Consumer程式碼 複製程式碼
package com.sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
object SparkStreamKaflaWordCount { def main(args: Array[String]): Unit = { //建立streamingContext var conf=new SparkConf().setMaster("spark://192.168.177.120:7077") .setAppName("SparkStreamKaflaWordCount Demo"); var ssc=new StreamingContext(conf,Seconds(4)); //建立topic //var topic=Map{"test" -> 1} var topic=Array("
test"); //指定zookeeper //建立消費者組 var group="con-consumer-group" //消費者配置 val kafkaParam = Map( "bootstrap.servers" -> "192.168.177.120:9092,anotherhost:9092",//用於初始化連結到叢集的地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], //用於標識這個消費者屬於哪個消費團體 "group.id
" -> group, //如果沒有初始化偏移量或者當前的偏移量不存在任何伺服器上,可以使用這個配置屬性 //可以使用這個配置,latest自動重置偏移量為最新的偏移量 "auto.offset.reset" -> "latest", //如果是true,則這個消費者的偏移量會在後臺自動提交 "enable.auto.commit" -> (false: java.lang.Boolean) ); //建立DStream,返回接收到的輸入資料 var stream=KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](topic,kafkaParam)) //每一個stream都是一個ConsumerRecord stream.map(s =>(s.key(),s.value())).print(); ssc.start(); ssc.awaitTermination(); } }
複製程式碼 (3)啟動zk //我是已經配置好zookeeper的環境變量了, zoo1.cfg配置 複製程式碼
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/home/zhangxs/datainfo/developmentData/zookeeper/zkdata1
# the port at which the clients will connect
clientPort=2181
server.1=zhangxs:2881:3881
複製程式碼 啟動zk服務
zkServer.sh start zoo1.cfg
(4)啟動kafka服務

【bin/kafka-server-start.sh config/server.properties】

複製程式碼
[[email protected] kafka_2.11]# bin/kafka-server-start.sh config/server.properties
[2017-03-25 18:42:03,153] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delete.topic.enable = false
fetch.purgatory.purge.interval.requests = 1000
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 0.10.2-IV0
leader.imbalance.check.interval.seconds = 300
複製程式碼 (5)(重新開啟一個終端)啟動生產者程序
[[email protected] kafka_2.11]# bin/kafka-console-producer.sh --broker-list 192.168.177.120:9092 --topic test
(6)將程式碼打成jar,jar名【streamkafkademo】,放到spark_home/jar/ 下面 (7)提交spark應用程式(消費者程式)
./spark-submit --class com.sparkstreaming.SparkStreamKaflaWordCount  /usr/local/development/spark-2.0/jars/streamkafkademo.jar 10
(8)在生產者終端上輸入資料
zhang xing sheng
(9)列印結果 按 Ctrl+C 複製程式碼 按 Ctrl+C 複製程式碼 遇到過的問題: (1)在使用eclipse編寫消費者程式時發現沒有KafkaUtils類。 這個jar是需要另下載的。然後build到你的工程裡就可以了 maven
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-streaming_2.11</artifactId>
  4. <version>2.1.0</version>
  5. </dependency>
jar下載
(2)在提交spark應用程式的時候,丟擲類找不到 複製程式碼
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/StringDeserializer
at com.sparkstreaming.SparkStreamKaflaWordCount$.main(SparkStreamKaflaWordCount.scala:25)
at com.sparkstreaming.SparkStreamKaflaWordCount.main(SparkStreamKaflaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
------------------------------------------------------------------------ Exception
in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka010/KafkaUtils$ at com.sparkstreaming.SparkStreamKaflaWordCount$.main(SparkStreamKaflaWordCount.scala:33) at com.sparkstreaming.SparkStreamKaflaWordCount.main(SparkStreamKaflaWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
複製程式碼 這個需要你將【spark-streaming-kafka-0-10_2.11-2.1.0】,【kafka-clients-0.10.2.0】這兩個jar新增到 spark_home/jar/路徑下就可以了。(這個只是我這個工程裡缺少的jar)

相關推薦

spark+kafka 案例

轉載自https://www.cnblogs.com/zhangXingSheng/p/6646879.html (1)下載kafka的jar包 spark2.1 支援kafka0.8.2.1以上的jar,我是spark2.0.2,下載的kafka_2.11-0.1

基於Kafka 入門案例-官網學習

首先Maven引入 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <v

spark案例

題目:給定一組鍵值對("spark",2),("hadoop",6),("hadoop",4),("spark",6),鍵值 對的key表示圖書名稱,value表示某天圖書銷量,請計算每個鍵對應的平均值, 也就是計算每種圖書的每天平均銷量。 //對陣列執行paralleli

大資料Spark+Kafka實時資料分析案例

下面分析詳細分析下上述步驟: 應用程式將購物日誌傳送給Kafka,topic為”sex”,因為這裡只是統計購物男女生人數,所以只需要傳送購物日誌中性別屬性即可。這裡採用模擬的方式傳送購物日誌,即讀取購物日誌資料,每間隔相同的時間傳送給Kafka。 接著利用Spark Streaming從Kafka主題”s

flume+kafka+storm整合實現實時計算案例

    我們做資料分析的時候常常會遇到這樣兩個場景,一個是統計歷史資料,這個就是要分析歷史儲存的日誌。我們會使用hadoop,具體框架可以設計為:1.flume收集日誌;2.HDFS輸入路徑儲存日誌;3.MapReduce計算,將結果輸出到HDFS輸出路徑;4.hive+sq

spark streaming實戰之kafka讀取與儲存

本次小實戰主要介紹一下spark streaming如何讀取kafka資料涉及理論部分在這就不多說了,自己也剛入門先說下需求待處理日誌格式為ouMrq2r_aU1mtKRTmQclGo1UzY,3251210381,2018/11/29 13:46,上海,上海,210.2.2

Spark之SparkStreaming案例-kafka

sparkStreaming從kafka中拿取資料 完整程式碼 package com.chb.spark.streaming; import java.io.Serializable; import java.util.Arrays; impor

Hibernate 入門案例

java程序 div nocache org student 包含 target .so tell 前言: 學習學到現在終於要學習框架了,心裏有點小激動呢,也不知道自己能不能學好呢,只能按著一步一個腳印的走下去,好了廢話不多說。讓我們打開hibernate

javascrip基礎以及一個計算器的案例

defined 屬性和方法 操作 undefined logs 大小寫 數字 bst 全局 1.js的概述 js是一種腳本語言 js是可插入html頁面的編程代碼 js插入html頁面後,可由所有的瀏覽器執 2.js的基本語法 與java一樣,變量,函數名,運算符以及

JQuery基礎以及5個案例

進行 change 遍歷 selectall 獲得 idea 獲取 設置 html 1.JQ完成定時彈出廣告   步驟分析 創建html文檔 在頁面中創建一個廣告部分的div,設置div隱藏 設置定時操作,1秒執行一個顯示的方法 設置定時操作.1秒執行一個隱藏方法 /

cookie的基礎以及案例

腳本元素 direct his 常用 變量 所有 arr 編譯 date 1.會話技術    用戶打開一個瀏覽器訪問頁面,訪問網站的很多頁面,訪問完成後將瀏覽器關閉的過程稱為是一次會話 cookie:將數據保存到客戶端瀏覽器 session:將數據保存到服務器端 向瀏覽器保

Java案例——對字符串進行加密解密

i++ 個數 color class 異或運算 揭秘 println scanner 英文 要求:   * 對用戶輸入的每個字符的值進行加密,將解密後的字符串輸出   * 對用戶輸入的已加密字符串進行解密並輸出 實現代碼: import java.util.Sca

Java案例——判斷所給年份是平年還是閏年

年份 平年 pre stat sca java next 用戶 string 要求:   * 判斷用戶輸入的年份是平年還是閏年 實現代碼: import java.util.Scanner; /** * 要求: * 判斷用戶輸入的年份是平年還是閏年 *

JS案例(基礎好煩惱少)----持續更新

eof 最大值 res 應該 for tin title style 最小 1 *************************************************** 2 <!DOCTYPE html> 3 <html lan

js學習總結----經典案例之選項卡

scrip ges div 視頻內容 mil line 學習總結 images ul li <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8">

js學習總結----案例之跑馬燈

htm function 微博 border ron == 內容 rip gree 具體代碼如下: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"

iptables filter 表案例

filter class logs cmp input nbsp inpu forward tables 案例1:放行 21 、22 、80 端口,且 22 端口只允許 192.168.5.0/24 這個 IP 段訪問 #!/bin/bash ipt="/sbin/

關於jQ的案例分享

src load screen borde child () length onload fun <!DOCTYPE html><html> <head> <meta charset="utf-8"> <t

Java案例(行星移動)

println dmi dap ova gre 建議 his 矩形 graphic Java小案例 行星移動:參考:三百集 使用軟件:idea2017,java 1,圖片集:這裏 (idea圖片源放在target目錄下,才能訪問到),建議從小往上看。。。 2,定義MyFr

1)案例步驟一

其他 com 到你 ons inter show urn ec2 database 首先:     MysqlDB類:           作用:               裏面存的是mysql的操作函數,都是封裝好的。           疑問點: