1. 程式人生 > >【spark】儲存資料到hdfs,自動判斷合理分塊數量(repartition和coalesce)(二)

【spark】儲存資料到hdfs,自動判斷合理分塊數量(repartition和coalesce)(二)

本人菜雞一隻,如果有說的不對的地方,還請批評指出!

 

該系列暫有2篇文章(本文為第2篇):

【spark】儲存資料到hdfs,自動判斷合理分塊數量(repartition和coalesce)(一):https://blog.csdn.net/lsr40/article/details/84968923

【spark】儲存資料到hdfs,自動判斷合理分塊數量(repartition和coalesce)(二):

https://blog.csdn.net/lsr40/article/details/85078499

 

上一篇解釋了為什麼要設定分塊數量和怎樣的分塊才算合理,相信大家都有一個大概的概念,那麼接下來來說說具體如何實現?

我接下來的所說的實現方法,還是要根據不同業務不同場景來選擇,並不存在一種一勞永逸的最高效的方法。

實現方式:

方法一:人工判斷,(適合場景:過濾日誌)

有這麼種情況:日誌通過某些框架按照某些壓縮格式(比如.gz,或者bzip2)聚合到了hdfs上成為資料塊,然後我們通過spark來做ETL,做資料清洗,過濾掉不符合條件或者欄位缺失的資料。這種情況下資料條數會減少,但是減少的量的百分比是我們可以根據歷史任務或者測試環境推斷出來的,舉個例子,比如能過濾20%,剩下80%的資料量,所以可以通過hdfs的命令或者hadoop的api來檢視某個檔案塊的大小,就可以知道輸入資料量,接著把輸入資料量*0.8/128M

,就可以得到資料應該分為幾塊了。

獲得檔案塊大小的API如下:(作者:superlxw1234)https://superlxw1234.iteye.com/blog/1514586

當然,其他場景也可以通過歷史情況+人為判斷得到應該存多少檔案塊,但是需要考慮的是:輸入的檔案塊大小的判斷方式。

例如(提供幾個方案):

1、約定好每個spark任務從哪裡讀入資料,在程式碼中寫好動態的路徑,執行程式碼的時候判斷該路徑下檔案的大小。

2、在啟動spark任務的shell中,通過執行hadoop的命令,獲取檔案塊的大小(如果多塊可以相加),然後當成引數傳入到main方法中

優點:不需要使用多餘的資源來計算輸入資料量或者輸出資料量

缺點:如上提供的方案所示,需要一套合理的方式來判斷輸入的資料大小,甚至有時候通過某些框架在往hdfs寫資料的時候,會在hdfs上有臨時檔案,一直到整個檔案塊寫好,才會有.gz(或者其他壓縮格式)結尾的檔案,所以輸入資料的時候甚至需要判斷檔名。

方法二:儲存資料之前判斷(抽樣)

spark資料全部計算完之後,在儲存之前讀取100條資料(當然也可以更多,但是由於這100條資料的會返回到spark的driver的,避免driver端記憶體溢位,所以。。。),然後計算這100條資料佔用多少空間。計算整個要儲存的資料的總條數。

應該分的檔案塊個數 =(總條數/100*100條資料的byte大小/128M)+1(請記得換算到相同單位)

程式碼如下(java版本):

public long saveDataTable(Dataset<Row> data, String dataBase,
String tableName, String format,String... colNames) throws UnsupportedEncodingException {
		int takeN = 100;
        //因為這部分資料要處理多次,所以先快取資料
        data.cache();
        //獲得資料的總條數
        long cn = data.count();
        if (cn == 0l ){
            data.unpersist();
        //如果資料條數為0,直接不儲存資料
            return 0l;
        }
        // 取出前100條
        List<Row> firstRow = data.takeAsList(takeN);
        //建立long來儲存100條資料的byte
        long byteSize = 0l;
        //遍歷100條資料,計算出100條資料的byte
        for (Row row : firstRow) {
            byteSize+=row.toString().getBytes("utf-8").length;
        }
        //該資料作為字串佔用的空間大小
        long dataByte = cn/takeN*byteSize;
        //由於表可能會有壓縮或者不同的格式,所以做資料量的縮放
        dataByte=tableInputFormatType(dataBase,tableName,dataByte,format);
        //計算應分割槽數
        int numPartition = (int)(dataByte/byteToMB/blockSize)+1;
        // 如果分割槽欄位個數為0,那就不做partitionBy
        if (colNames.length == 0 ) {
            data.repartition(numPartition).write().mode(SaveMode.Append)
                    .format(format)
                    .saveAsTable(dataBase + "." + tableName);
        }else {
            data.repartition(numPartition).write().mode(SaveMode.Append)
                    .partitionBy(colNames)
                    .format(format)
                    .saveAsTable(dataBase + "." + tableName);
        }
        //資料儲存之後,釋放快取
        data.unpersist();
        //返回該資料總條數,列印到日誌
        return cn;
    }
protected long tableInputFormatType(String dataBase, String tableName,long dataByte,String format) {
        format = format.toLowerCase();
        //如果儲存的時候選擇orc或者parquet,證明hive表事先不存在,所以spark會自動建表
        if (format.equals("orc") || format.equals("parquet")) {
            //spark的orc預設採用snappy壓縮,所以壓縮比和parquet差不多
            compressionRatio = 0.4;
        } else if(format.equals("csv") || format.equals("json")){
            compressionRatio = 1.1;
        } else if(format.equals("hive")) {
            //format=hive的時候,證明表有可能事先存在,spark會自動讀取hive表資訊,來做檔案格式的轉換
            //所以先判斷表stored,通過show create table的命令來判斷表資料的儲存格式
            //獲得建表語句
            //如果表不存在,就不用獲得建表語句了,直接儲存資料為text格式
            if (spark.catalog().tableExists(dataBase, tableName)) {
                String createtabStmt = spark.sql("show create table " + dataBase + "." + tableName)
                        .takeAsList(1).get(0).getAs(0);
                if (createtabStmt.contains("TextInputFormat")) {
                    compressionRatio = 1;
                } else if (createtabStmt.contains("Orc")) {
                    compressionRatio = 0.2;
                    //TextInputFormat,Orc,Parquet
                } else if (createtabStmt.contains("Parquet")) {
                    compressionRatio = 0.4;
                } else {
                    //可能是json或者其他的資料型別(可能性較低)
                    compressionRatio = 1.1;
                }
            }else{
                //如果事先表不存在,直接儲存資料為text格式
                compressionRatio = 1.0;
            }
        }
        dataByte*=compressionRatio;
        return dataByte;
    }

需要考慮的點是:

1、壓縮:前100條資料是按照字串格式計算大小的,但是壓縮之後資料會遠遠小於這個值,所以程式碼中有兩個方法,第二個方法就是用來判斷壓縮比的(但是我只是按照一個大概的壓縮比來寫百分比,可能並不會特別準確,還請大家用的時候自行測試)

2、這份資料執行了兩個action,一個是count,一個是save,所以資料要先cache,避免這兩個運算元都從源頭拉資料來計算

優點:不需要考慮資料如何輸入,只要在儲存資料的時候做判斷,很通用,特別是對於一些聚合操作,你不知道聚合後資料條數會有怎樣的驟減,這種方式很好用。

缺點:

1、壓縮儲存的時候,檔案大小會判斷不準

2、需要多消耗一份資源來count資料總條數(但是這個其實也還好,因為每天這個count可以在日誌裡打印出來,監控每天生成多少條資料)

方法三:寫成一段專門的程式碼,類似小工具,來呼叫

這個方法跟方法二有點類似,但是其實又不太一樣,聽我細細說來,是這樣的。

可以寫一個專門的jar包,類似工具一樣,用來做小檔案合併資料(合併各種hdfs上的資料),傳參可以是目錄,檔案大小,壓縮格式等,可以參照網上的一些小檔案合併的mr來寫,也可以使用spark來寫,我就不具體留程式碼了(因為我也沒實現這個功能)

優點:完全是一個單獨的工具,不用巢狀在spark程式碼中,對於hive生成或者其他框架生成的hdfs小檔案都可以處理,可以直接用shell命令呼叫,不會寫程式碼的人也可以使用

缺點:

1、要單獨維護一個工具

2、如果要做的通用必須有更多格式的相容和匹配

3、不能防範於未然,只能事後諸葛亮(小檔案已有的情況下再來處理)

 

總結:以上的方法各有優缺點,還是要按照各自的需求和場景使用,所以,程式設計師的價值就體現在這裡了。用人腦來想解決方法,選擇最適合的方案,願每一個程式設計師都有價值!

本人菜雞一隻,也正在學習中,如果我有什麼寫的不對或者不清晰的地方,希望大家指出,或者大家有什麼更好的方法,歡迎評論!我會寫明是你的方案,然後加入到文章中!