1. 程式人生 > >Hadoop中檔案讀寫(Java)

Hadoop中檔案讀寫(Java)

前言

在本文件中,你將瞭解到如何用Java介面讀寫Hadoop分散式系統中的檔案,以及編碼的轉換等問題。其中有些細節,在你不知道的時候,是非常容易出錯的。 這邊讀寫檔案分以下三種情況:

  • 1. 在非Map Reduce過程中讀寫分散式檔案系統中的檔案

比如說,你想自己遍歷一個檔案,想截斷一個檔案,都屬於這種方式。一般該過程發生在run函式中,程式設計師處理Map Reduce產生的中間檔案上。

  • 2. 在map(或reduce)函式中讀寫一個Record。

對於TextInputFormat,一個Record就是一行。我們會得到一個Text物件,作為一行。要注意的是如果讀入的檔案不是UTF-8 格式(比如GBK,因為TextInputFormat只能解碼UTF-8檔案,直接讀會產生亂碼),我們如何正確轉換成Unicode。

  • 3. 在map(或reduce)函式中,讀寫檔案

比如說,在map函式中,你想通過讀入檔案初始化一個HashMap。

非Map Reduce過程中讀檔案

主要用到FileSystem類,開啟一個檔案後得到FSDataInputStream,據說這個FSDataInputStream會對資料緩 存,所以沒必要包裝成一個BufferedReader,但其readLine()方法是被deprecated,所以你要想一行行的讀,還是轉成 BufferedReader吧。在轉換的過程中,還能指定輸入檔案的編碼,比如這邊是"UTF-8"。 該讀檔案的方式無法在map或reduce方法中呼叫,因為它們處於一個static類中,無法建立一個JobConf。

                JobConf confQ = new JobConf(getConf(), XXXX.class);
                FileSystem fs= FileSystem.get(confQ);
                FSDataInputStream fin = fs.open(new Path("filePathXX"));
                BufferedReader in = null;

		String line;
		try {
			in = new BufferedReader(new InputStreamReader(fin, "UTF-8"));
			
			while ((line = in.readLine()) != null) {

				//...


			}

		} finally {
			if (in != null) {
				in.close();
			}
			
		}

非Map Reduce過程中寫檔案

該過程和讀檔案對應,注意在close之前,呼叫BufferedWriter的flush()方法,不然有資料會沒寫出。

                JobConf confQ = new JobConf(getConf(), XXXX.class);
                FileSystem fs= FileSystem.get(confQ);

                FSDataOutputStream fout = fs.create(new Path("要寫入的檔案全路徑"));
                BufferedWriter out = null;

		try {
			out = new BufferedWriter(new OutputStreamWriter(fout, "UTF-8"));

			out.write("XXXXXX");
			out.newLine();

			}
			out.flush();

		} finally {

			if (out != null) {
				out.close();
			}
		}

Map或reduce方法中需要讀取一個檔案的解決方案

對於很大的檔案,目前的方案可能是預先把這個檔案部署到每個叢集節點上。這邊講兩種小檔案的處理方法。

  • 方法一:把檔案打包到執行的jar包中(可以放在根目錄下),然後用以下方式讀取(fileName就是在根目錄下要讀取的檔名):
                BufferedReader in = null;
		try {
			InputStream fstream = Thread.currentThread()
					.getContextClassLoader().getResourceAsStream("fileName");

			in = new BufferedReader(new InputStreamReader(new DataInputStream(
					fstream), "UTF-8"));
			String line;

			while ((line = in.readLine()) != null) {
				//...
			}

		} finally {
			in.close();
		}
  • 方法二:在Map或Reduce類中,複寫public void configure(JobConf jobIn)方法(在基類MapReduceBase中定義)
   public void configure(JobConf jobIn) {
        super.configure(jobIn);
    try{
    FileSystem fs = null;
    fs = FileSystem.get(jobIn);
    FSDataInputStream in;
    BufferedReader bufread;
    String strLine;
    String[] strList;
    IpField ipField;
    Path IpCityPath = new Path("/user/hadoop/dw/dim/ip_cityid.txt");
        if (!fs.exists(IpCityPath))
            throw new IOException("Input file not found");
        if (!fs.isFile(IpCityPath))
            throw new IOException("Input should be a file");
        in = fs.open(IpCityPath);
        bufread = new BufferedReader(new InputStreamReader(in));
        while ((strLine = bufread.readLine()) != null) {
            strList =strLine.split("\"");
            if(strList.length < 3)
                continue;
            IpField nodeIp = new IpField(strList[0], strList[1], strList[2]);
            CityIpLocal.add(nodeIp);
        }
        in.close();
    }
    catch(IOException e)
    {
       e.printStackTrace();
    }
}

CityIpLocal可以是外部類的一個ArrayList物件

map方法中讀GBK檔案

主要是編碼轉換的問題,我曾經在這個問題上除錯蠻久的。我們輸入的檔案是GBK編碼的,map中,呼叫Text.toString並不會把編碼自動轉換成Unicode,但Java中的字串都是當Unicode來處理的,我們需要手動轉換,方式如下:

Text value;//map傳入的引數
String line = new String(value.getBytes(), 0, value.getLength(),"GBK");

這邊指定的GBK代表讀入檔案的編碼,line返回的是解碼成Unicode後的結果。 特別注意的是,這邊需要指出value.getBytes()得到的byte陣列的起始和長度,(0, value.getLength())。 千萬不要這樣呼叫:String line = new String(value.getBytes(),"GBK");這樣得到的結果,末尾可能會有多餘字串。看來內部是這樣實現的:Text物件是被複用 的,共用一個byte陣列(也可能是char陣列)來存東西,下一次只是從頭開始覆寫這個陣列,到後面如果沒寫到,那麼原來的內容還會在。

引用一段鎮方的話

  如果以TextInputFormat使用檔案作為輸入,map的輸入value為Text型別,text內部實際維護的是一個byte陣列,
  從輸入檔案中直接以byte的形式讀入,不會對byte做轉義:輸入檔案中的位元組流直接進入text的byte陣列。
  
  Text假設內部編碼為utf8,呼叫Text.toString,Text會把內部byte當作utf8處理,如果讀入檔案的實際編碼為gbk
  就會產生亂碼;此時可以通過 new String(Text.getBytes(), 0, Text.getLength(), "輸入檔案實際編碼")實現
  正確轉義。
  
  以TextOutputFormat作為輸出時,reduce過程會強制以utf8編碼輸出。但是這一步有一個小triky,Text的輸出會
  呼叫它的Text.getBytes方法,而由於Text內部以byte陣列形式儲存,實際上可以放任意內容。
  
  注意:1. Hadoop的TextInput/TextOutput很系統編碼完全無關,是通過程式碼硬性寫入為UTF8的。
        2. Text和String對資料的處理是很不一樣的

Map Reduce輸出結果為GBK檔案

這個是竹莊給的解決方法中文問題 基本上就是因為TextOutputFormat把輸出編碼寫死成UTF-8了,自己把這個改掉就可以輸出GBK了。

儲存成Excel能識別的編碼(Unicode)

有的需求,需要用Excel來開啟最後生成的檔案。現在的方式是用Tab分割欄位,這樣就能在Excel中顯示為不同的欄了。 困難一些的是儲存的編碼形式。我研究了下,發現本地的Excel檔案是用Unicode編碼的,於是在Java程式中註明了Unicode,或者UTF- 16,結果還是不行。再看一下,原來需要的是小端的Unicode,而預設UTF-16儲存的是大端的,於是改成了UTF-16LE,指定為小端的。結果 還是失敗,能用記事本正確開啟,卻不能用Excel開啟。後來研究了下,原來還需要指定位元組序的(BOM,Byte Order Mark)。最後寫了如下程式碼搞定(這個方法把UTF-8的檔案轉成了Unicode的):

        private void convertToUnicode(FSDataInputStream convert_in,
			FSDataOutputStream convert_out,String head) throws IOException {
		BufferedReader in = null;
		BufferedWriter out = null;
		try {
			in = new BufferedReader(new InputStreamReader(convert_in, "UTF-8"));
			out = new BufferedWriter(new OutputStreamWriter(convert_out,
					"UTF-16LE"));
			out.write("\uFEFF");
			out.write(head+"\n");
			String line;
			while ((line = in.readLine()) != null) {
				out.write(line);
				out.newLine();
			}

			out.flush();
		} finally {
                        if(in!=null)
			in.close();
                        if(out!=null)
			out.close();
		}
	}

這邊還有個值得注意的現象,out.write("\uFEFF");我們可以查得,Unicode小端的BOM應該是FFFE,這邊為什麼反過來 了呢?原因在於Java的位元組碼順序是大端的,\uFEFF在記憶體中表示為FF在前(低地址),FE在後,寫出後即為FFFE(用UltraEditor 16進位制檢視,的確如此)。這邊的現象應該和機器的大小端無關的。

來源於:http://www.cnblogs.com/noures/archive/2012/08/17/2643841.html