1. 程式人生 > >Spark把RDD資料儲存到一個單個檔案中

Spark把RDD資料儲存到一個單個檔案中

Spark是當前最流行的分散式資料處理框架之一,相比於Hadoop,Spark在資料的處理方面更加靈活方便。然而在最近的使用中遇到了一點小麻煩:Spark儲存檔案的的函式(如saveAsTextFile)在儲存資料時都需要新建一個目錄,然後在這個目錄下分塊儲存檔案。如果我們想在原有的目錄下增加一個檔案(而不是增加一個目錄),Spark就無能為力了。

        有網友給出建議,用

rddx.repartition(1).saveAsTextFile("test/test.txt")
rddx.coalesce(1).saveAsTextFile("test/test.txt")

把資料合併到一個分割槽中,然而得到的結果是這樣的:

$ ./bin/hadoop fs -du -h test/test.txt
0        test/test.txt/_SUCCESS
499.9 M  test/test.txt/part-00000

Spark仍然是新建了一個目錄test.txt,然後在這個目錄下把資料都儲存在了part-00000檔案中。

       Spark的儲存模式的設定註定了在儲存資料的時候只能新建目錄,如果想把資料增加到原有的目錄中,單獨作為一個檔案,就只能藉助於Hadoop的HDFS操作。下面的例子演示如何用Hadoop的FileSystem實現在已有目錄下用一個檔案儲存Spark資料:

import org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.web.resources.ExceptionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; import java.net.URI; /** * 使用Hadoop的FileSystem把資料寫入到HDFS */ public class HdfsOperate implements
Serializable{
private static Logger logger = LoggerFactory.getLogger(HdfsOperate.class); private static Configuration conf = new Configuration(); private static BufferedWriter writer = null; //在hdfs的目標位置新建一個檔案,得到一個輸出流 public static void openHdfsFile(String path) throws Exception { FileSystem fs = FileSystem.get(URI.create(path),conf); writer = new BufferedWriter(new OutputStreamWriter(fs.create(new Path(path)))); if(null!=writer){ logger.info("[HdfsOperate]>> initialize writer succeed!"); } } //往hdfs檔案中寫入資料 public static void writeString(String line) { try { writer.write(line + "\n"); }catch(Exception e){ logger.error("[HdfsOperate]>> writer a line error:" , e); } } //關閉hdfs輸出流 public static void closeHdfsFile() { try { if (null != writer) { writer.close(); logger.info("[HdfsOperate]>> closeHdfsFile close writer succeed!"); } else{ logger.error("[HdfsOperate]>> closeHdfsFile writer is null"); } }catch(Exception e){ logger.error("[HdfsOperate]>> closeHdfsFile close hdfs error:" + e); } } }

在Spark中處理並儲存資料:

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import hdfsoperate.HdfsOperate;
import org.apache.spark.Partition;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import util.NlpModuleWrapper;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/**
 * 呼叫HdfsOperate類的方法把RDD資料儲存到Hdfs上
 */
public class FeatureExtractor implements Serializable {
    private static Logger logger = LoggerFactory.getLogger(FeatureExtractor.class);

    public void extractFeature(JavaSparkContext sc, int repartitionNum) throws Exception {
        String hdfsPath = "test/corpus/2016-09-02"; //存放原始資料的檔案
        //Spark可以讀取單獨的一個檔案或整個目錄
        JavaRDD<String> rddx = sc.textFile(hdfsPath).repartition(repartitionNum); 
        rddx = rddx.map(new ExtractFeatureMap());

        //寫入hdfs檔案位置
        String destinationPath = "test/result/2016-09-02" ;
        //建立Hdfs檔案,開啟Hdfs輸出流
        HdfsOperate.openHdfsFile(destinationPath);

        //分塊讀取RDD資料並儲存到hdfs
        //如果直接用collect()函式獲取List<String>,可能因資料量過大超過記憶體空間而失敗
        for (int i = 0; i < repartitionNum; i++) {
            int[] index = new int[1];
            index[0] = i;
            List<String>[] featureList = rddx.collectPartitions(index);
            if (featureList.length != 1) {
                logger.error("[FeatureExtractor]>> featureList.length is not 1!");
            }
            for (String str : featureList[0]) {
                //寫一行到Hdfs檔案
                HdfsOperate.writeString(str);
            }
        }
        //關閉Hdfs輸出流
        HdfsOperate.closeHdfsFile();

    }



    class ExtractFeatureMap implements Function<String, String> {
        @Override
        public String call(String line) throws Exception {
            try {
                //TODO:你自己的操作,返回String型別
            } catch (Exception e) {
                logger.error("[FeatureExtractor]>>GetTokenAndKeywordFeature error:", e);
            }
            return null;
        }
    }

}

(PS:目前還沒有看到過單用Spark介面能實現該功能,有知道的大神歡迎指點)