1. 程式人生 > >(四)DFS檔案操作的原理及程式碼實現

(四)DFS檔案操作的原理及程式碼實現

1、檔案操作原理

1.1、下載過程

  • Client向namenode發起Open file 請求。目的是獲取指定檔案的輸入流
  • namenode收到請求之後,會檢查路徑的合法性,客戶端的操作許可權。如果檢測未通過,則直接報錯返回
  • Client也會向namenode發起Getblockloaction請求,獲取指定檔案的元資料資訊。如果上一步檢測通過,則namenode會將元資料資訊封裝到輸入流裡,返回給客戶端
  • 客戶端根據元資料資訊,直接去對應的datanode讀取檔案塊,然後下載到本地(建立本地的輸出流,然後做流的對接)
  • 關閉流

1.2、上傳過程

  • Client向namenode傳送Create file請求,目的是獲取HDFS檔案的輸出流
  • namenode收到請求之後,會檢查路徑的合法性,客戶端的操作許可權。如果檢測未通過,則直接報錯返回
  • 如果通過檢測,namenode會將檔案的切塊資訊(檔案被切成幾塊,儲存位置,以及副本位置),然後把這些資訊封裝到輸出流裡,返回給客戶端。
  • clinet通過輸出流傳送檔案塊。會把檔案快打散成一個個的package,每個package最大為64KB,再傳輸一個個的package。這種機制叫做資料流管道機制,目的是充分利用每臺機器的頻寬,避免網路瓶頸和高延時的連線,最小化推送所有資料的延時
  • 通過資料流管道機制,實現資料的傳送和副本的複製。datanode之間會進行package傳輸,儲存副本。每臺datanode伺服器收到資料之後,會向上遊反饋,直到最終反饋給Client,這一輪傳輸才算完成。

1.3、刪除檔案過程

  • 客戶端向namenode發現刪除檔案指令,比如:hadoop fs -rm /park01/1.txt
  • namenode收到請求之後,會檢查路徑的合法性,客戶端的操作許可權。如果檢測未通過,則直接報錯返回
  • 如果檢測通過,會將對應的檔案從元資料中刪除。(注意,此時這個檔案並沒有真正從叢集上被刪除
  • 每臺datanode會定期向namenode傳送心跳,會領取刪除的指令,找到對應的檔案塊,進行檔案塊的刪除。

2、編寫程式碼操作檔案

2.1、準備環境

  • eclipse版本: Version: Mars.2 Release (4.5.2)
  • eclipse操作hadoop外掛下載:連結:https://pan.baidu.com/s/1Hiw50aG2s-PTNEzD1LHcDw   提取碼:r3ac 
  • 注意,外掛的版本要和hadoop版本保持一致。並將外掛放在eclipse的plugins目錄下
  • 將hadoop安裝包解壓到無中文路徑的目錄
  • 啟動eclipse,Window | Preferences | Hadoop Map/Reduce,配置好hadoop的安裝目錄

  • 新建hadoop工程 File | New | Others,搜尋Map/Reduce Project。建立後可以發現hadoop依賴的jar包都自動引用過來了。現在就可以開始寫程式碼操作hdfs了。

2.2、編寫程式碼

  • 連線HDFS
  • 下載
  • 上傳
  • 刪除
  • 建立目錄
  • 重新命名
  • 查詢目錄下檔案
  • 遞迴查詢目錄下檔案
  • 獲取檔案塊資訊
package hadoop;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class DfsOperation {
	//連線HDFS
	@Test
	public void testConnectNamenode() throws Exception {
		//建立hadoop環境引數物件,通過物件的set方法設定引數
		//通過物件配置的引數優先順序>配置檔案的配置
		//物件配置的生效範圍是當前工作執行緒。配置檔案的生效範圍是全域性
		Configuration conf = new Configuration();
		//連線HDFS檔案系統
		//Hadoop中有多種檔案系統(FileSystem有很多實現類),其中最重要的事分散式檔案系統
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);

	}
	
	//下載
	@Test
	public void getFile() throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
		
		//獲取HDFS上指定檔案的輸入流
		InputStream in = fs.open(new Path("/park/test.txt"));
		OutputStream out = new FileOutputStream(new File("test.txt"));
		//通過hadoop提供的資料流工具,完成資料流的傳輸
		IOUtils.copyBytes(in, out, conf);
		
		in.close();
		out.close();
		fs.close();
	}
	
	//上傳
	@Test
	public void putFile() throws Exception {
		Configuration conf = new Configuration();
		conf.set("dfs.replication", "1"); //設定副本數量
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
		
		ByteArrayInputStream in = new ByteArrayInputStream("testPutFile!".getBytes());
		OutputStream out = fs.create(new Path("/park/putFile.txt"));
		IOUtils.copyBytes(in, out, conf);
		
		in.close();
		out.close();
		fs.close();
	}
	
	//刪除
	@Test
	public void deleteFile() throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
		
		//false只能刪除不為空的目錄;true不為空的目錄也可以刪除
		//也可以指定檔案去刪除
		fs.delete(new Path("/park"), true);
		
		fs.close();
	}
	
	//建立目錄  
	//fs.mkdirs(new Path("/park02"));
	
	//重新命名
	//fs.rename(new Path("/park"), new Path("/park01"));
	
	//查詢目錄下檔案
	@Test
	public void searchFile() throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
		
		FileStatus[] ls = fs.listStatus(new Path("/result"));
		for(FileStatus status : ls) {
			System.out.println(status);
		}
		
		fs.close();
	}
	
	//遞迴查詢目錄下檔案
	@Test
	public void searchFileByR() throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
		//true表示遞迴檢視
		RemoteIterator<LocatedFileStatus> rt = fs.listFiles(new Path("/result"), true);
		
		while(rt.hasNext()) {
			System.out.println(rt.next());
		}
		
		fs.close();
	}
	
	//獲取檔案塊資訊
	@Test
	public void getFileBlocksInfo() throws Exception {
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.80.100:9000"), conf);
		//0 檢視塊的起始範圍  ;Integer.MAX_VALUE 檢視塊的結束範圍
		//通過這兩個引數控制檢視的塊的範圍
		BlockLocation[] data = fs.getFileBlockLocations(new Path("/park/putFile.txt"), 0, Integer.MAX_VALUE);
		for(BlockLocation bl : data) {
			System.out.println(bl);
			//輸出結果如下: 0,12,hadoop1
			//0 表示第一塊的起始位置; 12表示塊的實際大小;hadoop01表示儲存的datanode伺服器
		}
		
		fs.close();
	}
}