HDFS操作及小文件合並
阿新 • • 發佈:2017-09-25
讀取輸入 文件路徑 cal final .config block 輸出流 上傳 txt文件
小文件合並是針對文件上傳到HDFS之前
這些文件夾裏面都是小文件
參考代碼
package com.gong.hadoop2;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
/**
* function 合並小文件至 HDFS
* @author 小講
*
*/
public class MergeSmallFilesToHDFS {
private static FileSystem fs = null;
private static FileSystem local = null;
/**
* @function main
* @param args
* @throws IOException
* @throws URISyntaxException
*/
public static void main(String[] args) throws IOException,
URISyntaxException {
list();
}
/**
*
* @throws IOException
* @throws URISyntaxException
*/
public static void list() throws IOException, URISyntaxException {
// 讀取hadoop文件系統的配置
Configuration conf = new Configuration();
//文件系統訪問接口
URI uri = new URI("hdfs://dajiangtai:9000");
//創建FileSystem對象
fs = FileSystem.get(uri, conf);
// 獲得本地文件系統
local = FileSystem.getLocal(conf);
//過濾目錄下的 svn 文件,globStatus從第一個參數通配符合到文件,剔除滿足第二個參數到結果,因為PathFilter中accept是return!
FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));
//獲取73目錄下的所有文件路徑,註意FIleUtil中stat2Paths()的使用,它將一個FileStatus對象數組轉換為Path對象數組。
Path[] dirs = FileUtil.stat2Paths(dirstatus);
FSDataOutputStream out = null;
FSDataInputStream in = null;
for (Path dir : dirs) {
String fileName = dir.getName().replace("-", "");//文件名稱
//只接受日期目錄下的.txt文件,^匹配輸入字符串的開始位置,$匹配輸入字符串的結束位置,*匹配0個或多個字符。
FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
// 獲得日期目錄下的所有文件
Path[] listedPaths = FileUtil.stat2Paths(localStatus);
//輸出路徑
Path block = new Path("hdfs://dajiangtai:9000/middle/tv/"+ fileName + ".txt");
// 打開輸出流
out = fs.create(block);
for (Path p : listedPaths) {
in = local.open(p);// 打開輸入流
IOUtils.copyBytes(in, out, 4096, false); // 復制數據,IOUtils.copyBytes可以方便地將數據寫入到文件,不需要自己去控制緩沖區,也不用自己去循環讀取輸入源。false表示不自動關閉數據流,那麽就手動關閉。
// 關閉輸入流
in.close();
}
if (out != null) {
// 關閉輸出流
out.close();
}
}
}
/**
*
* @function 過濾 regex 格式的文件
*
*/
public static class RegexExcludePathFilter implements PathFilter {
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return !flag;
}
}
/**
*
* @function 接受 regex 格式的文件
*
*/
public static class RegexAcceptPathFilter implements PathFilter {
private final String regex;
public RegexAcceptPathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return flag;
}
}
}
最後一點,分清楚hadoop fs 和dfs的區別
hadoop fs <args>
FS涉及可以指向任何文件系統(如本地,HDFS等)的通用文件系統。因此,當您處理不同的文件系統(如本地FS,HFTP FS,S3 FS等)時,可以使用它
hadoop dfs <args>
dfs非常具體到HDFS。 將工作與HDFS有關。 這已被棄用,我們應該使用hdfs dfs。
hdfs dfs <args>
與第二個相同,即適用於與HDFS相關的所有操作,並且是推薦的命令,而不是hadoop dfs
HDFS操作及小文件合並