1. 程式人生 > >spark HelloWorld程序(scala版)

spark HelloWorld程序(scala版)

special hide dst tproxy top targe 提取 main read

使用本地模式,不需要安裝spark,引入相關JAR包即可:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <
groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency>

創建spark:

        val sparkUrl = "local"
        val conf = new SparkConf()
                //.setJars(Seq("/home/panteng/IdeaProjects/sparkscala/target/spark-scala.jar"))
.set("fs.hdfs.impl.disable.cache", "true") .set("spark.executor.memory", "8g") val spark = SparkSession .builder() .appName("Spark SQL basic example") .config(conf) .config("spark.some.config.option", "some-value") .master(sparkUrl) .getOrCreate()

加載本地文件:

val parquetFileDF = spark.read.parquet("/home/panteng/下載/000001_0")
            //spark.read.parquet("hdfs://10.38.164.80:9000/user/root/000001_0")

文件操作:

parquetFileDF.createOrReplaceTempView("parquetFile")

val descDF = spark.sql("SELECT substring(description,0,3) as pre ,description FROM parquetFile LIMIT 100000")
val diffDesc = descDF.distinct().sort("description")
diffDesc.createOrReplaceTempView("pre_desc")
val zhaoshang = spark.sql("select * from pre_desc")
zhaoshang.printSchema()

遍歷處理:

zhaoshang.foreach(row => clustering(row))
val regexRdd = spark.sparkContext.parallelize(regexList)
regexRdd.repartition(1).saveAsTextFile("/home/panteng/下載/temp6")

spark.stop()

附其他函數:

def clustering(row: Row): String = {
        try {
            var tempRegex = new Regex("null")
            if (textPre.equals(row.getAs[String]("pre"))) {
                textList = row.getAs[String]("description").replaceAll("\\d","0") :: textList
                return "continue"
            } else {
                if (textList.size > 2) {
                    tempRegex = ScalaClient.getRegex(textList)
                    regexList = tempRegex :: regexList
                }
                if (row.getAs[String]("pre") != null && row.getAs[String]("description") != null) {
                    textPre = row.getAs[String]("pre")
                    textList = textList.dropRight(textList.size)
                    textList = row.getAs[String]("description") :: textList
                }
                return "ok - " + tempRegex.toString()
            }
        } catch {
            case e: Exception => println("kkkkkkk" + e)
        }
        return "error"
    }

技術分享
package scala.learn

import top.letsgogo.rpc.ThriftProxy

import scala.util.matching.Regex

object ScalaClient {
    def main(args: Array[String]): Unit = {
        val client = ThriftProxy.client
        val seqList = List("您尾號9081的招行賬戶入賬人民幣689.00元",
            "您尾號1234的招行一卡通支出人民幣11.00元",
            "您尾號2345的招行一卡通支出人民幣110.00元",
            "您尾號5432的招行一卡通支出人民幣200.00元",
            "您尾號5436的招行一卡通入賬人民幣142.00元")
        var words: List[String] = List()
        for (seq <- seqList) {
            val list = client.splitSentence(seq)
            for (wordIndex <- 0 until list.size()) {
                words = list.get(wordIndex) :: words
            }
        }
        val wordlist = words.map(word => (word, 1))
        //方法一:先groupBy再map
        var genealWords: List[String] = List()
        wordlist.groupBy(_._1).map {
            case (word, list) => (word, list.size)
        }.foreach((row) => {
            (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
        })

        val list = client.splitSentence("您尾號1234的招行一卡通支出人民幣200.00元")
        val regexSeq: StringBuilder = new StringBuilder
        val specialChar = List("[", "]", "(", ")")
        for (wordIndex <- 0 until list.size()) {
            var word = list.get(wordIndex)
            if (genealWords.contains(word) && !("*".equals(word))) {
                if (specialChar.contains(word.mkString(""))) {
                    word = "\\" + word
                }
                regexSeq.append(word)
            } else {
                regexSeq.append("(.*)")
            }
        }
        println(regexSeq)
        val regex = new Regex(regexSeq.mkString)
        for (seq <- seqList) {
            println(regex.findAllIn(seq).isEmpty)
        }
    }

    def getRegex(seqList: List[String]) = {
        val client = ThriftProxy.client
        var words: List[String] = List()
        for (seq <- seqList) {
            val list = client.splitSentence(seq)
            for (wordIndex <- 0 until list.size()) {
                words = list.get(wordIndex) :: words
            }
        }
        val wordlist = words.map(word => (word, 1))
        //方法一:先groupBy再map
        var genealWords: List[String] = List()
        wordlist.groupBy(_._1).map {
            case (word, list) => (word, list.size)
        }.foreach((row) => {
            (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
        })

        val list = client.splitSentence(seqList(0))
        val regexSeq: StringBuilder = new StringBuilder
        val specialChar = List("[", "]", "(", ")")
        for (wordIndex <- 0 until list.size()) {
            var word = list.get(wordIndex)
            if (genealWords.contains(word) && !("*".equals(word))) {
                if (specialChar.contains(word.mkString(""))) {
                    word = "\\" + word
                }
                regexSeq.append(word)
            } else {
                if(regexSeq.size > 4) {
                    val endStr = regexSeq.substring(regexSeq.size - 4, regexSeq.size - 0)
                    if (!"(.*)".equals(endStr)) {
                        regexSeq.append("(.*)")
                    }
                }else{
                    regexSeq.append("(.*)")
                }
            }
        }
        println(regexSeq + "  " + seqList.size)
        val regex = new Regex(regexSeq.mkString.replaceAll("0+","\\\\d+"))
        //for (seq <- seqList) {
        //    println(regex.findAllIn(seq).isEmpty)
        //}
        regex
    }
}
批量數據提取正則

spark HelloWorld程序(scala版)