1. 程式人生 > >Spark SQL 之自定義刪除外部表

Spark SQL 之自定義刪除外部表

前言

Spark SQL 在刪除外部表時,本不能刪除外部表的資料的。本篇文章主要介紹如何修改Spark SQL 原始碼實現在刪除外部表的時候,可以帶額外選項來刪除外部表的資料。

本文的環境是我一直使用的 spark 2.4.3 版本。

1. 修改ANTLR4 語法檔案

修改 SqlBase.g4檔案中drop Table 相關語句,新增(WITH DATA)?, 修改完之後如下:

DROP TABLE (IF EXISTS)? tableIdentifier (WITH DATA)? PURGE?                   #dropTable

因為,刪除external表也不是必須的,所以新增WITH DATA 為可選項,跟 IF EXISTS類似。

2. 修改相關方法

2.1 修改SparkSqlParser.scala檔案

  /**
   * Create a [[DropTableCommand]] command.
   */
  override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
    DropTableCommand(
      visitTableIdentifier(ctx.tableIdentifier),
      ctx.EXISTS != null,
      ctx.VIEW != null,
      ctx.PURGE != null,
      ctx.WITH() != null && ctx.DATA() != null)
  }

2.2 修改DropTableCommand.scala等相關檔案

首先修改建構函式,在最後一個引數後面新增withData方法,預設為false:

case class DropTableCommand(
    tableName: TableIdentifier,
    ifExists: Boolean,
    isView: Boolean,
    purge: Boolean,
    withData:Boolean = false // TODO 外部表是否需要刪除表資料
    ) extends RunnableCommand

DropTableCommand本質上其實是用了command設計模式,實際在執行時,會呼叫其run方法,修改 run 方法,如下:

 1 override def run(sparkSession: SparkSession): Seq[Row] = {
 2     val catalog = sparkSession.sessionState.catalog
 3     val isTempView = catalog.isTemporaryTable(tableName)
 4 
 5     if (!isTempView && catalog.tableExists(tableName)) {
 6       // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
 7       // issue an exception.
 8       catalog.getTableMetadata(tableName).tableType match {
 9         case CatalogTableType.VIEW if !isView =>
10           throw new AnalysisException(
11             "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
12         case o if o != CatalogTableType.VIEW && isView =>
13           throw new AnalysisException(
14             s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
15         case _ =>
16       }
17     }
18 
19     if (isTempView || catalog.tableExists(tableName)) {
20       try {
21         sparkSession.sharedState.cacheManager.uncacheQuery(
22           sparkSession.table(tableName), cascade = !isTempView)
23       } catch {
24         case NonFatal(e) => log.warn(e.toString, e)
25       }
26       catalog.refreshTable(tableName)
27       log.warn(s"withData:${withData}")
28       catalog.dropTable(tableName, ifExists, purge, withData)
29     } else if (ifExists) {
30       // no-op
31     } else {
32       throw new AnalysisException(s"Table or view not found: ${tableName.identifier}")
33     }
34     Seq.empty[Row]
35   }

在第 28 行,為 catalog物件的dropTable 新增 withData 引數。其中catalog是 org.apache.spark.sql.catalyst.catalog.SessionCatalog 的例項。其子類並沒有重寫其 dropTable 方法,故只需要修改其dropTable 方法即可。具體修改程式碼如下:

 1 /**
 2    * Drop a table.
 3    *
 4    * If a database is specified in `name`, this will drop the table from that database.
 5    * If no database is specified, this will first attempt to drop a temporary view with
 6    * the same name, then, if that does not exist, drop the table from the current database.
 7    */
 8   def dropTable(
 9       name: TableIdentifier,
10       ignoreIfNotExists: Boolean,
11       purge: Boolean,
12       withData:Boolean = false // 外部表是否需要在hdfs上刪除其對應的資料
13                ): Unit = synchronized {
14     val db = formatDatabaseName(name.database.getOrElse(currentDb))
15     val table = formatTableName(name.table)
16     if (db == globalTempViewManager.database) {
17       val viewExists = globalTempViewManager.remove(table)
18       if (!viewExists && !ignoreIfNotExists) {
19         throw new NoSuchTableException(globalTempViewManager.database, table)
20       }
21     } else {
22       if (name.database.isDefined || !tempViews.contains(table)) {
23         requireDbExists(db)
24         // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
25         // Instead, log it as an error message.
26         if (tableExists(TableIdentifier(table, Option(db)))) {
27           logError(s"withData :${withData}")
28           externalCatalog.dropTable(db, table, ignoreIfNotExists = true,purge = purge, withData)
29         } else if (!ignoreIfNotExists) {
30           throw new NoSuchTableException(db = db, table = table)
31         }
32       } else {
33         tempViews.remove(table)
34       }
35     }
36   }

為防止在test中有很多的測試類在呼叫該方法,在編譯時報錯,新新增的withData給預設值,為false,保證該方法預設行為跟之前未修改前一致。

withData 引數繼續傳遞給 externalCatalog.dropTable 方法,其中,externalCatalog 是 org.apache.spark.sql.catalyst.catalog.ExternalCatalog 型別變數,ExternalCatalog 是一個trait,ExternalCatalog 實現類關係如下:

 首先修改ExternalCatalog 的dropTable 方法,如下:

def dropTable(
      db: String,
      table: String,
      ignoreIfNotExists: Boolean,
      purge: Boolean,
      withData:Boolean=false): Unit

引數載入最後,給預設值false。

org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener 是一個包裝類,其內部在原來ExternalCatalog 的行為之外添加了監聽的行為。先修改這個包裝類的dropTable,如下:

override def dropTable(
      db: String,
      table: String,
      ignoreIfNotExists: Boolean,
      purge: Boolean,
      withData:Boolean): Unit = {
    postToAll(DropTablePreEvent(db, table))
    delegate.dropTable(db, table, ignoreIfNotExists, purge, withData)
    postToAll(DropTableEvent(db, table))
  }

其中,delegate 就是真正執行 dropTable操作的ExternalCatalog物件。

catlog有兩個來源,分別是 in-memory和 hive, in-memory的實現類是org.apache.spark.sql.catalyst.catalog.InMemoryCatalog,只需要新增 方法引數列表即可,在方法內部不需要做任何操作。

hive的實現類是 org.apache.spark.sql.hive.HiveExternalCatalog, 其dropTable 方法如下:

override def dropTable(
      db: String,
      table: String,
      ignoreIfNotExists: Boolean,
      purge: Boolean,
      withData:Boolean): Unit = withClient {
    requireDbExists(db)
    val tableLocation: URI = client.getTable(db,table).location
    client.dropTable(db, table, ignoreIfNotExists, purge)
    val path: Path = new Path(tableLocation)
    val fileSystem: FileSystem = FileSystem.get(hadoopConf)
    val fileExists: Boolean = fileSystem.exists(path)
    logWarning(s"withData:${withData}, ${path} exists : ${fileExists}")
    if (withData && fileExists) {
      fileSystem.delete(path, true)
    }
  }

 

3. 打包編譯

在生產環境編譯,編譯命令如下:

./dev/make-distribution.sh --name 2.6.0-cdh5.14.0 --tgz --mvn /opt/soft/apache-maven-3.6.1/bin/mvn  -Pyarn -Phadoop-2.6 -Phive -Phive-thriftserver -Dhadoop.version=2.6.0-cdh5.14.0 -X

注:由於編譯的是 cdh版本,一些jar包不在中央倉庫,在pom.xml檔案中,新增 cloudera maven 源:

<repository>
   <id>cloudera</id>
   <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>

為了加快 maven編譯的速度, 在 make-distribution.sh 檔案中,修改了編譯的並行度,在171行,把1C改為4C,具體修改如下:

BUILD_COMMAND=("$MVN" -T 4C clean package -DskipTests $@)

執行編譯結束之後,在專案的根目錄下,會有 spark-2.4.3-bin-2.6.0-cdh5.14.0.tgz 這個壓縮包,這就是binary 檔案,可以解壓到指定目錄進行相應配置了。

4. 配置spark

把原來叢集中spark 的配置以及相關jar包拷貝到新的spark相應目錄。

5. 測試

5.1 建立外部表

spark sql

spark-sql> use test;

spark-sql> create external table ext1 location '/user/hive/warehouse/test.db/ext1' as select * from person;

spark-sql> select * from ext1;

1 2 3
2 zhangsan 4
3 lisi 5
4 wangwu 6
5 rose 7
6 nose 8
7 info 9
8 test 10

檢視 hdfs 上對應目錄是否有資料

[root@xxx ~]# hdfs dfs -ls -R /user/hive/warehouse/test.db/ext1
-rwxr-xr-x 3 root supergroup 76 2020-02-27 15:58 /user/hive/warehouse/test.db/ext1/part-00000-aae237ac-4a0b-425c-a0f1-5d54d1e88957-c000

 

5.2 刪除表

spark-sql> drop table if exists ext1 with data;


5.3 驗證表元資料已刪除成功

spark-sql> show tables;
test    person    false

沒有ext表,說明已刪除成功。

5.4 驗證hdfs上資料已刪除成功

[root@xxx ~]# hdfs dfs -ls -R /user/hive/warehouse/test.db/ext1
ls: `/user/hive/warehouse/test.db/ext1': No such file or directory

該目錄已不存在,說明hdfs上資料已刪除成功。

總結

本文具體介紹瞭如何修改spark sql 的原始碼,在刪除external表時可選擇地刪除hdfs上的底層數