1. 程式人生 > >spark streaming 接收kafka資料寫入Hive分割槽表

spark streaming 接收kafka資料寫入Hive分割槽表

直接上程式碼

object KafkaToHive{
	def main(args: Array[String]){
		val sparkConf = new SparkConf().setAppName("KafkaToHive")
		val sc = new SparkContext(sparkConf)
		val ssc = new StringContext(sc,Seconds(60))
		// 建立kafka引數
		val kafkaParams = Map[String,Object](
			//ip為kafka叢集ip,埠為叢集埠
			"bootstrap.servers" -> "ip1:port1,ip2:port2,ip:port3",
			"group.id" -> "KafkaToHive_group1",  //自定義組名稱
			"auto.offset.reset" -> "earliest",
			"enable.auto.commit" -> "false")
		val topics = Array("test1")
		val stream = KafkaUtils.createDirectStreaming[String,String](
			ssc,PreferConsistent,
			Subscribe[String,String](topics,kafkaParms)
		stream.foreachRDD(rdd=>{
			if(rdd.count>0){
				val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
				//TODO 具體處理邏輯
				//寫入Hive
				//value為實際操作中的結果集,即是//TODO返回的結果集
				val subRdd = rdd.sparkContext.parallelize(value)
				val sqlContext : SQLContext = new HiveContext(rdd.sparkContext)
				sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
				sqlContext.setConf("hive.exec.dynamic.partition","true")						       															    						       sqlContext.sql("use database1")
		     	val tempTable = sqlContext
		     	.read
		     	.format("json")
		     	.json(subRdd)
		     	.select(cols.map(new Column(_)): _*)
		     	.coalesce(1)
		     	.write
		     	.mode(SaveMode.Append)
		     	.insertInto("task_exec_time")
		        //提交offset
		       stream.asInstanceOf[CanCommitOffsets].commotAsync(offsetRanges)
		}
	})
}

}