1. 程式人生 > >大資料(hadoop-小檔案合併、Mapreduce原理)

大資料(hadoop-小檔案合併、Mapreduce原理)

hadoop-小檔案合併

package com.andy.merge;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

public class RegexAcceptFilter implements PathFilter{
	private final String regex ;
	
	public RegexAcceptFilter(String regex){
		this.regex = regex ;
	}

	//只接受符合regex的檔案
	@Override
	public boolean accept(Path path) {
		boolean flag = path.toString().matches(regex) ;
		return flag;
	}
	
}
package com.andy.merge;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;

//PathFilter是一個介面,裡面只有一個方法accept(Path path)
public class RegexUncludeFilter implements PathFilter{
	private final String regex ;

	public RegexUncludeFilter(String regex){
		this.regex = regex ;
	}
	
	//過濾 regex 格式的檔案
	@Override
	public boolean accept(Path path) {
		boolean flag = path.toString().matches(regex);
		//符合得我就接受,不符合的就過濾,所以是非flag
		return !flag;
	}
	
	
}
package com.andy.merge;

import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * 小檔案合併
 * @author huang
 *
 */
public class MegerSmallFiles {
	//寫入到HDFS的FileSystem物件
	private static FileSystem fs = null ;
	
	//本地檔案系統的FileSystem
	private static FileSystem local = null ;
	
	//HDFS服務路徑
	private static final String HDFS_SERVER = "hdfs://192.168.153.111:9000" ;
	
	//合併小檔案的主要方法
	public static void megerFiles() throws Exception {
		//設定系統使用者為hadoop
		System.setProperty("HADOOP_USER_NAME", "hadoop") ;
		
		//讀取hadoop檔案的配置資訊
		Configuration conf = new Configuration() ;
		
		//建立URI
		URI uri = new URI(HDFS_SERVER) ;
		
		//建立兩個檔案系統的fs
		fs = FileSystem.get(uri, conf) ;	//針對HDFS
		local = FileSystem.get(conf) ;		//針對本地檔案系統
		
		/* 獲取指定路徑下的所有檔案
		 * 過濾該路徑下的所有svn檔案
		 *  ^匹配一行的開頭 ;.表示匹配任意一個字元
		 *  *表示匹配0個或多個前面這個字元 ;$匹配一行的結束
		 * */
		FileStatus[] globStatus = local.globStatus(new Path("D:/pdata/*"), 
				new RegexUncludeFilter("^.*svn$"));
		//除錯輸出
		for (FileStatus fileStatus : globStatus) {
			System.out.println(fileStatus.getPath().toString());
		}
		
		//將一組FileStatus物件轉換成Path物件
		Path[] dirs = FileUtil.stat2Paths(globStatus);
		
		//獲取輸入輸出流
		FSDataOutputStream out = null ;
		FSDataInputStream in = null ;
		
		for (Path dir : dirs) {	//具體的每個目錄下面的所有檔案
			//檔名稱
			String fileName = dir.getName().replaceAll("-", "") ;
			//只接受該目錄下的txt檔案
			FileStatus[] txtPaths = local.globStatus(new Path(dir + "/*") ,
					new RegexAcceptFilter("^.*txt$"));
			Path[] txtFiles = FileUtil.stat2Paths(txtPaths);
			
			//設定輸出路徑
			Path hdfsFile = new Path(HDFS_SERVER + "/vip/" + fileName + ".txt") ;
			
			//開啟輸入輸出流,進行讀寫
			out = fs.create(hdfsFile) ;	//輸出流
			for (Path p : txtFiles) {
				in = local.open(p) ;
				IOUtils.copyBytes(in, out, 4096, false);
				//關閉輸入流
				in.close();
			}
			if(null != out){
				out.close();
			}
		}
	}
	
	//程式入口
	public static void main(String[] args) throws Exception {
		megerFiles() ;
		System.out.println("=====小檔案合併成功=====");
	}

}