1. 程式人生 > >在自定義資料來源中使用sparksql(Spark2.0+)帶示例

在自定義資料來源中使用sparksql(Spark2.0+)帶示例

主要原理

spark sql 核心:
ParseInterface:
專門負責解析外部資料來源SQL的SqlParser。目前自帶的parser已經能滿足各種需求
RunnableCommand:
從反射的資料來源中例項化relation,然後註冊到temp table中。
Strategy:
將plan對映為物理計劃。
RelationProvider:
提供一個Relation。
BaseRelation:
可提供sql的一些邏輯操作(insert等)。

Spark SQL解析SQL流程如下:
1、Analyzer通過Rule解析,將UnresolvedRelation解析為JsonRelation。
2、通過Parse,Analyzer,Optimizer最後得到Relation(_*)
3、自定義的Strategy將LogicalPlan對映到物理計劃PhysicalRDD。
4、PhysicalRDD裡包含了如何查詢外部資料的規則。

簡單示例

使用自定義的relation,strategy,plan實現簡單的select,insert語句

程式碼:

Relation


case class TextRelation(sqlContext: SQLContext, schema: StructType, path: String) extends BaseRelation with InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
if (!new File(path).exists())
data.rdd.map(_.mkString(",")).saveAsTextFile(path)
}
}

Source


class TextSource extends SchemaRelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType): BaseRelation = {
val path = parameters.getOrElse("path", "/home/wpy/tmp/external_sql/testSql")
TextRelation(sqlContext, schema, path)
}
}

sql入口


class Text4SQLContext(sc: SparkContext, sqlContext: SQLContext){
sqlContext.experimental.extraStrategies = new TextStrategies().TextStrategy :: Nil
def sql(sqlText: String): DataFrame = {
sqlContext.sql(sqlText)
}
}

Plan


case class LogicalText(output: Seq[Attribute], path: String) extends LogicalPlan {
override def children: Seq[LogicalPlan] = Nil
}


case class PhysicalText(output: Seq[Attribute], path: String) extends SparkPlan {
override protected def doExecute(): RDD[InternalRow] = {
sparkContext.textFile(path).map { row =>
val fields = row.split(",").map(UTF8String.fromString)
UnsafeProjection.create(schema)(InternalRow.fromSeq(fields))
}
}
override def children: Seq[SparkPlan] = Nil
}


case class TextExecuteCommand(cmd: RunnableCommand) extends SparkPlan {
override protected def doExecute(): RDD[InternalRow] = {
ExecutedCommandExec(cmd).execute()
}
override def output: Seq[Attribute] = cmd.output
override def children: Seq[SparkPlan] = Nil
}

Strategy


class TextStrategies extends QueryPlanner[SparkPlan] with PredicateHelper {
override def strategies: Seq[GenericStrategy[SparkPlan]] = TextStrategy :: Nil
object TextStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
plan match {
case LogicalText(output, path) => PhysicalText(output, path) :: Nil
case LogicalRelation(TextRelation(_, _, path), output, _) => PhysicalText(output, path) :: Nil
case [email protected]([email protected](t: TextRelation, _, _), part, query, overwrite, false) if part.isEmpty =>
ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil
case _ => Nil
}
}
}
override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = {
plan.collect {
case [email protected](logicalPlan) => placeholder -> logicalPlan
}
// Nil
}
override protected def prunePlans(plans: Iterator[SparkPlan]): Iterator[SparkPlan] = {
plans
}
}

示例:


object TestTextSql {
val conf = new SparkConf().setMaster("local[*]").setAppName(getClass.getCanonicalName)
val ss = SparkSession.builder().config(conf).getOrCreate()
def main(args: Array[String]): Unit = {
val sqlContext = ss.sqlContext
val sparkContext = ss.sparkContext
val ts = new Text4SQLContext(sparkContext, sqlContext)
ts.sql(
"""create table test1(
|word string,
|num string
|) using external.datasource.TextSource
|options(
|path '/home/wpy/tmp/external_sql/test1'
|)
""".stripMargin)
ts.sql("select * from test1").show
print("=============================================\n")
ts.sql(
"""create table test2(
|word string,
|num string
|) using external.datasource.TextSource
|options(
|path '/home/wpy/tmp/external_sql/test2'
|)
""".stripMargin)
ts.sql(
"""
|insert into table test2
|select * from test1
""".stripMargin)
ts.sql("select * from test2 order by word").show
}
}

資料來源
test1表(文字檔案)
1,a
2,b
3,c

測試結果如下:

+—-+—+
|word|num|
+—-+—+
| 1| a|
| 2| b|
| 3| c|
+—-+—+

=============================================

+—-+—+
|word|num|
+—-+—+
| 1| a|
| 2| b|
| 3| c|
+—-+—+

由於這部分程式碼較為簡單,此處則不再贅述,理論上自定義資料來源可以實現在任何能讀取的資料上進行sql操作,其中還可以自定義filter方法從源頭對資料進行過濾,從而實現對大量資料的快速查詢,注意此處使用了spark自帶的RDD,使用其他spark不支援的資料來源時,需要自行定義合適的RDD(物理計劃)去獲取資料。程式碼能直接執行(spark版本 2.2.0-SNAPSHOT)。

相關推薦

定義資料來源使用sparksqlSpark2.0+示例

主要原理 spark sql 核心: ParseInterface: 專門負責解析外部資料來源SQL的SqlParser。目前自帶的parser已經能滿足各種需求 RunnableCommand: 從反射的資料來源中例項化rel

Java實現定義的註解處理器Annotation Processor

在之前的《簡單實現ButterKnife的註解功能》中,使用了執行時的註解實現了通過編寫註解繫結View與xml。由於執行時註解需要在Activity初始化中進行繫結操作,呼叫了大量反射相關程式碼,在介面複雜的情況下,使用這種方法就會嚴重影響Activity初始

SpringMVC十二定義異常處理器 HandlerExceptionResolver接口

pin org ota admin pack property framework ase exception 自定義異常處理器和系統異常處理器的提升版可以實現相同的功能,但是使用的方法不同,自定義異常處理器可以不用在配置文件中配置name多東西,只需要一個異常處理器就可以

Odoo 定義Widgets 基礎教程章節1

大家好, 此次,我們主要講解 Odoo 中的Widgets。 網上、論壇裡很多提及Widget的文章,但很少說Odoo自定義Widget 是如何實現的,這一直是大家所苦惱的地方。本章,將對Odoo中的Widget 進行基礎講解。 首先, Widget【掛件】產生的目的,是為了方便後端開發人員在不熟悉Ja

定義輪播圖banner圖

public class MyBannerActivity extends AppCompatActivity { private String[] picUrl = { "https://img.huxiucdn.com/article/c

定義View繪製流程面試專用

(自己整理的比較亂,但這麼說沒問題的!) 自定義view是幹嘛的呢? 當我們不滿足於Android提供的原生控制元件和佈局時,就應該考慮到自定義view。 自定義View分為兩大塊。  自定義控制元件  和  自定義容器 自定義View必須重寫兩個構造方法  第一個

隨筆 定義redis快取註解基於springboot

前言:            最近專案開發中需要使用redis快取為資料庫降壓。由於在構建系統時沒有使用快取,後期加入快取的時候不想對業務程式碼上新增,造成程式碼入侵,所有封裝了一套自定義快取類,處理快取。 開發環境:          win10+Intelli

【Android】_定義Adapter_學生註冊無資料庫

本文是完善前一篇學生註冊文章(https://blog.csdn.net/cungudafa/article/details/84780652)中:對`自定義ListView`增加`介面卡Adapter`實現對每個學生個體進行再編輯和刪除操作。

spring-security 個性化用戶認證流程——定義登錄頁面可配置

ron 進行 狀態 row 錯誤 this 力度 override all 1.定義自己的登錄頁面我們需要根據自己的業務系統構建自己的登錄頁面以及登錄成功、失敗處理在spring security提供給我的登錄頁面中,只有用戶名、密碼框,而自帶的登錄成功頁面是空白頁面(可以

Android 定義view --圓形百分比進度條

注:本文由於是在學習過程中寫的,存在大量問題(overdraw onDraw new物件),請讀者們不要被誤導!!解決辦法見後面的部落格。 起因 最近公司專案有需求需要用到輕量級圖表如下

一步一步你實現定義圓形進度條詳解

        每次看到別人做出炫酷的都會想,這個應該很難吧?這是心理上先入為主的就這麼認為了,其實實現很簡單,下面一步一步的詳細剖析自定義圓形進度條的步驟。 首先看效果圖: 篇幅有點長,耐心看完肯定get新技能。 看每一個檢視都包含了些什麼。 最

定義物件去重Set篇

當需要給集合去重時,實現的方式有很多。這裡介紹使用Set去重(兩種方式) 需求:給多個Student物件去重 public class Student { private int id; private String name; public Student() {

Echarts定義資料檢視樣式表格樣式

用過echarts的同胞們都知道,自帶的資料檢視是怎樣的。在真正做專案時,肯定需要我們自己來寫一個相對美觀的資料檢視。下面以表格樣式的資料檢視為例,來實現一個自己的資料檢視。 <!DOCTYPE html> <html> <head>

Android:實際運用Zxing整合二維碼掃描 及 定義掃碼介面demo原始碼

二維碼掃描,各大主流App必不可少的功能,而且google已將輪子替我們造好,直接拿來使用即可。以下是教學如何將Zxing開源庫整合到自己專案中,並且自定義掃碼介面,後期可根據自己的業務需求進行修改,最後補充了一點由此延伸的學習技能點。 一. 整合Zing

遮蔽瀏覽器右鍵選單功能,定義右鍵選單相容IE8

做了個自定義右鍵選單的功能,和獲取選中文字內容相關,一起記錄一下。 網上查閱嘗試了很多方式,選擇了一個較為簡潔的方式實現,程式碼很少,進行細節調整後發現存在IE8相容性問題,又查閱資料後進行了優化,目前可以完美相容IE8,但注意jQuery版本必須1.7+,我

網路程式設計---乙太網頭 IP頭 ARP包結構體封裝及常用定義網路轉換函式已測試

  以下的結構體在實際工程中經過了測試是可以的,自定義函式也經過了測試,到目前為止沒有出現問題。 // --------------------------------------------- // TCP/IP基礎結構體和通用函式 // ---------

destoon 定義新模組模組手機模組

前面我們講過了destoon 怎樣自定義一個新的模組,現在讓我談談怎樣定義一個手機模組換句話說就是新模組的網頁內容可以有手機版 準備工作 1、destoon下載地址:http://www.destoon.com/download/ 2、下載並安裝完成, 3、後臺使用者名稱:

定義View學習一圓形頭像

前言 系統為我們提供的控制元件是有限的,當我們想要在有限的螢幕上顯示更豐富多彩的內容,我們往往需要自定義控制元件。作為一個android初學者,我對android的自定義View也不是很熟悉。這段時間剛好無事,就先從我們平常使用的圓形頭像開始練起吧。 我們要

資料結構---定義單向連結串列Java實現

單向連結串列是指每一個節點記憶體在一個指向下一個節點的指標,java中就是節點存在指向下一個節點的物件引用 下面是Node節點類 public class Node { private Object object; private Node next;

Android使用xml定義軟鍵盤效果附原始碼

Android使用xml自定義軟鍵盤效果原理: 1,軟鍵盤其實是個控制元件,使用android.inputmethodserver.KeyboardView類定義。 2,主佈局中使用幀佈局,當我們需要顯示軟鍵盤時設定為可見,不需要時設定為不可見。 3,編寫