Spark機器學習之-實時聚類演算法呼叫
阿新 • • 發佈:2019-01-24
Spark MLIB中的Kmenas聚類演算法,資料通過SparkStreaming 實時拉取kafka中的資料,並呼叫已經訓練好的聚類模型;根據讀取的資料實時的進行分類
package com.demo.cn.streaming import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.mllib.clustering.KMeansModel import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.rdd.RDD object SparkStreamKafka { val checkDir="E:\\file\\SparkCheckpoint" def functionToCreateContext(): StreamingContext = { val conf=new SparkConf() .setAppName("SparkStreamKafka") .setMaster("local[*]") val sc=new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(1)) // new context ssc.checkpoint(checkDir) // set logDatacheckpoint directory val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "10.1.69.11:6667,10.2.69.12:6667", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "SparkStreamKafka", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("kmeansTest") val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) //載入已經訓練的聚類模型
val kmodel=KMeansModel.load(sc,"file:///D:\\tmp\\clusters") stream.map(record => (record.key, record.value)) .foreachRDD(x=>{ val y=x.filter(x=>{x._2.nonEmpty}) val predictString=y.map(x=>{Vectors.dense(x._2.split(" ").map(_.toDouble))}) val predictValue=kmodel.predict(predictString).first() y.foreach(x=>{println(s"the key is :${x._1} and the values is :${x._2},predictValue:${predictValue}")}) }) ssc } def main(args: Array[String]): Unit = { //從checkPoint中恢復資料 val ssc = StreamingContext.getOrCreate(checkDir, () => functionToCreateContext()) ssc.start() ssc.awaitTermination() } }