1. 程式人生 > >Spark Java程式案例入門

Spark Java程式案例入門

spark 安裝模式:
local(本地模式):常用於本地開發測試,本地還分為local單執行緒和local-cluster多執行緒
standalone(叢集模式):典型的Mater/slave模式,不過也能看出Master是有單點故障的;Spark支援ZooKeeper來實現 HA
on yarn(叢集模式): 執行在 yarn 資源管理器框架之上,由 yarn 負責資源管理,Spark 負責任務排程和計算
on mesos(叢集模式): 執行在 mesos 資源管理器框架之上,由 mesos 負責資源管理,Spark 負責任務排程和計算

on cloud(叢集模式):比如 AWS 的 EC2,使用這個模式能很方便的訪問 Amazon的 S3;Spark 支援多種分散式儲存系統:HDFS 和 S3

安裝:
1.spark standalone模式 需要hadoop 的HDFS作為持久層
jdk1.6以上
安裝hadoop叢集請參考:http://blog.csdn.net/m0_37739193/article/details/71222673

2.安裝scala(三臺都要安裝)
[[email protected] ~]$ tar -zxvf scala-2.10.6.tgz
[[email protected] ~]$ tar -zxvf scala-2.10.6.tgz
[[email protected] ~]$ tar -zxvf scala-2.10.6.tgz

3.安裝spark
[

[email protected] ~]$ tar -zxvf spark-1.3.1-bin-hadoop2.6.tgz 

[[email protected] ~]$ vi .bash_profile

[[email protected] ~]$ vi .bash_profile 
export SPARK_HOME=/home/hadoop/spark-1.3.1-bin-hadoop2.6
export SCALA_HOME=/home/hadoop/scala-2.10.6
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin:$SCALA_HOME/bin
[[email protected] ~]$ source .bash_profile

4.配置spark的 configuration檔案
[[email protected] ~]$ cd spark-1.1.0/conf
[[email protected] conf]$ cp spark-env.sh.template spark-env.sh
[[email protected] conf]$ vi spark-env.sh
新增:

export JAVA_HOME=/usr/jdk1.7.0_25/
export SPARK_MASTER_IP=h40
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
#在spark-1.6.0-bin-hadoop2.6和spark-1.5.0-cdh5.5.2版本中為export SPARK_EXECUTOR_INSTANCES=1
export SPARK_WORKER_MEMORY=1g

5.配置slaves
[[email protected] conf]$ vi slaves 
h41
h42


6.同步到其他節點
[[email protected] ~]$ scp -r spark-1.1.0 h41:/home/hadoop
[[email protected] ~]$ scp -r spark-1.1.0 h42:/home/hadoop

7.啟動spark
[[email protected] spark-1.3.1-bin-hadoop2.6]$ sbin/start-all.sh

8.驗證
[[email protected] ~]$ jps
主節點有 master程序
8861 Master

[[email protected] ~]$ jps
[[email protected] ~]$ jps
從節點有 Worker程序
8993 Worker

案例一:(spark-1.3.1-bin-hadoop2.6版本自帶的WordCount例子)

[[email protected] examples]$ pwd
/home/hadoop/spark-1.3.1-bin-hadoop2.6/examples/src/main/java/org/apache/spark/examples
[[email protected] examples]$ ls
JavaHdfsLR.java  JavaLogQuery.java  JavaPageRank.java  JavaSparkPi.java  JavaStatusTrackerDemo.java  JavaTC.java  JavaWordCount.java  ml  mllib  sql  streaming
[[email protected] examples]$ cat JavaWordCount.java 

package org.apache.spark.examples;

import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class JavaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
      System.err.println("Usage: JavaWordCount <file>");
      System.exit(1);
    }

    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    JavaRDD<String> lines = ctx.textFile(args[0], 1);

    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String s) {
        return Arrays.asList(SPACE.split(s));
      }
    });

    JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
      @Override
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
      }
    });

    JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });

    List<Tuple2<String, Integer>> output = counts.collect();
    for (Tuple2<?,?> tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());
    }
    ctx.stop();
  }
}

在hdfs中建立相應檔案:
[[email protected] ~]$ vi hehe.txt
hello world
hello hadoop
hello hive
[[email protected] ~]$ hadoop fs -mkdir /spark
[[email protected] ~]$ hadoop fs -put hehe.txt /spark

然後進入spark的家目錄執行命令:
[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --master spark://h40:7077 --name JavaWordCountByHQ --class org.apache.spark.examples.JavaWordCount --executor-memory 500m --total-executor-cores 2 lib/spark-examples-1.3.1-hadoop2.6.0.jar hdfs://h40:9000/spark/hehe.txt

。。。(輸出內容太多省略)
hive: 1
hadoop: 1
hello: 3
world: 1
。。。
(這個案例感覺和執行hadoop的mapreduce似的,這個也是離線處理)

案例二(spark streaming):
本來spark也自帶了例子,但我沒有成功,按官方步驟先開啟nc -lk 9999端,再在spark的家目錄下執行bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount h40 9999無法達到想要的結果,一直顯示如下:

17/06/20 21:23:58 INFO dstream.SocketReceiver: Connected to h40:9999
17/06/20 21:23:59 INFO scheduler.JobScheduler: Added jobs for time 1497965039000 ms
17/06/20 21:24:00 INFO scheduler.JobScheduler: Added jobs for time 1497965040000 ms
17/06/20 21:24:01 INFO scheduler.JobScheduler: Added jobs for time 1497965041000 ms
17/06/20 21:24:02 INFO scheduler.JobScheduler: Added jobs for time 1497965042000 ms
17/06/20 21:24:03 INFO scheduler.JobScheduler: Added jobs for time 1497965043000 ms
17/06/20 21:24:04 INFO scheduler.JobScheduler: Added jobs for time 1497965044000 ms
17/06/20 21:24:05 INFO scheduler.JobScheduler: Added jobs for time 1497965045000 ms
17/06/20 21:24:06 INFO scheduler.JobScheduler: Added jobs for time 1497965046000 ms
17/06/20 21:24:07 INFO scheduler.JobScheduler: Added jobs for time 1497965047000 ms

[[email protected] streaming]$ ls
JavaCustomReceiver.java  JavaFlumeEventCount.java  JavaNetworkWordCount.java  JavaQueueStream.java  JavaRecoverableNetworkWordCount.java  JavaStatefulNetworkWordCount.java
[[email protected] streaming]$ pwd
/home/hadoop/spark-1.3.1-bin-hadoop2.6/examples/src/main/java/org/apache/spark/examples/streaming
[[email protected] streaming]$ cat JavaNetworkWordCount.java

package org.apache.spark.examples.streaming;

import scala.Tuple2;
import com.google.common.collect.Lists;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.util.regex.Pattern;

/**
 * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
 *
 * Usage: JavaNetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
 */
public final class JavaNetworkWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) {
    if (args.length < 2) {
      System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
      System.exit(1);
    }

    StreamingExamples.setStreamingLogLevels();

    // Create the context with a 1 second batch size
    SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

    // Create a JavaReceiverInputDStream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
            args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });

    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();
  }
}

然後我將該程式碼修改了下才成功:
package org.apache.spark.examples.streaming;  
  
import java.util.regex.Pattern;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

import com.google.common.collect.Lists;
  
public final class JavaNetworkWordCount {  
  private static final Pattern SPACE = Pattern.compile(" ");  
  
  public static void main(String[] args) {  
  
    StreamingExamples.setStreamingLogLevels();
  
    SparkConf sparkConf = new SparkConf().setAppName("wordcount").setMaster("local[2]");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));  
  
    JavaReceiverInputDStream<String> lines = ssc.socketTextStream("h40", 9999);

    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {  
      @Override  
      public Iterable<String> call(String x) {  
        return Lists.newArrayList(SPACE.split(x));  
      }  
    });  
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(  
      new PairFunction<String, String, Integer>() {  
        @Override  
        public Tuple2<String, Integer> call(String s) {  
          return new Tuple2<String, Integer>(s, 1);  
        }  
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {  
        @Override  
        public Integer call(Integer i1, Integer i2) {  
          return i1 + i2;  
        }  
      });  
  
    wordCounts.print();  
    ssc.start();  
    ssc.awaitTermination();  
  }  
}  

開啟一個終端,輸入 命令 nc -lk 9999,然後在myeclipse中將所需的jar包匯入,直接執行該程式:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/huiqiang/Desktop/%e6%96%b0%e5%bb%ba%e6%96%87%e4%bb%b6%e5%a4%b9/spark-1.3.1-bin-hadoop2.6/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/huiqiang/Desktop/%e6%96%b0%e5%bb%ba%e6%96%87%e4%bb%b6%e5%a4%b9/spark-1.3.1-bin-hadoop2.6/spark-examples-1.3.1-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17/07/10 09:18:56 INFO StreamingExamples: Setting log level to [WARN] for streaming example. To override add a custom log4j.properties to the classpath.
-------------------------------------------
Time: 1499649550000 ms
-------------------------------------------

-------------------------------------------
Time: 1499649560000 ms
-------------------------------------------

-------------------------------------------
Time: 1499649570000 ms
-------------------------------------------
。。。。。。。。。。。。。。。
然後你在9999埠輸入資料則myeclipse的控制檯中會打印出統計結果。

注意:

1.如果你想在Linux本地中執行該程式的話,需要將程式碼中的StreamingExamples.setStreamingLogLevels();這行程式碼刪除掉,再用myeclipse打成streaming.jar包上傳到Linux本地中去,再執行[[email protected] spark-1.3.1-bin-hadoop2.6]$ bin/spark-submit --class org.apache.spark.examples.streaming.JavaNetworkWordCount streaming.jar

2.import com.google.common.collect.Lists;這個需要google-collections-1.0.jar這個jar包,可是解壓的spark-1.3.1-bin-hadoop2.6.tgz並沒有這個jar包,我從網上找了一個,如果你需要的話,可以去這裡下載:http://download.csdn.net/detail/m0_37739193/9893632

這裡有個怪現象我也不知道是什麼原因造成的:當在Linux本地執行上面的程式的時候,雖然spark的lib目錄下沒有google-collections-1.0.jar,但在spark-1.3.1-bin-hadoop2.6

中可以正常執行,在spark-1.6.3-bin-hadoop2.6中卻報Caused by: java.lang.NoClassDefFoundError: com/google/common/collect/Lists和Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Lists,即使我把google-collections-1.0.jar上傳到spark-1.6.3-bin-hadoop2.6的lib目錄下並在.bash_profile的export CLASSPATH中添加了這個lib目錄,結果還是報上面的錯誤。我以為將google-collections-1.0.jar上傳到spark的lib目錄下並將該目錄新增到.bash_profil的export CLASSPATH中就不會報錯了哈,但在spark-1.3.1-bin-hadoop2.6和spark-1.6.3-bin-hadoop2.6所表現的狀態完全不是我以為的啊,我目前來說不是很明白。

參考:

http://blog.csdn.net/bluejoe2000/article/details/41556979
http://blog.csdn.net/huyangshu87/article/details/52288662

相關推薦

Spark Java程式案例入門

spark 安裝模式: local(本地模式):常用於本地開發測試,本地還分為local單執行緒和local-cluster多執行緒 standalone(叢集模式):典型的Mater/slave模式,不過也能看出Master是有單點故障的;Spark支援ZooKeeper

java程式設計師菜鳥進階(十七)linux基礎入門(五)linux檔案/目錄的許可權和歸屬管理

在linux中的每一個檔案或目錄都包含有訪問許可權,這些訪問許可權決定了誰能訪問和如何訪問這些檔案和目錄。相應的每一個檔案和目錄都有所屬的屬主和屬組,合理的設定檔案和目錄的屬組和屬主在檔案/目錄管理中佔據著很重要的地位,所以,今天我就和大家一起來看一下有關檔案/目錄的許可權和歸屬的相關設定

java程式設計師菜鳥進階(十六)linux基礎入門(四)linux下VIM文字編輯器使用

  linux下編寫配置檔案最好的編輯工具莫過於vim了。Vim的功能實在太多太全,Vim的很多功能也許我們很少用得到,真正為大家常用的功能可能只佔到所有功能的冰山一角。Vim終歸只是一個編寫程式碼或編輯文件的工具,所以只要掌握一些足夠我們使用的功能即可。 做個廣告

java程式設計師菜鳥進階(十五)linux基礎入門(三)linux使用者和組管理

我們大家都知道,要登入linux作業系統,我們必須要有一個使用者名稱和密碼。每一個使用者都由一個惟一的身份來標識,這個標識叫做使用者ID.系統中的每一個使用者也至少需要屬於一個"使用者分組".同樣,使用者分組也是由一個惟一的身份來標識的,該標識叫做使用者分組ID(GID).每位使用者的許可

java程式設計師菜鳥進階(十四)linux基礎入門(二)linux檔案及目錄命令管理

大家都知道,熟悉命令操作的開發人員,Linux作業系統命令操作效率要高於圖形介面的操作,所以瞭解和學習linux基本命令操作是學習linux作業系統的首要任務,本文主要介紹以下四個知識點:   1.      She

java程式設計師菜鳥進階(十三)linux基礎入門(一)vmvare下安裝linux RedHat圖解(超詳細篇)

對於linux,我從大二就想學習一下,但一直苦於無從下手,所以一直拖到現在,鑑於筆者瞭解很多人在linux入門的困難在何處,所以我認為本套入門基礎文章還是挺適合想學習linux的朋友,本系列文章大約十篇文章左右,近期會不斷更新下來,沒有linux基礎但又想學習linux的朋友可以關注一下本系列

MyCat入門+JAVA程式連線

本文章主要對mycat的配置檔案註釋說明,詳細的安裝使用請戳這個地址:mycat安裝使用 連線mycat時,將原先連線mysql的埠和資料庫改為mycat的埠8066,和mycat的邏輯資料庫TESTDB,使用者名稱和密碼為server.xml表裡設定的user。 jdbc.url

Java程式設計師的職業生涯學習建議——基礎入門

這部分主要適用於尚未做過Java工作的同學,包括一些在校生以及剛準備轉行Java的同學。 一、Java基礎 首先去找一個Java的基礎教程學一下(教材或者網路視訊)。 學習Java基礎的時候,應該儘量多動手,很多時候,你想當然的事情,等你寫出來執行一下,你就會發現不是這麼回事兒,不

java+selenium的入門 案例 selenium包 谷歌驅動包 火狐驅動包 IE驅動包 (一)

目錄 前言 selenium是什麼? Selenium的下載使用 Selenium下載 Selenium下載地址 Selenium之谷歌驅動包 chromedriver包下載地址 Selenium使用 前言 在學習selenium之前,要了解selen

MyBatis入門程式案例

mybatis下載 mybaits的程式碼由github.com管理,地址:https://github.com/mybatis/mybatis-3/releases。 可從該地址下載mybatis最新框架。 下載之後解壓縮: 案例需求 1 根據使用者id查詢y一個使

微信小程式入門第一講 java&&jFinal 小程式登陸功能實現以及獲取唯一標識openid和unionid

在我以前的一篇部落格中講到了獲取openid的方法,這裡就不做過多的講解java獲取微信小程式openid。這裡主要講解微信小程式登陸的實現和獲取unionid。需要提醒的是我後端框架使用的是jFinal,傳值方式跟spring的那套有些許的差別。 1、首先要知道微信小程式的開發本身就是基於

一個程式完全入門Java多執行緒

程式碼只供學習使用,實際開發中建議遵守Java開發規範,合理分包 程式碼所涉及知識點: 什麼是執行緒、Thread方法和Runnable介面的介紹及建立執行緒、執行緒的狀態和生命週期、sleep方法和join方法的使用、執行緒的優先順序、執行緒同步、執行緒間通訊(見另一篇文章

switch語句與三種迴圈語句,JAVA程式設計師程式設計新手入門基礎學習筆記

Java是一種可以撰寫跨平臺應用軟體的面向物件的程式設計語言。Java 技術具有卓越的通用性、高效性、平臺移植性和安全性,廣泛應用於PC、資料中心、遊戲控制檯、科學超級計算機、行動電話和網際網路,同時擁有全球最大的開發者專業社群。 自己整理了-份201 8最全面前端學習資料,從最基礎的HTML+

IDEA執行spark相關程式報陣列越界異常java.lang.ArrayIndexOutOfBoundsException: 10582

筆者執行環境:     Win10 + IDEA + spark2.4 + JDK8 程式執行到 sc.textFile("E:/tmp/test.txt"); 報了陣列越界異常,經檢查是paranamer造成的(網上有同行說JDK8得使用paraname

Java程式設計師的Scala入門教程

Java 8擁有了一些初步的函數語言程式設計能力:閉包等,還有新的併發程式設計模型及Stream這個帶高階函式和延遲計算的資料集合。在嘗試了Java 8以後,也許會覺得意猶未盡。是的,你會發現Scala能滿足你在初步嘗試函數語言程式設計後那求知的慾望。 安裝Scala

Spark WordCount簡單案例java,scala版)

Spark 是什麼?官方文件解釋:Apache Spark™ is a fast and general engine for large-scale data processing.通俗的理解:Spark是基於記憶體計算的大資料平行計算框架。Spark基於記憶體計算,提高了

Java程式設計師的Scala的入門教程

本文是《A Scala Tutorial for Java programmers》英文的翻譯,英文版地址A Scala Tutorial for Java programmers。是Michel Schinz和Philipp Haller編寫,由Bearice成

java程式設計師菜鳥入門之一javaweb專案開發環境

一、java開發環境 1、jdk的下載安裝 JDK:JDK是java語言的軟體開發包,是整個java開發的核心,包含了java的執行環境(jvm+java系統類庫)和java工具,執行java程式的最小環境為jre,開發java程式的最小環境為JDK。 JDK的下載:ora

matlab安裝軟體 Matlab視訊教程李大勇 MATLAB程式開發入門課程 MATLAB神經網路30個案例分析及源程式

數學建模10大演算法詳解_程式原始碼打包 matlab安裝軟體  Matlab視訊教程李大勇 MATLAB程式開發入門課程 MATLAB神經網路30個案例分析及源程式 百度雲連結  http://download.csdn.net/download/wocao1226/10

JProfiler入門教程-簡單的java程式效能調優

推薦文章:JProfiler 入門教程 一、安裝JProfiler 從http://www.ej-technologies.com/下載5.1.2並申請試用序列號 二、主要功能簡介 1.記憶體剖析 Memory profiler JPr