1. 程式人生 > >自定義分割槽隨機分配解決資料傾斜的問題

自定義分割槽隨機分配解決資料傾斜的問題

package com.cr.skew1_stage_version2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class SkewApp2 {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //單例作業
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);
        System.setProperty("hadoop.home.dir","E:\\hadoop-2.7.5");

        //設定job的各種屬性
        job.setJobName("SkewApp2");                 //設定job名稱
        job.setJarByClass(SkewApp2.class);              //設定搜尋類
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        //設定輸入路徑
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00000")));
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00001")));
        FileInputFormat.addInputPath(job,new Path(("D:\\skew\\out\\part-r-00002")));
        //設定輸出路徑
        Path path = new Path("D:\\skew\\out2");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.setMapperClass(SkewMapper2.class);               //設定mapper類
        job.setReducerClass(SkewReducer1.class);               //設定reduecer類

        job.setMapOutputKeyClass(Text.class);            //設定之map輸出key
        job.setMapOutputValueClass(IntWritable.class);   //設定map輸出value

        job.setOutputKeyClass(Text.class);               //設定mapreduce 輸出key
        job.setOutputValueClass(IntWritable.class);      //設定mapreduce輸出value

        job.setNumReduceTasks(3);
        job.waitForCompletion(true);

    }

}
檢視原始碼可知
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text> {
    public KeyValueTextInputFormat() {
    }
這裡的mapper輸入為<text,text>型別

相關推薦

定義分割槽隨機分配解決資料傾斜的問題

package com.cr.skew1_stage_version2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path

MapReduce資料傾斜解決方案2-- 定義分割槽類---二次作業

資料傾斜:大量資料湧向到一個或者幾個reduce,造成大量的reduce空閒。 解決資料傾斜方案2:自定義分割槽類---二次作業 下面以單次統計為例進行說明: 1、DataLeanMapper1 package hadoop.lean.partitioner; i

《深入理解Spark》之通過定義分割槽解決資料傾斜問題

package com.lyzx.day37 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class D1 { //partitionBy和自定義分割槽器解決資料傾斜的問題 def

Spark資料過濾、定義分割槽、Shuffer調優 經典案例(詳解)

案例: 根據學科取得最受歡迎的老師的前兩名 這個是資料 http://bigdata.edu360.cn/zhangsan http://bigdata.edu360.cn/zhangsan http://bigdata.edu360.cn/lisi http:

kafka 定義分割槽分配和遷移

自定義分割槽分配和遷移 分割槽重新分配工具還可用於選擇性地將分割槽的副本移動到特定的代理集。當以這種方式使用時,假設使用者知道重新分配計劃並且不需要工具生成候選重新​​分配,有效地跳過 - 生成步驟並直接移動到--execute步驟 例如,以下示例將主題foo1的分割槽0移動到代理5,6,將

資料結構與演算法----定義類中函式與資料成員

近期在梳理知識,做一個小結,希望自己能多多使用 在標頭檔案中: enum sign {plus, minus}; class Accruency { public: Accruency(sign s = plus, unsigned long d = 0, unsigned in

使用U盤安裝 CentOS 7,定義分割槽

準備材料: 1.U盤(至少8G); 2.最新版UItraISO(試用版也可以); 3.CentOS 7 光碟映象;      CentOS7的映象檔案下載地址1:http://mirrors.163.com/centos/7.1.1503/isos/x8

kafka6 編寫使用定義分割槽的生產者

一 客戶端 在上一篇部落格建立的簡單生產者的基礎上,進行兩個修改操作: 1.新建SimplePartitioner.java,修改返回分割槽為1。 SimplePartitioner.java程式碼如下 package cn.test.mykafka; import java.util

hive sql 如何解決資料傾斜

場景:       我有接近7億條網站訪問瀏覽資料要做一次 按 host(域名) 分割槽,訪問時間進行排序(取最先訪問) 說白了就是row_number over(partition by  host order by ftime)。

MapReduce中定義分割槽

package tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartition extends Partitioner<

kafka模擬生產者-消費者以及定義分割槽

基本概念 kafka中的重要角色   broker:一臺kafka伺服器就是一個broker,一個叢集可有多個broker,一個broker可以容納多個topic   topic:可以理解為一個訊息佇列的名字   partition:分割槽,為了實現擴充套件性,一個topic可以分佈到多

MapReduce定義分割槽partition的作用和用法

預設分割槽數量為     key.hash%reducetask的個數 自定義分割槽     自己定義的 自定義分割槽很簡單,我們只需要繼承抽象類Partitioner,重寫getPartition方法即可,另外還要給任務設定分割槽:

定義三級表格,方便資料迴圈

<div class="m_table_box"> <div class="m_table"> <div class="m_cell m_td" style="width: 100px"><b>老闆</b></div>

MR之partition定義分割槽

maptask執行的結果都會放到一個分割槽檔案中,這個分割槽檔案有自己的編號,這個編號是通過一個hash演算法來生成的,通過對context.write(k,v)中的k進行hash會產生一個值,相同的key產生的值是一樣的,所以這種辦法能將相同的key值放到一個分割槽中。分割槽中的值會發送給

定義View---隨機顯示TextView並更換背景顏色

1.自定義View @SuppressLint("AppCompatCustomView") public class Random extends TextView{ public Random(Context context) { super(context)

定義easyui-datagid的表格資料序列號

一、使用datagrid時有時需要顯示行號,雖然rownumbers="true"屬性可以設定,但是當表格有橫向滾動條時,就和序列號不協調了,所以需要自定義,解決如下: columns: [[       {field:'index

定義View 隨機四位數

CustomView @SuppressLint("AppCompatCustomView") public class CustomView extends TextView { Paint paint; int i=0; public CustomView(Conte

Mysql定義函式報錯解決方法

1、在MySql中建立自定義函式報錯資訊如下: ERROR 1418 (HY000): This function has none of DETERMINISTIC, NO SQL, or READS SQL DATA in its declaration and binary loggi

Irrlicht 中如何定義場景節點和網格資料

自定義場景節點需要繼承ISceneNode,注意ISceneNode有幾個函式必須覆蓋 自定義網格資料需要繼承 IMeshNode,IMeshNode繼承與ISceneNode class CS

Springboot專案Netty做服務端並定義Gson配置類解析資料

簡述 Springboot專案中使用 Netty 作為服務端,接收並處理其他平臺傳送的 Json資料包,處理拆包、粘包及資料包中時間型別是 long 型別需轉成 ***Date***的情況。 專案流程 啟動專案,開啟Netty服務埠11111 載入