1. 程式人生 > >MapReduce功能實現四---小綜合(從hbase中讀取資料統計並在hdfs中降序輸出Top 3)

MapReduce功能實現四---小綜合(從hbase中讀取資料統計並在hdfs中降序輸出Top 3)

MapReduce功能實現系列:

方法一:

在Hbase中建立相應的表1:

create 'hello','cf'
put 'hello','1','cf:hui','hello world'
put 'hello','2','cf:hui','hello hadoop'
put 'hello','3','cf:hui','hello hive'
put 'hello','4','cf:hui','hello hadoop'
put 'hello','5','cf:hui','hello world'
put 'hello','6','cf:hui','hello world'
put 'hello','7','cf:hui','hbase hive'

java程式碼:
import java.io.IOException;
import java.util.Comparator;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HbaseTopJiang1 {  
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
	    String tablename = "hello";
	    Configuration conf = HBaseConfiguration.create();
	    conf.set("hbase.zookeeper.quorum", "h71");
	    Job job = new Job(conf, "WordCountHbaseReader");
	    job.setJarByClass(HbaseTopJiang1.class);
	    Scan scan = new Scan();
	    TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);
	    job.setReducerClass(WordCountHbaseReaderReduce.class);
	    FileOutputFormat.setOutputPath(job, new Path(args[0]));
	    MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }  
      
    public static class doMapper extends TableMapper<Text, IntWritable>{  
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text(); 
        @Override  
        protected void map(ImmutableBytesWritable key, Result value,  
                Context context) throws IOException, InterruptedException { 
        	/*不進行分隔,將value整行全部獲取
			String rowValue = Bytes.toString(value.list().get(0).getValue());
          	context.write(new Text(rowValue), one);
        	*/
        	String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
     	    for (String str: rowValue){
    		   word.set(str);
    		   context.write(word,one);
    	    }
        }  
    }  
    
    public static final int K = 3; 
    public static class WordCountHbaseReaderReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        //定義treeMap來保持統計結果,由於treeMap是按key升序排列的,這裡要人為指定Comparator以實現倒排
        private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {
            @Override
            public int compare(Integer x, Integer y) {
                return y.compareTo(x);
            }
        });
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //reduce後的結果放入treeMap,而不是向context中記入結果
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            if (treeMap.containsKey(sum)){
                String value = treeMap.get(sum) + "," + key.toString();
                treeMap.put(sum,value);
            }else {
                treeMap.put(sum, key.toString());
            }
			if(treeMap.size() > K) {
				treeMap.remove(treeMap.lastKey());
			}  
        }
        protected void cleanup(Context context) throws IOException, InterruptedException {
            //將treeMap中的結果,按value-key順序寫入contex中
            for (Integer key : treeMap.keySet()) {
                context.write(new Text(treeMap.get(key)), new IntWritable(key));
            }
        }
    }
}  

在Linux中執行該程式碼:
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang1.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang1*class
[[email protected] q1]$ hadoop jar xx.jar HbaseTopJiang1 /out

[[email protected] q1]$ hadoop fs -cat /out/part-r-00000
hello   6
world   3
hadoop,hive     2

方法二:


truncate 'hello'
put 'hello','1','cf:hui','hello world	world'
put 'hello','2','cf:hui','hello hadoop	hadoop'
put 'hello','3','cf:hui','hello hive	hive'
put 'hello','4','cf:hui','hello hadoop	hadoop'
put 'hello','5','cf:hui','hello world	world'
put 'hello','6','cf:hui','hello world	world'
put 'hello','7','cf:hui','hbase hive	hive'
注意:相同單詞之間的分隔符是"/t"(Tab鍵),結果hbase中插入資料的時候根本就不能插入製表符,所以該方法破產,可以參考一下思想

java程式碼:
import java.io.IOException;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class HbaseTopJiang2{
	public static class doMapper extends TableMapper<Text, IntWritable>{  
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text(); 
		@Override  
		protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { 
        	/*不進行分隔,將value整行全部獲取
			String rowValue = Bytes.toString(value.list().get(0).getValue());
          	context.write(new Text(rowValue), one);
        	*/
			String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");
			for (String str: rowValue){
				word.set(str);
				context.write(word,one);
			}
		}  
	}   

	public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	@Override
	public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		int total=0;
		for (IntWritable val : values){
			total++;
		}
		context.write(key, new IntWritable(total));
		}   
	}
	   
	public static final int K = 3;  
	/**
	* 把上一個mapreduce的結果的key和value顛倒,調到後就可以按照key排序了。
	*/
	public static class KMap extends Mapper<LongWritable,Text,IntWritable,Text> {
		TreeMap<Integer, String> map = new TreeMap<Integer, String>();
		@Override
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			String line = value.toString();
			String result[] = line.split("\t");
			StringBuffer hui = null;
			if(result.length > 2){	//我怕在往hbase表輸入資料時帶\t分隔符的,後來發現hbase中插入資料的時候根本就不能插入製表符
				for(int i=0;i<result.length-2;i++){
					hui=new StringBuffer().append(result[i]);
				}
			}else{
				hui = new StringBuffer().append(result[0]);
			}
			if(line.trim().length() > 0 && line.indexOf("\t") != -1) {
				String[] arr = line.split("\t", 2);
				String name = arr[0];  
				Integer num = Integer.parseInt(arr[1]);  
		        if (map.containsKey(num)){
		            String value1 = map.get(num) + "," + hui;
		            map.put(num,value1);
		        }
		        else {
		            map.put(num, hui.toString());
		        }
				if(map.size() > K) {
					map.remove(map.firstKey());  
				}  
			}  
		}  
		@Override
		protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context)  
				throws IOException, InterruptedException {  
			for(Integer num : map.keySet()) {  
				context.write(new IntWritable(num), new Text(map.get(num)));
			}  
		}  
	} 
	
	/**
	* 按照key的大小來劃分區間,當然,key是int值
	*/
	public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> {
		@Override
		public int getPartition(K key, V value, int numReduceTasks) {
			/**
			 * int值的hashcode還是自己本身的數值
			 */
			//這裡我認為大於maxValue的就應該在第一個分割槽
			int maxValue = 50;
			int keySection = 0;
			// 只有傳過來的key值大於maxValue 並且numReduceTasks比如大於1個才需要分割槽,否則直接返回0
			if (numReduceTasks > 1 && key.hashCode() < maxValue) {
				int sectionValue = maxValue / (numReduceTasks - 1);
				int count = 0;
				while ((key.hashCode() - sectionValue * count) > sectionValue) {
					count++;
				}
				keySection = numReduceTasks - 1 - count;
			}
			return keySection;
		}
	}
		    
	/**
	* int的key按照降序排列
	*/
	public static class IntKeyDescComparator extends WritableComparator {
		protected IntKeyDescComparator() {
			super(IntWritable.class, true);
		}
		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			return -super.compare(a, b);
		}
	}
		    
	/**
	* 把key和value顛倒過來輸出
	*/
	public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> {
		private Text result = new Text();
		@Override
		public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
			for (Text val : values) {
				result.set(val.toString());
				context.write(result, key);
			}
		}
	}
		
	public static void main(String[] args) throws Exception {
		String tablename = "hello";
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.zookeeper.quorum", "h71");
		Job job1 = new Job(conf, "WordCountHbaseReader");
		job1.setJarByClass(HbaseTopJiang2.class);
		Scan scan = new Scan();
		TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job1);
		job1.setReducerClass(WordCountReducer.class);
		FileOutputFormat.setOutputPath(job1, new Path(args[0]));
		MultipleOutputs.addNamedOutput(job1, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);
	 	   	
		Job job2 = Job.getInstance(conf, "Topjiang");
		job2.setJarByClass(HbaseTopJiang2.class);
		job2.setMapperClass(KMap.class);
		job2.setSortComparatorClass(IntKeyDescComparator.class);
		job2.setPartitionerClass(KeySectionPartitioner.class);
		job2.setReducerClass(SortIntValueReduce.class);
		job2.setOutputKeyClass(IntWritable.class);
		job2.setOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job2, new Path(args[0]));
		FileOutputFormat.setOutputPath(job2, new Path(args[1]));

		//提交job1及job2,並等待完成
		if (job1.waitForCompletion(true)) {
			System.exit(job2.waitForCompletion(true) ? 0 : 1);
		}
	}
}
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang2.java
[[email protected] q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang2*class
[[email protected] q1]$ hadoop jar xx.jar HbaseTopJiang2 /out /output

[[email protected] q1]$ hadoop fs -ls /out
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 19:02 /out/_SUCCESS
-rw-r--r--   2 hadoop supergroup         32 2017-03-18 19:02 /out/part-r-00000
[[email protected] q1]$ hadoop fs -ls /output
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 19:02 /output/_SUCCESS
-rw-r--r--   2 hadoop supergroup         25 2017-03-18 19:02 /output/part-r-00000

理想結果:
[[email protected] q1]$ hadoop fs -cat /out/part-r-00000
hbase1
hadoophadoop 2
hello6
hivehive 2
worldworld 3
[[email protected] q1]$ hadoop fs -cat /output/part-r-00000
hello6
worldworld 3
hadoophadoop,hive hive 2
(分隔符都為製表符)

我發現製表符(Tab鍵)從UltraEdit複製到SecureCRT正常,而從SecureCRT複製到UltraEdit則製表符會變成空格,也是醉了。。。

相關推薦

MapReduce功能實現---綜合(hbase讀取資料統計hdfs輸出Top 3)

MapReduce功能實現系列: 方法一: 在Hbase中建立相應的表1: create 'hello','cf' put 'hello','1','cf:hui','hello world' put 'hello','2','cf:hui','h

JAVAExcel讀取資料儲存到資料庫

1.jar包 2.資料庫資訊 3.JDBC連線資料庫工具類 package Test; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedSta

Matlab .fig檔案讀取資料重新繪圖

Matlab提供了強大的函式集合,可以從.fig檔案中讀取圖中的資料,並重新繪製圖形。如果原始資料丟失,我們可以從.fig檔案中恢復原始資料,並基於原始資料做進一步的處理。 以下是一個從兩個不同檔案

MapReduce功能實現一---HbaseHdfs之間資料相互轉換

MapReduce功能實現系列: 一、從Hbase表1中讀取資料再把統計結果存到表2 在Hbase中建立相應的表1: create 'hello','cf' put 'hello','1','cf:hui','hello world' put 'he

hbase表1讀取資料,最終結果寫入到hbase表2 ,如何通過MapReduce實現

需要一: 將hbase中‘student’表中的info:name和info:age兩列資料取出並寫入到hbase中‘user’表中的basic:XM和basic:NL class ReadStudentMapper extends Table

Android 二維碼開發功能實現()------基於Zxing實現編碼功能(生成二維碼,一維碼等)

Android 二維碼開發功能實現(四)------基於Zxing實現編碼功能(生成二維碼,一維碼等) 前言 關於Google的開源庫Zxing,前面也寫了幾遍文章進行介紹.我們先簡單的回顧一下! Android 二維碼的掃碼功能實現(一) 這篇文章主要介紹了,Zxi

MapReduce功能實現

MapReduce功能實現系列: MapReduce功能實現一---Hbase和Hdfs之間資料相互轉換 MapReduce功能實現二---排序 MapReduce功能實現三---Top N MapReduce功能實現四---小綜合(從hbas

HBase匯入資料3:使用MapReduceHDFS或本地檔案讀取資料寫入HBase(增加使用Reduce批量插入)

前面我們介紹了:為了提高插入效率,我們在前面只使用map的基礎上增加使用reduce,思想是使用map-reduce操作,將rowkey相同的項規約到同一個reduce中,再在reduce中構建put物件實現批量插入測試資料如下:注意到有兩條記錄是相似的。package cn

mapreducehbase大量讀資料超時異…

16/05/06 19:56:13 INFO mapreduce.Job: Task Id : attempt_1461653563167_0008_m_000001_2, Status : FAILED Error: org.apache.hadoop.hbase.client.RetriesExhau

MapReduce功能實現六---最大值(Max)、求和(Sum)、平均值(Avg)

MapReduce功能實現系列: 一、最大值(Max) 情況1: [[email protected] q1]$ vi ql.txt aa 111 22 555 [[email protected] q1]$ hadoop fs

HBase建表高階屬性,hbase應用案例看行鍵設計,HBasemapreduce結合,Hbase讀取資料、分析,寫入hdfshdfs讀取資料寫入Hbase,協處理器和二級索引

1. Hbase高階應用 1.1建表高階屬性 下面幾個shell 命令在hbase操作中可以起到很到的作用,且主要體現在建表的過程中,看下面幾個create 屬性 1、 BLOOMFILTER 預設是NONE 是否使用布隆過慮及使用何種方式 布隆

python3 簡單實現csv文件讀取內容,對內容進行分類統計

tmp spa writer ict 打開文件 while 類型 spl blog 新手python剛剛上路,在實際工作中遇到如題所示的問題,嘗試使用python3簡單實現如下,歡迎高手前來優化import csv #打開文件,用with打開可以不用去特意關閉file了

用shell實現一個指令碼,用來同來統計自己某個檔案下的程式碼,總的程式碼行數,總的註釋量,總的空行量?支援遍歷查詢,支援軟連結查詢

[[email protected] yunwei]# cat sum_code_row_version1.4.sh #!/bin/bash # File Name: sum_code_row.sh # Author: Liwqiang # mail: [email

Android Studio平臺下使用hellochart實現txt檔案讀取資料繪折線圖

Android Studio平臺下使用hellochart實現從文字讀取資料繪折線圖 本人是一個剛剛接觸Android不超過兩個月的小白,最近在做的論文是關於這一塊的相關內容。所有的東西都是自學的,聽導師的建議也是第一次留個這樣的資料,可能有很多地方理解不到位,

python實現檔案讀取資料繪製成 x y 軸圖形

import matplotlib.pyplot as plt import numpy as np def readfile(filename):     dataList = []     dataNum = 0     with open(filename,'r')

Kafka系列()Kafka消費者:Kafka讀取資料

本系列文章為對《Kafka:The Definitive Guide》的學習整理,希望能夠幫助到大家應用從Kafka中讀取資料需要使用KafkaConsumer訂閱主題,然後接收這些主題的訊息。在我們深入這些API之前,先來看下幾個比較重要的概念。Kafka消費者相關的概念消

Spark支援種方式資料庫讀取資料

目前Spark支援四種方式從資料庫中讀取資料,這裡以Mysql為例進行介紹。 一、不指定查詢條件   這個方式連結MySql的函式原型是: def jdbc(url: String, table: String, properties: Properties):

java實現k-means演算法(用的鳶尾花iris的資料集,mysq資料庫讀取資料

k-means演算法又稱k-均值演算法,是機器學習聚類演算法中的一種,是一種基於形心的劃分方法,其中每個簇的中心都用簇中所有物件的均值來表示。其思想如下: 輸入: k:簇的數目;D:包含n個物件的資料集。輸出:k個簇的集合。 方法: 從D中隨機選擇幾個物件作為起始質心

live555RTSP伺服器讀取資料到使用接收到的資料流程分析

本文在linux環境下編譯live555工程,並用cgdb除錯工具對live555工程中的testProgs目錄下的openRTSP的執行過程進行了跟蹤分析,直到將從socket端讀取視訊資料並儲存為對應的視訊和音訊資料為止。 進入testProgs目錄,執行./openRTSP rtsp://

python 檔案讀取資料,同時去除掉空格和換行

從檔案中讀取資料,同時去除掉空格和換行,程式碼如下 import numpy as np def sort(path): w = open(path,'r') l = w.readlines() col=[] for k in l: k = k.strip('\n')