1. 程式人生 > >【Spark篇】---Spark中Action運算元

【Spark篇】---Spark中Action運算元

一、前述

Action類運算元也是一類運算元(函式)叫做行動運算元,如foreach,collect,count等。Transformations類運算元是延遲執行,Action類運算元是觸發執行。一個application應用程式(就是我們編寫的一個應用程式)中有幾個Action類運算元執行,就有幾個job執行。

二、具體

 原始資料集:

  1、count

返回資料集中的元素數會在結果計算完成後回收到Driver端返回行數

package com.spark.spark.actions;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; /** * count * 返回結果集中的元素數,會將結果回收到Driver端。 * */ public class Operator_count { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local"); conf.setAppName("collect"); JavaSparkContext jsc
= new JavaSparkContext(conf); JavaRDD<String> lines = jsc.textFile("./words.txt"); long count = lines.count(); System.out.println(count); jsc.stop(); } }

 結果:返回行數即元素數

2、take(n)

       first=take(1) 返回資料集中的第一個元素

      返回一個包含資料集n個元素的集合。是一個array有幾個partiotion 會有幾個job觸發

package com.spark.spark.actions;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
 * take
 * 
 * @author root
 *
 */
public class Operator_takeAndFirst {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("take");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        
 
        JavaRDD<String> parallelize = jsc.parallelize(Arrays.asList("a","b","c","d"));
        List<String> take = parallelize.take(2);
        String first = parallelize.first();
        for(String s:take){
            System.out.println(s);
        }
        jsc.stop();
    }
}

結果:

3、foreach

      迴圈遍歷資料集中的每個元素,執行相應的邏輯。

4、collect

     將計算結果回收到Driver端。當資料量很大時就不要回收了,會造成oom.

     一般在使用過濾運算元或者一些能返回少量資料集的運算元後

package com.spark.spark.actions;

import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

/**
 * collect 
 * 將計算的結果作為集合拉回到driver端,一般在使用過濾運算元或者一些能返回少量資料集的運算元後,將結果回收到Driver端列印顯示。
 *
 */
public class Operator_collect {
    public static void main(String[] args) {
        /**
         * SparkConf物件中主要設定Spark執行的環境引數。
         * 1.執行模式
         * 2.設定Application name
         * 3.執行的資源需求
         */
        SparkConf conf = new SparkConf();
        conf.setMaster("local");
        conf.setAppName("collect");
        /**
         * JavaSparkContext物件是spark執行的上下文,是通往叢集的唯一通道。
         */
        JavaSparkContext jsc = new JavaSparkContext(conf);
        JavaRDD<String> lines = jsc.textFile("./words.txt");
        JavaRDD<String> resultRDD = lines.filter(new Function<String, Boolean>() {

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Boolean call(String line) throws Exception {
                return !line.contains("hadoop");
            }
            
        });
        List<String> collect = resultRDD.collect();
        for(String s :collect){
            System.out.println(s);
        }
        
        jsc.stop();
    }
}

結果:

  • countByKey

              作用到K,V格式的RDD上,根據Key計數相同Key的資料集元素。也就是個數)

java程式碼:

package com.spark.spark.actions;

import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;
/**
 * countByKey
 * 
 * 作用到K,V格式的RDD上,根據Key計數相同Key的資料集元素。返回一個Map<K,Object>
 * @author root
 *
 */
public class Operator_countByKey {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("countByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                new Tuple2<Integer,String>(1,"a"),
                new Tuple2<Integer,String>(2,"b"),
                new Tuple2<Integer,String>(3,"c"),
                new Tuple2<Integer,String>(4,"d"),
                new Tuple2<Integer,String>(4,"e")
        ));
        
        Map<Integer, Object> countByKey = parallelizePairs.countByKey();
        for(Entry<Integer,Object>  entry : countByKey.entrySet()){
            System.out.println("key:"+entry.getKey()+"value:"+entry.getValue());
        }
        
        
    }
}

結果:

  • countByValue

           根據資料集每個元素相同的內容來計數。返回相同內容的元素對應的條數。

java程式碼:

package com.spark.spark.actions;

import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;
/**
 * countByValue
 * 根據資料集每個元素相同的內容來計數。返回相同內容的元素對應的條數。
 * 
 * @author root
 *
 */
public class Operator_countByValue {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("countByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaPairRDD<Integer, String> parallelizePairs = sc.parallelizePairs(Arrays.asList(
                new Tuple2<Integer,String>(1,"a"),
                new Tuple2<Integer,String>(2,"b"),
                new Tuple2<Integer,String>(2,"c"),
                new Tuple2<Integer,String>(3,"c"),
                new Tuple2<Integer,String>(4,"d"),
                new Tuple2<Integer,String>(4,"d")
        ));
        
        Map<Tuple2<Integer, String>, Long> countByValue = parallelizePairs.countByValue();
        
        for(Entry<Tuple2<Integer, String>, Long> entry : countByValue.entrySet()){
            System.out.println("key:"+entry.getKey()+",value:"+entry.getValue());
        }
    }
}

 scala程式碼:

package com.bjsxt.spark.actions

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * countByValue
 * 根據資料集每個元素相同的內容來計數。返回相同內容的元素對應的條數。
 */
object Operator_countByValue {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("countByValue")
    val sc = new SparkContext(conf)
     val rdd1 = sc.makeRDD(List("a","a","b"))
    val rdd2 = rdd1.countByValue()
    rdd2.foreach(println)
    sc.stop()
  }
}

 程式碼結果:

java:

scala:

  • reduce

            根據聚合邏輯聚合資料集中的每個元素。(reduce裡面需要具體的邏輯,根據裡面的邏輯對相同分割槽的資料進行計算

java程式碼:

package com.spark.spark.actions;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
/**
 * reduce
 * 
 * 根據聚合邏輯聚合資料集中的每個元素。
 * @author root
 *
 */
public class Operator_reduce {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("reduce");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3,4,5));
        
        Integer reduceResult = parallelize.reduce(new Function2<Integer, Integer, Integer>() {
            
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        System.out.println(reduceResult);
        sc.stop();
    }
}

scala程式碼:

package com.bjsxt.spark.actions

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
 * reduce
 * 
 * 根據聚合邏輯聚合資料集中的每個元素。
 */
object Operator_reduce {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("reduce")
    
    val sc = new SparkContext(conf)
    val rdd1 = sc.makeRDD(Array(1,2))
    
    val result = rdd1.reduce(_+_)
    
    println(result)
    sc.stop()
  }
}

 結果:

java:

scala:

相關推薦

Spark---SparkAction運算元

一、前述 Action類運算元也是一類運算元(函式)叫做行動運算元,如foreach,collect,count等。Transformations類運算元是延遲執行,Action類運算元是觸發執行。一個application應用程式(就是我們編寫的一個應用程式)中有幾個Action類運算元執行,就有幾個job

Spark---SparkTransformations轉換算子

pack gpo rds color boolean long als sam park 一、前述 Spark中默認有兩大類算子,Transformation(轉換算子),懶執行。action算子,立即執行,有一個action算子 ,就有一個job。 通俗些來說由RDD變成

Spark---Spark資源調度源碼分析與應用

部分 app post 類名 inf master 執行過程 efault spark 一、前述 Spark中資源調度是一個非常核心的模塊,尤其對於我們提交參數來說,需要具體到某些配置,所以提交配置的參數於源碼一一對應,掌握此節對於Spark在任務執行過程中的資源分配會更上

Spark---SparkShuffle文件的尋址

sta lock exe 數據 小文件 默認 節點 刪除 提高 一、前述 Spark中Shuffle文件的尋址是一個文件底層的管理機制,所以還是有必要了解一下的。 二、架構圖 三、基本概念: 1) MapOutputTracker MapOutputTracker是Spa

Spark---Sparkyarn模式兩種提交任務方式

一、前述Spark可以和Yarn整合,將Application提交到Yarn上執行,和StandAlone提交模式一樣,Yarn也有兩種提交任務的方式。二、具體           1、yarn-client提交任務方式配置         在client節點配置中spark

Spark--SparkStandalone的兩種提交模式

一、前述Spark中Standalone有兩種提交模式,一個是Standalone-client模式,一個是Standalone-master模式。二、具體        1、Standalone-client提交任務方式提交命令            ./spark-sub

Spark---Spark調優之代碼調優,數據本地化調優,內存調優,SparkShuffle調優,Executor的堆外內存調優

左右 任務調度 combiner flight 觸發 年齡 ans minor 序列化機制 一、前述 Spark中調優大致分為以下幾種 ,代碼調優,數據本地化,內存調優,SparkShuffle調優,調節Executor的堆外內存。 二、具體 1、代碼調優 1、避免創

基礎Mac關於eclipse外掛配置的一些事兒

eclipse 安裝外掛有三種常見的方式: 一、eclipse market,help/eclisep marketplace中搜索相關的外掛安裝,傻瓜式安裝 二、手動安裝,獲取外掛的link,然後通過help/install new software手

Spark---SparkStreaming+Kafka的兩種模式receiver模式和Direct模式

一、前述 SparkStreamin是流式問題的解決的代表,一般結合kafka使用,所以本文著重講解sparkStreaming+kafka兩種模式。 二、具體 1、Receiver模式    原理圖:  receiver模式理解: 在SparkStreaming程式執行起來後,Executor中會有r

Flask 第七Flask的wtforms使用

widget pass nco 自定義 wtforms csrf probably item 比較 一、簡單介紹flask中的wtforms WTForms是一個支持多個web框架的form組件,主要用於對用戶請求數據進行驗證。 安裝: pip3 install wtfo

Keras---利用keras改寫VGG16經典模型在手寫數字識別體的應用

model類 都是 ast 訓練樣本 轉化 一個 h5py 次梯度 窗口 一、前述 VGG16是由16層神經網絡構成的經典模型,包括多層卷積,多層全連接層,一般我們改寫的時候卷積層基本不動,全連接層從後面幾層依次向前改寫,因為先改參數較小的。 二、具體 1、因為本文中代碼需

談談Nancy讓人又愛又恨的Diagnostics

base isa 但是 get sting erro for 就會 一次 原文:談談Nancy中讓人又愛又恨的Diagnostics【上篇】前言 在Nancy中有個十分不錯的功能-Diagnostics,可以說這個功能讓人又愛又恨。 或許我們都做過下面這樣的一些嘗試:

待補充Spark 集群模式 && Spark Job 部署模式

啟動 nbsp -s .cn 一個 sos ref 說明 www 0. 說明   Spark 集群模式 && Spark Job 部署模式 1. Spark 集群模式   [ Local ]  使用一個 JVM 模擬 Spark 集群  

線上直播spark streaming高階特性在ndcg計算實踐

【線上直播】spark streaming高階特性在ndcg計算實踐 ▼ 嘉賓:王富平 王富平 簡介 歷任百度大資料部高階工程師、1號店搜尋與精準化部門架構師,一直從事大資料方向的研發工作,對大資料工具、機器學習有深刻的認知,在實時計算領域經驗豐富,對stor

線上直播Spark對AI的支援及應用

講師:袁方   講師簡介: 目前就職於某大型知名智慧手機企業,對使用者畫像、推薦與廣告相關的大資料AI演算法方面有很深的研究。中山大學碩士畢業,曾在香港從事AI科研,主要研究方向為AI影象及音視訊處理。   分享內容: 1. 瞭解Spark基本架構

線上直播Spark Streaming架構及實踐

講師: 韓老師 講師簡介: 清華大學碩士畢業,擁有超過6年的大資料平臺研發經驗,目前就職於某知名電商大資料平臺團隊,負責異構資料交換及流式計算平臺的研發。熟悉hadoop及spark生態,對資料平臺整體技術架構及流式計算引擎有深入研究。 分享大綱: 流式計算應

戶口換房過程,戶口怎麼遷移?

換房過程中,隨著房屋產權的變更,戶口面臨遷移的問題。 一、賣舊房換新房戶口遷出(新房未到遷戶口時間) 最常用的方法是:與買家協議遷戶時間延後,簽訂戶口遷出承諾書或支付遷戶保險金。 如果屆時買家遷入,您會被擠掉成為當地派出所集體戶口。 當然,也可以選擇在親戚朋友的房產上掛靠戶口。

十五Spark Streaming整合Kafka使用Direct方式(使用Scala語言)

官網介紹 Kafka提供了新的consumer api 在0.8版本和0.10版本之間。0.8的整合是相容0.9和0.10的。但是0.10的整合不相容以前的版本。 這裡使用的整合是spark-streaming-kafka-0-8。官方文件 配置SparkStrea

問題小結專案遇到的問題小結(第一

問題:使用openlayers3的ol.format.WKT類對ol.geom.Circle進行轉換,得到wkt描述傳入後臺進行處理時,chrome一直報錯(geometryEncoder should be defined ),如下: 百度了一下,沒有

block程式設計第五block使用 weak–strong dance 技術避免迴圈引用

----------------------------------------歡迎檢視block連載部落格【點選】-----------------------------------------------【block程式設計第一篇】block語法