1. 程式人生 > >MapReduce讀取txt檔案儲存至HBase,以檔名作Key,整個檔案內容作Value

MapReduce讀取txt檔案儲存至HBase,以檔名作Key,整個檔案內容作Value

把已抓取好的網路輿情資訊(以txt形式存放),儲存到HBase中,再進行資訊分析。

要求:

以檔名作Key,整個檔案內容作Value

思路:

txt檔案先上傳到HDFS中,再使用HBase MapReduce將檔案寫入HBase中。(很簡單的思路)

問題分析:

首先必須分析到的問題是,如何讀取解析txt檔案,TextInputFormat是預設的檔案解析類,但此處顯然無法滿足要求。因此,必須得自定義檔案解析類(當然繼承自FileInputFormat)。

本文給出 讀取解析整個檔案的自定義FileInputFormat——WholeFileInputFormat,最終可實現以檔名作Key,整個檔案內容作Value,儲存至HBase。

package file;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class FileInputFormatMake {
	
	
	public static void main(String arge[]) throws IOException, InterruptedException, ClassNotFoundException{
		Configuration conf =new Configuration();
		//設定zookeeper
		conf.set("hbase.zookeeper.quorum", "hadoop00");
		//設定hbase表名稱
		conf.set(TableOutputFormat.OUTPUT_TABLE, "tool");
		//將該值改大,防止hbase超時退出
		conf.set("dfs.socket.timeout", "180000");
		Job job = new Job(conf,"HBaseBatchImport");
		job.setMapperClass(BatchImportMapper.class);
		job.setReducerClass(BatchImportReducer.class);
		job.setNumReduceTasks(1);
		//設定map的輸出,不設定reduce的輸出型別
		job.setMapOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setInputFormatClass(WholeFileInputFormat.class);
		//不設定輸出路徑,設定輸出格式型別
		job.setOutputFormatClass(TableOutputFormat.class);
		FileInputFormat.setInputPaths(job, "hdfs://hadoop00:9000/tool");//tool資料夾下有多個檔案
		job.waitForCompletion(true);
		
		
	}
	
	
	static class BatchImportMapper extends Mapper<Text,Text,Text,Text>{
		@Override
		protected void map(Text key, Text value, Context context) {
					try {
						context.write(key, value);
					} catch (IOException e) {
						e.printStackTrace();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
		}
		
	}
	
	
	static class BatchImportReducer extends TableReducer<Text,Text,NullWritable>{
		@Override
		protected void reduce(Text k2, Iterable<Text> v2,Context context)
				throws IOException, InterruptedException {
					for(Text text:v2){
						Put put = new Put(Bytes.toBytes(k2.toString()));
						put.add(Bytes.toBytes("context"), Bytes.toBytes("value"), Bytes.toBytes(text.toString()));
						context.write(NullWritable.get(), put);
					}
		
		}
		
	}
	
	
	
	static class WholeFileInputFormat extends FileInputFormat{
		@Override
		protected boolean isSplitable(JobContext context, Path filename) {
			return false;
		}
		@Override
		public RecordReader createRecordReader(InputSplit split,
				TaskAttemptContext context) throws IOException,
				InterruptedException {
			
			return new WholeFileRecordReader();
		}
	}
	static  class WholeFileRecordReader extends RecordReader{
		private FileSplit fileSplit;
		private FSDataInputStream fis;
		private Text key = null;
		private Text value = null;
		
		private boolean processed = false;
		@Override
		public void initialize(InputSplit inputSplit, TaskAttemptContext context)
				throws IOException, InterruptedException {
			fileSplit = (FileSplit)inputSplit;
			Configuration job = context.getConfiguration();
		    Path file = fileSplit.getPath();
		    FileSystem fs = file.getFileSystem(job);
		    fis = fs.open(file);
			
		}

		@Override
		public boolean nextKeyValue() throws IOException, InterruptedException {
			
			if(key == null){
				key = new Text();
			}
			if(value == null){
				value = new Text();
			}
			if(!processed){
				byte[] content = new byte[(int)fileSplit.getLength()];
				Path file = fileSplit.getPath();
				System.out.println(file.getName());
				key.set(file.getName());
				
				org.apache.hadoop.io.IOUtils.readFully(fis, content, 0, content.length);
				String sendString=new String(  content , "ISO-8859-1" );
				System.out.println(sendString);
				value.set(new Text(sendString));
			processed = true;
			return true;
			}
			return false;
		}

		@Override
		public Text getCurrentKey() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return this.key;
		}

		@Override
		public Text getCurrentValue() throws IOException,
				InterruptedException {
			
			return this.value;
		}

		@Override
		public float getProgress() throws IOException, InterruptedException {
			// TODO Auto-generated method stub
			return processed ? fileSplit.getLength() : 0;
		}

		@Override
		public void close() throws IOException {
			
			
		}
		
	}
	
	

}


相關推薦

MapReduce讀取txt檔案儲存HBase名作Key整個檔案內容Value

把已抓取好的網路輿情資訊(以txt形式存放),儲存到HBase中,再進行資訊分析。 要求: 以檔名作Key,整個檔案內容作Value 思路: txt檔案先上傳到HDFS中,再使用HBase MapReduce將檔案寫入HBase中。(很簡單的思路) 問題分析:

C++讀取txt儲存txt

  哇,今天又重新用C++來寫了一些程式碼發現自己竟然在類的使用和檔案讀取和儲存上面特別頭疼,於是,各種問度娘+各種翻看之前的程式碼。不禁感嘆,自己的程式碼還是寫的太少了,對這些一點都不熟悉。於是,今晚!一定!要!好好!總結!提升!   首先,類的使用方式: 2 Walking *a =

C++讀寫檔案儲存容器list中

C++讀寫檔案及容器list基本操作       大家在開始入門C/C++時,都要練習個學生管理系統啥的,主要都為了進一步掌握所學知識,並能使用這些知識。其中這個小專案的重難點就在資料的操作了,其中如

android 將res/raw下的檔案儲存SD卡

public void saveToSDCard(String name) throws Throwable { InputStream inStream = context.getResources().openRawResource(R.raw.beep

“該檔案包含不能在當前內碼表(936)中表示的字元請將該檔案儲存為 Unicode 格式防止資料丟失”

這個警告怎麼破?其實很簡單: 以VS2012為例,去除方法見下: ------------------------------------------- 影象處理開發資料、影象處理開發需求、

Java從網路讀取圖片並儲存本地

一、js程式碼: 程式碼如下: /**  * 點選下載當前圖片  *   */ &

opencv3.3 該檔案包含不能在當前內碼表(936)中表示的字元。請將該檔案儲存為 Unicode 格式防止資料丟失

VS2015 + opencv3.3 執行報錯: warning C4819: 該檔案包含不能在當前內碼表(936)中表示的字元。請將該檔案儲存為 Unicode 格式以防止資料丟失  error C2065: “ptr”: 未宣告的識別符號 error C2065: “ptr”:

VS2017 報錯該檔案包含不能在當前內碼表(936)中表示的字元。請將該檔案儲存為 Unicode 格式防止資料丟失

尤其程式碼是從linux平臺複製過來: 報錯如圖: 更有甚者基本函式都報錯: 當下檢查發現if else break case等基本函式並無問題時,報錯行數明顯不一致等一定要注意文件編碼格式, 最簡單的辦法是用notepad++,逐個將.

將需要書寫的內容追加的方式寫到檔案

public void writeScriptLog(String writeInfo,Boolean isError) { File folder =new File("目錄"); try { if (!folder.exists()) { folder.mkdirs(

遍歷資料夾目錄一定條件刪除指定型別檔案

背景: 刪除 bmp格式檔案 (建立日期超過當天的時間) 示例程式碼如下: #include<iostream> #include<io.h> #include<time.h> #include<string> #inclu

《 warning C4819: 該檔案包含不能在當前內碼表(936)中表示的字元。請將該檔案儲存為 Unicode 格式防止資料丟失》

問題描述 最近專案中添加了很多外部的.h和.cpp檔案,有可能是編碼格式不一樣,在生成解決方案時,輸出窗口出現了好多的warning C4819警告資訊,具體情況如下所示: warning C4819: 該檔案包含不能在當前內碼表(936)中表示的字元。 請將該檔案儲存為

檔案儲存到資料庫(二進位制流的形式)

Hibernate方法 HibernateUtils.java package yang.fang.hibernate; import org.hibernate.Session; import org.hibernate.SessionFactory; import

VS2017 warning C4819: 該檔案包含不能在當前內碼表(936)中表示的字元。請將該檔案儲存為 Unicode 格式防止資料丟失

Visual Studio 2017出現warning C4819: 該檔案包含不能在當前內碼表(936)中表示的字元。請將該檔案儲存為 Unicode 格式以防止資料丟失 解決方案: 1.修改字元編碼格式 Visual Studio提供高階儲存選項功能,

家為家鄉為鄉國為國天下為天下

內地的網友都知道像Fackbook、Twitter、Youtube這類網站,我們是無法訪問的。要想瀏覽這類網站需要使用國外代理IP或者VPN。今天給大家介紹一款免費且不限制流量的VPN工具——VPN Gate VPN Gate 是日本國立筑波大學的一項學術研究、實驗計劃,主要目的為「推廣全球分散式公共

ASP.NET Core應用針對靜態檔案請求的處理[1]: Web的形式釋出靜態檔案

雖然ASP.NET Core是一款“動態”的Web服務端框架,但是在很多情況下都需要處理針對靜態檔案的請求,最為常見的就是這對JavaScript指令碼檔案、CSS樣式檔案和圖片檔案的請求。針對不同格式的靜態檔案請求的處理,ASP.NET Core為我們提供了三個中介軟體,它們將是本系列文章論述的重點。不過在

C語言空格為分割回車為結束標記輸入一串數字到一個int陣列中

分析:例如 int a[100]; 輸入:1,2,3,4,5,6(回車換行) 結果:陣列的0-5號元素分別是上述值,打印出來,之後程式結束。 難點:獲取int型的數字輸入不難,scanf("%d",&n);這個只能得到int值,會自動濾出空格和回車換行,如何判斷輸入

澳斯康生物完成4500萬美元的A輪融資擴大一體化CMC並在中國建設世界上最大的細胞培養生產廠

加州THOUSAND OAKS和上海 -- (美國商業資訊) -- 澳斯康生物製藥(海門)有限公司(Thousand Oaks Biopharmaceuticals, Inc.,“TOBIO”)是一家領先的一體化全球性CMC組織,擁有GMP細胞培養基和生物製造能力,該公司今天宣佈已完成4500萬

docker+lnmp 報錯小結laradock為例其它同理

用vagrant + centos7 + lnmp開發都快兩年,這是一個很好用的本地開發環境。對於我來說,它更像是一臺真正的linux電腦,能真正直接把握它的每一處地方。而且現在記憶體都普遍8G以上的本子,加上SSD真的是毫無壓力。 但時間一長,暴露出很

C++ 輸入一行數字或字串(未知個數)空格間格換行後結束輸入並輸出

說明:通過getchar()來判斷是否是\n來終止輸入 方法一: 一行中輸入多個數字並以空格間隔,通過int陣列儲存 #include<iostream> #include<string.h> #include<stdio.h> u