1. 程式人生 > >Hadoop鏈式MapReduce、多維排序、倒排索引、自連線演算法、二次排序、Join效能優化、處理員工資訊Join實戰、URL流量分析、TopN及其排序、求平均值和最大最小值、資料清洗ETL、分析氣

Hadoop鏈式MapReduce、多維排序、倒排索引、自連線演算法、二次排序、Join效能優化、處理員工資訊Join實戰、URL流量分析、TopN及其排序、求平均值和最大最小值、資料清洗ETL、分析氣

Hadoop Mapreduce 演算法彙總

 第52課:Hadoop鏈式MapReduce程式設計實戰...1

第51課:Hadoop MapReduce多維排序解析與實戰...2

第50課:HadoopMapReduce倒排索引解析與實戰...3

第49課:Hadoop MapReduce自連線演算法及程式設計實戰...4

第48課:Hadoop MapReduce二次排序程式設計實戰...6

第47課:Hadoop MapReduce二次排序演算法和實現解析(原理課)...7

第46課:Hadoop Join效能優化程式設計實戰...7

第45課:Hadoop Join效能優化之原理和執行機制詳解(原理課)...9

第44課:Hadoop處理員工資訊Join實戰...9

第43課:Hadoop實戰URL流量分析...11

第42課:Hadoop中的TopN及其排序原理剖析及程式碼實戰...12

第41課:Hadoop求平均值和最大最小值案例實戰以及測試除錯...13

第40課:Hadoop資料去重和資料排序案例實戰及資料清洗ETL.13

第39課:MapReduce分析氣象資料動手程式設計實戰...14
 

Hadoop演算法水平的體現:

l  Key值的定義

l  Hadoop鏈式MapReduce的開發

52課:Hadoop鏈式MapReduce程式設計實戰

【資料檔案Input

Computer,5000

 SmartPhone,3000

 Tablet,1500

 Tv,50000

 Book,18

 Clothes,150

 Gloves,9

 Computer,5000

 SmartPhone,3000

 Tablet,1500

 Tv,50000

 Book,18

 Clothes,150

 Gloves,9

 SmartPhone,3000

 Tablet,1500  

【執行結果Output】商品價格大於100小於10000,然後將相同的商品價格累加,累加總價大於5000的商品輸出結果

           Computer        10000

           SmartPhone      9000

【原始碼檔案】ChainMapperReducer.java

Map階段】KV值定義

    context.write(new Text(splited[0].trim()),newIntWritable(price));//map階段key值為商品

context.write(key,value);// ChaintDataMapper1ChaintDataMapper2

Reduce階段】KV值輸出

    context.write(key,new IntWritable(summary));//ChainDataReducer

    context.write(key,value); // ChaintDataMapper3

【鏈式Mapreduce定義】

ChainMapper.addMapper(job, ChaintDataMapper1.class,LongWritable.class, Text.class, Text.class, IntWritable.class, newConfiguration());   

ChainMapper.addMapper(job, ChaintDataMapper2.class,Text.class, IntWritable.class, Text.class, IntWritable.class, newConfiguration());             

ChainReducer.setReducer(job,ChainDataReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, newConfiguration());

 ChainReducer.addMapper(job,ChaintDataMapper3.class, Text.class, IntWritable.class, Text.class, IntWritable.class, newConfiguration());

51課:HadoopMapReduce多維排序解析與實戰

【資料檔案Input

Spark,100

Hadoop,60

Kafka,95

Spark,99

Hadoop,65

Kafka,98

Spark,99

Hadoop,63

Kafka,97  

【執行結果Output】按名稱分組輸出排序結果

Hadoop  60,63,65

Kafka   95,97,98

Spark   99,99,100

【原始碼檔案】MutipleSorting.java

Map階段】KV值定義

map階段key值為自定義的intMultiplePair,讀入每行資料,如Spark100放入intMultiplePairfirstsecond100作為value放入intWritable

context.write(intMultiplePair,intWritable);

IntMultipleSortingComparatorIntMultiplePair分別按firstsecond值排序比較大小

Reduce階段】KV值輸出

GroupingMultipleComparator分組排序,僅按firstKEY值分組,將同名稱的分數iterator()合併放到result裡面,輸出排序結果

context.write(newText(key.getFirst()), new Text(result));// reduce階段輸出KV

【自定義KEY及相關類】

class IntMultiplePair定義2個屬性 firstsecond;

classIntMultipleSortingComparator重寫compare方法

if(!x.getFirst().equals(y.getFirst())){

returnx.getFirst().compareTo(y.getFirst());                        

} else {

return x.getSecond() - y.getSecond();

}

classGroupingMultipleComparator重寫compare方法

        return x.getFirst().compareTo(y.getFirst());

classMyMultipleSortingPartitioner重寫getPartition方法

        return (arg0.getFirst().hashCode() &Integer.MAX_VALUE)%arg2;

job配置

 job.setPartitionerClass(MyMultipleSortingPartitioner.class);

           job.setSortComparatorClass(IntMultipleSortingComparator.class);

          job.setGroupingComparatorClass(GroupingMultipleComparator.class);

50課:HadoopMapReduce倒排索引解析與實戰

【資料檔案Input

 #cat file1.txt

 Spark is so powerful

 #cat file2.txt

 Spark is the most excitingthing happening in big data today

 #cat file3.txt

 Hello Spark Hello again Spark

【執行結果Output

 Hello   file3.txt:2

 Spark   file3.txt:2;file1.txt:1;file2.txt:1

 again   file3.txt:1

 big     file2.txt:1

 data    file2.txt:1

 exciting        file2.txt:1

 happening       file2.txt:1

 in      file2.txt:1

 is      file2.txt:1;file1.txt:1

 most    file2.txt:1

 powerful        file1.txt:1

 so      file1.txt:1

 the     file2.txt:1

 thing   file2.txt:1

 today   file2.txt:1

【原始碼檔案】 InvertedIndex.java

Map階段】KV值定義

setup方法獲取檔名 fileName =inputSplit.getPath().getName();

讀入每行資料,切割成單詞以後,“單詞名:檔名”作為KEYnumber作為value計數為1

context.write(new Text(keyForCombiner), number);

while(stringTokenizer.hasMoreTokens()){

 String keyForCombiner = stringTokenizer.nextToken() + ":" +fileName;

 context.write(new Text(keyForCombiner), number);

本地Combiner階段class DataCombiner

本地遍歷相同的“單詞名:檔名”的values,進行累加計數;然後拆開key,將單詞拿出來放入key值中,而將“檔名:累計數”放入value

context.write(new Text(keyArray[0]),new Text(keyArray[1]+":"+sum));

 for(Text item : values){

 sum += Integer.valueOf(item.toString());

  }              

  String[]keyArray = key.toString().split(":");

  context.write(new Text(keyArray[0]), newText(keyArray[1]+":"+sum));

Reduce階段】KV值輸出

    輸出K,V,K值是單詞,values以“檔名:累計數;檔名:累計數”的方式合併輸出。context.write(key,new Text(result.toString().substring(0, result.toString().length() -1)));

for(Text item : values){

 result.append(item + ";");

  }    

context.write(key, newText(result.toString().substring(0, result.toString().length() -1)));

  }

job配置

 job.setCombinerClass(DataCombiner.class);

49課:HadoopMapReduce自連線演算法及程式設計實戰

【資料檔案Input0列是孩子 1列是父親

Tom     Lucy

 Tom     Jack

Jone    Lucy

 Jone    Jack

 Lucy    Mary

 Lucy    Ben

 Jack    Alice

 Jack    Jesse

 Terry   Alice

 Terry   Jesse

 Philip  Terry

 Philip  Alma

 Mark    Terry

 Mark    Alma  

【執行結果Output】找祖父親

 Tom     Alice

 Tom     Jesse

 Jone    Alice

 Jone    Jesse

 Tom     Ben

 Tom     Mary

 Jone    Ben

 Jone    Mary

 Philip  Alice

 Philip  Jesse

 Mark    Alice

 Mark    Jesse

【原始碼檔案】  SelfJoin.java

Map階段】KV值定義

讀入每行資料,及切割單詞,在map階段輸出2KV值,分別對應左表、右表;

l  左表Key1列的父親,value為“1_孩子”;

l  右表Key0列的孩子,value為“0_父親”;

 context.write(newText(array[1].trim()), new Text("1_"+array[0].trim())); //left

 context.write(new Text(array[0].trim()), newText("0_"+array[1].trim())); //right

Reduce階段】KV值輸出

遍歷相同key值的values,根據map階段定義的分隔符“_”切分values1是孩子,將splited[1]放入grandChildList列表;0是父親,將splited[1]放入grandParentList列表;然後遍歷輸出祖孫和祖父的笛卡兒積。

 while(iterator.hasNext()){

 String item = iterator.next().toString();

 String[] splited = item.split("_");

 if(splited[0].equals("1")){

grandChildList.add(splited[1]);

} else {

grandParentList.add(splited[1]);

         }        }                

  if(grandChildList.size() > 0 && grandParentList.size() > 0){                      

 for (String grandChild:grandChildList ) {

 for (String grandParent: grandParentList) {

   context.write(new Text(grandChild), new Text(grandParent));

….

48課:HadoopMapReduce二次排序程式設計實戰

【資料檔案Input

12      8

 32     21

 54     32

 65     21

 501    12

 81     2

 81     6

 81     9

 81     7

 81     1

 100    100  

【執行結果Output】按字串方式排序

 100     100

 12      8

 32      21

 501     12

 54      32

 65      21

 81      1

 81      2

 81      6

 81      7

 81      9

【原始碼檔案】  SecondarySort.java

Map階段】KV值定義

將讀入每行記錄按“\t”切分,第一個及第二個值放入IntPair例項物件,將IntPair例項物件作為KEY值,value是每行的資料。

IntPairitem =new IntPair(splited[0],splited[1]);

 context.write(item, value);

Reduce階段】KV值輸出

Reducekey值的compareTo方法經過排序彙總輸出,reduce輸出的KVkey值為空,直接將從map讀入的value值(每行的資料)輸出。

context.write(NullWritable.get(), item);

自定義KEY值及相關類

class IntPair定義屬性firstsecond

重寫compareTo方法

   if (!this.first.equals(o.getFirst())){

         return this.first.compareTo(o.first);                   

} else {

if(!this.Second.equals(o.Second)) {

returnthis.Second.compareTo(o.Second);                                    

} else {

return 0;

class MyPartitioner定義分割槽數

return  (key.hashCode() &Integer.MAX_VALUE) % numPartitioneS;

classSecondarySortGroupComparator分組排序 super(IntPair.class,true);

job配置

 job.setPartitionerClass(MyPartitioner.class);

 job.setGroupingComparatorClass(SecondarySortGroupComparator.class);

47課:HadoopMapReduce二次排序演算法和實現解析(原理課)

46課:HadoopJoin效能優化程式設計實戰

【資料檔案Input

 #cat members.txt

 1       Spark  1

 2       Hadoop 1

 3       flink  3

 4       Kafka  1

 5       Tachyon 2

#cat address.txt

 1       America

 2       China

 3       Germa

【執行結果Output

4 Kafka America
2 Hadoop America
1 Spark America
5 Tachyon China
3 flink Germa

【原始碼檔案】  JoinImproved.java

Map階段】KV值定義

讀入每行資料,按“\t”進行單詞切分,

l  如果切分以後的陣列長度為2,則讀取的是地址檔案,將地址ID、地址名稱資料存入member例項,同時將地址ID存入memberKey,打標記為truemap輸出的KV值:key為自定義的memberKey(地址),value為自定義的member(地址)

l  如長度不為2,讀取的是成員檔案,將成員編號、成員名、地址ID資料存入member例項,同時將地址ID存入memberKey,打標記為falsemap輸出的KV值:key為自定義的memberKey(成員),value為自定義的member(成員)

if(dataSplited.length == 2){

  Member_Information member = newMember_Information();

  member.setAddresNo(dataSplited[0]);

  member.setAddressName(dataSplited[1]);     

  MemberKey memberKey = new MemberKey();

 memberKey.setKeyID(Integer.valueOf(dataSplited[0]));

  memberKey.setFlag(true);                     

context.write(memberKey, member);

  } else {

  Member_Information member = newMember_Information();

  member.setMemberNo(dataSplited[0]);

  member.setMemberName(dataSplited[1]);

  member.setAddresNo(dataSplited[2]);                     

  MemberKey memberKey = new MemberKey();

  memberKey.setKeyID(Integer.valueOf(dataSplited[2]));

  memberKey.setFlag(false);                             

context.write(memberKey, member);

  }

Reduce階段】KV值輸出

ReducememberKeycompareTo的方法進行排序彙總,MemberKey排序比較:如果MemberKey的地址ID相同,則比較打的標記,如標記為true,則返回-1,這樣地址資訊就排到第一條;如果標記為false,返回1;我們只需對地址資訊之後的成員資料一條條進行處理。這樣就進行了Join效能優化,避免了將成員及地址資訊放入List列表中,再在list列表中篩選出地址資訊,在List資料量大的情況下導致OOM

Reduce輸出KVKey值為空值,valuesMember_InformationtoString

for(Member_Informationitem : values){

if(0 == counter){

  member = new Member_Information(item);

  } else {

  Member_Information mem = newMember_Information(item);

  mem.setAddressName(member.getAddressName());                          

  context.write(NullWritable.get(), newText(mem.toString()));

  }                       

  counter++;

  }

自定義KEY值及相關類

class MemberKey定義屬性keyIDflag

重寫compareTo方法

   publicint compareTo(MemberKey member) {

       if(this.keyID == member.keyID){

           if(this.flag== member.flag){

               return0;

           }else {

               returnthis.flag? -1:1;

           }

       }else {

           returnthis.keyID  - member.keyID > 0? 1 :-1;

       }

   }

class Member_Information

定義屬性存放成員及地址資訊

   privateString memberNo = "";

   privateString memberName = "";

   privateString addresNo = "";

   privateString addressName = "";

重寫toString方法

return this.memberNo + " " + this.memberName + "" + this.addressName;

class GroupComparator重寫compare方法

   MemberKeyx = (MemberKey)a;

       MemberKeyy = (MemberKey)b;

       if(x.getKeyID()== y.getKeyID()){

           return0;

       }else {

           returnx.getKeyID() > y.getKeyID()? 1 : -1;

       }

Job配置

job.setGroupingComparatorClass(GroupComparator.class);

45課:HadoopJoin效能優化之原理和執行機制詳解(原理課)

44課:Hadoop處理員工資訊Join實戰

【資料檔案Input

 workers.txt

 7499 allen salesman 76981981-02-20 1600 300 30

 7782 clark managers 76391981-06-09 2450  10

 7654 martin salesman 76981981-03-20 1250 1400 30 boston

 7900 james clerk 76981981-01-09 950  30

 7788 scott analyst 75661981-09-01 3000 100 20

department.txt

30 sales chicago

 20 research dallas

 10 accounting newyork

【執行結果Output

 10          10  accounting

 10      7782  clark 10 

20      7788  scott 20 

20          20 research

 30          30 sales

 30      7900  james 30 

30      7654  martin 30 

30      7499  allen 30

【原始碼檔案】  JoinWorkersInformation.java

Map階段】KV值定義

讀入每行資料,按“\t”進行單詞切分,

l  如果切分以後的陣列長度小於3,則讀取的是部門檔案,將部門ID、部門名稱、部門標記0資料存入MemberInformation例項;map輸出的KV值:key為部門IDvalue為自定義的MemberInformation(部門)

l  如長度大於3,讀取的是職員檔案,將職員編號、職員名、部門ID、職員標記1資料存入MemberInformation例項;map輸出的KV值:key為部門IDvalue為自定義的MemberInformation(職員)

 if (data.length <=3){ //department

     MemberInformation department=  newMemberInformation ();

             department.setDepartmentNo(data[0]);

             department.setDepartmentName(data[1]);

             department.setFlag(0);

            context.write(newLongWritable(Long.valueOf(department.getDepartmentNo())), department);

         }else { // worker

 MemberInformation   worker= new MemberInformation ();

             worker.setWorkerNo(data[0]);

             worker.setWorkerName(data[1]);

             worker.setDepartmentNo(data[7]);

             worker.setFlag(1);

           context.write(newLongWritable(Long.valueOf(worker.getDepartmentNo())), worker);

            …        }

Reduce階段】KV值輸出

reduce根據key部門ID的值進行彙總,如標記為0,標識是部門資訊,取到部門的資料,如果不為0,表示是職員資料,在workerList列表中增加職員記錄。遍歷職員列表,輸出KVkey值為0值,value為職員的toString資訊

List<MemberInformation> workerList=newArrayList<MemberInformation>();

         for (MemberInformation item : values) {

             if (0 ==item.getFlag()) {

                 department=new MemberInformation(item);

             }else {

                 workerList.add(new MemberInformation(item) );

             }

         }

         for (MemberInformation worker:workerList){          worker.setDepartmentNo(department.getDepartmentNo());           worker.setDepartmentName(department.getDepartmentName());

             resultValue.set(worker.toString());

             context.write(resultKey,resultValue );     

         }

 classMemberInformation

定義屬性

   private String workerNo ="";

   private String workerName ="";

   private String departmentNo ="";   

   private String departmentName =""; 

   private int flag = 0; // 0 department,1 worker

重寫toString方法

return this.workerNo + "   "+this.workerName+" "+this.departmentNo + "  "+this.departmentName;

43課:Hadoop實戰URL流量分析

【資料檔案InputURLLog.txt

127.0.0.1 - - [03/Jul/2015:23:36:38 +0800] "GET /course/detail/3.htmHTTP/1.0" 200 38435 0.038
182.131.89.195 - - [03/Jul/2015:23:37:43 +0800] "GET / HTTP/1.0" 301- 0.000
127.0.0.1 - - [03/Jul/2015:23:38:27 +0800] "POST/service/notes/addViewTimes_23.htm HTTP/1.0" 200 2 0.003  

【執行結果Output

 GET /   3

 GET /course/detail/3.htm        1

 GET /course/list/73.htm 1

 GET/html/notes/20140318/24.html        1

 GET/html/notes/20140609/542.html       1

 GET/html/notes/20140609/544.html       1

【原始碼檔案】  URLLog.java

Map階段】KV值定義

Map輸出的KV:將每行資料使用handleLine函式處理,區分GET/POST,擷取相應URL字串賦值給key值,resultValue值計數為1

context.write(text,resultValue);

   privateString handleLine(String line) {        

       StringBufferbuffer = new StringBuffer();

       if(line.length()>0){

           if(line.contains("GET")){               buffer.append(line.substring(line.indexOf("GET"),line.indexOf("HTTP/1.0")).trim());

           }elseif ( line.contains("POST")) {             buffer.append(line.substring(line.indexOf("POST"),line.indexOf("HTTP/1.0")).trim());

       returnbuffer.toString();

   }

Reduce階段】KV值輸出

Reduce彙總輸出KVkey為提取的URL字串,totalresultValue為累加計數。

context.write(key, totalresultValue);

42課:Hadoop中的TopN及其排序原理剖析及程式碼實戰

【資料檔案InputTopN1.txt

 1,9819,100,121

 2,8918,2000,111

 3,2813,1234,22

 4,9100,10,1101

 5,3210,490,111

 6,1298,28,1211

 7,1010,281,90

 8,1818,9000,20

TopN2.txt

10,3333,10,100

 11,9321,1000,293

 12,3881,701,20

 13,6791,910,30

 14,8888,11,39

【執行結果Output

 1       9000

 2       2000

 3       1234

【原始碼檔案】  TopNSorted.java

Map階段】KV值定義

l  setup方法:配置TopN取值:length =context.getConfiguration().getInt("topn", 5);

l cleanup方法:context.write(newText(String.valueOf(topN[i])), new Text(String.valueOf(topN[i])));

輸出KVkey值為mapArrays.sort(topN)排序以後的TopN值,value為數值

l conf設定TopN配置conf.setInt("topn", 3);

Reduce階段】KV值輸出

l cleanup方法: context.write(new Text(String.valueOf(length- i + 1)), new Text(String.valueOf(topN[i])));

輸出KVkey值為reduce彙總時 Arrays.sort(topN)排序以後的TopN值,value為數值

41課:Hadoop求平均值和最大最小值案例實戰以及測試除錯

【資料檔案Input

 #cat dataBiggestsmallest.txt

 1

 33

 55

 66

 77

 45

 34567

 50

 88776

 345

 5555555

 23

 32

【執行結果Output

 maxValue        5555555

 minValue        1

【原始碼檔案】  BiggestSmallest.java

Map階段】KV值定義

MapKVkey值設定為keyFoReducervalue為每行的資料

 context.write(keyFoReducer,data);

Reduce階段】KV值輸出

Reduce KV輸出最大值及最小值

    for(LongWritable item : values){

        if (item.get()>maxValue){

            maxValue=item.get();

        }     

        if (item.get()<minValue){

            minValue=item.get();

        }     

    }

    context.write(new Text("maxValue"),new LongWritable(maxValue));

    context.write(new Text("minValue"),new LongWritable(minValue));

 }

40課:Hadoop資料去重和資料排序案例實戰及資料清洗ETL

【資料檔案InputSortedData.txt

 423535

 45

 666

 77

 888

 22

 3

 4

 5

 7777888

 99999

【執行結果Output

  1       3

 2       4

 3       5

 4       22

 5       45

 6       77

 7       666

 8       888

 9       99999

 10      423535

 11      7777888

【原始碼檔案】 SortData .java

Map階段】KV值定義

讀入每行資料,map輸出KVkey值為每行的資料,value設定為1