1. 程式人生 > >挖挖Hive的程式碼(三)——生成MapReduce(中)

挖挖Hive的程式碼(三)——生成MapReduce(中)

好久沒有更新部落格了,最近終於有空可以繼續分享我對Hive程式碼的剖析了。不想再用上一篇的那種風格來解讀程式碼了,直接上白話版的程式碼解讀吧。

上一篇已經貼過一段模式匹配法處理Operator樹的程式碼,羅列了十多個處理流程。這次就具體展開的解讀一下Hive是如何實現這些處理流程的。

GenMRProcContext (下面簡稱ctx),

記錄了整個遍歷過程中的上下文資訊。

ParseContext(下面簡稱parseCtx),語義分析的結果,包含了整個job的operator樹

TS%:

構造一個新的MapRedTask物件(task),並在ctx中記錄該task為currTask,記錄當前operator為currTopOp;

在parseCtx中找到當前operator對應的alias;

為當前operator構造一個GenMapRedCtx物件,以currTask,currTopOp和alias為引數,並以當前operator為key記錄在ctx中。

(Analyze的操作也在這裡處理,沒研究,暫略)

TS%.*RS%:

通過上一個operator(對應的GenMapRedCtx物件)獲取到:currTask,currPlan,currTopOp,currAliasId等資訊,並將這些資訊儲存在ctx中;

獲取當前operator的child作為reducer(必然是一個MapReduce job的reduce部分的topOp),並從ctx獲取到該reducer對應的task物件;

如果reducer對應的task不存在,說明這個reducer還沒有被遍歷過:

  • 如果currPlan的reduce部分也沒有構造,則呼叫initPlan()方法初始化這個plan:
    • 將該reducer與currTask的對映關係存入ctx;
    • 將該reducer設為currPlan的reduce部分;
    • 將currTask存入ctx的rootTasks中;
    • 如果該reducer是個JoinOperator,會將currPlan的needsTagging設為true
    • 如果currTopOp不在ctx的seenOps中,新增之,並呼叫setTaskPlan()方法設定currTask的源輸入資訊(path2alias的對映關係,partition的裁剪也在這裡實現的)
    • 將currTopOp,currAliasId置空
  • 如果reduce部分已經構造過了(當前這棵TS樹有分叉,有兩個RS,currPlan的reduce部分被另一個RS的child給佔了),則呼叫splitPlan()方法構造一個新的task:
    • 構造一個新的task,將該reducer設為這個task的plan的reduce部分;
    • 將該reducer與這個task的對映關係存入ctx;
    • 呼叫splitTasks()方法關聯這兩個有依賴關係的task(舊的為parent,新的為child):
      • 設定倆task的關聯關係;
      • 生成一個臨時檔案路徑(用來存中間結果);
      • 根據這個臨時檔案路徑,構造一個FileSinkOperator(FS),並用這個FS替換當前(currTask中的,也就是parent)的RS;
      • 構造一個TableScanOperator(TS)作為當前RS的parent;
      • 如果當前RS的reducer是JoinOperator(Join),則這個新生成的TS對應的alias會改成一個特殊的值,並且設定child task的plan的needsTagging為true;
      • 呼叫setTaskPlan()方法為child task設定源輸入資訊(會用到新構造的臨時檔案路徑作為源輸入路徑,如果reducer是Join,map部分的alias就是剛設定的特殊值,否則用臨時檔案路徑代替):
        • 在該方法中,會將前一步關聯起來的TS-RS串新增到新的task的map部分中去。
      • 在ctx中,將currTopOp,currAliasId置空,將child task設為currTask。

如果reducer對應的task已經存在了(join的情況,另一個分支已經遍歷過了),則呼叫joinPlan()方法合併當前task與該reducer對應的task:

  • 將currTopOp在ctx中標記為已遍歷;
  • 判斷當前TS到RS的邏輯是否為mapjoin的小表的邏輯;
  • 呼叫setTaskPlan()方法,在reducer對應的task中,為currTopOp設定對應的源輸入資訊:
    • 在該方法中,會將currTopOp新增到reducer對應的task的map部分中去。
  • 在ctx中,將currTopOp置空,將改reducer對應的task設為currTask

設定該reducer對應的task為currTask(這步確實做了兩次,地方不同);

最後,為當前operator構造一個GenMapRedCtx物件,以currTask,currTopOp和currAliasId為引數,並以當前operator為key記錄在ctx中。

RS%.*RS%:

通過上一個operator(對應的GenMapRedCtx物件)獲取到:currTask,currTopOp,currAliasId等資訊,並將這些資訊儲存在ctx中;

獲取當前operator的child作為reducer(必然是一個MapReduce job的reduce部分的topOp),並從ctx中獲取到該reducer對應的task物件;

如果該reducer物件不存在(還沒有被遍歷過),呼叫splitPlan()方法構造一個新的MapRedTask物件,並依賴於currTask:

  • 通過ctx構造一個新的MapredWork物件,並以此構造一個新的MapRedTask物件(新的task);
  • 將該reducer設為新task的reduce部分;
  • 在ctx中記錄該reducer與這個新task的對映關係;
  • 呼叫splitTasks()方法關聯currTask與這個新的task:
    • 設定task間的依賴關係,currTask為parent,新的task為child;
    • 生成一個臨時檔案路徑(用來存中間結果);
    • 根據這個臨時檔案路徑,構造一個FileSinkOperator(FS),並用這個FS替換當前(currTask中)的RS;
    • 構造一個TableScanOperator(TS)作為當前RS的parent;
    • 如果當前RS的reducer是JoinOperator(Join),則這個新生成的TS對應的alias會改成一個特殊的值,並且設定child task的plan的needsTagging為true;
    • 呼叫setTaskPlan()方法為child task設定源輸入資訊(會用到新構造的臨時檔案路徑作為源輸入路徑,如果reducer是Join,map部分的alias就是剛設定的特殊值,否則用臨時檔案路徑代替):
      • 在該方法中,會將前一步關聯起來的TS-RS串新增到新的task的map部分中去。
    • 在ctx中,將currTopOp,currAliasId置空,將child task設為currTask。

否則這個reducer必然已經被遍歷過了,呼叫joinPlan()方法,將該reducer對應的task關聯到currTask:

  • 設定task間的依賴關係:為currTask增加一個依賴於它的task(reducer對應的task);
  • 生成一個臨時檔案路徑(用來存中間結果);
  • 根據這個臨時檔案路徑,構造一個FileSinkOperator(FS),並用這個FS替換當前(currTask中)的RS;
  • 構造一個TableScanOperator(TS)作為當前RS的parent;
  • 如果當前RS的reducer是JoinOperator(Join),則這個新生成的TS對應的alias會改成一個特殊的值,並且設定child task的plan的needsTagging為true;
  • 呼叫setTaskPlan()方法為child task設定源輸入資訊(會用到新構造的臨時檔案路徑作為源輸入路徑,如果reducer是Join,map部分的alias就是剛設定的特殊值,否則用臨時檔案路徑代替):
    • 在該方法中,會將前一步關聯起來的TS-RS串新增到reducer對應的task的map部分中去。
  • 在ctx中,將currTopOp,currAliasId置空,將reducer對應的task設為currTask。

最後,為當前operator構造一個GenMapRedCtx物件,以currTask,currTopOp和currAliasId為引數,並以當前operator為key記錄在ctx中。

FS%:

關聯moveTask;如有必要,新增statsTask;如有必要,新增merge files所必要的job。

TS%.*MAPJOIN%:

通過MapjoinOperator(MO)的parent獲取currTask, currPlan, currAliasId;

在這個處理過程中,reducer就是當前MO;

從ctx中獲取該reducer對應的task;

如果reducer對應的task不存在,呼叫initMapjoinPlan()就行初始化:

  • 將該reducer(MO)與currTask的對映關係記錄在ctx中;
  • 將currTask加入ctx的root tasks中;
  • 在ctx中標註currTopOp為已遍歷;
  • 呼叫setTaskPlan(),在reducer對應的task(currTask)中,為currTopOp設定對應的源輸入資訊,並將currTopOp新增到currTask的map部分中去;
  • 在ctx中,將currTopOp,currAliasId置空,將currTask設為currTask

如果reducer對應的task已經存在了(mapjoin的另一路已經被遍歷過),則呼叫joinPlan()方法合併當前task與該reducer對應的task:

  • 判斷當前TS到MO的邏輯是否為mapjoin的小表的邏輯;
  • 呼叫setTaskPlan(),在reducer對應的task(currTask)中,為currTopOp設定對應的源輸入資訊:
    • 如果currTopOp是個小表的TS,會在currTask的plan中生成一個MapLocalWork物件,源輸入的資訊是以alias -> FetchWork的形式存在。
  • 在ctx中,將currTopOp置空,將改reducer對應的task設為currTask

(未完,待續……)