1. 程式人生 > >Spark Streaming從Kafka中獲取資料,並進行實時單詞統計,統計URL出現的次數

Spark Streaming從Kafka中獲取資料,並進行實時單詞統計,統計URL出現的次數

1、建立Maven專案

2、啟動Kafka

3、編寫Pom檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion
>
4.0.0</modelVersion> <groupId>cn.toto.spark</groupId> <artifactId>bigdata</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target
>
<encoding>UTF-8</encoding> <scala.version>2.10.6</scala.version> <spark.version>1.6.2</spark.version> <hadoop.version>2.6.4</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId
>
<artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>cn.toto.spark.FlumeStreamingWordCount</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>

4.編寫程式碼

package cn.toto.spark

import cn.toto.spark.streams.LoggerLevels
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by toto on 2017/7/13.
  * 從kafka中讀資料,並且進行單詞數量的計算
  */
object KafkaWordCount {

  /**
    * String         :單詞
    * Seq[Int]       :單詞在當前批次出現的次數
    * Option[Int]    :歷史結果
    */
  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
    //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
    iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }
  }

  def main(args: Array[String]): Unit = {
    LoggerLevels.setStreamingLogLevels()
    //這裡的args從IDEA中傳入,在Program arguments中填寫如下內容:
    //引數用一個數組來接收:
    //zkQuorum     :zookeeper叢集的
    //group        :組
    //topic        :kafka的組
    //numThreads   :執行緒數量
    //hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1   要注意的是要建立line這個topic
    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Seconds(5))
    ssc.checkpoint("E:\\wordcount\\outcheckpoint")
    //"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"
    //"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //儲存到記憶體和磁碟,並且進行序列化
    val data: ReceiverInputDStream[(String, String)] =
          KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
    //從kafka中寫資料其實也是(key,value)形式的,這裡的_._2就是value
    val words = data.map(_._2).flatMap(_.split(" "))
    val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc,
          new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

5.配置IDEA中執行的引數:

這裡寫圖片描述
配置說明:

hadoop11:2181,hadoop12:2181,hadoop13:2181 g1 wordcount 1
hadoop11:2181,hadoop12:2181,hadoop13:2181       :zookeeper叢集地址
g1                                              :組
wordcount                                       :kafkatopic
1                                               :執行緒數為1

6、建立kafka,並在kafka中傳遞引數

啟動kafka

[root@hadoop1 kafka]# pwd
/home/tuzq/software/kafka/servers/kafka
[root@hadoop1 kafka]# bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

建立topic

[[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 --partitions 1 --topic wordcount
Created topic "wordcount".

檢視主題

bin/kafka-topics.sh --list --zookeeper hadoop11:2181

啟動一個生產者傳送訊息(我的kafka在hadoop1,hadoop2,hadoop3這幾臺機器上)

[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordcount
No safe wading in an unknown water
Anger begins with folly,and ends in repentance
No safe wading in an unknown water
Anger begins with folly,and ends in repentance
Anger begins with folly,and ends in repentance

使用spark-submit來執行程式

#啟動spark-streaming應用程式
bin/spark-submit --class cn.toto.spark.KafkaWordCount /root/streaming-1.0.jar hadoop11:2181 group1 wordcount 1

7、檢視執行結果

這裡寫圖片描述

8、再如統計URL出現的次數

package cn.toto.spark

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by toto on 2017/7/14.
  */
object UrlCount {
  val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
    iterator.flatMap{case(x,y,z)=> Some(y.sum + z.getOrElse(0)).map(n=>(x, n))}
  }

  def main(args: Array[String]) {
    //接收命令列中的引數
    val Array(zkQuorum, groupId, topics, numThreads, hdfs) = args
    //建立SparkConf並設定AppName
    val conf = new SparkConf().setAppName("UrlCount")
    //建立StreamingContext
    val ssc = new StreamingContext(conf, Seconds(2))
    //設定檢查點
    ssc.checkpoint(hdfs)
    //設定topic資訊
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    //重Kafka中拉取資料建立DStream
    val lines = KafkaUtils.createStream(ssc, zkQuorum ,groupId, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
    //切分資料,擷取使用者點選的url
    val urls = lines.map(x=>(x.split(" ")(6), 1))
    //統計URL點選量
    val result = urls.updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
    //將結果列印到控制檯
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

相關推薦

Spark StreamingKafka獲取資料進行實時單詞統計統計URL出現次數

1、建立Maven專案 2、啟動Kafka 3、編寫Pom檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.or

Spark StreamingKafka獲取數據進行實時單詞統計統計URL出現次數

scrip 發送消息 rip mark 3.2 umt 過程 bject ttr 1、創建Maven項目 創建的過程參考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2、啟動Kafka A:安裝ka

表格獲取資料在排序後按順序重新排列表格(Javascript)

數字排序出現問題 升序結果出現了一個大的數字反而在小數字的後面 function mySort(arr){ for(var x=0;x<arr.length-1;x++){ for(var y=x+1;y<arr.l

使用springmvc頁面獲取資料然後根據獲得的引數資訊進行修改如果修改的資料含有不是基本資料型別的引數。比如傳的引數有Date型別的資料需要我們進行引數型別轉換。

1.1 需求   在商品修改頁面可以修改商品的生產日期,並且根據業務需求自定義日期格式。 1.2 需求分析   由於日期資料有很多格式,所以springmvc沒辦法把字串轉換成日期型別。所以需要自定義引數繫結。前端控制器接收到請求後,找到註解形式的處理器介面卡,對RequestMapping標記的方法進

hive獲取資料

MySQL中獲取資料 public RestMsg<Object> getZhen( HttpServletRequest request) { RestMsg<Object> rm = new RestMsg<Object>();

Java基礎-----Excel獲取資料生成shell指令碼

前言 java讀取Excel的驅動包: 連結:https://pan.baidu.com/s/1ejCR9sS2OUmttFYpQnJkKQ 提取碼:58rm 實現1: 從Excel中讀取表名,由於每個欄位會對應一個表名,故讀取的某列會有若干個連續的表名出現,所以用set集合

Spark Streaming消費Kafka Direct方式資料零丟失實現

一、概述 上次寫這篇文章文章的時候,Spark還是1.x,kafka還是0.8x版本,轉眼間spark到了2.x,kafka也到了2.x,儲存offset的方式也發生了改變,筆者根據上篇文章和網上文章,將offset儲存到Redis,既保證了併發也保證了資料不丟失,經過測試,有效。 二、

1.簡單例項:ASP.NET下Echarts通過Ajax資料庫獲取資料

後臺:Test01.ashx.cs:從資料庫獲取資料,通過HTTP請求(HttpContext)實現和前臺資料傳遞json資料 using System; using System.Collections.Generic; using System.Linq; using

ASP.NET+Echarts+Ajax資料庫獲取資料

html <div class="panel-body"> <div id="signNum" style="height: 400px; width:

spark叢集HDFS讀取資料計算

一、             利用spark從hadoop的hdfs中讀取資料並計算 1.1準備階段 部署好hadoop分散式搭建(+zookeeper,6臺機器)可以參考這篇部落格:http://blog.csdn.net/vinsuan1993/article/deta

使用localStorage解決瀏覽器重新整理後無法再vuex獲取資料的問題

假設有這樣一個場景:使用者登入後,用vuex管理使用者的資訊,登入成功後進入主介面,在進入主介面後重新整理瀏覽器,此時vuex中的使用者資訊將無法獲取到。那麼應該如何解決?辦法之一是使用localStorage儲存使用者資訊。在登入成功後儲存使用者資訊,重新整理瀏覽器後從lo

Http Get 伺服器獲取資料 儲存到本地

package com.http.get; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConn

Jmeter-資料庫獲取資料並作為變數傳輸

再今天重新學習,從資料庫中取資料,並作為變數傳到下一個請求中。 首先第一步要匯入mysql驅動包 一、新增JDBC Connection Configuration 設定連結 Database URL: jdbc:mysql:// 資料庫地址 /庫名 JDBC Driver class:com.my

是用JDBC資料庫獲取資料以java物件返回

/** * * @param c * for example Person.class * @param primaryKeys * primaryKeys為主鍵,引數順序和表中保持一致 如果id, name

hive查詢獲取資料插入到表或動態分割槽

轉自:http://www.crazyant.net/1197.html Hive的insert語句能夠從查詢語句中獲取資料,並同時將資料Load到目標表中。現在假定有一個已有資料的表staged_employees(僱員資訊全量表),所屬國家cnty和所屬州st是該表

Kafka系列(四)Kafka消費者:Kafka讀取資料

本系列文章為對《Kafka:The Definitive Guide》的學習整理,希望能夠幫助到大家應用從Kafka中讀取資料需要使用KafkaConsumer訂閱主題,然後接收這些主題的訊息。在我們深入這些API之前,先來看下幾個比較重要的概念。Kafka消費者相關的概念消

用Java程式碼網頁獲取資料(示例程式碼)

package com.mashensoft.net; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java

php資料庫獲取資料用ajax傳送到前臺

1、資料庫的欄位: 2、php連線資料庫獲取資料庫的資訊放入json_encode($css);{檔案為:db.php} <span style="font-size:14px;"><?php $host="localhost"; $username=

C#sqlserver獲取資料的方法

使用sqlCommand 建立一個連線的命令 再向其中的CommandText中賦值要進行了操作語句指令碼 用SqlDataAdapter獲取返回值,並將其複製到一個datatable中 從dat

stormkafka資料

========================================== 定義從kafka中讀出的資料 import java.io.UnsupportedEncodingException; import java.util.List; import bac