1. 程式人生 > >MapReduce實現兩表的Join--原理及python和java程式碼實現

MapReduce實現兩表的Join--原理及python和java程式碼實現

用Hive一句話搞定的,但是有時必須要用mapreduce

方法介紹

1. 概述

在傳統資料庫(如:MYSQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進行JOIN操作,同樣常見且耗時,由於Hadoop的獨特設計思想,當進行JOIN操作時,有一些特殊的技巧。
本文首先介紹了Hadoop上通常的JOIN實現方法,然後給出了幾種針對不同輸入資料集的優化方法。

2. 常見的join方法介紹

假設要進行join的資料分別來自File1和File2.

2.1 reduce side join

reduce side join是一種最簡單的join方式,其主要思想如下:

在map階段,map函式同時讀取兩個檔案File1和File2,為了區分兩種來源的key/value資料對,對每條資料打一個標籤(tag),比如:tag=0表示來自檔案File1,tag=2表示來自檔案File2。即:map階段的主要任務是對不同檔案中的資料打標籤。
在reduce階段,reduce函式獲取key相同的來自File1和File2檔案的value list, 然後對於同一個key,對File1和File2中的資料進行join(笛卡爾乘積)。即:reduce階段進行實際的連線操作。

2.2 map side join

之所以存在reduce side join,是因為在map階段不能獲取所有需要的join欄位,即:同一個key對應的欄位可能位於不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的資料傳輸。

Map side join是針對以下場景進行的優化:兩個待連線表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每個map task記憶體中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key的記錄,如果有,則連線後輸出即可。
為了支援檔案的複製,Hadoop提供了一個類DistributedCache,使用該類的方法如下:
(1)使用者使用靜態方法DistributedCache.addCacheFile()指定要複製的檔案,它的引數是檔案的URI(如果是HDFS上的檔案,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode埠號)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的檔案拷貝到各個TaskTracker的本地磁碟上。(2)使用者使用DistributedCache.getLocalCacheFiles()方法獲取檔案目錄,並使用標準的檔案讀寫API讀取相應的檔案。

2.3 SemiJoin

SemiJoin,也叫半連線,是從分散式資料庫中借鑑過來的方法。它的產生動機是:對於reduce side join,跨機器的資料傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的資料,則可以大大節省網路IO。
實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,儲存到檔案File3中,File3檔案一般很小,可以放到記憶體中。在map階段,使用DistributedCache將File3複製到各個TaskTracker上,然後將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同。
更多關於半連線的介紹,可參考:半連線介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

2.4 reduce side join + BloomFilter

在某些情況下,SemiJoin抽取出來的小表的key集合在記憶體中仍然存放不下,這時候可以使用BloomFiler以節省空間。
BloomFilter最常見的作用是:判斷某個元素是否在一個集合裡面。它最重要的兩個方法是:add() 和contains()。最大的特點是不會存在false negative,即:如果contains()返回false,則該元素一定不在集合中,但會存在一定的true negative,即:如果contains()返回true,則該元素可能在集合中。
因而可將小表中的key儲存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關係,只不過增加了少量的網路IO而已。
更多關於BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500

3. 二次排序

在Hadoop中,預設情況下是按照key進行排序,如果要按照value進行排序怎麼辦?即:對於同一個key,reduce函式接收到的value list是按照value排序的。這種應用需求在join操作中很常見,比如,希望相同的key中,小表對應的value排在前面。
有兩種方法進行二次排序,分別為:buffer and in memory sort和 value-to-key conversion。
對於buffer and in memory sort,主要思想是:在reduce()函式中,將某個key對應的所有value儲存下來,然後進行排序。 這種方法最大的缺點是:可能會造成out of memory。
對於value-to-key conversion,主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable介面或者呼叫setSortComparatorClass函式),這樣reduce獲取的結果便是先按key排序,後按value排序的結果,需要注意的是,使用者需要自己實現Paritioner,以便只按照key進行資料劃分。Hadoop顯式的支援二次排序,在Configuration類中有個setGroupingComparatorClass()方法,可用於設定排序group的key值,

reduce-side-join python程式碼

hadoop有個工具叫做steaming,能夠支援python、shell、C++、PHP等其他任何支援標準輸入stdin及標準輸出stdout的語言,其執行原理可以通過和標準java的map-reduce程式對比來說明:

使用原生java語言實現Map-reduce程式
  1. hadoop準備好資料後,將資料傳送給java的map程式
  2. java的map程式將資料處理後,輸出O1
  3. hadoop將O1打散、排序,然後傳給不同的reduce機器
  4. 每個reduce機器將傳來的資料傳給reduce程式
  5. reduce程式將資料處理,輸出最終資料O2
藉助hadoop streaming使用python語言實現Map-reduce程式
  1. hadoop準備好資料後,將資料傳送給java的map程式
  2. java的map程式將資料處理成“鍵/值”對,並傳送給python的map程式
  3. python的map程式將資料處理後,將結果傳回給java的map程式
  4. java的map程式將資料輸出為O1
  5. hadoop將O1打散、排序,然後傳給不同的reduce機器
  6. 每個reduce機器將傳來的資料處理成“鍵/值”對,並傳送給python的reduce程式
  7. python的reduce程式將資料處理後,將結果返回給java的reduce程式
  8. java的reduce程式將資料處理,輸出最終資料O2

上面紅色表示map的對比,藍色表示reduce的對比,可以看出streaming程式多了一步中間處理,這樣說來steaming程式的效率和效能應該低於java版的程式,然而python的開發效率、執行效能有時候會大於java,這就是streaming的優勢所在。

hadoop之實現集合join的需求

hadoop是用來做資料分析的,大都是對集合進行操作,因此該過程中將集合join起來使得一個集合能得到另一個集合對應的資訊的需求非常常見。

比如以下這個需求,有兩份資料:學生資訊(學號,姓名)和學生成績(學號、課程、成績),特點是有個共同的主鍵“學號”,現在需要將兩者結合起來得到資料(學號,姓名,課程,成績),計算公式:

學號,姓名) join (學號,課程,成績)= (學號,姓名,課程,成績)

資料事例1-學生資訊:

學號sno 姓名name
01 name1
02 name2
03 name3
04 name4

資料事例2:-學生成績:

學號sno 課程號courseno 成績grade
01 01 80
01 02 90
02 01 82
02 02 95

期待的最終輸出:

學號sno 姓名name 課程courseno 成績grade
01 name1 01 80
01 name1 02 90
02 name2 01 82
02 name2 02 95

實現join的注意點和易踩坑總結

如果你想寫一個完善健壯的map reduce程式,我建議你首先弄清楚輸入資料的格式、輸出資料的格式,然後自己手動構建輸入資料並手動計算出輸出資料,這個過程中你會發現一些寫程式中需要特別處理的地方:

  1. 實現join的key是哪個,是1個欄位還是2個欄位,本例中key是sno,1個欄位
  2. 每個集合中key是否可以重複,本例中資料1不可重複,資料2的key可以重複
  3. 每個集合中key的對應值是否可以不存在,本例中有學生會沒成績,所以資料2的key可以為空

第1條會影響到hadoop啟動指令碼中key.fields和partition的配置,第2條會影響到map-reduce程式中具體的程式碼實現方式,第3條同樣影響程式碼編寫方式。

hadoop實現join操作的思路

具體思路是給每個資料來源加上一個數字標記label,這樣hadoop對其排序後同一個欄位的資料排在一起並且按照label排好序了,於是直接將相鄰相同key的資料合併在一起輸出就得到了結果。

1、 map階段:給表1和表2加標記,其實就是多輸出一個欄位,比如表一加標記為0,表2加標記為2;

2、 partion階段:根據學號key為第一主鍵,標記label為第二主鍵進行排序和分割槽

3、 reduce階段:由於已經按照第一主鍵、第二主鍵排好了序,將相鄰相同key資料合併輸出

hadoop使用python實現join的map和reduce程式碼

mapper.py的程式碼:

123456789101112131415161718192021222324252627282930# -*- coding: utf-8 -*-#Mapper.pyimport osimport sys#mapper指令碼def mapper():#獲取當前正在處理的檔案的名字,這裡我們有兩個輸入檔案#所以要加以區分filepath=os.environ["map_input_file"]filename=os.path.split(filepath)[-1]forline insys.stdin:ifline.strip()=="":continuefields=

相關推薦

MapReduce實現Join--原理pythonjava程式碼實現

用Hive一句話搞定的,但是有時必須要用mapreduce 方法介紹 1. 概述 在傳統資料庫(如:MYSQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進行JOIN操作,同樣常見且耗時,由於Hadoop的獨特設計思想,當進行JOIN操作時,有一

MD5( 信息摘要算法)的概念原理python代碼的實現

偽造 ide 不可 壓縮包 acc before 固定 target a20 簡述: message-digest algorithm 5(信息-摘要算法)。經常說的“MD5加密”,就是它→信息-摘要算法。 md5,其實就是一種

【深度學習】線性迴歸(一)原理python從0開始實現

文章目錄 線性迴歸 單個屬性的情況 多元線性迴歸 廣義線性模型 實驗資料集 介紹 相關連結 Python實現 環境 編碼

MapReducejoin一般操作

案例:(部門員工兩表的join查詢) 原始資料 員工表(emp):  empno ename  job      mgr  hiredate   sal  comm deptno loc  7499  allen  salesman 7698 1981-02-20 1600

MapReducejoin操作優化

注:優化前的分析過程詳見本博的上篇博文 案例 地址(Address)和人員(Person)的一對多關聯 原始資料 地址(Address)資料 id AddreName 1 beijing 2 shanghai 3 guangzhou 人員(Person)資料 1 zhan

JVM執行原理StackHeap的實現過程

Java語言寫的源程式通過Java編譯器,編譯成與平臺無關的‘位元組碼程式’(.class檔案,也就是0,1二進位制程式),然後在OS之上的Java直譯器中解釋執行,而JVM是java的核心和基礎,在java編譯器和os平臺之間的虛擬處理器。 一、JVM原理 1、JV

利用位運算實現個整數的加法運算,請程式碼實現,並作簡要說明。

#include <stdio.h>      int main(void) {        int add(int a,int b);       int m,a,b;       scanf("%d,%d",&a,&b);       m

排序演算法總結(含動圖演示Java程式碼實現)

本文將圍繞氣泡排序、桶排序、計數排序、堆排序、插入排序、並歸排序、快速排序和選擇排序,按照描述、時間複雜度(最壞情況)、動態圖展示和程式碼實現來講解。本文預設排序為從小到大。 本文相關程式碼已上傳至github,歡迎關注https://github.com/zhuzhenke/commo

常見的查找算法的原理python實現

put arch img 字典 python實現 需要 技術 () one 順序查找 二分查找 練習 一、順序查找 data=[1,3,4,5,6] value=1 def linear_search(data,value): flag=False

短時傅裏葉變換(Short Time Fourier Transform)原理 Python 實現

src 參考 函數 ade block return 技術 數學公式 def 原理   短時傅裏葉變換(Short Time Fourier Transform, STFT) 是一個用於語音信號處理的通用工具.它定義了一個非常有用的時間和頻率分布類, 其指定了任意信號隨時間

線程池原理python實現

source 實例 以及 代碼 let range python實現 queue 上界 https://www.cnblogs.com/goodhacker/p/3359985.html 為什麽需要線程池   目前的大多數網絡服務器,包括Web服務器、Email服務器以

雜湊原理實現

雜湊表(Hash table,也叫散列表), 是根據關鍵碼值(Key value)而直接進行訪問的資料結構。也就是說,它通過把關鍵碼值對映到表中一個位置來訪問記錄,以加快查詢的速度。這個對映函式叫做雜湊函式,存放記錄的陣列叫做散列表。 雜湊表hash table(key,value) 的做法

dropout原理python實現

dropout原理及python實現 標籤: 神經網路 python dropout引入 我們都知道在訓練神經網路的時候,對於神經網路來說很容易產生過擬合現象,在解決神經網路的過擬合的時候,我們可以使用正則化進行防止過擬合現象的產生,除此之外我們也可以使用dropout

SVM演算法原理Python實現

Svm(support Vector Mac)又稱為支援向量機,是一種二分類的模型。當然如果進行修改之後也是可以用於多類別問題的分類。支援向量機可以分為線性核非線性兩大類。其主要思想為找到空間中的一個更夠將所有資料樣本劃開的超平面,並且使得本本集中所有資料到這個超平面的距離最

url去重 --布隆過濾器 bloom filter原理python實現

array art bits bras pos for tar ack setup https://blog.csdn.net/a1368783069/article/details/52137417 # -*- encoding: utf-8 -*- """This

xgboost原理python實現

目錄 xgboost原理 具體瞭解還是要看Tianqi Chen大佬文章 決策樹分為分類樹和迴歸樹(CART)組成。這裡有一個簡單的的例子,用來區分人們是否會喜歡電腦遊戲。  但是有時候一棵樹並不

樸素貝葉斯原理python實現

一、貝葉斯演算法引入       樸素貝葉斯演算法是基於貝葉斯定理和特徵條件獨立假設的分類法,是一種基於概率分佈的分類演算法。       貝葉斯分類演算法,通俗的來講,在給定資料集的前提下,對於一個

層次聚類演算法的原理python實現

層次聚類(Hierarchical Clustering)是一種聚類演算法,通過計算不同類別資料點間的相似度來建立一棵有層次的巢狀聚類樹。在聚類樹中,不同類別的原始資料點是樹的最低層,樹的頂層是一個聚類的根節點。 聚類樹的建立方法:自下而上的合併,自上而下的分裂。(這裡介紹第一種) 1.2 層次聚類的合

經典排序演算法,氣泡排序,選擇排序,直接插入排序,希爾排序,快速排序,歸併排序,二分查詢。原理python實現

1.氣泡排序 氣泡排序 1.比較相鄰的元素,如果第一個比第二個大(升序),就交換他們兩個 2.對每一對相鄰的元素做同樣的工作,從開始到結尾的最後一對 這步做完後,最後的元素會是最大的數 3.針對所有的元素重複以上的步驟,除了最

PCA 原理 Python 實現

原文發於個人部落格 前言 說好的要做個有逼格的技術部落格,雖然這篇依然沒什麼水平,但總算走出了第一步,希望以後每天都能進步一點點吧! 接觸機器學習也一年多了,也學了很多演算法,而PCA是資料預處理中一個很重要的演算法,當時學習的時候也在網上看了