1. 程式人生 > >sparkstreaming結合sparksql-2.x實時向hive中寫資料

sparkstreaming結合sparksql-2.x實時向hive中寫資料

今天主要來介紹一下SparkSql,2.x新版本操作hive的一個寫法.

Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也能夠被用於從已存在的 Hive 環境中讀取資料

廢話不多說,直接上程式碼:

package spark

import java.io.File
import java.util
import kafka.{PropertiesScalaUtils, RedisKeysListUtils}
import kafka.SparkStreamingKafka.{dbIndex, kafkaStreams}
import net.sf.json.JSONObject
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import redis.RedisPool

/**
  * 利用sparksql 2.0向hive中寫資料;
  */
object SparkSqlDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO)
    val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath
    val spark = SparkSession.builder().appName("Spark SQL Jason").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
    spark.conf.set("spark.streaming.kafka.maxRatePerPartition", "2000")
    spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    spark.conf.set("spark.streaming.concurrentJobs", "10")
    spark.conf.set("spark.streaming.kafka.maxRetries", "50")
    @transient
    val sc = spark.sparkContext
    val scc = new StreamingContext(sc, Seconds(2))
    val topic = "jason_20180511"
    val topicSet: Set[String] = Set(topic) //設定kafka的topic;
    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "latest",
      "value.deserializer" -> classOf[StringDeserializer]
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker")
      , "group.id" -> PropertiesScalaUtils.loadProperties("groupId")
      , "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val maxTotal = 200
    val maxIdle = 100
    val minIdle = 10
    val testOnBorrow = false
    val testOnReturn = false
    val maxWaitMillis = 500
    RedisPool.makePool(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, maxTotal, maxIdle, minIdle, testOnBorrow, testOnReturn, maxWaitMillis)
    val jedis = RedisPool.getPool.getResource
    jedis.select(dbIndex)
    val keys: util.Set[String] = jedis.keys(topic + "*")
    if (keys.size() == 0) {
      kafkaStreams = KafkaUtils.createDirectStream[String, String](
        scc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
    } else {
      val fromOffsets: Map[TopicPartition, Long] = RedisKeysListUtils.getKeysList(PropertiesScalaUtils.loadProperties("redisHost"), PropertiesScalaUtils.loadProperties("redisPort").toInt, topic)
      kafkaStreams = KafkaUtils.createDirectStream[String, String](
        scc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, fromOffsets))
    }
    RedisPool.getPool.returnResource(jedis)
    kafkaStreams.foreachRDD(rdd=>{
      val jedis_jason = RedisPool.getPool.getResource
      jedis_jason.select(dbIndex)
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      import spark.sql
      if(!rdd.isEmpty()){
        val rowRDD:RDD[Row] = rdd.map(x=>{
          val json = JSONObject.fromObject(x.value().toString)
          val a = json.get("name")
          val b = json.get("addr")
          Row(a,b)
        })
        val schemaString = "name addr"
        val field = schemaString.split(" ").map(x=> StructField(x,StringType,nullable = true))
        val schema = StructType(field)
        val df = spark.createDataFrame(rowRDD, schema)
        df.show()
        df.createOrReplaceTempView("tempTable")
        val sq = "insert into test_2 select * from tempTable"
        sql(sq)
        println("插入hive成功了")
      }
      offsetRanges.foreach { offsetRange =>
        println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
        val topic_partition_key_new = offsetRange.topic + "_" + offsetRange.partition
        jedis_jason.set(topic_partition_key_new, offsetRange.untilOffset + "")
      }
    })
    scc.start()
    scc.awaitTermination()
  }
}

需要注意的是: spark.sql.warehouse.dir 配置的目錄,該目錄預設為Spark應用程式當前目錄中的 spark-warehouse 目錄 但請注意,自從2.0.0以來,hive-site.xml 中的 hive.metastore.warehouse.dir 屬性已被棄用。 而是使用 spark.sql.warehouse.dir 來指定倉庫中資料庫的預設位置。

還有一個問題是,這樣寫的話,會在hdfs上這個表的目錄下生成很多的小檔案,這個時候如果想在hive中進行統計,計算的時候,會產生很多個map,嚴重影響計算的速度,大家可以先考慮下這個問題.

為了解決在表目錄下面生成很多小檔案的問題,我們可以把hive表建成一個分割槽表,怎麼建分割槽表在我的另一篇blog裡面有寫到,或者可以直接用: insert overwrite table combine_data partition (day_time='2018-08-01') select data,enter_time from combine_data where day_time = '2018-08-01';來合併小檔案.

或者也可把用reparation減少分割槽數,但是這麼寫,會減少rdd的並行度,降低效能,自己參考使用.

如果有寫的不對的地方,歡迎大家指正,如果有什麼疑問,可以加QQ群:340297350,謝謝

參考:

http://spark.apachecn.org/docs/cn/2.2.0/sql-programming-guide.html點選開啟連結

相關推薦

sparkstreaming結合sparksql-2.x實時hive資料

今天主要來介紹一下SparkSql,2.x新版本操作hive的一個寫法. Spark SQL 的功能之一是執行 SQL 查詢.Spark SQL 也能夠被用於從已存在的 Hive 環境中讀取資料 廢話不多說,直接上程式碼: package spark import j

使用Sqoop從PostgreSQLHive遷移資料遇到的問題

postgreSQL的介面 跟mysql不同之處就是,多了一個 2 ,這也是導致資料遷移錯誤原因 1.資料庫名稱   2.schema  3.表名 PostgreSQL中的物件屬於三層模型,即database->schema->table。

SparkStreamingHbase資料

在SparkStreaming中統計了資料之後,我們需要將結果寫入外部檔案系統。 首先,需要說一下,下面的這個方法。 foreachRDD(func) 最通用的輸出操作,把func作用於從stream生成的每一個RDD。 注意:這個函式是在 執行streaming程式的dr

sparkStreaming結合sparkSql進行日誌分析

pass mysql foreach eat instance dataframe sel park array package testimport java.util.Propertiesimport org.apache.spark.SparkConfimport o

sparkStreaming結合SparkSql實例

top alt comment each 語句 cit 計算 相關 分隔 SparkSQL結合SparkStreaming的使用 Flume+Kafka+SparkStreaming已經發展為一個比較成熟的實時日誌收集與計算架構,利用Kafka,即可以支持將用於離線分析的

sparksql 2.x demo I

package com.ws.sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql._ import or

sparksql讀取hive資料儲存到hdfs

package wondersgroup_0905_Test import org.apache.spark.sql.SparkSession object sparkHive { def main(args: Array[String]): Unit = { //資料庫名稱

SparkSQL讀取Hive資料

注意紅色字。 ---------------------- 由於我Spark採用的是Cloudera公司的CDH,並且安裝的時候是線上自動安裝和部署的叢集。最近在學習SparkSQL,看到SparkSQL on HIVE。下面主要是介紹一下如何通過SparkSQL在

新版VirtualBox5.2.x設定Ubuntu與windows共享資料夾的方法

1、安裝增強功能首先點選“裝置”——“安裝增強功能”。即可在主資料夾下看到:點選執行軟體,完成增強功能的安裝。2、設定共享資料夾點選“裝置”——“共享資料夾”,新增共享資料夾,並勾選“自動掛載”和“固定分配”。如圖:重啟ubuntu,即可看到共享資料夾(所在目錄為“ /med

Spark 2.x 決策樹 示例程式碼-IRIS資料

資料集下載 下載連結 程式碼 package Iris; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import o

分散式sparkSQL引擎應用:從遠端通過thriftServer連線spark叢集處理hive資料

實現原理: 客戶端(java程式)與thriftServer連線,thriftServer再代理客戶端轉換成spark的操作流程,再載入hive的資料到spark的worker節點,並執行Map-Re

Hive通過查詢語句插入資料過程發現的坑

前言最近在學習使用Hive(版本0.13.1)的過程中,發現了一些坑,它們或許是Hive提倡的比關係資料庫更加自由的體現(同時引來一些問題),或許是一些bug。總而言之,這些都需要使用Hive的開發人員額外注意。本文旨在列舉我發現的3個通過查詢語句向表中插入資料過程中的問題,

Boot 2.x 普羅米修斯資料採集

<!-- boot2.x 相容--> <!-- The client --> <dependency

Spring Boot 2.x基礎教程:配置元資料的應用

在使用Spring Boot開發應用的時候,你是否有發現這樣的情況:自定義屬性是有高量背景的,滑鼠放上去,有一個`Cannot resolve configuration property`的配置警告。 ![](https://img2020.cnblogs.com/other/626506/202101/

通過資料庫批量kettle插入資料,建立trans和job的模板(按照不同的要求需要自行調整)

import psycopg2 # 用來操作資料庫的類 class GPCommand(object): # 類的初始化 def __init__(self): self.hostname = 'XXX.XX.X.XX' self.username

VS2013 c++連結資料庫,應用儲存過程,資料庫寫入資料

// ConsoleApplication1.cpp : 定義控制檯應用程式的入口點。 // #include "stdafx.h" #include "iomanip" using namespace std; #import "c:\Program Files\Common Files\S

使用spark將hive資料匯入到mongodb

import com.huinong.truffle.push.process.domain.common.constant.Constants; import com.mongodb.spark.MongoSpark; import com.mongodb.spark.config.WriteConf

Pig指令碼從Hiveload資料並存入到Hbase

1、我們先建一個Hive表test01: create table test01(name String, age int, phone String,province String, city String) ROW FORMAT DELIMITED FIELDS TERMINATED B

【嵌入式 C】地址空間資料

做嵌入式開發的,一定免不了和硬體打交道。我們通過讀寫暫存器的方式來管理相關的硬體,這裡的暫存器並不是CPU空間中的暫存器,我們的暫存器實際上是一個地址單元(屬於特殊暫存器),燒寫程式時要被對映到地址空間上。所以我們通過讀寫地址這種方法來訪問暫存器是避免不了的。 方法: (*(volatile

【JEECG示例文件】使用Kettle從mysqloracle抽取資料

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!