1. 程式人生 > >Learning Spark中文版--第六章--Spark高級編程(2)

Learning Spark中文版--第六章--Spark高級編程(2)

做的 sin exchange lds 距離 應用 learning unix 調整

Working on a Per-Partition Basis
(基於分區的操作)

以每個分區為基礎處理數據使我們可以避免為每個數據項重做配置工作。如打開數據庫連接或者創建隨機數生成器這樣的操作,我們希望避免為每個元素重做配置工作。Spark有分區版本的mapforeach,通過讓RDD的每個分區只運行一次代碼,可幫助降低這些操作的成本。

回到我們的呼號例子中,有一個無線電臺呼號的在線數據庫,我們可以查詢聯系日誌的公共列表。通過使用基於分區的操作,我們可以分享數據庫的連接池來避免為多個連接配置連接參數,並且可以復用JSON解析器。如Example6-10到6-12所示,我們使用mapPartitions()

函數,這個函數會給我們一個輸入RDD每個分區中元素的叠代器,並且期望我們的返回是一個結果的叠代器。

Example 6-10. Shared connection pool in Python

def processCallSigns(signs):
    """Lookup call signs using a connection pool"""
    # Create a connection pool
    http = urllib3.PoolManager()
    # the URL associated with each call sign record
    urls = map(lambda x: "http://73s.com/qsos/%s.json" % x, signs)
    # create the requests (non-blocking)
    requests = map(lambda x: (x, http.request('GET', x)), urls)
    # fetch the results
    result = map(lambda x: (x[0], json.loads(x[1].data)), requests)
    # remove any empty results and return
    return filter(lambda x: x[1] is not None, result)
    
def fetchCallSigns(input):
    """Fetch call signs"""
    return input.mapPartitions(lambda callSigns : processCallSigns(callSigns))
    
contactsContactList = fetchCallSigns(validSigns)

Example 6-11. Shared connection pool and JSON parser in Scala

val contactsContactLists = validSigns.distinct().mapPartitions{
    signs =>
    val mapper = createMapper()
    val client = new HttpClient()
    client.start()
    // create http request
    signs.map {sign =>
        createExchangeForSign(sign)
    // fetch responses
    }.map{ case (sign, exchange) =>
        (sign, readExchangeCallLog(mapper, exchange))
    }.filter(x => x._2 != null) // Remove empty CallLogs
}


Example 6-12. Shared connection pool and JSON parser in Java

// Use mapPartitions to reuse setup work.
JavaPairRDD<String, CallLog[]> contactsContactLists =
    validCallSigns.mapPartitionsToPair(
    new PairFlatMapFunction<Iterator<String>, String, CallLog[]>() {
        public Iterable<Tuple2<String, CallLog[]>> call(Iterator<String> input) {
        // List for our results.
            ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();
            ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();
            ObjectMapper mapper = createMapper();
            HttpClient client = new HttpClient();
            try {
                client.start();
                while (input.hasNext()) {
                    requests.add(createRequestForSign(input.next(), client));
                }
                for (Tuple2<String, ContentExchange> signExchange : requests) {
                    callsignLogs.add(fetchResultFromRequest(mapper, signExchange));
                }
            } catch (Exception e) {
            }
            return callsignLogs;
        }});
        
System.out.println(StringUtils.join(contactsContactLists.collect(), ","));

當使用基於分區的操作時,Spark會給函數一個分區中元素的叠代器。我們的返回結果也是一個叠代器。除了mapPartitions()之外,Spark還有很多其他分區操作,如表6-1中所示:

函數名 所調用內容 返回類型 RDD[T]函數簽名
mapPartitions() 分區元素的叠代器 返回元素的叠代器 f: (Iterator[T]) ->Iterator[U]
mapPartitionsWithIndex() 分區數量和分區元素的叠代器 返回元素的叠代器 f: (Int, Iterator[T]) -> Iterator[U]
foreachPartition()
元素叠代器 f: (Iterator[T]) -> Unit

除了可以避免重復的配置工作,我們有時還可以使用mapPartitions()來避免創建對象的開銷。有時我們需要創建一個對象來聚合不同類型的結果。回想一下第三章我們計算平均數的時候,當時把數值RDD轉換成了元組RDD,這樣便於跟蹤在歸約步驟中元素處理的數量從而計算平均值。除了這種對每個元素都處理的方式,我們可以為每個分區創建一次元組,示例如下:

Example 6-13. Average without mapPartitions() in Python

def combineCtrs(c1, c2):
    return (c1[0] + c2[0], c1[1] + c2[1])

def basicAvg(nums):
    """Compute the average"""
    nums.map(lambda num: (num, 1)).reduce(combineCtrs)

Example 6-14. Average with mapPartitions() in Python
def partitionCtr(nums):
    """Compute sumCounter for partition"""
    sumCount = [0, 0]
    for num in nums:
        sumCount[0] += num
        sumCount[1] += 1
    return [sumCount]

def fastAvg(nums):
    """Compute the avg"""
    sumCount = nums.mapPartitions(partitionCtr).reduce(combineCtrs)
    return sumCount[0] / float(sumCount[1])

Piping to External Programs(連接外部程序)

Spark提供三種開箱即用的語言支持,你似乎在編寫Spark程序時不會遇到語言上的障礙。但是,如果Scala,Java和Python都不是你想要的,那也沒關系,SPark提供了一個使用其他語言(如R腳本)傳輸數據給程序的機制,。

Spark在RDD上提供了一個pipe()方法。這個方法可以讓我們使用其他語言編寫job,但前提是這種語言支持Unix標準流的讀入和寫出。使用pipe(),可以編寫一個RDD轉換,它從標準輸入讀取每個RDD元素作為String,接著按照需要操作該String,然後將結果作為String寫入標準輸出。提供的接口和編程模型是受到了一些限制,但是有時後你需要做的就是像寫在map()filter()中使用的本地代碼函數一樣。

你可能會想把RDD的內容傳輸給外部程序或者腳本,因為你可能已經構建並測試了一個復雜的軟件,而且你想在Spark中重用他。有很多數據科學家有R語言寫的代碼,我們可以通過pipe()函數與R程序進行交互。

在Example6-15中,我們使用R語言的庫來計算所有聯系人的距離。RDD中的每個元素通過換行符作為分割寫出,並且程序輸出的每一行都是結果RDD中的字符串元素。為了使R程序轉換輸入數據簡便一些,我們把數據轉換成mylat, mylon, theirlat, theirlon格式。我們以逗號作為分隔符。

Example 6-15. R distance program
#!/usr/bin/env Rscript
library("Imap")
f <- file("stdin")
open(f)
while(length(line <- readLines(f,n=1)) > 0) {
    # process line
    contents <- Map(as.numeric, strsplit(line, ","))
    mydist <- gdist(contents[[1]][1], contents[[1]][2],
                    contents[[1]][3], contents[[1]][4],
                    units="m", a=6378137.0, b=6356752.3142, verbose = FALSE)
    write(mydist, stdout())
}

如果把上述代碼寫入一個名為./src/R/finddistance.R的可執行文件,那麽使用方法大體如下:

$ ./src/R/finddistance.R
37.75889318222431,-122.42683635321838,37.7614213,-122.4240097
349.2602
coffee
NA
ctrl-d

目前為止很不錯,我們學會了從stdin讀取的每一行轉換成stdout輸出的方法了。現在我們需要讓每個工作節點都能使用finddistance.R並且能讓shell腳本轉換我們的RDD。完成這兩點其實在Spark中很簡單,示例如下:

Example 6-16. Driver program using pipe() to call finddistance.R in Python

# Compute the distance of each call using an external R program
distScript = "./src/R/finddistance.R"
distScriptName = "finddistance.R"
sc.addFile(distScript)
def hasDistInfo(call):
    """Verify that a call has the fields required to compute the distance"""
    #驗證計算距離的必要字段
    requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
    return all(map(lambda f: call[f], requiredFields))
def formatCall(call):
    """Format a call so that it can be parsed by our R program"""
    #格式話數據便於R程序解析
    return "{0},{1},{2},{3}".format(
        call["mylat"], call["mylong"],
        call["contactlat"], call["contactlong"])
        
pipeInputs = contactsContactList.values().flatMap(
    lambda calls: map(formatCall, filter(hasDistInfo, calls)))
distances = pipeInputs.pipe(SparkFiles.get(distScriptName))
print distances.collect()

Example 6-17. Driver program using pipe() to call finddistance.R in Scala

// Compute the distance of each call using an external R program
// adds our script to a list of files for each node to download with this job
//通過外部的R程序計算每個呼叫的距離
//把腳本添加到這個job中每個節點都要下載的文件列表
val distScript = "./src/R/finddistance.R"
val distScriptName = "finddistance.R"
sc.addFile(distScript)
val distances = contactsContactLists.values.flatMap(x => x.map(y =>
    s"$y.contactlay,$y.contactlong,$y.mylat,$y.mylong")).pipe(Seq(
        SparkFiles.get(distScriptName)))
println(distances.collect().toList)

Example 6-18. Driver program using pipe() to call finddistance.R in Java

// Compute the distance of each call using an external R program
// adds our script to a list of files for each node to download with this job
String distScript = "./src/R/finddistance.R";
String distScriptName = "finddistance.R";
sc.addFile(distScript);
JavaRDD<String> pipeInputs = contactsContactLists.values()
    .map(new VerifyCallLogs()).flatMap(
    new FlatMapFunction<CallLog[], String>() {
        public Iterable<String> call(CallLog[] calls) {
            ArrayList<String> latLons = new ArrayList<String>();
            for (CallLog call: calls) {
                latLons.add(call.mylat + "," + call.mylong +
                        "," + call.contactlat + "," + call.contactlong);
            }
            return latLons;
        }
    });
JavaRDD<String> distances = pipeInputs.pipe(SparkFiles.get(distScriptName));
System.out.println(StringUtils.join(distances.collect(), ","));

使用SparkContext.addFile(path)方法,我們可以構建一個在單個job中每個工作節點都要下載的文件列表。這些文件可以來自驅動程序的本地文件系統(如例子中那樣),或者來自HDFS或其他Hadoop支持的文件系統,也可以是一個HTTP,HTTPSFTP的URI。當一個action在一個job中運行,每個節點都會開始下載這些文件。這些文件可以在工作節點的SparkFiles.getRootDirectory路徑或者SparkFiles.get(filename)中找到。當然,這只是確定pipe()方法能夠在每個工作節點上找到腳本文件的方法之一。你可以使用其他的遠程復制工具來將腳本文件放在每個節點可知位置上。

使用SparkContext.addFile(path)添加的所有文件都存放在同一個目錄中,因此使用唯一名稱非常重要。

一旦確定了腳本是可以使用的,RDD上的pipe()方法可以很簡單地把RDD中的元素傳輸給腳本。也許更智能的findDistance版本會接受SEPARATOR作為命令行參數。在那種情況下,以下任何一個都可以完成這項工作,盡管第一個是首選:

  • rdd.pipe(Seq(SparkFiles.get("finddistance.R"), ","))
  • rdd.pipe(SparkFiles.get("finddistance.R") + " ,")

第一種方式,我們把命令的調用當做具有位置的元素序列來傳遞(命令本身在零偏移的位置上);第二種方式,我們把它作為一個單獨的命令字符串傳遞,然後Spark會分解成位置參數 。

我們也可以在需要的情況下明確設置pipe()的環境變量。直接把環境變量的map作為pipe()的第二個參數,Spark會去設置這些在值。

你現在應該至少理解了如何使用pipe()來利用外部命令處理RDD的元素,還有如何將命令腳本分發到集群中工作節點能找到的位置。

Numeric RDD Operations(數值RDD的操作)

Spark提供了幾種包含數值數據RDD的描述性統計操作。除此之外,更復雜的統計和機器學習方法之外,我們將在後面的第11章中介紹這些方法。

Spark的數值操作是通過流式算法實現的,它允許一次構建我們模型的一個元素。描述性統計對數據一次性計算出結果並且調用stats()會返回一個StatsCounter對象。表6-2介紹了StatsCounter對象的方法。

Table 6-2. Summary statistics available fromStatsCounter

方法 作用
count() 元素數量
mean() 元素平均值
sum() 總值
max() 最大值
min() 最小值
variance() 方差
sampleVariance() 樣本方差
stdev() 標準差
sampleStdev() 樣本標準差

如果您只想計算其中一個統計數據,那麽也可以直接在RDD上調用相應的方法,例如rdd.mean()rdd.sum()

在例6-19到6-21中,我們將使用匯總統計來從我們的數據中刪除一些異常值。因為我們可能會遍歷相同的RDD兩次(一次是匯總統計,第二次是刪除異常值),所以將RDD緩存起來。回到我們的呼叫日誌的例子上,我們可以從呼叫日誌上刪除距離太遠的聯系地點。

Example 6-19. Removing outliers in Python

# Convert our RDD of strings to numeric data so we can compute stats and
# remove the outliers.
distanceNumerics = distances.map(lambda string: float(string))
stats = distanceNumerics.stats()
stddev = std.stdev()
mean = stats.mean()
reasonableDistances = distanceNumerics.filter(
    lambda x: math.fabs(x - mean) < 3 * stddev)
print reasonableDistances.collect()

Example 6-20. Removing outliers in Scala

// Now we can go ahead and remove outliers since those may have misreported locations
// first we need to take our RDD of strings and turn it into doubles.
val distanceDouble = distance.map(string => string.toDouble)
val stats = distanceDoubles.stats()
val stddev = stats.stdev
val mean = stats.mean
val reasonableDistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev)
println(reasonableDistance.collect().toList)


Example 6-21. Removing outliers in Java

// First we need to convert our RDD of String to a DoubleRDD so we can
// access the stats function
JavaDoubleRDD distanceDoubles = distances.mapToDouble(new DoubleFunction<String>() {
    public double call(String value) {
        return Double.parseDouble(value);
    }});
final StatCounter stats = distanceDoubles.stats();
final Double stddev = stats.stdev();
final Double mean = stats.mean();
JavaDoubleRDD reasonableDistances =
    distanceDoubles.filter(new Function<Double, Boolean>() {
        public Boolean call(Double x) {
            return (Math.abs(x-mean) < 3 * stddev);}});
System.out.println(StringUtils.join(reasonableDistance.collect(), ","));

配合累加器和廣播變量,分區處理,外部程序接口和匯總統計,我們最終完成了示例程序。
完整的代碼在 src/python/ChapterSixExample.py,src/main/scala/com/oreilly/learningsparkexamples/scala/ChapterSixExample.scala,和src/main/java/com/oreilly/learningsparkexamples/java/ChapterSixExample.java這幾個文件之中。

Conclusion(總結)

這一章節,我們介紹了一些Spark的高級編程特性,你可以使用這些特性來提高你程序的效率或可讀性。後續章節將介紹部署和調整Spark應用程序,以及用於SQL,流媒體以及機器學習的內置庫。我們也將開始看到更復雜和更完整的示例應用程序,它們利用了迄今為止描述的大部分功能,並且也會對你自己使用Spark的方式有所指導或啟發。

Learning Spark中文版--第六章--Spark高級編程(2)