1. 程式人生 > >Spark WordCount 讀寫hdfs檔案 (read file from hadoop hdfs and write output to hdfs)

Spark WordCount 讀寫hdfs檔案 (read file from hadoop hdfs and write output to hdfs)

create a scala project and a WordCount class as follow:
package com.qiurc.test
import org.apache.spark._
import SparkContext._

object WordCount {
    def main(args: Array[String]){
      if(args.length != 3){
        println("usage: com.qiurc.test.WordCount <master> <input> <output>")
        return
      }
      val sc = new SparkContext(args(0), "WordCount",
          System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_QIUTEST_JAR")))
      val textFile  = sc.textFile(args(1))
      val result = textFile.flatMap(_.split(" "))
              .map(word => (word, 1)).reduceByKey(_ + _)
      result.saveAsTextFile(args(2))
      
    }
}



3 匯出為一個 jar包 right click the project and export as spark_qiutest.jar.
then put it into some dir, such as SPARK_HOME/qiutest 4 弄一個執行指令碼執行這個jar包
copy run-example(in SparkHome) and change it![email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ cp run-example run-qiu-test
[email protected]
:~/spark-0.8.0-incubating-bin-hadoop1$ vim run-qiu-test
____________________________________
SCALA_VERSION=2.9.3

# Figure out where the Scala framework is installed
FWDIR="$(cd `dirname $0`; pwd)"

# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"

# Load environment variables from conf/spark-env.sh, if it exists
if [ -e $FWDIR/conf/spark-env.sh ] ; then
  . $FWDIR/conf/spark-env.sh
fi

if [ -z "$1" ]; then
  echo "Usage: run-example <example-class> [<args>]" >&2
  exit 1
fi

# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
# to avoid the -sources and -doc packages that are built by publish-local.
QIUTEST_DIR="$FWDIR"/qiutestSPARK_QIUTEST_JAR=""
if [ -e "$QIUTEST_DIR"/spark_qiutest.jar ]; then
  export SPARK_QIUTEST_JAR=`ls "$QIUTEST_DIR"/
spark_qiutest.jar`
fi
 
if [[ -z $SPARK_QIUTEST_JAR ]]; then
  echo "Failed to find Spark qiutest jar assembly in $FWDIR/qiutest" >&2
  echo "You need to build spark test jar  assembly before running this program" >&2
  exit 1
fi

# Since the examples JAR ideally shouldn't include spark-core (that dependency should be
# "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
CLASSdata-path="$SPARK_QIUTEST_JAR:$CLASSPATH"

# Find java binary
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ `command -v java` ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
  echo -n "Spark Command: "
  echo "$RUNNER" -cp "$CLASSPATH" "[email protected]"
  echo "========================================"
  echo
fi

exec "$RUNNER" -cp "$CLASSPATH" "[email protected]"


5 Run it in spark with hadoop hdfs [email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ ls assembly         LICENSE               pyspark.cmd       spark-classa.txt            logs                  python            spark-class2.cmd

[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ cat a.txt
a
b
c
c
d
d
e
e


(note : put a.txt into hdfs)
[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$hadoop fs -put a.txt ./

(note : check  a.txt in hdfs)
[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ hadoop fs -ls
Found 6 items-rw-r--r--   2 hadoop supergroup       4215 2014-04-14 10:27 /user/hadoop/README.md-rw-r--r--   2 hadoop supergroup         19 2014-04-14 15:58 /user/hadoop/a.txt-rw-r--r--   2 hadoop supergroup          0 2013-05-29 17:17 /user/hadoop/dumpfile-rw-r--r--   2 hadoop supergroup          0 2013-05-29 17:19 /user/hadoop/dumpfilesdrwxr-xr-x   - hadoop supergroup          0 2014-04-14 15:57 /user/hadoop/qiurcdrwxr-xr-x   - hadoop supergroup          0 2013-07-06 19:48 /user/hadoop/temp
(note : create a dir named "qiurc" to store the output of WordCount in hdfs) [email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ hadoop fs -mkdir /user/hadoop/qiurc
[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ hadoop fs -ls
Found 5 items-rw-r--r--   2 hadoop supergroup       4215 2014-04-14 10:27 /user/hadoop/README.md-rw-r--r--   2 hadoop supergroup          0 2013-05-29 17:17 /user/hadoop/dumpfile-rw-r--r--   2 hadoop supergroup          0 2013-05-29 17:19 /user/hadoop/dumpfilesdrwxr-xr-x   - hadoop supergroup          0 2014-04-14 15:32 /user/hadoop/qiurcdrwxr-xr-x   - hadoop supergroup          0 2013-07-06 19:48 /user/hadoop/temp 開始執行我們的WordCount程式。指定輸入輸出位置。測試過只有加上hdfsXXX絕對路徑才能寫入hdfs
(note: prefix "hdfs://debian-master:9000/user/hadoop/" can't beforgot)

[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ ./run-qiu-testcom.qiurc.test.WordCount spark://debian-master:7077hdfs://debian-master:9000/user/hadoop/a.txthdfs://debian-master:9000/user/hadoop/qiurc
(note: get command is ok, too)
[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ hadoop fs -copyToLocal /user/hadoop/qiurc/ localFile
[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ ls localFile/
part-00000  part-00001  part-00002  _SUCCESS(note: let me show these result )
[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ cat localFile/part-00000
(,1)
(c,2)
[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ cat localFile/part-00001
(d,2)
(a,1)
[email protected]:~/spark-0.8.0-incubating-bin-hadoop1$ cat localFile/part-00002
(e,3)
(b,1)

Finish ! ^_^