1. 程式人生 > >hadoop 讀流程和寫流程

hadoop 讀流程和寫流程

hadoop HDFD讀流程
hadoop HDFD寫流程

package com.lhj.hadoop;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;



/**
* 建立路徑並上傳檔案,hdfs://node1:9000/test/input/aaa.txt
* 匯出jar檔案到node1
* 執行jar包:hadoop jar HdfsUtils.jar com.lhj.hadoop.HdfsUtils
* @author Administrator
*
* 讀的流程:
* 1.客戶端通過FileSystem的open方法開啟一個輸入流。FileSystem具體的實現是DistributedFileSystem這個類。
* 2.從namenode獲取block的位置資訊
* 3.從datanode讀取資料,(通過流的方式FSDataInputStream)
* 4.close關閉流
*
* write的流程:
* 1.客戶端通過FileSystem的create方法,建立一個輸出流
* 2.往namenode寫元資料
* 3.往其中一個datanode寫資料,由datanode產生執行緒,複製到其他datanode上
* 4.close關閉流
*/
public class HdfsUtils {
        public static void main(String[] args) {
                HdfsUtils hdfsUtils = new HdfsUtils();
                hdfsUtils.read();
                hdfsUtils.write();
        }
       
        private void read() {
                Path path = new Path("hdfs://node1:9000/test/input/aaa.txt");
                Configuration conf = new Configuration();
                BufferedReader reader=null;
               
                try {
                        FSDataInputStream in = FileSystem.get(conf).open(path);//關鍵程式碼,FS開啟路徑
                        reader = new BufferedReader(new InputStreamReader(in));
                       
                        String line=null;
                        while ((line=reader.readLine()) != null) {
                                System.out.println(line);                       
                        }
                } catch (IOException e) {
                        e.printStackTrace();
                } finally{
                        if (reader!=null){
                                try {
                                        reader.close();
                                } catch (IOException e) {
                                        e.printStackTrace();
                                }
                        }
                }
               
        }


        private void write(){
                Path path = new Path("hdfs://node1:9000/test/input/bbb.txt");
                Configuration conf = new Configuration();
                FSDataOutputStream output = null;
               
                String[] contents=new String[]{
                                "aaaaaaaaaaa\n",
                                "bbbbbbbbbbb\n",
                                "ccccccccccc\n",
                                "\n"
                };
               
                try {
                        FileSystem fs = path.getFileSystem(conf);
                        output = fs.create(path);
                       
                        for (String line:contents){
                                output.write(line.getBytes("UTF-8"));
                                output.flush();
                        }

                } catch (IOException e) {
                        e.printStackTrace();
                } finally{
                        try {
                                output.close();
                        } catch (IOException e) {
                                e.printStackTrace();
                        }
                }
        }

}