1. 程式人生 > >Spark DataFrame列的合併和拆分

Spark DataFrame列的合併和拆分

Spark DataFrame 列的合併與拆分

版本說明:Spark-2.3.0

使用Spark SQL在對資料進行處理的過程中,可能會遇到對一列資料拆分為多列,或者把多列資料合併為一列。這裡記錄一下目前想到的對DataFrame列資料進行合併和拆分的幾種方法。

1 DataFrame列資料的合併

例如:我們有如下資料,想要將三列資料合併為一列,並以“,”分割

+----+---+-----------+
|name|age|      phone|
+----+---+-----------+
|Ming| 20|15552211521|
|hong| 19|13287994007|
| zhi| 21|15552211523|
+----+---+-----------+

1.1 使用map方法重寫

使用map方法重寫就是將DataFrame使用map取值之後,然後使用toSeq方法轉成Seq格式,最後使用Seq的foldLeft方法拼接資料,並返回,如下所示:

//方法1:利用map重寫
    val separator = ","
    df.map(_.toSeq.foldLeft("")(_ + separator + _).substring(1)).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

1.2 使用內建函式concat_ws

合併多列資料也可以使用SparkSQL的內建函式concat_ws()

 //方法2: 使用內建函式 concat_ws
    import org.apache.spark.sql.functions._
    df.select(concat_ws(separator, $"name", $"age", $"phone").cast(StringType).as("value")).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

1.3 使用自定義UDF函式

自己編寫UDF函式,實現多列合併

    //方法3:使用自定義UDF函式

    // 編寫udf函式
    def mergeCols(row: Row): String = {
      row.toSeq.foldLeft("")(_ + separator + _).substring(1)
    }

    val mergeColsUDF = udf(mergeCols _)
    df.select(mergeColsUDF(struct($"name", $"age", $"phone")).as("value")).show()

完整程式碼:

package com.hollysys.spark.sql

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StringType

/**
  * Created by shirukai on 2018/9/12
  * DataFrame 合併列
  */
object MergeColsTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local")
      .getOrCreate()

    //從記憶體建立一組DataFrame資料
    import spark.implicits._
    val df = Seq(("Ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L))
      .toDF("name", "age", "phone")
    df.show()
    /**
      * +----+---+-----------+
      * |name|age|      phone|
      * +----+---+-----------+
      * |Ming| 20|15552211521|
      * |hong| 19|13287994007|
      * | zhi| 21|15552211523|
      * +----+---+-----------+
      */
    //方法1:利用map重寫
    val separator = ","
    df.map(_.toSeq.foldLeft("")(_ + separator + _).substring(1)).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */
    //方法2: 使用內建函式 concat_ws
    import org.apache.spark.sql.functions._
    df.select(concat_ws(separator, $"name", $"age", $"phone").cast(StringType).as("value")).show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */
    //方法3:使用自定義UDF函式

    // 編寫udf函式
    def mergeCols(row: Row): String = {
      row.toSeq.foldLeft("")(_ + separator + _).substring(1)
    }

    val mergeColsUDF = udf(mergeCols _)
    df.select(mergeColsUDF(struct($"name", $"age", $"phone")).as("value")).show()

    /**
      * /**
      * * +-------------------+
      * * |              value|
      * * +-------------------+
      * * |Ming,20,15552211521|
      * * |hong,19,13287994007|
      * * | zhi,21,15552211523|
      * * +-------------------+
      **/
      */
  }
}

2 DataFrame列資料的拆分

上面我們將DataFrame的多列資料合併為一列如下所示,有時候我們也需要將單列資料,以某種拆分規則,拆分為多列。下面提供幾種將一列拆分為多列的方法。

+-------------------+
|              value|
+-------------------+
|Ming,20,15552211521|
|hong,19,13287994007|
| zhi,21,15552211523|
+-------------------+

2.1 使用內建函式split,然後遍歷新增列

該方法,先利用內建函式split將單列的資料拆分,然後遍歷使用getItem(角標)方法獲取拆分後的資料,依次使用withColumn方法新增新列,程式碼如下所示:

    //方法1: 使用內建函式split,然後遍歷新增列
    val separator = ","
    lazy val first = df.first()

    val numAttrs = first.toString().split(separator).length
    val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
    //按指定分隔符拆分value列,生成splitCols列
    var newDF = df.withColumn("splitCols", split($"value", separator))
    attrs.zipWithIndex.foreach(x => {
      newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
    })
    newDF.show()
  /**
      * +-------------------+--------------------+-----+-----+-----------+
      * |              value|           splitCols|col_0|col_1|      col_2|
      * +-------------------+--------------------+-----+-----+-----------+
      * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
      * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
      * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
      * +-------------------+--------------------+-----+-----+-----------+
      */

2.2 使用UDF函式建立多列資料,然後合併

該方法是使用udf函式,生成多個列,然後合併到原來的資料。該方法參考了VectorDisassembler(與spark ml官網提供的VectorAssembler相反),這是一個第三方的spark ml向量拆分演算法,該方法github地址:https://github.com/jamesbconner/VectorDisassembler。程式碼如下所示:

//方法2:使用udf函式建立多列,然後合併
    val attributes: Array[Attribute] = {
      val numAttrs = first.toString().split(separator).length
      //生成attributes
      Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName("value" + "_" + i))
    }
    //建立多列資料
    val fieldCols = attributes.zipWithIndex.map(x => {
      val assembleFunc = udf {
        str: String =>
          str.split(separator)(x._2)
      }
      assembleFunc(df("value").cast(StringType)).as(x._1.name.get, x._1.toMetadata())
    })
    //合併資料
    df.select(col("*") +: fieldCols: _*).show()

    /**
      * +-------------------+-------+-------+-----------+
      * |              value|value_0|value_1|    value_2|
      * +-------------------+-------+-------+-----------+
      * |Ming,20,15552211521|   Ming|     20|15552211521|
      * |hong,19,13287994007|   hong|     19|13287994007|
      * | zhi,21,15552211523|    zhi|     21|15552211523|
      * +-------------------+-------+-------+-----------+
      */

完整程式碼:

package com.hollysys.spark.sql

import org.apache.spark.ml.attribute.{Attribute, NumericAttribute}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType

/**
  * Created by shirukai on 2018/9/12
  * 拆分列
  */
object SplitColTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local")
      .getOrCreate()

    //從記憶體中建立DataFrame
    import spark.implicits._
    val df = Seq("Ming,20,15552211521", "hong,19,13287994007", "zhi,21,15552211523")
      .toDF("value")
    df.show()

    /**
      * +-------------------+
      * |              value|
      * +-------------------+
      * |Ming,20,15552211521|
      * |hong,19,13287994007|
      * | zhi,21,15552211523|
      * +-------------------+
      */

    import org.apache.spark.sql.functions._
    //方法1: 使用內建函式split,然後遍歷新增列
    val separator = ","
    lazy val first = df.first()

    val numAttrs = first.toString().split(separator).length
    val attrs = Array.tabulate(numAttrs)(n => "col_" + n)
    //按指定分隔符拆分value列,生成splitCols列
    var newDF = df.withColumn("splitCols", split($"value", separator))
    attrs.zipWithIndex.foreach(x => {
      newDF = newDF.withColumn(x._1, $"splitCols".getItem(x._2))
    })
    newDF.show()

    /**
      * +-------------------+--------------------+-----+-----+-----------+
      * |              value|           splitCols|col_0|col_1|      col_2|
      * +-------------------+--------------------+-----+-----+-----------+
      * |Ming,20,15552211521|[Ming, 20, 155522...| Ming|   20|15552211521|
      * |hong,19,13287994007|[hong, 19, 132879...| hong|   19|13287994007|
      * | zhi,21,15552211523|[zhi, 21, 1555221...|  zhi|   21|15552211523|
      * +-------------------+--------------------+-----+-----+-----------+
      */

    //方法2:使用udf函式建立多列,然後合併
    val attributes: Array[Attribute] = {
      val numAttrs = first.toString().split(separator).length
      //生成attributes
      Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName("value" + "_" + i))
    }
    //建立多列資料
    val fieldCols = attributes.zipWithIndex.map(x => {
      val assembleFunc = udf {
        str: String =>
          str.split(separator)(x._2)
      }
      assembleFunc(df("value").cast(StringType)).as(x._1.name.get, x._1.toMetadata())
    })
    //合併資料
    df.select(col("*") +: fieldCols: _*).show()

    /**
      * +-------------------+-------+-------+-----------+
      * |              value|value_0|value_1|    value_2|
      * +-------------------+-------+-------+-----------+
      * |Ming,20,15552211521|   Ming|     20|15552211521|
      * |hong,19,13287994007|   hong|     19|13287994007|
      * | zhi,21,15552211523|    zhi|     21|15552211523|
      * +-------------------+-------+-------+-----------+
      */
  }
}

總結

目前只是琢磨出這麼幾種方式實現,如果有大佬們有其它高效便捷的實現方式,希望多多賜教。