1. 程式人生 > >使用 aspectj 對 spark 進行攔截

使用 aspectj 對 spark 進行攔截

文章目錄

背景

開源產品要想用的得心應手免不了要根據公司的業務/場景對其做一些改造,如果直接在原始碼的層面對其修改,當下可能用的很省心,但後期與社群程式碼的合併,版本的升級的時候就相當糟心了。

對於一個平臺來說,使用者對技術本身是不敏感的,所以我們需要增加一些限制來減少叢集的一些不可控情況,例如不斷的寫入新表/新資料卻不記得刪除,大量不按規範建立的表名等情況。與此同時應儘量讓技術對使用者透明,比如讓其無感知的訪問多種型別的資料庫。

下文以攔截 spark.sql()

方法為例,通過為 hive表的新增生命週期,自動切換 tidb 表,表許可權校驗等幾個小功能 來說明。

如何使用

程式碼

SparkSqlAspect.scala
為了便於理解以下程式碼會進行一些刪減

import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.{Around, Aspect}
import org.slf4j.LoggerFactory
import org.apache.spark.sql.{Dataset, Row, SparkSession, TiContext}
import cn.tongdun.datacompute.parser._ import cn.tongdun.datacompute.parser.spark.SparkSQLHelper @Aspect class SparkSqlAspect { private val logger = LoggerFactory.getLogger(classOf[SparkSqlAspect]) private var tiContext: TiContext = null @Around("execution(public org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> org.apache.spark.sql.SparkSession.sql(java.lang.String)) && args(sqlRaw)"
) def around(pjp: ProceedingJoinPoint, sqlRaw: String): Dataset[Row] = { //sparkSQLHelper 是我們基於 antlr4 增加了一些 sparksql 語法的支援,例如建表時需要指定 lifecycle 等 val sql = SparkSQLHelper.format(sqlRaw) val spark = pjp.getThis.asInstanceOf[SparkSession] var dataset: Dataset[Row] = spark.emptyDataFrame val statementData = SparkSQLHelper.getStatementData(sql) val statement = statementData.getStatement() //getType 方法用於獲取sql的型別 statementData.getType match { case StatementType.CREATE_TABLE => createMethod() case StatementType.CREATE_TABLE_AS_SELECT => createAsSelectMethod() case StatementType.SELECT => dataset = selectMethod(spark, inputSql, statement, pjp) case _ => dataset = pjp.proceed(pjp.getArgs).asInstanceOf[Dataset[Row]] } dateset } // 建表必須帶有 lifecycle 欄位,並對錶名進行校驗,將相關資訊註冊到元資料系統等操作 def createMethod(): Unit = { ... } // 約定 create table as select 生成的表都為中間表,必須以 tdl_ 開頭,lifecycle 固定為7天 def createAsSelectMethod(): Unit = { ... } // select 對多個數據庫源進行判定以及對許可權進行校驗,下面以tidb為例 def selectMethod(spark: SparkSession, inputSql: String, statement: Statement, pjp: ProceedingJoinPoint): Dataset[Row] = { val tableData = statement.asInstanceOf[TableData] //獲取所有需要訪問的源表 tableData.getInputTables.toArray.foreach { case t: TableSource => val databaseName = t.getDatabaseName val tableName = t.getTableName val fullTableName = databaseName + "." + tableName //所有tidb的庫都以tidb為字首 if (t.getDatabaseName.startsWith("tidb")) { //對tidb表許可權進行校驗 if(tableAuthCheck(...)){ //判斷tiContext是否初始化 if (tiContext == null) { tiContext = new TiContext(spark) } //對tidb表的表名進行替換,避免與現有的臨時表/中間表衝突 val replacedTable = "tdl_" + databaseName + "_" + tableName //加入tidb表資料來源 tiContext.tidbMapTable(databaseName, tableName) //註冊為臨時表 tiContext.getDataFrame(databaseName, tableName).createOrReplaceTempView(replacedTable) //將sql語句中的表名進行替換 sql = StringUtils.replace(sql, fullTableName, replacedTable) } else { throw new IllegalAccessError(fullTableName + "沒有訪問許可權") } } case _ => } pjp.proceed(Array(sql)).asInstanceOf[Dataset[Row]] } }

配置

pom.xml

<dependency>
    <groupId>org.aspectj</groupId>
	    <artifactId>aspectjrt</artifactId>
	    <version>1.9.1</version>
    </dependency>

    <dependency>
        <groupId>org.aspectj</groupId>
        <artifactId>aspectjweaver</artifactId>
	    <version>1.9.1</version>
    </dependency>
	
	<!--公司內部版本,用於支援spark2.3-->	
    <dependency>
       <groupId>com.pingcap.tispark</groupId>
       <artifactId>tispark-core</artifactId>
       <version>1.1-SNAPSHOT</version>
       <scope>provided</scope>
    </dependency>

resources/META-INF/AspectSql.aj

<?xml version="1.0" encoding="UTF-8" ?>
<aspectj>
    <aspects>
        <aspect name="cn.tongdun.aspectj.SparkSqlAspect"/>
    </aspects>
    <weaver options="-Xset:weaveJavaxPackages=true"/>
</aspectj>

spark-defaults.conf

spark.driver.extraClassPath /path/to/spark-aspectj.jar
spark.driver.extraJavaOptions -javaagent:/home/admin/aspectjweaver-1.9.1.jar

結語

通過上述的操作,在使用者呼叫 spark.sql(...) 時將會觸發相應的方法。hdfs/rdd/sparkSession/etc. 操作同理可實現。

不同公司面臨的真實場景各有不同,因此並沒有過多的實現細節,僅給需要的同學提供一些思路。