1. 程式人生 > >hadoop hdfs 上傳下載檔案

hadoop hdfs 上傳下載檔案

上傳檔案:

package uploadfile;

import java.io.*;
import java.net.URI;
import java.util.Date;

import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.conf.Configuration;

public class UploadFile {

	public static void main(String[] args) {
		
		try{
			
			long begin = System.currentTimeMillis();
			System.out.println("*******************開始 時間 " + new Date() + "************************");
			String localSrc = args[0];  //D://cloudra cdh4.txt
			String dst=args[1];
	
			System.out.println(localSrc);
			System.out.println(dst);
			
			InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
			
//			byte[] a=new byte[1024];
//			if(in.read(a)!=-1){
//				String as = new String(a);
//				System.out.println(as);
//			}
			
			Configuration conf = new Configuration();
			FileSystem fs = FileSystem.get(URI.create(dst),conf);
	
			OutputStream out = fs.create(new Path(dst));
	
			IOUtils.copyBytes(in,out,4096,true);
			
			long end = System.currentTimeMillis();
			long excutetime = end - begin ;
			System.out.println("*******************結束 時間 " + new Date() + "************************");
			
			System.out.println();
			System.out.println("====================");
			System.out.println("消耗時間為:" + excutetime + " 毫秒");
			System.out.println("          " + excutetime/1000 + "秒");
			System.out.println("          " + excutetime/60000 + "分");
			System.out.println("====================");
			System.out.println();
			
		}catch(Exception e){
			e.printStackTrace();
		}
	}

}


下載檔案:

package uploadfile;
  
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**  
 * *  
 * * 
 * *Description: 檢視Hadoop檔案系統中的檔案,利用hadoop FileSystem介面中的FSDataInputStream  
 * * FSDataInputStream還具有流定位的能力,可以從檔案的任意位置開始讀取  
 * *  
 * * @author charles.wang  
 * * @created May 26, 2012 12:28:49 PM  
 * */ 
public class DownloadFile {      
	
	/**      * @param args      */     
	public static void main(String[] args) throws Exception{    
		
		//第一個引數傳遞進來的是Hadoop檔案系統中的某個檔案的URI,以hdfs://ip 的theme開頭         
		String uri = args[0];      
		
		//讀取Hadoop檔案系統的配置         
		Configuration conf = new Configuration();         
		conf.set("Hadoop.job.ugi", "hadoop-user,hadoop-user");    
		
		//FileSystem是使用者操作HDFS的核心類,它獲得URI對應的HDFS檔案系統         
		FileSystem fs = FileSystem.get(URI.create(uri),conf);         
		FSDataInputStream in = null;    
		
		try{         
			
			//實驗一:輸出全部檔案內容             
			System.out.println("實驗一:輸出全部檔案內容");     
			
			//讓FileSystem開啟一個uri對應的FSDataInputStream檔案輸入流,讀取這個檔案             
			in = fs.open( new Path(uri) ); 
			
			//用Hadoop的IOUtils工具方法來讓這個檔案的指定位元組複製到標準輸出流上             
			//IOUtils.copyBytes(in, System.out,50,false);   //輸出到控制檯上  
			OutputStream out = new BufferedOutputStream(new FileOutputStream("D:\\aassdcc.txt"));
			IOUtils.copyBytes(in, out,4096,true);
			
			System.out.println("Download success!!!");                  
			
//			//實驗二:展示FSDataInputStream檔案輸入流的流定位能力,用seek進行定位             
//			System.out.println("實驗二:展示FSDataInputStream檔案輸入流的流定位能力,用seek進行定位");   
//			
//			//假如我們要吧檔案輸出3次             
//			//第一次輸入全部內容,第二次輸入從第20個字元開始的內容,第3次輸出從第40個字元開始的內容             
//			for (int i=1;i<=3;i++){                 
//				in.seek(0+20*(i-1));                 
//				System.out.println("流定位第 "+i+" 次:" );                 
//				IOUtils.copyBytes(in, System.out,4096,false);    
//				
//			}         
		}catch(Exception e){
			e.printStackTrace();
		}finally{             
			IOUtils.closeStream(in);        
		}      
	}  
}
	

總結:

這裡採用的方法是通過 FsUrlStreamHandlerFactory 例項呼叫URL 中的setURLStreamHandlerFactory 方法。由於JAVA 虛擬機器只能用一次上述方法,因此 通常在靜態方法中呼叫上述方法。這個限制意味首如果程式的其他元件--如不受你控制的第三方元件--已經聲明瞭一個URL例項,你將無法再使用上述方法從Hadoop 中讀取資料。

我們可以呼叫Hadoop 中簡潔的IOUtils 類,並在finally子句中關閉資料流,同時也可以在輸入流和輸出流之間複製資料。copyBytes方法的最後兩個引數,第一個用於設定複製的緩衝區大小,第二個用於設定複製結束後是否關閉資料流。