1. 程式人生 > >spark1.x-spark-sql-資料傾斜解決方案

spark1.x-spark-sql-資料傾斜解決方案

聚合源資料

過濾導致傾斜的key where條件

提高shuffle並行度 spark.sql.shuffle.partitions

  sqlContext.setConf("spark.sql.shuffle.partitions","1000")
   // 預設的並行度 為 200 reducetask只有200

雙重group by 改寫SQL 改成兩次Group by

給某個欄位加上隨機字首 random_prefix()
實現UDAF
call(String,Integer)
randNum+”_”+val
區域性聚合 去掉隨機字首
拿到值
再進行一次 全域性的聚合
多Key RDD 拆開了
在對映成一張表

reduce join 轉換為map jon

將表做成 RDD 手動去實現 mapjoin
SPark sql內建的map join
預設有一個小表 在10M以內
預設就會將該表進行broadcast 然後執行map join
調節這個閾值 20M 50M 甚至1G

    sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "20971520");

取樣傾斜Key並單獨進行join

強spark-core
隨機key與擴容表
sparksql 轉化為sparkcore
product info 加上隨機數 進行擴容10倍
sql做子查詢

        JavaRDD<Row> rdd = sqlContext.sql("select * from product_info").javaRDD();
        JavaRDD<Row> flattedRDD = rdd.flatMap(new FlatMapFunction<Row, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<Row> call
(Row row) throws Exception { List<Row> list = new ArrayList<Row>(); for(int i = 0; i < 10; i ++) { long productid = row.getLong(0); String _productid = i + "_" + productid; Row _row = RowFactory.create(_productid, row.get(1), row.get(2)); list.add(_row); } return list; } }); StructType _schema = DataTypes.createStructType(Arrays.asList( DataTypes.createStructField("product_id", DataTypes.StringType, true), DataTypes.createStructField("product_name", DataTypes.StringType, true), DataTypes.createStructField("product_status", DataTypes.StringType, true))); DataFrame _df = sqlContext.createDataFrame(flattedRDD, _schema); _df.registerTempTable("tmp_product_info"); String _sql = "SELECT " + "tapcc.area," + "remove_random_prefix(tapcc.product_id) product_id," + "tapcc.click_count," + "tapcc.city_infos," + "pi.product_name," + "if(get_json_object(pi.extend_info,'product_status')=0,'自營商品','第三方商品') product_status " + "FROM (" + "SELECT " + "area," + "random_prefix(product_id, 10) product_id," + "click_count," + "city_infos " + "FROM tmp_area_product_click_count " + ") tapcc " + "JOIN tmp_product_info pi ON tapcc.product_id=pi.product_id ";