1. 程式人生 > >Spark如何實現MapReduce中的setup和cleanup方法

Spark如何實現MapReduce中的setup和cleanup方法



在MapReduce中,Mapper和Reducer可以宣告一個setup方法,在處理一個split輸入之前執行,來進行分配資料庫連線等昂貴資源,同時可以用cleanup函式可以釋放資源。

public class  SetupCleanupMapper extends

    Mapper<LongWritable, Text, Text,  IntWritable> {

  private Connection dbConnection;

 

  @Override

  protected void setup(Context context) {

    dbConnection = ...;

  }

 

  ...

  @Override

  protected void cleanup(Context context) {

    dbConnection.close();

  }

}


Spark中的map和flatMap等方法每次只能在一個input(一行)上操作,而且沒有提供在轉換大批值前後執行程式碼的方法。

但是可以用mapPartitions或mapPartitionsToPair方法來實現類似setup的目的。

mapPartitions方法和map方法類似,只不過對映函式的引數由RDD中的每一個元素變成了RDD中每一個分割槽的迭代器。如果在對映的過程中需要頻繁建立額外的物件,使用mapPartitions要比map高效的過。

比如,將RDD中的所有資料通過JDBC連線寫入資料庫,如果使用map函式,可能要為每一個元素都建立一個connection,這樣開銷很大,如果使用mapPartitions,那麼只需要針對每一個分割槽建立一個connection。

JavaRDD<Integer> mapOrder = sc.textFile(logFile,3).map(new Function<String,Integer>(){//讀取文字,分成3個分割槽

			public Integer call(String v1) throws Exception {
				// TODO Auto-generated method stub
				return Integer.parseInt(v1);
			}

			
        	
        });

TTLPartition ttl=new TTLPartition();
       // ttl.setup();
        JavaPairRDD<Integer, String> res1Pair=
        		mapOrder.mapPartitionsToPair( ttl).partitionBy(new HashPartitioner(1)).//ttl物件是你要處理資料的邏輯
        		reduceByKey(new Function2<String,String,String>(){

					public String call(String v1, String v2) throws Exception {
						// TODO Auto-generated method stub
						return v1+v2;
					}}).sortByKey();


mapPartitionsToPair的call方法實現如下:

public Iterator<Tuple2<Integer, String>> call(Iterator<Integer> t) throws Exception {
		// TODO Auto-generated method stub
		setup();//這樣就實現mapreduce中對每一個split做預處理,之後才是該split中每一個數據的處理邏輯:迭代器t遍歷split,每個資料執行一次map方法
		int vi=0;
		while(t.hasNext())
		{
			vi=t.next();
			map(vi);
		}
                cleanup();

相關推薦

Spark如何實現MapReducesetupcleanup方法

 在MapReduce中,Mapper和Reducer可以宣告一個setup方法,在處理一個split輸入之前執行,來進行分配資料庫連線等昂貴資源,同時可以用cleanup函式可以釋放資源。 public class SetupCleanupMapper extend

JUnit測試setup()teardown()方法

       這幾天做Junit測試接觸到了setup和teardown兩個方法,簡單的可以這樣理解它們,setup主要實現測試前的初始化工作,而teardown則主要實現測試完成後的垃圾回收等工作。

SparkDenseMatrixvalues()toArray方法的區別

    之前一直以為DenseMatrix中的values()和toArray方法獲取到的矩陣的資料是一樣的,結果今日一次矩陣轉置測試時發現兩者獲取到的資料是不一樣的,values()獲取到的資料是將DenseMatrix中的資料以行優先的形式將矩陣中的資料儲存到陣列中,而

MapReduce階段map的setup() cleanup()

setup() 此方法被MapReduce框架僅且執行一次,在執行Map任務前,進行相關變數或者資源的集中初始化工作。若是將資源初始化工作放在方法map()中,導致Mapper任務在解析每一行輸入時都會進行資源初始化工作,導致重複,程式執行效率不高! c

Spark機器學習mlmllib矩陣、向量

int reg index mac matrix 對比 判斷 bsp ive 1:Spark ML與Spark MLLIB區別? Spark MLlib是面向RDD數據抽象的編程工具類庫,現在已經逐漸不再被Spark團隊支持,逐漸轉向Spark ML庫,Spark ML是面

servlet表單getpost方法的區別

pos span 轉化 不可見 上傳文件 post div font 支持 Form中的get和post方法,在數據傳輸過程中分別對應了HTTP協議中的GET和POST方法。二者主要區別如下:1、Get是用來從服務器上獲得數據,而Post是用來向服務器上傳遞數據。2、Get

Javawaitsleep方法的區別

lee join 告訴 inter art 過程 lam 兩個 一次 1、兩者的區別 這兩個方法來自不同的類分別是Thread和Object 最主要是sleep方法沒有釋放鎖,而wait方法釋放了鎖,使得其他線程可以使用同步控制塊或者方法(鎖代碼塊和方法鎖)。 w

淺析c#==操作符equals方法

邏輯 mce 需求 ram margin width 通過 否則 可用   在之前的文章中,我們講到了使用C#中提供的Object類的虛Equals方法來判斷Equality,但實際上它還提供了另外一種判斷Equality的方法,那就是使用==運算符。許多童鞋也許會想當然的

java sendredirect()forward()方法的區別

rect 次數 報錯 nec 重定向 web服務 單獨 exception aca 一.文章1 HttpServletResponse.sendRedirect與RequestDispatcher.forward方法都可以實現獲取相應URL資源。 sendRedirect

mapreducemapreduce個數

case when 生成 task 輸入 slots align reducer 進行 很多 一、 控制hive任務中的map數: 1. 通常情況下,作業會通過input的目錄產生一個或者多個map任務。 主要的決定因素有: input的文件總個數,input的

javascriptencodeURIdecodeURI方法

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Java hashCode()equals()方法

Java中,涉及到兩個物件的比較時,我們會用到hashCode()和equals()。這兩個方法是Object類中定義的方法。 1. api中的描述 (1)hashCode() hashCode()方法給物件返回一個hash code值。這個方法被用於hash tables,

Javasynchronized同步方法

在多執行緒中,有一個經典問題:存票售票問題 如果只用兩個Thread子類則容易陷入死迴圈。 有一個很好的解決辦法就是synchronized。 方法一:在thread子類的run中直接通過synchronized來申請物件的鎖旗標,即用synchronized把存售票程式碼框起來。 方法二:在票類中直

面試官問:能否模擬實現JS的callapply方法

之前寫過兩篇《面試官問:能否模擬實現JS的new操作符》和《面試官問:能否模擬實現JS的bind方法》 其中模擬bind方法時是使用的call和apply修改this指向。但面試官可能問:能否不用call和apply來實現呢。意思也就是需要模擬實現call和apply的了。 附上之前寫文章寫過的一段

關於pythonlociloc方法

pandas以類似字典的方式來獲取某一列的值 import pandas as pd import numpy as np table = pd.DataFrame(np.zeros((4,2)), index=['a','b','c','d'], columns=['left', 'right'])

javaset()get()方法的理解

1.名詞理解 從名字看set是設定的意思而get是獲取的意思,所以顧名思義這兩個方法是對資料進行設定和獲取操作的,我們往往不會單獨的使用它們而是用一些修飾詞配合使用,比如setname(), getname() ,setage(), getage(),等等 2.使用場景 JAVA

JavaScriptcallapply方法的使用

acvaScript中的call()方法和apply()方法,在某些時候這兩個方法還確實是十分重要的。1. 每個函式都包含兩個非繼承而來的方法:call()方法和apply()方法。2. 相同點:這兩個方法的作用是一樣的。都是在特定的作用域中呼叫函式,等於設定函式體內this物件的值,以擴充函式賴以執行的作用

HTTPGETPOST方法的區別

HTTP請求的方法有很多:GET、POST、HEAD、TRACE、OPTIONS等,但是GET和POST是兩個最常用的方法。 GET是最簡單的一種請求方法,其主要功能是從伺服器端獲取使用者所需資源,並將其作為響應返回給客戶端,需要注意的是:GET方法的作用主要用來獲取伺

javacompareTocompare方法之比較

這兩個方法經常搞混淆,現對其進行總結以加深記憶。 compareTo(Object o)方法是java.lang.Comparable介面中的方法,當需要對某個類的物件進行排序時,該類需要實現Comparable介面的,必須重寫public int compar

pandasapplytransform方法的效能比較

1. apply與transform 首先講一下apply() 與transform()的相同點與不同點 相同點: 都能針對dataframe完成特徵的計算,並且常常與groupby()方法一起使用。 不同點: apply()裡面可以跟自定義的函式,包括簡單的求和函式以及複雜的特徵間的差值函式等(注:appl