1. 程式人生 > >使用spark讀取es中的資料並進行資料清洗,使用fp-growth演算法進行加工

使用spark讀取es中的資料並進行資料清洗,使用fp-growth演算法進行加工

最近學了spark,用fg-growth演算法進行資料的關聯排序

object HelloPFg {
  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Spark MLlib Exercise:K-Means Clustering")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes", "192.168.100.100")

    val sc = new SparkContext(conf)

    var sqlsc = new SQLContext(sc)
    /**
      * 讀取es中的資料,logstash是es中的索引名稱,如果需要讀取多個索引,則使用逗號將索引隔開即可
      * val esLogs = sc.esRDD("logstash-2016.04.04,logstash-2016.04.05").values
      * 如果需要讀取不同的index中的不同的type中的資料,則分別讀取,然後使用union將多個rdd合併成一個rdd即可
    val esLogs = sc.esRDD("logstash-2016.04.04/spark").values
    val esLogs1 = sc.esRDD("logstash-2016.04.05/docs").values
    val test=esLogs.union(esLogs1)
      * */
    val esLogs = sc.esRDD("logstash-2016.04.04").values//使用values取出資料中的values,本來取出的資料為Map

    val line_num = esLogs.count()
    //對資料進行過濾,只保留防火牆的資料
    val waf1 = esLogs.filter(_.contains("waf_logtype"))
    System.out.println("waf1:" + waf1.first())
    System.out.println("waf1:srcip:" + waf1.first().get("srcip"))
    //去掉含有message的資料
    var waf2 = waf1.map(m => m.-("message"))
    System.out.println("waf2...............waf2........."+waf2.first())
    //組裝fp-growth需要的資料型別,fpg演算法需要RDD型別的Array[String]型別
    var waf3 = waf2.map(m => Array(m.get("waf_logtype").toString(),
      m.get("url").toString().substring(0, dns(m.get("url").toString())+1),//對url進行過濾,去掉第一個反斜槓後面的部分
      m.get("srcip").toString(), m.get("method").toString()))
    
    println("...............waf3:........."+waf3.first())
    System.out.println("waf3:"+waf3.first().mkString(","))
    //設定最小支援度,以及分片的數量,分片的數量就是計算的結果會生成檔案的個數
    val fpg = new FPGrowth()
      .setMinSupport(0.2)
      .setNumPartitions(10)
    val model = fpg.run(waf3)

    model.freqItemsets.collect().foreach { itemset =>
      println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
    }

    val minConfidence = 0.8
    model.generateAssociationRules(minConfidence).collect().foreach { rule =>
      println(
        rule.antecedent.mkString("[", ",", "]")
          + " => " + rule.consequent.mkString("[", ",", "]")
          + ", " + rule.confidence)
    }
  }
  private def dns(line: String): Int = {
    if (line.indexOf('/') > 0) line.indexOf('/')
    else 0
  }
}