1. 程式人生 > >Zeppelin 原始碼分析-Interpreter 相關類(3)

Zeppelin 原始碼分析-Interpreter 相關類(3)

和 Interpreter 直接相關類有以下幾個:
Interpreter, InterpreterFactory, RemoteInterpreter, InterpreterGroup, InterpreterSetting。
由於篇幅有限,這裡分開介紹。

InterpreterFactor 類

InterpreterFactor 類是 Interpreter 介面的工廠類,各種 Interpreter (其實這個類創建出來的全部都是主程序中的代理)都是從這個類中創造出來的,換句話說,這個類專門為獨立的直譯器程序建立代理物件,這個類比較關鍵,因此下面一個一個函式介紹。
該類的所有函式如下:


除了畫圈部分,其餘函式均需要呼叫本類的其餘函式才能完成指定功能,因此先從三個獨立的函式說起。

creatRepl

creatRepl 是建立一個在主程序中建立直譯器的函式,但是鑑於目前所有的直譯器都在獨立的程序中,因此此函式目前來說其實沒有什麼意義。

connectToRemoteRepl

這個函式是為遠端的直譯器 JVM 建立本地代理物件,關鍵程式碼如下:

LazyOpenInterpreter intp = new LazyOpenInterpreter(
  new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath,
    connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener,
    userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT)));

createRemoteRepl

這個函式除了構建本地代理的時候不需要傳送 ip 地址和埠號外,其餘基本相似,另外還需要傳送一個 interpreterRunnerPath 引數,這個引數指定了啟動 Thirf 伺服器的指令碼的路徑,程式碼比較簡單,這裡不再贅述。

createInterpreterGroup

這個函式就是根據給定的 id 建立一個 InterpreterGroup ,是在 InterpreterSetting 中的 getInterpreterGroup 方法被唯一呼叫,新建之後立即加入到該類的 interpreterGroupRef 中,該屬性是一個 Map,key 為 InterpreterGroup 的 id,value 為 InterpreterGroup 物件,屬性的作用就是根據 id 獲取到真實的物件。這個函式在 2 中說過,這裡不再贅述。

createInterpretersForNote

這個函式的呼叫層次如下,可見也是一個相對基礎的函式

這個函式只有在 interpreterGroup 中找不到對應的代理直譯器的時候才會被呼叫,因此首先需要做的就是處理正要建立的代理直譯器所在的 InterpreterGroup 中的原來的直譯器物件正在被移除的情況:

InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId);
synchronized (interpreterGroup) {
  long interpreterRemovalWaitStart = System.nanoTime();
  long minTimeout = 10L * 1000 * 1000000; // 10 sec
  long interpreterRemovalWaitTimeout = Math.max(minTimeout,
    conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2);
  while (interpreterGroup.containsKey(interpreterSessionKey)) {
    if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) {
      throw new InterpreterException("Can not create interpreter");
    }
    try {
      interpreterGroup.wait(1000);
    } catch (InterruptedException e) {
      logger.debug(e.getMessage(), e);
    }
  }
}

然後需要做的就是將某組中所有的直譯器代理物件創建出來,具體的建立過程如下:之前說過 InterpreterSetting 物件其實就是記錄了網頁中一組直譯器的一些配置資訊,因此首先將這些配置資訊找出,然後使用這些配置資訊一次性建立該組的所有直譯器的代理,對於 interpreterInfos 中每個物件,首先判斷該直譯器是執行在主 JVM 還是執行在獨立的 JVM 中,當然目前為止 option.isRemote() 返回總是 true ,然後再判斷該直譯器是在本地的 JVM 還是在 另一臺機器的 JVM,如果在本地呼叫 createRemoteRepl 方法,否則呼叫 connectToRemoteRepl 方法,建立完代理物件之後,就需要為代理物件設定 InterpreterGroup,並在該代理物件對應的 InterpreterGroup 中加入該代理物件,其中每組直譯器的預設直譯器放在 list 的第一個位置。

List<InterpreterInfo> interpreterInfos = interpreterSetting.getInterpreterInfos();
Interpreter interpreter;
for (InterpreterInfo info : interpreterInfos) {
  if (option.isRemote()) {
    if (option.isExistingProcess()) {
      interpreter = connectToRemoteRepl(interpreterSessionKey, info.getClassName(), option.getHost(),option.getPort(), properties, interpreterSetting.getId(), user, option.isUserImpersonate);
    } else {
      interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(),
        properties, interpreterSetting.getId(), user, option.isUserImpersonate(), runner);
    }
  } else {
    interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties);
  }
  // 省略
}

createOrGetInterpreterList

createInterpretersForNote 只在 createOrGetInterpreterList 方法中會被呼叫, createOrGetInterpreterList 方法先根據 setting,user,noteId 建立或者獲取需要執行的直譯器所在的組,之後再檢查該組中是否有指定 interpreterSessionKey 的直譯器代理列表,如果沒有就建立,否則就直接返回,程式碼如下:

private List<Interpreter> createOrGetInterpreterList(String user, String noteId, InterpreterSetting setting) {
  InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId);
  synchronized (interpreterGroup) {
    String interpreterSessionKey =
      interpreterSettingManager.getInterpreterSessionKey(user, noteId, setting);
    if (!interpreterGroup.containsKey(interpreterSessionKey)) {
      createInterpretersForNote(setting, user, noteId, interpreterSessionKey);
    }
    return interpreterGroup.get(interpreterSessionKey);
  }
}

getInterpreter(String user, String noteId, String replName)

這個方法才是 InterpreterFactor 類供外界使用的得到一個直譯器代理物件的方法。
這個方法首先根據 noteID(即你在網頁中建立的 NoteBook 的 ID),得到該 NoteBook 繫結的所有直譯器的資訊,其中,你建立該 NoteBook 時設定的預設直譯器放在 settings 的第一個位置:

List<InterpreterSetting> settings = interpreterSettingManager.getInterpreterSettings(noteId);

在 replName 是 null 或者 “” 的時候返回該 NoteBook 的預設直譯器,這在程式剛開始初始化的時候呼叫,就算你沒點進你建立的 NoteBook ,他也會呼叫這塊, getDefaultInterpreterSetting 方法其實就是返回 settings[0] 。

if (replName == null || replName.trim().length() == 0) {
  // get default settings (first available)
  // TODO(jl): Fix it in case of returning null
  InterpreterSetting defaultSettings = interpreterSettingManager
    .getDefaultInterpreterSetting(settings);
  return createOrGetInterpreterList(user, noteId, defaultSettings).get(0);
}

如果 replName 非空,即是 spark,spark.sql 等,則分情況分析:
* 首先解析類似於 spark.sql 這種 replName ,根據組名 group 和 settings 獲取需要的直譯器的設定資訊,getInterpreter(String user, String noteId, InterpreterSetting setting, String name) 方法就是根據 setting 中的 interpreterGroupRef 屬性獲取到對應 InterpreterGroup 中名字為 name 的代理直譯器物件,獲取到之後即返回直譯器代理物件。
* 如果輸入的是 spark,mysql 這種 replName ,首先找到預設直譯器組,然後如果這裡有的話直接返回,沒有的話繼續尋找,這時只能把 replName 引數當成直譯器組名,從 settings 中尋找第一個組名為 replName 的直譯器組,如果這裡面依然沒有的話,那麼就從所有的 setting 裡面搜尋。

if (replNameSplit.length == 2) {
  String group = null;
  String name = null;
  group = replNameSplit[0];
  name = replNameSplit[1];
  setting = getInterpreterSettingByGroup(settings, group);
  if (null != setting) {
    interpreter = getInterpreter(user, noteId, setting, name);
    if (null != interpreter) {
      return interpreter;
    }
  }
  throw new InterpreterException(replName + " interpreter not found");
} else {
  setting = interpreterSettingManager.getDefaultInterpreterSetting(settings);
  interpreter = getInterpreter(user, noteId, setting, replName);
  if (null != interpreter) {
    return interpreter;
  }
  setting = getInterpreterSettingByGroup(settings, replName);
  if (null != setting) {
    List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, setting);
    if (null != interpreters) {
      return interpreters.get(0);
    }
  }
  for (InterpreterSetting s : settings) {
    if (s.getGroup().equals(replName)) {
      List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, s);
      if (null != interpreters) {
        return interpreters.get(0);
      }
    }
  }
}

這裡舉個例子說明第二種情況,假設我建了兩個直譯器,組名都是 spark ,id 分別為 2CPNNU13C 和 2CPNNU13D,然後都是以共享程序啟動:


然後我新建一個 NoteBook(ID 為 2CPNNU13E),網頁中選的預設直譯器是 spark1,繫結的直譯器是所有,那麼程式剛啟動時 id 分別為 2CPNNU13D 的 InterpreterSetting 物件中的 interpreterGroupRef 屬性中就會存有這個 id 為 2CPNNU13E 的 NoteBook 物件對應的直譯器組物件,這個直譯器組物件的 id 為 2CPNNU13E:share_progress ,只有一個元祖,key為 share_progress,value 就是 spark 的五個代理物件。這個函式的第一句話 interpreterSettingManager.getInterpreterSettings(noteId) 返回的是所有直譯器設定,其中第一項就是 id 分別為 2CPNNU13D 的 InterpreterSetting 物件。當我在一個 ParaGraph 輸入 %spark1 的時候,由於預設直譯器就是 spark1,因此會進入第一個分支,而輸入 %spark 的時候其實會進入第三個分支因為 settings 中尋找第一個組名為 spark 的直譯器組其實返回的是 id 分別為 2CPNNU13D 的 InterpreterSetting 物件。

相關推薦

Zeppelin 原始碼分析-Interpreter 相關3

和 Interpreter 直接相關類有以下幾個: Interpreter, InterpreterFactory, RemoteInterpreter, InterpreterGroup, InterpreterSetting。 由於篇幅有限,這裡分開介紹

原始碼分析三、Map3-TreeMap

一、概述 通過IDEA看下TreeMap的繼承關係,繼承抽象父類AbstractMap,實現了NavigableMap介面,SortedMap介面,TreeMap是一種有序的Map,從其實現的介面就能看出來。 那麼,首先來看下實現的兩類介面: SortedMap:實

Memcached原始碼分析之訊息迴應3

文章列表: 《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》 前言 上一章《Memcached原始碼分析 - Memcached原始碼分析之命令解析(2)》,我們花了很大的力氣去講解Memcached如何從客戶端讀取命令,並且

雲客Drupal8原始碼分析之實體Entity配置實體基

配置實體基類是系統定義的一個用於配置實體的抽象基類,繼承自實體基類,完成了配置實體的大部分通用功能,具體的配置實體往往會繼承它,比如使用者角色實體,這樣寫少量程式碼即可,類定義如下: Drupal\Core\Config\Entity\ConfigEntityBase 實

雲客Drupal8原始碼分析之實體Entity內容實體基

原始碼分析重點在於在自己的大腦中重現開發者的思維過程,內容實體基類是drupal中很大的一個類,她要處理眾多的問題,內容實體的大多數功能都集中在這裡,開發者有許多的考慮,要弄清楚她的所有細節,學習者可能會覺得有些困難,這時需要明白任何複雜龐大的事物都是一步步累積發展起來的,

南大算法設計與分析課程OJ答案3

完美 語言 偶數 使用 課程 nbsp problems AS btn 問題 A: 動態中位數問題 時間限制: 1 Sec 內存限制: 8 MB提交: 866 解決: 102提交 狀態 算法問答 題目描述 輸入一組整數a1, a2, …, an ,每輸入一

Spark原始碼分析之Spark Shell

https://www.cnblogs.com/xing901022/p/6412619.html 文中分析的spark版本為apache的spark-2.1.0-bin-hadoop2.7。 bin目錄結構: -rwxr-xr-x. 1 bigdata bigdata 1089 Dec

資料結構與演算法分析:線性結構3

堆疊 1.計算機如何進行表示式求值     算術表示式: 由兩類物件構成:運算數,運算子號 不同運算子號優先順序不同    ①中綴表示式:把運算子號放在兩個運算數之間:a+b*c-d/e     &

JAVA——特殊(1)——String3——字串比較(方法

(二)字串比較 (1)比較字串是否相等——區分大小寫——返回值為Boolean型 public boolean equals(Object anObject) //anObject——傳入需要被比較的物件 //此方法區分大小寫 (2)比較字串是否相

雲客Drupal8原始碼分析之外掛系統

以下內容僅是一個預覽,完整內容請見文尾: 至此本系列對外掛的介紹全部完成,涵蓋了系統外掛的所有知識 全文目錄(全文10476字): 例項化外掛 外掛對映Plugin mapping 外掛上下文  

寫爬蟲所用到的工具---3[檔案]

package Tool; import java.io.*; import java.util.ArrayList; import java.util.List; /** * this is a class that can operation file

elasticsearch原始碼分析之分片分配

分片 什麼是分片 分片是把索引資料切分成多個小的索引塊,這些小的索引塊能夠分發到同一個叢集中的不同節點。在檢索時,檢索結果是該索引每個分片上檢索結果的合併。類似於資料庫的分庫分表。 為什麼分片 1、這樣可以提高讀寫效能,實現負載均衡。 2、副本容易

elasticsearch原始碼分析之索引操作

上節介紹了es的node啟動如何建立叢集服務的過程,這節在其基礎之上介紹es索引的基本操作功能(create、exist、delete),用來進一步細化es叢集是如果工作的。 客戶端部分的操作就不予介紹了,詳細可以參照elasticsearch原始碼分析之客戶

elasticsearch原始碼分析之服務端

上篇部落格說明了客戶端的情況,現在繼續分析服務端都幹了些啥,es是怎麼把資料插進去的,此處以transport的bulk為入口來探究,對於單個document的傳送就忽略了。 一、服務端接收 1.1接收訊息 在客戶端分析中已經提到,netty中通訊的處理類是Mes

SpringMVC原始碼分析--容器初始化FrameworkServlet

一下SpringMVC配置檔案的地址contextConfigLocation的配置屬性,然後其呼叫的子類FrameworkServlet的initServletBean方法。 其實FrameworkServlet是springMVC初始化IOC容器的核心,通過讀取配置的c

圖形介面程式設計 佈局容器3

5 表格佈局 無論使用錨定佈局還是流式佈局,都無法達到複雜佈局的效果,很多時候我們不得不使用絕對佈局,忍受絕對佈局帶來的麻煩(要麼容器尺寸一變化,介面就變得一團糟;要麼在容器的Resize事件中寫複雜的佈局程式碼)。其實.net Framework中

springMVC原始碼分析--容器初始化ContextLoaderListener

在spring Web中,需要初始化IOC容器,用於存放我們注入的各種物件。當tomcat啟動時首先會初始化一個web對應的IOC容器,用於初始化和注入各種我們在web執行過程中需要的物件。當tomcat啟動的時候是如何初始化IOC容器的,我們先看一下在web.xml中經常看

elasticsearch原始碼分析之啟動過程

最近開始廣泛的使用elasticsearch,也開始寫一些java程式碼了,為了提高java程式碼能力,也為了更加深入一點了解elasticsearch的內部運作機制,所以開始看一些elasticsearch的原始碼了。對於這種廣受追捧的開源專案,細細品讀一定會受益匪淺,

SpringMVC原始碼分析--容器初始化DispatcherServlet

上一篇部落格SpringMVC原始碼分析--容器初始化(四)FrameworkServlet我們已經瞭解到了SpringMVC容器的初始化,SpringMVC對容器初始化後會進行一系列的其他屬性的初始化操作,在SpringMVC初始化完成之後會呼叫onRefresh(wac

springMVC原始碼分析--攔截器HandlerExecutionChain

上一篇部落格springMVC原始碼分析--HandlerInterceptor攔截器呼叫過程(二)中我們介紹了HandlerInterceptor的執行呼叫地方,最終HandlerInterceptor呼叫的地方是在HandlerExecutionChain中,接下來我們就