1. 程式人生 > >spark處理中文亂碼問題!!|����| | �㶫| | �Ϻ�| |����| |����| |����|

spark處理中文亂碼問題!!|����| | �㶫| | �Ϻ�| |����| |����| |����|

既然能看見這篇文章,說明你遇到是亂碼問題,具體問題咱們就不再分析了,直接來看為什麼亂碼,如何不讓它亂碼
首先咱們分析為什麼會亂碼,首先因為spark沒有自己讀取檔案的方式所以它採用了hadoop的讀取檔案的方式,因為日誌的格式是GBK編碼的,而hadoop上的編碼預設是用UTF-8,導致最終輸出亂碼。所以咱們應該制定編碼格式是GBK的,下面通過一個案例來表示直接讀取和指定方式讀取的結果差別,以及程式碼的修改位置
直接使用spark的textfile讀取:


import org.apache.spark.sql
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

object helllo_dataset {
  def main(args: Array[String]): Unit = {
    val spark=new sql.SparkSession
.Builder().master("local").appName("helllo_dataset").getOrCreate() val data_DS: Dataset[String] = spark.read.textFile("ip.txt") //ip.txt的檔案格式(1.0.1.0|1.0.3.255|16777472|16778239|亞洲|中國|福建|福州||電信|350100|China|CN|119.306239|26.075302) import spark.implicits._ val ipDS: Dataset[String] = data_DS.map
(line => { val data_demo: Array[String] = line.split("[|]") val ip = data_demo(6) ip }) val ipDF: DataFrame = ipDS.toDF("ip") ipDF.createTempView("T_ip_data") val result: DataFrame = spark.sql("select ip from T_ip_data") result.show() } }

輸出結果:
這裡寫圖片描述

下面我們跳過spark的textfile,直接在hadoopfile上對引數進行限定,程式碼如下


import org.apache.spark.sql
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.rdd.RDD

object saprkSql_ {
  def main(args: Array[String]): Unit = {
    val spark=new sql.SparkSession.Builder().master("local").appName("helllo_dataset").getOrCreate()
//程式碼不一樣的地方就在這裡!!!(下面這一段程式碼)
    val data_DS: RDD[String] = spark.sparkContext.hadoopFile("C:\\Users\\qixing\\Desktop\\ip.txt", classOf[TextInputFormat],
      classOf[LongWritable], classOf[Text]).map(
      pair => new String(pair._2.getBytes, 0, pair._2.getLength, "GBK"))
    import spark.implicits._

        val ipDS: RDD[String] = data_DS.map(line => {
          val data_demo: Array[String] = line.split("[|]")
          val ip = data_demo(6)
          ip
        })
    ipDS
    val ipDF: DataFrame = ipDS.toDF("ip")
    ipDF.createTempView("T_ip_data")
    val result: DataFrame = spark.sql("select ip from T_ip_data")
    result.show()
  }
}

列印結果:
這裡寫圖片描述

之所以會出現上面的亂碼問題,其實問題的本源很簡答,就是編碼和解碼使用的不一致,關於字元編碼問題大家可以看看這個字元編碼白話理解