spark streaming 接收kafka資料寫入Hive分割槽表
阿新 • • 發佈:2018-12-26
直接上程式碼
object KafkaToHive{ def main(args: Array[String]){ val sparkConf = new SparkConf().setAppName("KafkaToHive") val sc = new SparkContext(sparkConf) val ssc = new StringContext(sc,Seconds(60)) // 建立kafka引數 val kafkaParams = Map[String,Object]( //ip為kafka叢集ip,埠為叢集埠 "bootstrap.servers" -> "ip1:port1,ip2:port2,ip:port3", "group.id" -> "KafkaToHive_group1", //自定義組名稱 "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false") val topics = Array("test1") val stream = KafkaUtils.createDirectStreaming[String,String]( ssc,PreferConsistent, Subscribe[String,String](topics,kafkaParms) stream.foreachRDD(rdd=>{ if(rdd.count>0){ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //TODO 具體處理邏輯 //寫入Hive //value為實際操作中的結果集,即是//TODO返回的結果集 val subRdd = rdd.sparkContext.parallelize(value) val sqlContext : SQLContext = new HiveContext(rdd.sparkContext) sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict") sqlContext.setConf("hive.exec.dynamic.partition","true") sqlContext.sql("use database1") val tempTable = sqlContext .read .format("json") .json(subRdd) .select(cols.map(new Column(_)): _*) .coalesce(1) .write .mode(SaveMode.Append) .insertInto("task_exec_time") //提交offset stream.asInstanceOf[CanCommitOffsets].commotAsync(offsetRanges) } }) }
}