存在Hadoop叢集上的檔案,大部分都會經過壓縮,如果是壓縮後的檔案,我們直接在應用程式中如何讀取裡面的資料?答案是肯定的,但是比普通的文字讀取要稍微複雜一點,需要使用到Hadoop的壓縮工具類支援,比如處理gz,snappy,lzo,bz壓縮的,前提是首先我們的Hadoop叢集得支援上面提到的各種壓縮檔案。

本次就給出一個讀取gz壓縮檔案的例子核心程式碼:

def readHdfsWriteKafkaByDate(fs:FileSystem,date:String,conf:Configuration,topic:String,finishTimeStamp:Long):Unit={
  
  //訪問hdfs檔案,只讀取gz結尾的壓縮檔案,如果是.tmp結尾的不會讀取
    val path=new Path("/collect_data/userlog/"+date+"/log*.gz")
    //例項化壓縮工廠編碼類
    val factory = new CompressionCodecFactory(conf)
    //讀取通配路徑
    val items=fs.globStatus(path)
    var count=0
    //遍歷每一個路徑檔案
    items.foreach(f=>{
    //列印全路徑
      println(f.getPath)
      //通過全路徑獲取其編碼
      val codec = factory.getCodec(f.getPath())//獲取編碼
      //讀取成資料流
      var  stream:InputStream = null;
      if(codec!=null){
      //如果編碼識別直接從編碼建立輸入流
        stream = codec.createInputStream(fs.open(f.getPath()))
      }else{
      //如果不識別則直接開啟
        stream = fs.open(f.getPath())
      }
      val writer=new StringWriter()
      //將位元組流轉成字串流
      IOUtils.copy(stream,writer,"UTF-8")
      //得到字串內容
      val raw=writer.toString
      //根據字串內容split出所有的行資料,至此解壓資料完畢
      val raw_array=raw.split("\n")
      //遍歷資料      
      raw_array.foreach(line=>{

        val array = line.split("--",2) //拆分陣列
        val map = JSON.parseObject(array(1)).asScala
        val userId = map.get("userId").getOrElse("").asInstanceOf[String] //為空為非法資料
        val time = map.get("time").getOrElse("") //為空為非法資料
        if(StringUtils.isNotEmpty(userId)&&(time+"").toLong<=finishTimeStamp){//只有資料
          pushToKafka(topic,userId,line)
          count=count+1
        }

      })

    })

  }

壓縮和解壓模組用的工具包是apache-commons下面的類:

import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils

如果想在Windows上除錯,可以直接設定HDFS的地址即可

-     val conf = new Configuration()//獲取hadoop的conf
//    conf.set("fs.defaultFS","hdfs://192.168.10.14:8020/")//windows上除錯用

至此資料已經解壓並讀取完畢,其實並不是很複雜,用java程式碼和上面的程式碼也差不多類似,如果直接用原生的api讀取會稍微複雜,但如果我們使用Hive,Spark框架的時候,框架內部會自動幫我們完成壓縮檔案的讀取或者寫入,對使用者透明,當然底層也是封裝了不同壓縮格式的讀取和寫入程式碼,這樣以來使用者將會方便許多。

參考文章:

有什麼問題可以掃碼關注微信公眾號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。

輸入圖片說明