1. 程式人生 > >MapReduce之按照ID取模分割槽輸出到不同檔案下

MapReduce之按照ID取模分割槽輸出到不同檔案下

很多時候需要對大檔案進行分割槽

最簡單的是ID的hash分割槽

利用MapReduce的分割槽把檔案分割成到不同的檔案中去

方便後續的計算,例如KNN可以吧預測切分成多個小片

分別讀入預測

package com.mr.partition;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 把預測的資料讀入記憶體然後進行迭代計算
 * 適用於預測資料很少訓練資料很多
 * 如果預測資料很多可以切分多分分別計算
 * @author lenovo
 * 1,計算歐式距離(可根據實際情況修改距離公式)
 * 2,找出最近
 *   輸出topk使用TreeSet<TopKeyWritable>自己寫TopKeyWritable排序
 */
public class IDhashMR extends Configured implements Tool {

	public static enum Counter {
		PARSER_ERR
	}

	public static class MyMap extends Mapper<LongWritable, Text, Text, Text> {
		private Text mykey = new Text();
		private Text myval = new Text();
        List  testList=new ArrayList();
		
		
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String[] array = value.toString().split(",");
				
				mykey.set(array[0]);
				myval.set(array[1]);
				context.write(mykey, myval);
		};
	}

	public static class MyReduce extends Reducer<Text, Text, Text, Text> {
		private Text val = new Text();
		Map top=new TreeMap();
		
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			// 迴圈遍歷 Interable
			int sum = 0;
			for (Text value : values) {
				// 累加
				String[] array = value.toString().split(",");
				sum += Integer.parseInt(array[0]);
			}
			val.set(sum+"");
			context.write(key, val);
		};
	}

	@Override
	public int run(String[] args) throws Exception {
		// 1 conf
		Configuration conf = new Configuration();
		conf.set("mapred.textoutputformat.separator", ",");// key value分隔符
//		DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);// 為該job新增快取檔案
		// 2 create job
		// Job job = new Job(conf, ModuleMapReduce.class.getSimpleName());
		Job job = this.parseInputAndOutput(this, conf, args);
		// 3 set job
		// 3.1 set run jar class
		// job.setJarByClass(ModuleReducer.class);
		// 3.2 set intputformat
		job.setInputFormatClass(TextInputFormat.class);
		// 3.3 set input path
		// FileInputFormat.addInputPath(job, new Path(args[0]));
		// 3.4 set mapper
		job.setMapperClass(MyMap.class);
		// 3.5 set map output key/value class
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		// 3.6 set partitioner class
		 job.setPartitionerClass(IDhashPartition.class);
		// 3.7 set reduce number
		 job.setNumReduceTasks(2);
		// 3.8 set sort comparator class
		// job.setSortComparatorClass(LongWritable.Comparator.class);
		// 3.9 set group comparator class
		// job.setGroupingComparatorClass(LongWritable.Comparator.class);
		// 3.10 set combiner class
		// job.setCombinerClass(null);
		// 3.11 set reducer class
		job.setReducerClass(MyReduce.class);
		// 3.12 set output format

		job.setOutputFormatClass(TextOutputFormat.class);
		// 3.13 job output key/value class
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		// 3.14 set job output path
		// FileOutputFormat.setOutputPath(job, new Path(args[1]));
		// 4 submit job
		boolean isSuccess = job.waitForCompletion(true);
		// 5 exit
		// System.exit(isSuccess ? 0 : 1);
		return isSuccess ? 0 : 1;
	}

	public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)
			throws Exception {
		// validate
//		if (args.length != 2) {
//			System.err.printf("Usage:%s [genneric options]<input><output>\n",
//					tool.getClass().getSimpleName());
//			ToolRunner.printGenericCommandUsage(System.err);
//			return null;
//		}
		// 2 create job
		Job job = new Job(conf, tool.getClass().getSimpleName());
		// 3.1 set run jar class
		job.setJarByClass(tool.getClass());
		// 3.3 set input path
		FileInputFormat.addInputPath(job, new Path(args[0]));
		// 3.14 set job output path
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		return job;
	}

	public static void main(String[] args) throws Exception {
		args = new String[] {
				"hdfs://192.168.192.129:9000/ml/knn/partitioner.txt",
				// "hdfs://hadoop-00:9000/home910/liyuting/output/" };
				"hdfs://192.168.192.129:9000/ml/knn/partitioner/"};
		// run mapreduce
		int status = ToolRunner.run(new IDhashMR(), args);
		// 5 exit
		System.exit(status);
	}
}
package com.mr.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 自定義分割槽
 * @author Administrator
 *
 */
public class IDhashPartition extends Partitioner<Text,Text>{
   // private static final Logger logger = LoggerFactory.getLogger(DefinedPartition.class);
    /**
    *  資料輸入來源:map輸出
    * @author zengzhaozheng
    * @param key map輸出鍵值
    * @param value map輸出value值
    * @param numPartitions 分割槽總數,即reduce task個數
    */
    public int getPartition(Text key, Text value,int numPartitions) {
        //("--------enter DefinedPartition flag--------");
        /**
        * 注意:這裡採用預設的hash分割槽實現方法
        * 根據組合鍵的第一個值作為分割槽
        * 這裡需要說明一下,如果不自定義分割槽的話,mapreduce框架會根據預設的hash分割槽方法,
        * 將整個組合將相等的分到一個分割槽中,這樣的話顯然不是我們要的效果
        */
       // ("--------out DefinedPartition flag--------");
    	// return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
        return (Integer.parseInt(key.toString()))%10;
    }
}



相關推薦

MapReduce按照ID分割槽輸出不同檔案

很多時候需要對大檔案進行分割槽 最簡單的是ID的hash分割槽 利用MapReduce的分割槽把檔案分割成到不同的檔案中去 方便後續的計算,例如KNN可以吧預測切分成多個小片 分別讀入預測 package com.mr.partition; import java.io

id分表

bsp _id user 用戶 用戶id 取模 場景 nbsp var 場景 1 假設按用戶id分2個庫 每個庫分10張表。 分表策略 1.用戶id%2 確定庫 用戶id%3確定表。 2.(用戶id%(2*10))/ 10 取整確定庫,(用戶id%(2*10)%

ac數論n次方

傳送門次方求模時間限制:1000 ms  |  記憶體限制:65535 KB難度:3描述求a的b次方對c取餘的值輸入第一行輸入一個整數n表示測試資料的組數(n<100)每組測試只有一行,其中有三個正整數a,b,c(1=<a,b,c<=1000000000)輸

數學高冪次

大神 bsp alt ron mage str 博客 htm com 盜自倉鼠大神博客:http://www.cnblogs.com/linyujun/p/5194170.html 用於解決(a^b)%p類問題,當b很大時 好像運用到費馬小定理,不會證明 φ(

在陣列中的兩個數字,如果前面一個數字大於後面的數字,則這兩個數字組成一個逆序對。輸入一個數組,求出這個陣列中的逆序對的總數P。並將P對1000000007的結果輸出

 /*        最簡單的思路:陣列的所有數兩兩比較,進行累加,空間複雜度為O(n^2)  */ class Solution { public: int InversePairs(vector<int> data) { int P

MapReduce ---自定義全域性計數器,將資訊輸出到控制檯

package jishuqi; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.ha

在陣列中的兩個數字,如果前面一個數字大於後面的數字,則這兩個數字組成一個逆序對。輸入一個數組,求出這個陣列中的逆序對的總數P。並將P對1000000007的結果輸出。 即輸出P%100000000

分析:可以利用兩層for迴圈,從頭開始遍歷查詢每一個元素的逆序對數,然後求總和。也可以利用歸併排序的思想來求解。下面是利用歸併排序的思想求解 public class Solution{public int InversePairs(int[] array) {if (ar

MapReduce 按照Value值進行排序輸出

檔案輸入: A    1 B    5 C    4 E    1 D    3 W    9 P    7 Q    2 檔案輸出: W    9 P    7 B    5 C    4 D    3 Q    2 E    1 A    1 程式碼如下: packag

【HDU 3037】大數組合Lucas定理+擴充套件歐幾里得求逆元與不定方程一類問題

Saving Beans Time Limit: 6000/3000 MS (Java/Others)    Memory Limit: 32768/32768 K (Java/Others) Total Submission(s): 2284    Accepted S

組合數逆元方法+模板

參自: http://www.cnblogs.com/liziran/p/6804803.html https://baike.baidu.com/item/%E8%B4%B9%E9%A9%AC%E5%B0%8F%E5%AE%9A%E7%90%86/4776158?fr=a

Mycat資料庫分片(分片)-yellowcong

取模分片,簡單來講,根據資料庫的主鍵和儲存的節點數進行取模操作,然後根據取模的結果,將資料存放到對應的節點中,取模分表,可以將資料均勻的分配到各個庫中。實現的步驟:1、建立資料庫,2、配置schema.xml檔案,3、配置server.xml,4、新增ru

mapreduce,自定義排序,分割槽,分組實現按照年份升序排序,溫度降序排序

自定義類: package myhadoop; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Wr

位運算——按位與(&)操作——(快速演算法)

位運算之——按位與(&)操作——(快速取模演算法)   由於位運算直接對記憶體資料進行操作,不需要轉成十進

MapReduce自定義分割槽器Partitioner

@[toc] ## 問題引出 >要求將統計結果按照條件輸出到不同檔案中(分割槽)。 比如:將統計結果按照**手機歸屬地不同省份**輸出到不同檔案中(分割槽) ## 預設Partitioner分割槽 ```java public class HashPartitioner extends Partitio

Hadoop實戰-MapReducemax、min、avg統計(六)

next combine output fileinput private pub eof pri use 1、數據準備: Mike,35 Steven,40 Ken,28 Cindy,32 2、預期結果 Max  40 Min   28 Avg 33 3、M

CSU - 1556 Jerry&#39;s trouble(高速冪)

click ostream algo printf 高速 ron main 取模 bit 【題目鏈接】:click here 【題目大意】:計算x1^m+x2^m+..xn^m(1<=x1<=n)( 1 <= n < 1 000 000, 1 &

求第n行楊輝三角(n很大,

int 為什麽不能 style code 為我 max sin clas pan 1 #include <iostream> 2 #include <cstdio> 3 4 using namespace std; 5 typedef

運算

add 結合 重要 nbsp left 但是 list padding 四則運算 腦子不好使,老是記不住(?_?),備忘一下。 模運算與基本四則運算有些相似,但是除法例外。其規則如下: (a + b) % p = (a % p + b % p) % p (a -

2017湘潭賽 A題 Determinant (高斯消元)

mina while 代數 tor mod continue 高斯消元 problem 元素 鏈接 http://202.197.224.59/OnlineJudge2/index.php/Problem/read/id/1260 今年湘潭的A題 題意不難 大意是把n*(n

Hadoop MapreduceWordCount實現

註意 com split gin 繼承 [] leo ring exce 1.新建一個WCMapper繼承Mapper public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritab