1. 程式人生 > >MapReduce資料傾斜解決方案2-- 自定義分割槽類---二次作業

MapReduce資料傾斜解決方案2-- 自定義分割槽類---二次作業

資料傾斜:大量資料湧向到一個或者幾個reduce,造成大量的reduce空閒。

解決資料傾斜方案2:自定義分割槽類---二次作業

下面以單次統計為例進行說明:

1、DataLeanMapper1

package hadoop.lean.partitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Random;

/**
 * DataLeanMapper1
 */
public class DataLeanMapper1 extends Mapper<LongWritable, Text, Text,IntWritable> {
	Random r = new Random();

	/**
	 * 每一行
	 */
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		String line = value.toString();
		String[] arr = line.split(" ");

		Text keyOut = new Text();
		IntWritable valueOut = new IntWritable(1);
		for(String word : arr){
			keyOut.set(word);
			context.write(keyOut,valueOut);
		}
	}
}
2、DataLeanMapper2
package hadoop.lean.partitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Random;

public class DataLeanMapper2 extends Mapper<Text, Text, Text,IntWritable> {

	/**
	 * 每一行
	 */
	protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
		context.write(key , new IntWritable(Integer.parseInt(value.toString())));

	}
}
3、DataLeanReducer1
package hadoop.lean.partitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * DataLeanReducer1
 */
public class DataLeanReducer1 extends Reducer<Text, IntWritable, Text, IntWritable>{

	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int count = 0 ;
		for(IntWritable iw : values){
			count = count + iw.get() ;
		}
		context.write(key,new IntWritable(count));
	}
}
4、RandomPartitioner 隨機分割槽
package hadoop.lean.partitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.Random;

/**
 * 隨機分割槽
 */
public class RandomPartitioner extends Partitioner<Text,IntWritable> {

	Random r = new Random() ;
	public int getPartition(Text text, IntWritable intWritable, int numPartitions) {
		return r.nextInt(numPartitions);
	}
}
5、App 

 * 資料傾斜解決辦法需要二次作業
 * 自定義分割槽類

package hadoop.lean.partitioner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 資料傾斜解決辦法需要二次作業
 * 自定義分割槽類
 */
public class App {
	public static void main(String[] args) throws Exception {
		args = new String[]{"d:/java/mr/data/1.txt", "d:/java/mr/out1", "d:/java/mr/out2"} ;
		Configuration conf = new Configuration();

		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(new Path(args[1]))){
			fs.delete(new Path(args[1]),true);
		}

		Job job = Job.getInstance(conf);

		job.setJobName("WordCount-1");
		job.setJarByClass(App.class);

		job.setMapperClass(DataLeanMapper1.class);
		job.setReducerClass(DataLeanReducer1.class);

		//新增輸入路徑
		FileInputFormat.addInputPath(job,new Path(args[0]));
		//設定輸出路徑
		FileOutputFormat.setOutputPath(job,new Path(args[1]));

		//設定mapreduce輸出
		job.setPartitionerClass(RandomPartitioner.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setNumReduceTasks(3);

		//第一個階段(job)
		if(job.waitForCompletion(true)){
			job = Job.getInstance(conf);

			job.setJobName("WordCount-2");
			job.setJarByClass(App.class);

			job.setMapperClass(DataLeanMapper2.class);
			job.setReducerClass(DataLeanReducer1.class);

			//新增輸入路徑
			FileInputFormat.addInputPath(job, new Path(args[1]));
			//設定輸出路徑
			FileOutputFormat.setOutputPath(job, new Path(args[2]));
			//第一次的輸出是第二次的輸入,首次輸出的key - value
			job.setInputFormatClass(KeyValueTextInputFormat.class);

			//第二次雜湊分割槽
			job.setPartitionerClass(HashPartitioner.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);

			job.setNumReduceTasks(3);
			job.waitForCompletion(true);
		}
	}
}


相關推薦

MapReduce資料傾斜解決方案2-- 定義分割槽---作業

資料傾斜:大量資料湧向到一個或者幾個reduce,造成大量的reduce空閒。 解決資料傾斜方案2:自定義分割槽類---二次作業 下面以單次統計為例進行說明: 1、DataLeanMapper1 package hadoop.lean.partitioner; i

mapreduce定義分組、定義分割槽排序

mapreduce中二次排序的思想中,我們常常需要對資料的分割槽分組進行自定義, 以下就介紹一下自定義分割槽分組的簡單實現 1、自定義分割槽: public class demoPartitioner<K, V> extends Partitioner<

Spark專案實戰-資料傾斜解決方案之原理以及現象分析

一、資料傾斜的原理 在執行shuffle操作的時候,大家都知道是按照key來進行values的資料的輸出、拉取和聚合的。同一個key的values,一定是分配到一個reduce task進行處理的。假設多個key對應的values,總共是90萬。但是問題是可能某個key對應

Spark專案實戰-資料傾斜解決方案之將reduce join轉換為map join

一、reduce端join操作原理 二、map端join操作原理  三、適用場景 如果兩個RDD要進行join,其中一個RDD是比較小的。一個RDD是100萬資料,一個RDD是1萬資料。(一個RDD是1億資料,一個RDD是100萬資料) 其中一個RDD必須是比較

spark 大型專案實戰(五十八):資料傾斜解決方案之sample取樣傾斜key進行兩join

當採用隨機數和擴容表進行join解決資料傾斜的時候,就代表著,你的之前的資料傾斜的解決方案,都沒法使用。 這個方案是沒辦法徹底解決資料傾斜的,更多的,是一種對資料傾斜的緩解。 原理,其實在上一講,已經帶出來了。 步驟: 1、選擇一個RDD,要用flatM

資料傾斜解決方案之原理以及現象分析

資料傾斜 在任何大資料類的專案中,都是最棘手的效能問題,最能體現人的技術能力,最能體現RD(Research Developer,研發工程師)的技術水平。 資料傾斜 = 效能殺手 如果沒有豐富的經驗,或者沒有受過專業的技術培訓,是很難解決資料傾斜問題的 在執行shuff

spark1.x-spark-sql-資料傾斜解決方案

聚合源資料 過濾導致傾斜的key where條件 提高shuffle並行度 spark.sql.shuffle.partitions sqlContext.setConf("spark.sql.shuffle.partitions","1000")

資料傾斜解決方案之使用隨機key實現雙重聚合

使用隨機key實現雙重聚合 1、原理 2、使用場景 (1)groupByKey (2)reduceByKey 比較適合使用這種方式;join,咱們通常不會這樣來做,後面會講三種,針對不同的join造成的資料傾斜的問題的解決方案。 第一輪聚合的時候,對key進行打散,將

ThinkPHP 3.2 定義 Model

esp ins array create 得到 namespace 數據 map ret   ThinkPHP 提供了一個 Model 類,供其他的 Model 進行繼承。Model 類中是 MVC 中的模型類,它是調用 持久層 的上層類。感覺這麽描述問題很多,但是有什麽

ByteBuf 一個用於在通訊中的資料解析傳輸組裝的定義容器

在做和硬體通訊的專案的時候,通訊的內容一般都是最基本的byte陣列,比如BLE,UART等等方式,傳遞的都是byte陣列。 移動端在接收的時候,就需要去解析byte陣列,然後從中通過拼接和或(|)以及位移等運算來得到想要的資料型別,比如說,unsignedByte,short,int,float

Mapreduce三大元件之一Partitioner——實現定義分割槽

MapReduce中資料流動 (1)最簡單的過程: map - reduce (2)定製了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce (3)增加了在本地先進性一次reduce(優化)

定義分割槽隨機分配解決資料傾斜的問題

package com.cr.skew1_stage_version2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path

《深入理解Spark》之通過定義分割槽解決資料傾斜問題

package com.lyzx.day37 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class D1 { //partitionBy和自定義分割槽器解決資料傾斜的問題 def

定義銷售屬性的展示和資料儲存解決方案

我的資料表設計如下 由 Product表, SellProperty表和SKU表三張表來支援該功能. 表結構如下: CREATE TABLE `purchaser_product` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `product_code` v

解決Shiro+SpringBoot定義Filter不生效問題2

解決Shiro+SpringBoot自定義Filter不生效問題 在SpringBoot+Shiro實現安全框架的時候,自定義擴充套件了一些Filter,並註冊到ShiroFilter,但是執行的時候發現總是在ShiroFilter之前就進入了自定義Filter,結果當然是不對的。 &

解決MapReduce資料傾斜

在平行計算中我們總希望分配的每一個task 都能以差不多的粒度來切分並且完成時間相差不大,但是叢集中可能硬體不同,應用的型別不同和切分的資料大小不一致總會導致有部分任務極大的拖慢了整個任務的完成時間,硬體不同就不說了,應用的型別不同其中就比如page rank 或者data

Java程式設計之TreeSet排序兩種解決方法(1)元素自身具備比較功能,元素需要實現Comparable介面覆蓋compare(2)建立根據定義Person的name進行排序的Comparator

       當很多人問我讀研到底好不好的時候,我總是說上研很苦逼,讀完研之後都不知道自己能不能找到工作,所以不建議同學們讀研~即使要讀也讀一個985或者211的研究生,這是我肺腑之言。但還有一半我沒說完,讀研的時候你可能會找到你喜歡的活動,會遇到一些願意和你一起玩的玩伴,

mapreduce【流量統計】求和——定義資料型別

需求:統計一下檔案中,每一個使用者所耗費的總上行流量,總下行流量,總流量 1363157985066 13726230503

vue(2) - 定義指令

asc color true dom this == ont ron 指令 我們都知道v-for、v-html、等等都是指令:擴展html 語法 自定義指令: 屬性指令 Vue.deirctive(指令名稱,function(){   this.el ==>原生的do

python解決導入定義庫失敗: ModuleNotFoundError: No module named 'MyLib'

ear 安裝 pat name err under code 自定義庫 安裝路徑 python安裝目錄:...\python_3_6_1_64bit 新建文件:chenyeubai.pth,寫入庫所在的絕對路徑E:\workSpace\my_code\learn\myLib