spark將資料寫入ES(ElasticSearch)終極總結
阿新 • • 發佈:2019-01-14
簡介
spark接入ES可以使用多種方式,常見型別如下。
本文主要介紹將case class 類物件寫入ElasticSearch:也就是獲取資料然後使用case class封裝資料,然後在case class中選取一個欄位當做 id,但是這個欄位一定資料不能重複 要唯一。不指定ES自己也會生成id。
準備工作
第一步:
使用Apache Spark將資料寫入到ElasticSearch
elasticsearch-hadoop
,其從2.1版本開始提供了內建支援Apache Spark的功能,使用elasticsearch-hadoop
之前,我們需要引入依賴:本文使用的版本是:6.3.2
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.2</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.3.2</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.3.2</version> </dependency>
第二步:
提前建立好索引和type:索引-->sql_command type--> sql_info
在spark程式碼中設定ES的相關引數:ES的nodes節點 ip和埠,以及自動建立索引引數
val session: SparkSession = SparkSession.builder().config(conf) .config("es.index.auto.create", "true") .config("es.nodes", "ip1:9200,ip2:9200,ip2:9200") .getOrCreate()
第三步:
呼叫寫入ES的api: saveToEs(),並匯入ES的相關類:import org.elasticsearch.spark._,這將使得所有的RDD擁有saveToEs
方法。下面我將一一介紹將不同型別的資料寫入ElasticSearch中。呼叫 saveToEs()時可以指定id也可以不指定,不指定ES會自動生成,看自己需求。自己指定時需要提供一個不唯一的欄位,如果沒有自己可以生成一個,但是一定不能重複 。
//資料寫入es
import org.elasticsearch.spark._
rddSource.saveToEs("sql_command/sql_info", Map("es.mapping.id" -> "md5id"))
案例程式碼
object Data2Es {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.WARN)
if (args.length != 5) {
println(
""" |cn.missfresh.SparkStreaming2Es
|引數不合法,請傳遞正確的引數個數:
|brokers
|topic
|groupId
|seconds
|offtype
""".stripMargin)
sys.exit()
}
val Array(brokers, topic, groupId, seconds, offtype) = args
val conf = new SparkConf().setAppName("RTC_data2es_wangzh")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.kafka.consumer.poll.ms", "60000")
conf.set("spark.streaming.kafka.maxRatePerPartition", "500")
//設定es的相關引數
val session: SparkSession = SparkSession.builder().config(conf)
.config("es.index.auto.create", "true")
.config("es.nodes", "ip1:9200,iP2:9200,ip3:9200")
.getOrCreate()
val sc = session.sparkContext
val ssc: StreamingContext = new StreamingContext(sc, Seconds(seconds.toLong))
//設定kafka的相關引數
val topics = Array(topic)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> offtype, // latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//建立Kafka資料流
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topics, kafkaParams)
)
")
kafkaStream.foreachRDD(rdd=>{
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val fliterData = rdd.filter(_.value().split("\\{").length==2)
val rddSource= fliterData.map(x => {
val split = x.value().split("\\{")
//獲得ip
val ip = IpUtil.getIp(x.value())
val jsonStr = "{" + split(1)
//解析json字串,使用case class封裝資料
val behavior= JsonParse.jsonParse(jsonStr, ip)
})
//資料寫入es
import org.elasticsearch.spark._
rddSource.saveToEs("sql_command/sql_info", Map("es.mapping.id" -> "md5id"))
// 提交偏移量
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
println("=============》 任務結束 ================》")
})
ssc.start()
ssc.awaitTermination()
}
}
/**
*建立對應欄位的case class
*/
case class UserBehavior ( md5id :String,
application_id :String,
session_id :String,
user_ip_address:String,
logger_type :String,
logger_location:String,
command :String,
command_clean :String,
query_string :String,
current_time :String,
blg_user_name :String,
user_name :String,
ret :String,
mode_type :String,
processor_name :String,
last_command :String,
mryxblg_authorization_nabled :String,
mryxblg_command_monitoring:String)