1. 程式人生 > >spark將資料寫入ES(ElasticSearch)終極總結

spark將資料寫入ES(ElasticSearch)終極總結

簡介

spark接入ES可以使用多種方式,常見型別如下。

本文主要介紹將case class 類物件寫入ElasticSearch:也就是獲取資料然後使用case class封裝資料,然後在case class中選取一個欄位當做 id,但是這個欄位一定資料不能重複 要唯一。不指定ES自己也會生成id。

準備工作

第一步:

使用Apache Spark將資料寫入ElasticSearch

中。本文使用的是類庫是elasticsearch-hadoop,其從2.1版本開始提供了內建支援Apache Spark的功能,使用elasticsearch-hadoop之前,我們需要引入依賴:本文使用的版本是:6.3.2

<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.3.2</version>
</dependency> 
<dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>6.3.2</version>
</dependency>
<dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.3.2</version>
</dependency>

第二步:

提前建立好索引和type:索引-->sql_command   type-->  sql_info

在spark程式碼中設定ES的相關引數:ES的nodes節點 ip和埠,以及自動建立索引引數

val session: SparkSession = SparkSession.builder().config(conf)
      .config("es.index.auto.create", "true")
      .config("es.nodes", "ip1:9200,ip2:9200,ip2:9200")
      .getOrCreate()

第三步:

呼叫寫入ES的api: saveToEs(),並匯入ES的相關類:import org.elasticsearch.spark._,這將使得所有的RDD擁有saveToEs方法。下面我將一一介紹將不同型別的資料寫入ElasticSearch中。呼叫 saveToEs()時可以指定id也可以不指定,不指定ES會自動生成,看自己需求。自己指定時需要提供一個不唯一的欄位,如果沒有自己可以生成一個,但是一定不能重複 。

//資料寫入es
    import org.elasticsearch.spark._
    rddSource.saveToEs("sql_command/sql_info", Map("es.mapping.id" -> "md5id"))

案例程式碼

object Data2Es {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    if (args.length != 5) {
      println(
        """          	  |cn.missfresh.SparkStreaming2Es
          				  |引數不合法,請傳遞正確的引數個數:
          				  |brokers
          				  |topic
          				  |groupId
          				  |seconds
                          |offtype
        				""".stripMargin)
      sys.exit()
    }
    val Array(brokers, topic, groupId, seconds, offtype) = args
    val conf = new SparkConf().setAppName("RTC_data2es_wangzh")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.streaming.kafka.consumer.poll.ms", "60000")
    conf.set("spark.streaming.kafka.maxRatePerPartition", "500")
    //設定es的相關引數
    val session: SparkSession = SparkSession.builder().config(conf)
      .config("es.index.auto.create", "true")
      .config("es.nodes", "ip1:9200,iP2:9200,ip3:9200")
      .getOrCreate()
    val sc = session.sparkContext
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(seconds.toLong))
    //設定kafka的相關引數
    val topics = Array(topic)
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> offtype, // latest
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //建立Kafka資料流
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(topics, kafkaParams)
    )
")
    kafkaStream.foreachRDD(rdd=>{
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val fliterData = rdd.filter(_.value().split("\\{").length==2)
      val rddSource= fliterData.map(x => {
        val split = x.value().split("\\{")
        //獲得ip
        val ip = IpUtil.getIp(x.value())
        val jsonStr = "{" + split(1)
       //解析json字串,使用case class封裝資料
        val behavior= JsonParse.jsonParse(jsonStr, ip)
      })
      //資料寫入es
    import org.elasticsearch.spark._
    rddSource.saveToEs("sql_command/sql_info", Map("es.mapping.id" -> "md5id"))
    // 提交偏移量
      kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    println("=============》  任務結束 ================》")
  })
    ssc.start()
    ssc.awaitTermination()
  }
}
/**
  *建立對應欄位的case  class
  */
case class UserBehavior ( md5id          :String,
                          application_id :String,
                         session_id     :String,
                         user_ip_address:String,
                         logger_type    :String,
                         logger_location:String,
                         command        :String,
                         command_clean  :String,
                         query_string   :String,
                         current_time   :String,
                         blg_user_name  :String,
                         user_name      :String,
                         ret            :String,
                         mode_type      :String,
                         processor_name :String,
                         last_command :String,
                         mryxblg_authorization_nabled :String,
                         mryxblg_command_monitoring:String)