1. 程式人生 > >Hadoop:本地檔案(window系統)定時獲取檔案並上傳至HDFS檔案(兩個虛擬機器)系統 Java 實現

Hadoop:本地檔案(window系統)定時獲取檔案並上傳至HDFS檔案(兩個虛擬機器)系統 Java 實現

實現功能:定時日誌採集並上傳至HDFS檔案系統的Java API實現

環境+工具:windows  +  虛擬機器Centos * 2  +  eclipse  +  windows下編譯的Hadoop jar包  +  Hadoop叢集

一、流程

       1)啟動一個定時任務,規劃各種路徑

——定時探測日誌源目錄 (本地目錄) F:/logs/get_log/ ;

——獲取需要採集的檔案 (檔案過濾器,比如以get_log開頭);

——移動這些檔案到一個待上傳臨時目錄  F:/logs/uptoload/ ;

——遍歷臨時檔案目錄,逐一傳輸至HDFS的目標路徑(若不存在則建立) /access_log/logs_日期/

       上傳以後HDFS檔案系統中重新命名的字首:get_log_     ;   檔案的字尾:.log

——同時將傳輸完成的檔案移動到備份目錄(若不存在則建立) F:/logs/backup_日期/

啟動第二個定時任務

        2)採用log4j輸出檔案採集日誌

實現程式碼:

1.建立LogAccess類

package hdfs_log;

import java.util.Timer;
import java.util.TimerTask;

public class LogAccess {
	public static void main(String[] args) {
		//定義一個定時器物件,每隔1小時執行一次日誌獲取任務
		Timer tm = new Timer();
		//繼承TimerTask類,重新run方法
		tm.schedule(new AccessTask(), 0, 60*60*1000L);
		
	}

}

2.Timer schedule方法中的第一個引數為 TimerTask的子類,並重寫run()方法

package hdfs_log;

import java.io.File;
import java.io.FilenameFilter;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.TimerTask;
import java.util.UUID;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;


public class AccessTask extends TimerTask {

	@Override
	public void run() {
		// TODO Auto-generated method stub
		//構造一個log4j日誌物件
		Logger logger = Logger.getLogger("logRollingFile");
		//獲取日誌採集日期,用於命名
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd-HH");
		String date = sdf.format(new Date());
		
		//建立本地日誌源目錄,通過檔案過濾實現特定檔案提取,儲存在檔案列表中
		File srcDir = new File("f:/logs/get_log/");
		File[] srcList = srcDir.listFiles(new FilenameFilter() {
			
			@Override
			public boolean accept(File dir, String name) {
				// 提取以get_log為字首的日誌檔案
				if(name.startsWith("get_log.")) {
					return true;
				}else {
					return false;
				}
				
			}
		});
		//輸出日誌資訊
		logger.info("探測到如下檔案需要採集" + Arrays.toString(srcList));
		//移動檔案至待上傳臨時目錄
		try {
		File toupload = new File("f:/logs/toupload/");
		for(File srcfile: srcList) {
			FileUtils.moveFileToDirectory(srcfile, toupload, true);
			}
		//輸出日誌資訊
		logger.info("檔案上傳至臨時目錄:" + toupload.getAbsolutePath());
			
		//建立hdfs訪問,將臨時目錄內檔案上傳至hdfs檔案系統
		FileSystem fs = FileSystem.get(new URI("hdfs://192.168.235.143:9000/"), new Configuration(), "hadoop000");	
		File[] toupload_List = toupload.listFiles();
		//判斷hdfs和備份目錄是否存在,若不存在則建立
		Path hdfsDir = new Path("/access_log/logs4" + date);
		if(!fs.exists(hdfsDir)){
			fs.mkdirs(hdfsDir);
		}
		File backupDir = new File("f:/logs/backup" + date);
		if(!backupDir.exists()) {
			backupDir.mkdirs();
		}
		
		//上傳檔案至hdfs檔案
		for(File load_file:toupload_List) {
			Path hdfs_dst = new Path("/access_log/logs4"+date+"/get_log_"+UUID.randomUUID()+".log");
			fs.copyFromLocalFile(new Path(load_file.getAbsolutePath()), hdfs_dst);
			//輸出日誌資訊
			logger.info("日誌上傳至HDFS檔案目錄:" + hdfs_dst);
			//將檔案移動至備份檔案目錄,輸出日誌資訊
			FileUtils.moveFileToDirectory(load_file, backupDir, true);
			logger.info("日誌備份至如下檔案目錄:" + backupDir);
			}	
		fs.close();
				
		}catch(Exception e){
			e.printStackTrace();
		}
	}

}

3.執行結果

輸出日誌資訊: 

2018-八月-24 11:23:16-[TS] INFO Timer-0 logRollingFile - 探測到如下檔案需要採集[f:\logs\get_log\get_log.1, f:\logs\get_log\get_log.3]
2018-八月-24 11:23:16-[TS] INFO Timer-0 logRollingFile - 檔案上傳至臨時目錄:f:\logs\toupload
2018-八月-24 11:23:22-[TS] INFO Timer-0 logRollingFile - 日誌上傳至HDFS檔案目錄:/access_log/logs42018-08-24-11/get_log_787bb116-6d14-4cd3-bddf-a33e28740aec.log
2018-八月-24 11:23:22-[TS] INFO Timer-0 logRollingFile - 日誌備份至如下檔案目錄:f:\logs\backup2018-08-24-11
2018-八月-24 11:23:22-[TS] INFO Timer-0 logRollingFile - 日誌上傳至HDFS檔案目錄:/access_log/logs42018-08-24-11/get_log_f75f220a-3202-4b2f-be1f-f5c4c3a06095.log
2018-八月-24 11:23:22-[TS] INFO Timer-0 logRollingFile - 日誌備份至如下檔案目錄:f:\logs\backup2018-08-24-11

相關參考:小牛學堂-大資料基礎教程