1. 程式人生 > >大數據學習之十二——MapReduce代碼實例:關聯性操作

大數據學習之十二——MapReduce代碼實例:關聯性操作

reducer equal 學習 obj actor 對應關系 關系 exceptio ted

1.單表關聯

"單表關聯"要求從給出的數據中尋找所關心的數據,它是對原始數據所包含信息的挖掘。

實例描述
給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。

技術分享圖片

算法思想:

這個實例需要進行單表連接,連接的是左表的parent列和右表的child列,且左表和右表是同一個表。連接結果中除去連接的兩列就是所需要的結果——"grandchild--grandparent"表。要用MapReduce解決這個實例,首先應該考慮如何實現表的自連接;其次就是連接列的設置;最後是結果的整理。MapReduce的shuffle過程會將相同的key會連接在一起,所以可以將map結果的key設置成待連接的列,然後列中相同的值就自然會連接在一起了。
1.map階段將讀入數據分割成child和parent之後,將parent設置成key,child設置成value進行輸出,並作為左表;再將同一對child和parent中的child設置成key,parent設置成value進行輸出,作為右表
2.為了區分輸出中的左右表,需要在輸出的value中再加上左右表的信息,比如在value的String最開始處加上字符1表示左表,加上字符2表示右表
3. reduce接收到連接的結果,其中每個key的value-list就包含了"grandchild--grandparent"關系。取出每個key的value-list進行解析,將左表中的child放入一個數組,右表中的parent放入一個數組,然後對兩個數組求笛卡爾積就是最後的結果了

代碼實例:

public class table01 {

static String INPUT_PATH="hdfs://master:9000/input/i.txt";

static String OUTPUT_PATH="hdfs://master:9000/output/singletable01";

static class MyMapper extends Mapper<Object,Object,Text,Text>{ //輸入為字符串類型

Text output_key=new Text();

Text output_value=new Text();

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{

String[] tokens=value.toString().split(","); //以,分割

if(tokens!=null && tokens.length==2){ //判斷表分割成兩列

output_key.set(tokens[0]); //將child作為右表的key值,右表標記為2

output_value.set(2+","+value);

context.write(output_key, output_value);

output_key.set(tokens[1]); //將parent列作為key值,作為左表,標記為1

output_value.set(1+","+value);

context.write(output_key, output_value); //將一個表分割成了兩個表

System.out.println(tokens[0]+"--"+tokens[1]);

}

}

}

static class MyReduce extends Reducer<Text,Text,Text,Text>{ //傳入到MapReduce變成這樣的格式: lucy , {1,tom,lucy 2,lucy,mary}

Text output_key=new Text();

Text output_value=new Text();

protected void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{

List<String> childs=new ArrayList();

List<String> grands=new ArrayList();

for(Text line:values){

String[] tokens=line.toString().split(",");

if(tokens[0].equals("1")){ //判斷是左表的話,即parent作為key值的時候,將孩子加入隊列中

childs.add(tokens[1]);

System.out.println(1+"--"+tokens[1]);

}

else if(tokens[0].equals("2")){ //右表,childs作為key值,將祖父母加入隊列

grands.add(tokens[2]);

System.out.println(2+"--"+tokens[2]);

}

}

for(String c:childs){ //循環輸出

for(String g:grands){

output_key.set(c);

output_value.set(g);

context.write(output_key, output_value);

}

}

}

}

public static void main(String[] args) throws Exception{

Path outputpath=new Path(OUTPUT_PATH);

Configuration conf=new Configuration();

Job job=Job.getInstance(conf);

FileInputFormat.setInputPaths(job, INPUT_PATH);

FileOutputFormat.setOutputPath(job,outputpath);

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.waitForCompletion(true);

}

}

2.多表關聯

實例描述
輸入是兩個文件,一個代表工廠表,包含工廠名列和地址編號列;另一個代表地址表,包含地址名列和地址編號列。要求從輸入數據中找出工廠名和地址名的對應關系,輸出"工廠名——地址名"表 。

技術分享圖片

算法思想:

多表關聯和單表關聯相似,都類似於數據庫中的自然連接。相比單表關聯,多表關聯的左右表和連接列更加清楚。所以可以采用和單表關聯的相同的處理方式,map識別出輸入的行屬於哪個表之後,對其進行分割,將連接的列值保存在key中,另一列和左右表標識保存在value中,然後輸出。reduce拿到連接結果之後,解析value內容,根據標誌將左右表內容分開存放,然後求笛卡爾積,最後直接輸出。

public class table02 {

static String INPUT_PATH="hdfs://master:9000/doubletable";

static String OUTPUT_PATH="hdfs://master:9000/output/doubletable";

static class MyMapper extends Mapper<Object,Object,Text,Text>{

Text output_key=new Text();

Text output_value=new Text();

String tableName=""; //區分表名

protected void setup(Context context)throws java.io.IOException,java.lang.InterruptedException{

FileSplit fs=(FileSplit)context.getInputSplit(); //將多個表格區分開來

tableName=fs.getPath().getName(); //得到表名

System.out.println(tableName);

}

protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{

String[] tokens=value.toString().split(",");

if(tokens!=null && tokens.length==2){

if(tableName.equals("l.txt")){ //如果是表一的話

output_key.set(tokens[1]); //將addressID作為key值連接

output_value.set(1+","+tokens[0]+","+tokens[1]); //1只是一個標記

}

else if(tableName.equals("m.txt")){ //如果是表二的話

output_key.set(tokens[0]); //addressID是第一個屬性

output_value.set(2+","+tokens[0]+","+tokens[1]);

}

context.write(output_key, output_value);

}

}

}

static class MyReduce extends Reducer<Text,Text,Text,Text>{

Text output_key=new Text();

Text output_value=new Text();

protected void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException{

List<String> factorys=new ArrayList();

List<String> addrs=new ArrayList();

for(Text line:value){

String[] tokens=line.toString().split(",");

if(tokens[0].equals("1")){ //表一取出factory的值

factorys.add(tokens[1]);

}

else if(tokens[0].equals("2")){

addrs.add(tokens[2]); //表二取出address的值

}

}

for(String c:factorys) //循環輸出

for(String g:addrs){

output_key.set(c);

output_value.set(g);

context.write(output_key,output_value);

}

}

}

public static void main(String[] args) throws Exception{

Path outputpath=new Path(OUTPUT_PATH);

Configuration conf=new Configuration();

FileSystem fs=outputpath.getFileSystem(conf);

if(fs.exists(outputpath)){

fs.delete(outputpath, true);

}

Job job=Job.getInstance(conf);

FileInputFormat.setInputPaths(job, INPUT_PATH);

FileOutputFormat.setOutputPath(job,outputpath);

job.setMapperClass(MyMapper.class);

job.setReducerClass(MyReduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.waitForCompletion(true);

}

}

大數據學習之十二——MapReduce代碼實例:關聯性操作