1. 程式人生 > >HBase與MapReduce整合2-Hdfs2HBase

HBase與MapReduce整合2-Hdfs2HBase

 2)File中解析資料到HBase表中(import)

  Hdfs2HBase
檔案格式的資料->HBase表中
Mapreduce
* input: hdfs files  
  Mapper:OutputKey/OutputValue

* output: hbase table 

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
 需求:
    File中解析資料到HBase表中(import)
			Mapreduce
		 * input: hdfs files  
		   Mapper:OutputKey/OutputValue
		 * output: hbase table 
 */
public class HDFS2UserMapReduce extends Configured implements Tool {

	// Mapper Class
	// Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT>
	public static class ReadHDFSMapper extends Mapper<LongWritable, Text,Text,Put> {
		private Text mapOutputKey = new Text();
		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String lineValue = value.toString();
			String values[] =lineValue.split("\t");
			Put put = new Put(Bytes.toBytes(values[0]));
			for(int i=0;i<values.length;i++)
			{
				if(i==0)
				{
					mapOutputKey.set(values[0]);
					continue;
				} 
			  	String column =values[i];
			  	String cf =column.substring(0, column.indexOf(':'));
			  	String cq =column.substring(column.indexOf(':')+1, column.indexOf('='));
			  	String val =column.substring(column.indexOf('=')+1, column.length());
			    put.add(Bytes.toBytes(cf),
			    		Bytes.toBytes(cq),
			      		Bytes.toBytes(val));
			}
			context.write(mapOutputKey, put);
		}

	}
	// Reducer Class
	public static class WriteBasicReducer extends
			TableReducer<Text, Put, ImmutableBytesWritable> {

		@Override
		protected void reduce(Text key, Iterable<Put> values, Context context)
				throws IOException, InterruptedException {
			for(Put put : values){
				context.write(null, put);
			}
		}

	}

	// Driver
	public int run(String[] args) throws Exception {
	     if (args.length < 2) {
		      usage("Wrong number of arguments: " + args.length);
		      System.exit(-1);
		 }
		
		// create job
		Job job = Job.getInstance(this.getConf(),//
				this.getClass().getSimpleName());
		
		// set run job class
		job.setJarByClass(this.getClass());
		
		job.setMapperClass(ReadHDFSMapper.class);
	    
		//input format
		job.setInputFormatClass(TextInputFormat.class);
		Path inputPath = new Path(args[1]);
		FileInputFormat.addInputPath(job, inputPath);
		
		TableMapReduceUtil.initTableReducerJob(
		  args[0],        // output table
		  WriteBasicReducer.class,    // reducer class
		  job//
		);
		job.setNumReduceTasks(0);   // at least one, adjust as required
		boolean b = job.waitForCompletion(true);
		if (!b) {
		  throw new IOException("error with job!");
		}
		return 0;
	}

	  /*
	   * @param errorMsg Error message.  Can be null.
	   */
	  private static void usage(final String errorMsg) {
	    if (errorMsg != null && errorMsg.length() > 0) {
	      System.err.println("ERROR: " + errorMsg);
	    }
	    System.err.println("Usage: User2HDFSMapReduce <tablename> <inputputdir> ");
	    System.err.println("Examples:  user hdfs://172.27.35.8:8020/user/hadoop/export/user ");
	
	  }

	public static void main(String[] args) throws Exception {
		// get configuration
		Configuration conf = HBaseConfiguration.create();
		
		// submit job
		int status = ToolRunner.run(//
			conf, //
			new HDFS2UserMapReduce(), //
			args//
		);
				
		// exit program
		System.exit(status);
	}
}


相關推薦

HBaseMapReduce整合2-Hdfs2HBase

 2)File中解析資料到HBase表中(import)  Hdfs2HBase 檔案格式的資料->HBase表中Mapreduce* input: hdfs files    Mapper:OutputKey/OutputValue* output: hbase t

HBaseMapReduce整合操作

1、目的:將HBase中stu_info表中的name放到表user_info中 2、TestHbaseMapper: package com.zzw.hbase.mapreduce; import java.io.IOException; import org.apache.had

HBase權威指南學習記錄(五、hbaseMapReduce整合

新增依賴: <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifact

HbaseMapreduce整合的案例

【需求】將info列簇中的name這一列匯入到另外一張表中去 建表: create 'test:stu_info','info','degree','work' 插入資料:6個rowkey 3個列簇 put 'test:stu_info','20170222_10001',

HBase新版本MapReduce整合

1.MapReduce從hbase讀取資料 //讀取hbase表資料 public class HbaseAndMapReduce { public static void main(String[] args) throws Exception

hbaseflume整合程式設計

1、官網下載src包,解壓,需要匯入的——》flume-ng-sinks——》flume-ng-hbase-sink 2、編輯SimpleAsyncHbaseEventSerializer:複製一份重新命名為MySimpleAsyncHbaseEventSerializer

hbasemapreduce同時執行的問題

      在hbase資料寫入和mapreduce同時執行時出現hbase regionserver掛掉的問題,同時hdfs上的檔案塊出現miss。      目前看來mapreduce和hbase同時執行時出現的一個問題就是記憶體競爭,hbase的regionserver

SSM整合(2): spring mybatis 整合

分享 eth point names space json cal 返回 autowired 在進行完spring與springmvc整合之後, 繼續 spring與mybatis的整合. 既然是操作數據庫, 那必然不能缺少了連接屬性 一. db.properties j

HBaseSqoop的整合

之前學習Sqoop的時候都是Hadoop,Hive和RDBMS之間進行資料的匯入與匯出,並沒有與HBase整合,下面就來講解HBase與Sqoop的整合 需求: 利用 Sqoop 在 HBase 和 RDBMS 中進行資料的轉儲,將 RDBMS(Mysql) 中的資料抽取到 HBase 中

HBaseHive的整合案例二

1.案例需求 在 HBase 中已經儲存了某一張表hbase_hive,然後在Hive中建立一個外部表來關聯HBase中的hbase_hive這張表,使之可以藉助 Hive來分析 HBase 這張表中的資料,案例二是緊接著案例一進行的,所以在做案例二之前應該先進行案例一 2.在Hive中建立

HBaseHive的整合案例一

1.Hive與HBase的對比    Hive     1)資料倉庫 Hive 的本質其實就相當於將 HDFS 中已經儲存的檔案在 Mysql 中做了一個雙射關係,以方便使用 HQL 去管理查詢     2)用於資料分析、清洗

SpringMVC框架(1)之(1.2 入門程式—SpringMVCMybatis整合

一、整合思路: 1. jar包: mybatis包、spring包、mybatis和spring整合包、資料庫驅動包、日誌包; 2. Spring管理: SpringMVC中編寫的 Handler(即Controller)、Mybatis的 SqlSessionFactory

hue(2):HueHadoop整合

一、實現功能 hue介面可以瀏覽、更改、新增hdfs上的檔案,從而,便於更多人員操作大資料平臺。 二、實現步驟 1.hadoop配置 (1)配置etc/Hadoop/hdfs-site.xml <property> <name>dfs.webhdfs.

整理不易,且整且珍惜 2.開發環境的搭建 3.Eclipse的相關配置 4.使用maven建立web專案 5.Spring+Mybatis+SpringMVC整合 6.mybatis自動生成程式碼 7.springmybatis整合中Junit的測試 8.maven專案的啟動 9.Restful

整理不易,且整且珍惜 2.開發環境的搭建 3.Eclipse的相關配置 4.使用maven建立web專案 5.Spring+Mybatis+SpringMVC整合 6.mybatis自動生成程式碼

安裝Kibana6.3.2Elasticsearch6.3.2整合

1、下載官網kibana-6.3.2-linux-x86_64.tar.gz檔案 #下載 wget https://artifacts.elastic.co/downloads/kibana/kibana-6.3.2-linux-x86_64.tar.gz #解壓 tar

HBase-Hive的區別、Sqoop的整合

1、HBase 與 Hive 的對比 Hive: 1)、資料倉庫 Hive 的本質其實就相當於將 HDFS 中已經儲存的檔案在 Mysql 中做了一個雙射關係,以方 便使用 HQL 去管理查詢。 2)、用於資料分析、清洗 Hive 適用於離線的資料分析和清洗,延遲較高。 3)

Apache2.2以上版本Tomcat整合的好方法

今天技術總監叫實現Apache和Tomacat整合,在網上找了很多資料,結果發現還是這個最方便! 下面是實現2個tomcat實現負載均衡,如果只有一個則可刪除一個 apache2.2以上版本,無需使用jk_mod來整合tomcat,直接使用ajp,很方便。 修改ap

ExtJS3.0KindEditor4.1.2整合

ExtJS與KindEditor整合的方式。 /** * 將KindEditor4.1.2 功能封裝到名稱空間“KE“。 * @author shuyuan */ Ext.namespace("KE"); KE.app = (function() { return

QuartzSpring整合2種方式

Quartz與Spring整合有2種方式: 1.MethodInvokeJobDetailFactoryBean 2.JobDetailBean 下面分別介紹這兩種方式。 1.MethodInvokeJobDetailFactoryBean 1)建立一

Spring Cloud Sleuth (2)-ELK整合

通過我們已經引入了Sleuth的基礎模組完成一次任務鏈的跟蹤,但是由於日誌檔案都離散的儲存在各個微服務結點上,日常運維時僅僅通過檢視日誌檔案來分析定位問題還是一件繁瑣的問題。所以我們需要一些工具來收集、儲存、分析和展示日誌資訊,例如ELK(ElasticSearch、Log