Spark Streaming從Kafka中獲取資料,並進行實時單詞統計,統計URL出現的次數
1、建立Maven專案
2、啟動Kafka
3、編寫Pom檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion >4.0.0</modelVersion>
<groupId>cn.toto.spark</groupId>
<artifactId>bigdata</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target >
<encoding>UTF-8</encoding>
<scala.version>2.10.6</scala.version>
<spark.version>1.6.2</spark.version>
<hadoop.version>2.6.4</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId >
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.toto.spark.FlumeStreamingWordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4.編寫程式碼
package cn.toto.spark
import cn.toto.spark.streams.LoggerLevels
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by toto on 2017/7/13.
* 從kafka中讀資料,並且進行單詞數量的計算
*/
object KafkaWordCount {
/**
* String :單詞
* Seq[Int] :單詞在當前批次出現的次數
* Option[Int] :歷史結果
*/
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }
}
def main(args: Array[String]): Unit = {
LoggerLevels.setStreamingLogLevels()
//這裡的args從IDEA中傳入,在Program arguments中填寫如下內容:
//引數用一個數組來接收:
//zkQuorum :zookeeper叢集的
//group :組
//topic :kafka的組
//numThreads :執行緒數量
//hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1 要注意的是要建立line這個topic
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))
ssc.checkpoint("E:\\wordcount\\outcheckpoint")
//"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"
//"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//儲存到記憶體和磁碟,並且進行序列化
val data: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
//從kafka中寫資料其實也是(key,value)形式的,這裡的_._2就是value
val words = data.map(_._2).flatMap(_.split(" "))
val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,
new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
5.配置IDEA中執行的引數:
配置說明:
hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1
hadoop11:2181,hadoop12:2181,hadoop13:2181 :zookeeper叢集地址
g1 :組
wordcount :kafka的topic
1 :執行緒數為1
6、建立kafka,並在kafka中傳遞引數
啟動kafka
[root@hadoop1 kafka]# pwd
/home/tuzq/software/kafka/servers/kafka
[root@hadoop1 kafka]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
建立topic
[[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 --partitions 1 --topic wordcount
Created topic "wordcount".
檢視主題
bin/kafka-topics.sh --list --zookeeper hadoop11:2181
啟動一個生產者傳送訊息(我的kafka在hadoop1,hadoop2,hadoop3這幾臺機器上)
[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordcount
No safe wading in an unknown water
Anger begins with folly,and ends in repentance
No safe wading in an unknown water
Anger begins with folly,and ends in repentance
Anger begins with folly,and ends in repentance
使用spark-submit來執行程式
#啟動spark-streaming應用程式
bin/spark-submit --class cn.toto.spark.KafkaWordCount /root/streaming-1.0.jar hadoop11:2181 group1 wordcount 1
7、檢視執行結果
8、再如統計URL出現的次數
package cn.toto.spark
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by toto on 2017/7/14.
*/
object UrlCount {
val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
}
def main(args: Array[String]) {
//接收命令列中的引數
val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
//建立SparkConf並設定AppName
val conf = new SparkConf().setAppName("UrlCount")
//建立StreamingContext
val ssc = new StreamingContext(conf, Seconds(2))
//設定檢查點
ssc.checkpoint(hdfs)
//設定topic資訊
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//重Kafka中拉取資料建立DStream
val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
//切分資料,擷取使用者點選的url
val urls = lines.map(x=>(x.split(" ")(6), 1))
//統計URL點選量
val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
//將結果列印到控制檯
result.print()
ssc.start()
ssc.awaitTermination()
}
}
相關推薦
Spark Streaming從Kafka中獲取資料,並進行實時單詞統計,統計URL出現的次數
1、建立Maven專案 2、啟動Kafka 3、編寫Pom檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or
Spark Streaming從Kafka中獲取數據,並進行實時單詞統計,統計URL出現的次數
scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka
從表格中獲取資料,並在排序後按順序重新排列表格(Javascript)
數字排序出現問題 升序結果出現了一個大的數字反而在小數字的後面 function mySort(arr){ for(var x=0;x<arr.length-1;x++){ for(var y=x+1;y<arr.l
使用springmvc從頁面中獲取資料,然後根據獲得的引數資訊進行修改,如果修改的資料中含有不是基本資料型別的引數。比如傳的引數中有Date型別的資料時,需要我們進行引數型別轉換。
1.1 需求 在商品修改頁面可以修改商品的生產日期,並且根據業務需求自定義日期格式。 1.2 需求分析 由於日期資料有很多格式,所以springmvc沒辦法把字串轉換成日期型別。所以需要自定義引數繫結。前端控制器接收到請求後,找到註解形式的處理器介面卡,對RequestMapping標記的方法進
從hive中獲取資料
MySQL中獲取資料 public RestMsg<Object> getZhen( HttpServletRequest request) { RestMsg<Object> rm = new RestMsg<Object>();
Java基礎-----從Excel中獲取資料生成shell指令碼
前言 java讀取Excel的驅動包: 連結:https://pan.baidu.com/s/1ejCR9sS2OUmttFYpQnJkKQ 提取碼:58rm 實現1: 從Excel中讀取表名,由於每個欄位會對應一個表名,故讀取的某列會有若干個連續的表名出現,所以用set集合
Spark Streaming消費Kafka Direct方式資料零丟失實現
一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、
1.簡單例項:ASP.NET下Echarts通過Ajax從資料庫中獲取資料
後臺:Test01.ashx.cs:從資料庫獲取資料,通過HTTP請求(HttpContext)實現和前臺資料傳遞json資料 using System; using System.Collections.Generic; using System.Linq; using
ASP.NET+Echarts+Ajax從資料庫中獲取資料
html <div class="panel-body"> <div id="signNum" style="height: 400px; width:
spark叢集從HDFS中讀取資料並計算
一、 利用spark從hadoop的hdfs中讀取資料並計算 1.1準備階段 部署好hadoop分散式搭建(+zookeeper,6臺機器)可以參考這篇部落格:http://blog.csdn.net/vinsuan1993/article/deta
使用localStorage解決瀏覽器重新整理後無法再從vuex中獲取資料的問題
假設有這樣一個場景:使用者登入後,用vuex管理使用者的資訊,登入成功後進入主介面,在進入主介面後重新整理瀏覽器,此時vuex中的使用者資訊將無法獲取到。那麼應該如何解決?辦法之一是使用localStorage儲存使用者資訊。在登入成功後儲存使用者資訊,重新整理瀏覽器後從lo
Http Get 從伺服器中獲取資料 儲存到本地
package com.http.get; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConn
Jmeter-從資料庫中獲取資料並作為變數傳輸
再今天重新學習,從資料庫中取資料,並作為變數傳到下一個請求中。 首先第一步要匯入mysql驅動包 一、新增JDBC Connection Configuration 設定連結 Database URL: jdbc:mysql:// 資料庫地址 /庫名 JDBC Driver class:com.my
是用JDBC從資料庫中獲取資料並以java物件返回
/** * * @param c * for example Person.class * @param primaryKeys * primaryKeys為主鍵,引數順序和表中保持一致 如果id, name
hive從查詢中獲取資料插入到表或動態分割槽
轉自:http://www.crazyant.net/1197.html Hive的insert語句能夠從查詢語句中獲取資料,並同時將資料Load到目標表中。現在假定有一個已有資料的表staged_employees(僱員資訊全量表),所屬國家cnty和所屬州st是該表
Kafka系列(四)Kafka消費者:從Kafka中讀取資料
本系列文章為對《Kafka:The Definitive Guide》的學習整理,希望能夠幫助到大家應用從Kafka中讀取資料需要使用KafkaConsumer訂閱主題,然後接收這些主題的訊息。在我們深入這些API之前,先來看下幾個比較重要的概念。Kafka消費者相關的概念消
用Java程式碼從網頁中獲取資料(示例程式碼)
package com.mashensoft.net; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java
php從資料庫中獲取資料用ajax傳送到前臺
1、資料庫的欄位: 2、php連線資料庫獲取資料庫的資訊放入json_encode($css);{檔案為:db.php} <span style="font-size:14px;"><?php $host="localhost"; $username=
C#從sqlserver中獲取資料的方法
使用sqlCommand 建立一個連線的命令 再向其中的CommandText中賦值要進行了操作語句指令碼 用SqlDataAdapter獲取返回值,並將其複製到一個datatable中 從dat
storm從kafka中讀資料
========================================== 定義從kafka中讀出的資料 import java.io.UnsupportedEncodingException; import java.util.List; import bac