1. 程式人生 > >pyspark-Spark程式設計指南

pyspark-Spark程式設計指南

參考:

1、http://spark.apache.org/docs/latest/rdd-programming-guide.html

2、https://github.com/apache/spark/tree/v2.2.0

Spark程式設計指南

連線Spark

from pyspark import SparkContext, SparkConf

初始化Spark

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf
=conf)

使用Shell

$ ./bin/pyspark --master local[4]

$ ./bin/pyspark --master local[4] --py-files code.py

彈性分散式資料集 (RDDs)

並行集合

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

外部資料集

# Text file RDDs can be created using SparkContext’s textFile method.
# This method takes an URI for the file (either a local path on the machine,
# or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. >>> distFile = sc.textFile("data.txt") textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz"). # lets you read a directory containing multiple small text files SparkContext.wholeTextFiles RDD.saveAsPickleFile and
SparkContext.pickleFile # support saving an RDD in a simple format consisting of pickled Python objects.

儲存和載入SequenceFiles

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

儲存和載入其他Hadoop輸入/輸出格式

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})

RDD 操作

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

lineLengths.persist()

傳遞函式到Spark

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)


class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)


class MyClass(object):
    def __init__(self):
        self.field = "Hello"
def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

瞭解關閉

例子

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

使用鍵值對

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

變數共享

廣播變數

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value  # list
[1, 2, 3]
da=sc.parallelize(broadcastVar.value) # -->RDD

累加器
>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
>>> accum.value # <type 'int'>
10
class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.