1. 程式人生 > >[Spark原始碼學習] reduceByKey和groupByKey實現與combineByKey的關係

[Spark原始碼學習] reduceByKey和groupByKey實現與combineByKey的關係

reduceByKey原始碼:

    def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
        """
        Merge the values for each key using an associative and commutative reduce function.

        This will also perform the merging locally on each mapper before
        sending results to a reducer, similarly to a "combiner" in MapReduce.

        Output will be partitioned with C{numPartitions} partitions, or
        the default parallelism level if C{numPartitions} is not specified.
        Default partitioner is hash-partition.

        >>> from operator import add
        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        >>> sorted(rdd.reduceByKey(add).collect())
        [('a', 2), ('b', 1)]
        """
return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)

groupByKey原始碼:

   def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
        """
        Group the values for each key in the RDD into a single sequence.
        Hash-partitions the resulting RDD with numPartitions partitions.

        .. note:: If you are grouping in order to perform an aggregation (such as a
            sum or average) over each key, using reduceByKey or aggregateByKey will
            provide much better performance.

        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        >>> sorted(rdd.groupByKey().mapValues(len).collect())
        [('a', 2), ('b', 1)]
        >>> sorted(rdd.groupByKey().mapValues(list).collect())
        [('a', [1, 1]), ('b', [1])]
        """
def createCombiner(x): return [x] def mergeValue(xs, x): xs.append(x) return xs def mergeCombiners(a, b): a.extend(b) return a memory = self._memory_limit() serializer = self._jrdd_deserializer agg =
Aggregator(createCombiner, mergeValue, mergeCombiners) def combine(iterator): merger = ExternalMerger(agg, memory * 0.9, serializer) merger.mergeValues(iterator) return merger.items() locally_combined = self.mapPartitions(combine, preservesPartitioning=True) shuffled = locally_combined.partitionBy(numPartitions, partitionFunc) def groupByKey(it): merger = ExternalGroupBy(agg, memory, serializer) merger.mergeCombiners(it) return merger.items() return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)

combineByKey()原始碼:

def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
                     numPartitions=None, partitionFunc=portable_hash):
        """
        Generic function to combine the elements for each key using a custom
        set of aggregation functions.

        Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
        type" C.

        Users provide three functions:

            - C{createCombiner}, which turns a V into a C (e.g., creates
              a one-element list)
            - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
              a list)
            - C{mergeCombiners}, to combine two C's into a single one (e.g., merges
              the lists)

        To avoid memory allocation, both mergeValue and mergeCombiners are allowed to
        modify and return their first argument instead of creating a new C.

        In addition, users can control the partitioning of the output RDD.

        .. note:: V and C can be different -- for example, one might group an RDD of type
            (Int, Int) into an RDD of type (Int, List[Int]).

        >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
        >>> def to_list(a):
        ...     return [a]
        ...
        >>> def append(a, b):
        ...     a.append(b)
        ...     return a
        ...
        >>> def extend(a, b):
        ...     a.extend(b)
        ...     return a
        ...
        >>> sorted(x.combineByKey(to_list, append, extend).collect())
        [('a', [1, 2]), ('b', [1])]
        """
        if numPartitions is None:
            numPartitions = self._defaultReducePartitions()

        serializer = self.ctx.serializer
        memory = self._memory_limit()
        agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

        def combineLocally(iterator):
            merger = ExternalMerger(agg, memory * 0.9, serializer)
            merger.mergeValues(iterator)
            return merger.items()

        locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
        shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)

        def _mergeCombiners(iterator):
            merger = ExternalMerger(agg, memory, serializer)
            merger.mergeCombiners(iterator)
            return merger.items()

        return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)

具體分析待續