Hadoop:本地檔案(window系統)定時獲取檔案並上傳至HDFS檔案(兩個虛擬機器)系統 Java 實現
實現功能:定時日誌採集並上傳至HDFS檔案系統的Java API實現
環境+工具:windows + 虛擬機器Centos * 2 + eclipse + windows下編譯的Hadoop jar包 + Hadoop叢集
一、流程
1)啟動一個定時任務,規劃各種路徑
——定時探測日誌源目錄 (本地目錄) F:/logs/get_log/ ;
——獲取需要採集的檔案 (檔案過濾器,比如以get_log開頭);
——移動這些檔案到一個待上傳臨時目錄 F:/logs/uptoload/ ;
——遍歷臨時檔案目錄,逐一傳輸至HDFS的目標路徑(若不存在則建立) /access_log/logs_日期/
上傳以後HDFS檔案系統中重新命名的字首:get_log_ ; 檔案的字尾:.log
——同時將傳輸完成的檔案移動到備份目錄(若不存在則建立) F:/logs/backup_日期/
啟動第二個定時任務
2)採用log4j輸出檔案採集日誌
實現程式碼:
1.建立LogAccess類
package hdfs_log; import java.util.Timer; import java.util.TimerTask; public class LogAccess { public static void main(String[] args) { //定義一個定時器物件,每隔1小時執行一次日誌獲取任務 Timer tm = new Timer(); //繼承TimerTask類,重新run方法 tm.schedule(new AccessTask(), 0, 60*60*1000L); } }
2.Timer schedule方法中的第一個引數為 TimerTask的子類,並重寫run()方法
package hdfs_log; import java.io.File; import java.io.FilenameFilter; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.TimerTask; import java.util.UUID; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; public class AccessTask extends TimerTask { @Override public void run() { // TODO Auto-generated method stub //構造一個log4j日誌物件 Logger logger = Logger.getLogger("logRollingFile"); //獲取日誌採集日期,用於命名 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH"); String date = sdf.format(new Date()); //建立本地日誌源目錄,通過檔案過濾實現特定檔案提取,儲存在檔案列表中 File srcDir = new File("f:/logs/get_log/"); File[] srcList = srcDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { // 提取以get_log為字首的日誌檔案 if(name.startsWith("get_log.")) { return true; }else { return false; } } }); //輸出日誌資訊 logger.info("探測到如下檔案需要採集" + Arrays.toString(srcList)); //移動檔案至待上傳臨時目錄 try { File toupload = new File("f:/logs/toupload/"); for(File srcfile: srcList) { FileUtils.moveFileToDirectory(srcfile, toupload, true); } //輸出日誌資訊 logger.info("檔案上傳至臨時目錄:" + toupload.getAbsolutePath()); //建立hdfs訪問,將臨時目錄內檔案上傳至hdfs檔案系統 FileSystem fs = FileSystem.get(new URI("hdfs://192.168.235.143:9000/"), new Configuration(), "hadoop000"); File[] toupload_List = toupload.listFiles(); //判斷hdfs和備份目錄是否存在,若不存在則建立 Path hdfsDir = new Path("/access_log/logs4" + date); if(!fs.exists(hdfsDir)){ fs.mkdirs(hdfsDir); } File backupDir = new File("f:/logs/backup" + date); if(!backupDir.exists()) { backupDir.mkdirs(); } //上傳檔案至hdfs檔案 for(File load_file:toupload_List) { Path hdfs_dst = new Path("/access_log/logs4"+date+"/get_log_"+UUID.randomUUID()+".log"); fs.copyFromLocalFile(new Path(load_file.getAbsolutePath()), hdfs_dst); //輸出日誌資訊 logger.info("日誌上傳至HDFS檔案目錄:" + hdfs_dst); //將檔案移動至備份檔案目錄,輸出日誌資訊 FileUtils.moveFileToDirectory(load_file, backupDir, true); logger.info("日誌備份至如下檔案目錄:" + backupDir); } fs.close(); }catch(Exception e){ e.printStackTrace(); } } }
3.執行結果
輸出日誌資訊:
2018-八月-24 11:23:16-[TS] INFO Timer-0 logRollingFile - 探測到如下檔案需要採集[f:\logs\get_log\get_log.1, f:\logs\get_log\get_log.3]
2018-八月-24 11:23:16-[TS] INFO Timer-0 logRollingFile - 檔案上傳至臨時目錄:f:\logs\toupload
2018-八月-24 11:23:22-[TS] INFO Timer-0 logRollingFile - 日誌上傳至HDFS檔案目錄:/access_log/logs42018-08-24-11/get_log_787bb116-6d14-4cd3-bddf-a33e28740aec.log
2018-八月-24 11:23:22-[TS] INFO Timer-0 logRollingFile - 日誌備份至如下檔案目錄:f:\logs\backup2018-08-24-11
2018-八月-24 11:23:22-[TS] INFO Timer-0 logRollingFile - 日誌上傳至HDFS檔案目錄:/access_log/logs42018-08-24-11/get_log_f75f220a-3202-4b2f-be1f-f5c4c3a06095.log
2018-八月-24 11:23:22-[TS] INFO Timer-0 logRollingFile - 日誌備份至如下檔案目錄:f:\logs\backup2018-08-24-11
相關參考:小牛學堂-大資料基礎教程