1. 程式人生 > >【Flink原理和應用】:CliFrontend的原始碼分析

【Flink原理和應用】:CliFrontend的原始碼分析

1. 前言

Flink的原始碼體系比較龐大,一頭扎進去,很容易一頭霧水,不知道從哪部分程式碼看起。但是如果結合我們的業務開發,有針對性地去跟進原始碼去發現問題,理解原始碼裡的執行細節,效果會更好。

筆者在近期的Flink開發過程中,因為產品的原因,只允許部署Flink standalone模式,出於效能考慮,很有必要對其效能做下測試。

Flink的standalone模式的部署方式很簡單。只需要設定下基本的全域性配置引數就行。比如jobmanager.heap.size, taskmanager.heap.size, parallelism.default, taskmanager.numberOfTaskSlots等這些常用引數,就可以執行./bin/start-cluster.sh來啟動Flink的standalone模式。

但是當我執行:

./bin/flink run -c chx.demo.FirstDemo /demo/chx.jar 

來提交我的任務時,發現問題了。當批處理的資料量達2000W時,一切還挺正常,但是當批處理的資料量達3800W時,報出了異常:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>>>> [Actor[akka://flink/user/$a#183984057]] after [10000ms]

碰到這種報錯,首先Akka的機制我們是有必要熟悉下的,但是本文不重點講解Akka的原理和用法,不過我後續文章想對akka做具體的分析和總結。

本文重點講述我們通過./bin/flink run提交任務時,程式到底做了什麼事情。對背後程式碼的執行邏輯做一番分析。

2. 原始碼分析

2.1. 執行入口main函式分析

提交任務時執行的入口是org.apache.flink.client.cli包裡CliFrontend的main函式。main函式的程式碼,我這裡先貼出來:

	public static void main(final String[] args) {
		// 獲取系統的一些引數,並且日誌打印出來
		EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client"
, args); // 1. 找到配置路徑【FLINK_CONF_DIR】 final String configurationDirectory = getConfigurationDirectoryFromEnv(); // 2. 載入全域性配置 final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory); // 3.載入自定義命令列 final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines( configuration, configurationDirectory); try { // 4. 初始化命令列客戶端物件 final CliFrontend cli = new CliFrontend( configuration, customCommandLines); // 5. 安全工具的安裝 SecurityUtils.install(new SecurityConfiguration(cli.configuration)); // 6.執行主體邏輯 int retCode = SecurityUtils.getInstalledContext() .runSecured(() -> cli.parseParameters(args)); System.exit(retCode); } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace(); System.exit(31); }

main入口函式的執行邏輯主體是在第6步(看程式碼註釋)。然後我們繼續看其呼叫的parseParameters函式方法。

2.2. 引數解析邏輯分析

/**
 * 解析命令欄引數並且啟動請求響應
 *
 * @param args 客戶端命令列引數.
 * @return The return code of the program
 */
public int parseParameters(String[] args) {

	// 1. 檢查引數,如果沒有引數,那麼就列印help提示操作
	if (args.length < 1) {
		CliFrontendParser.printHelp(customCommandLines);
		System.out.println("Please specify an action.");
		return 1;
	}

	// 2. 執行動作,是第一個引數,比如run,cancel等等
	String action = args[0];

	// 3. 將執行動作從引數列表裡移除
	final String[] params = Arrays.copyOfRange(args, 1, args.length);

	try {
		// 執行動作
		switch (action) {
			// 執行動作,執行任務
			case ACTION_RUN:
				run(params);
				return 0;
			// 列出所有的任務
			case ACTION_LIST:
				list(params);
				return 0;
			// 得到某任務的資訊
			case ACTION_INFO:
				info(params);
				return 0;
			// 取消任務
			case ACTION_CANCEL:
				cancel(params);
				return 0;
			// 停止任務
			case ACTION_STOP:
				stop(params);
				return 0;
			// 儲存點操作
			case ACTION_SAVEPOINT:
				savepoint(params);
				return 0;
			// 修改操作
			case ACTION_MODIFY:
				modify(params);
				return 0;
			case "-h":
			case "--help":
				CliFrontendParser.printHelp(customCommandLines);
				return 0;
			case "-v":
			case "--version":
				String version = EnvironmentInformation.getVersion();
				String commitID = EnvironmentInformation.getRevisionInformation().commitId;
				System.out.print("Version: " + version);
				System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID);
				return 0;
			default:
				System.out.printf("\"%s\" is not a valid action.\n", action);
				System.out.println();
				System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
				System.out.println();
				System.out.println("Specify the version option (-v or --version) to print Flink version.");
				System.out.println();
				System.out.println("Specify the help option (-h or --help) to get help on the command.");
				return 1;
		}
	} catch (CliArgsException ce) {
		return handleArgException(ce);
	} catch (ProgramParametrizationException ppe) {
		return handleParametrizationException(ppe);
	} catch (ProgramMissingJobException pmje) {
		return handleMissingJobException();
	} catch (Exception e) {
		return handleError(e);
	}
}

其實parseParameters函式程式碼邏輯也不復雜,主要是獲取了客戶端命令列引數,並且根據第一個引數(也就是執行任務的型別)來做出相應的邏輯處理。

2. 3. 提交任務(run)邏輯分析

因為我們主要是提交任務,也即執行的./bin/flink run操作。所以我們接著重點分析run(params)函式:

/**
	 * 執行任務操作
	 *
	 * @param args run操作的命令列引數.
	 */
	protected void run(String[] args) throws Exception {
		LOG.info("Running 'run' command.");
       // 1. 命令列引數解析
		final Options commandOptions = CliFrontendParser.getRunCommandOptions();

		final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);

		final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);

		final RunOptions runOptions = new RunOptions(commandLine);

		// 2. 判斷是否是help操作
		if (runOptions.isPrintHelp()) {
			CliFrontendParser.printHelpForRun(customCommandLines);
			return;
		}

		// 3. 一定要指定run的jar包目錄,也就是我們任務的邏輯程式碼jar包
		if (runOptions.getJarFilePath() == null) {
			throw new CliArgsException("The program JAR file was not specified.");
		}
       
       // 4. 初始化一個打包程式例項
		final PackagedProgram program;
		try {
			LOG.info("Building program from JAR file");
			program = buildProgram(runOptions);
		}
		catch (FileNotFoundException e) {
			throw new CliArgsException("Could not build the program from JAR file.", e);
		}

		final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);

		try {
			// 5. 執行提交的任務的核心邏輯
			runProgram(customCommandLine, commandLine, runOptions, program);
		} finally {
			program.deleteExtractedLibraries();
		}
	}

這裡我們先重點看下第4步的邏輯,即初始化一個打包程式例項:

program = buildProgram(runOptions);

2.4. 任務執行程式初始化

分析下buildProgram函式邏輯:

PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
      // 1. 執行引數:執行引數,任務邏輯jar包路徑,執行classPath路徑
		String[] programArgs = options.getProgramArgs();
		String jarFilePath = options.getJarFilePath();
		List<URL> classpaths = options.getClasspaths();
      
      // 2. 判斷執行邏輯的jar包路徑不能為空 
		if (jarFilePath == null) {
			throw new IllegalArgumentException("The program JAR file was not specified.");
		}
 
		File jarFile = new File(jarFilePath);

		// 3.檢查jar包檔案是否存在以及檔案型別是否正確。
		if (!jarFile.exists()) {
			throw new FileNotFoundException("JAR file does not exist: " + jarFile);
		}
		else if (!jarFile.isFile()) {
			throw new FileNotFoundException("JAR file is not a file: " + jarFile);
		}

		// 4. 任務執行邏輯jar包的入口函式
		String entryPointClass = options.getEntryPointClassName();
       // 5. 初始化PackagedProgram物件。我們重點分析指定了入口函式的情況。
		PackagedProgram program = entryPointClass == null ?
				new PackagedProgram(jarFile, classpaths, programArgs) :
				new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);

		// 設定儲存點
		program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());

		return program;
	}
	/**
	 * 建立使用給定引數包裝jar檔案中定義的計劃的例項。為了生成計劃,使用className引數中定義的類。
	 * @param jarFile
	 *        包含任務計劃的jar包檔案.
	 * @param classpaths
	 *        程式執行需要的其他classpath URLs.
	 * @param entryPointClassName
	 *        任務邏輯jar包的執行入口函式
	 * @param args
	 *        可選引數
	 * @throws ProgramInvocationException
	 *         This invocation is thrown if the Program can't be properly loaded. Causes
	 *         may be a missing / wrong class or manifest files.
	 */
	public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
	    // 1. 任務執行jar包的一些檢查。比如路徑是否存在,jar格式對不對等。
		if (jarFile == null) {
			throw new IllegalArgumentException("The jar file must not be null.");
		}

		URL jarFileUrl;
		try {
			jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
		} catch (MalformedURLException e1) {
			throw new IllegalArgumentException("The jar file path is invalid.");
		}
		checkJarFile(jarFileUrl);

		this.jarFile = jarFileUrl;
		this.args = args == null ? new String[0] : args;

		// 2.如果沒有指定任務jar包的入口函式。我們將掃描,找出一個入口main函式
		if (entryPointClassName == null) {
			entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl);
		}

		// 3. 現在我們有了一個入口點,我們可以提取巢狀的jar檔案
		this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
		this.classpaths = classpaths;
		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());

		// 4. 載入執行入口main函式
		this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);

		// 如果入口點是Program,則例項化類並獲得計劃
		if (Program.class.isAssignableFrom(this.mainClass)) {
			Program prg = null;
			try {
				prg = InstantiationUtil.instantiate(this.mainClass.asSubclass(Program.class), Program.class);
			} catch (Exception e) {
				// validate that the class has a main method at least.
				// the main method possibly instantiates the program properly
				if (!hasMainMethod(mainClass)) {
					throw new ProgramInvocationException("The given program class implements the " +
							Program.class.getName() + " interface, but cannot be instantiated. " +
							"It also declares no main(String[]) method as alternative entry point", e);
				}
			} catch (Throwable t) {
				throw new ProgramInvocationException("Error while trying to instantiate program class.", t);
			}
			this.program = prg;
		} else if (hasMainMethod(mainClass)) {
			this.program = null;
		} else {
			throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
					Program.class.getName() + " interface.");
		}
	}

2.5. 程式執行

任務程式初始化之後,就要真正開始執行了。回到【3. 提交任務(run)邏輯分析】程式碼的第5步:

runProgram(customCommandLine, commandLine, runOptions, program);

執行任務的主體邏輯如下:

/**
 * 執行任務
	 */
private <T> void runProgram(
		CustomCommandLine<T> customCommandLine,
		CommandLine commandLine,
		RunOptions runOptions,
		PackagedProgram program) throws ProgramInvocationException, FlinkException {
    // 1. 叢集環境的相關描述
	final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);

	try {
	 // 2. 叢集Id
		final T clusterId = customCommandLine.getClusterId(commandLine);

		final ClusterClient<T> client;
		
		// 3.如果叢集以作業模式啟動並datached,則直接部署作業
		if (clusterId == null && runOptions.getDetachedMode()) {        
		   // 3.1 獲取並行度
			int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();

			// 3.2.建立任務圖
			final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
			// 3.3. 叢集特定的引數
			final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);

			// 3.4. 裝載任務
			client = clusterDescriptor.deployJobCluster(
				clusterSpecification,
				jobGraph,
				runOptions.getDetachedMode());

			logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());

			try {
			  // 3.5. 把客戶單關閉
				client.shutdown();
			} catch (Exception e) {
				LOG.info("Could not properly shut down the client.", e);
			}
		} else {
		 // 4.其他情況時,應該怎麼啟動任務的執行呢。
			final Thread shutdownHook;
			if (clusterId != null) {
				client = clusterDescriptor.retrieve(clusterId);
				shutdownHook = null;
			} else {
				// 在作業模式下,我們還必須部署會話叢集,因為作業可能由多個部分組成(例如,當使用.時)
				final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
				client = clusterDescriptor.deploySessionCluster(clusterSpecification);
				// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
				// there's a race-condition here if cli is killed before shutdown hook is installed
				if (!runOptions.getDetachedMode() && runOptions.isShutdownOnAttachedExit()) {
					shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
				} else {
					shutdownHook = null;
				}
			}

			try {
				client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
				client.setDetached(runOptions.getDetachedMode());
				LOG.debug("Client slots is set to {}", client.getMaxSlots());

				LOG.debug("{}", runOptions.getSavepointRestoreSettings());

				int userParallelism = runOptions.getParallelism();
				LOG.debug("User parallelism is set to {}", userParallelism);
				if (client.getMaxSlots() != MAX_SLOTS_UNKNOWN && userParallelism == -1) {
					logAndSysout("Using the parallelism provided by the remote cluster ("
						+ client.getMaxSlots() + "). "
						+ "To use another parallelism, set it at the ./bin/flink client.");
					userParallelism = client.getMaxSlots();
				} else if (ExecutionConfig.PARALLELISM_DEFAULT == userParallelism) {
					userParallelism = defaultParallelism;
				}
              // 繼續看,執行邏輯部分。
				executeProgram(program, client, userParallelism);
			} finally {
				if (clusterId == null && !client.isDetached()) {
					// terminate the cluster only if we have started it before and if it's not detached
					try {
						client
            
           

相關推薦

Flink原理應用CliFrontend原始碼分析

1. 前言 Flink的原始碼體系比較龐大,一頭扎進去,很容易一頭霧水,不知道從哪部分程式碼看起。但是如果結合我們的業務開發,有針對性地去跟進原始碼去發現問題,理解原始碼裡的執行細節,效果會更好。 筆者在近期的Flink開發過程中,因為產品的原因,只允許部署Flink stand

Flink原理應用Flink高效的記憶體管理

1. 前言 如今,大資料領域的開源框架(Hadoop,Spark,Storm)都使用的 JVM,當然也包括 Flink。基於 JVM 的資料分析引擎都需要面對將大量資料存到記憶體中,這就不得不面對 JVM 存在的幾個問題: Java 物件儲存密度低。一個只包含 boolea

Pythonwith及上下文管理器的原理應用

這篇部落格主要總結with用法,自定義上下文管理器,以及__exit__的引數相關內容。 with 語句是 Pyhton 提供的一種簡化語法,適用於對資源進行訪問的場合,確保不管使用過程中是否發生異常都會執行必要的“清理”操作,釋放資源,with 語句主要是為了簡化程式碼操

BZOJ 2023 [Usaco2005 Nov]Ant Counting 數螞蟻dp前綴優化

答案 zoj style online nsf side cal mil www. 題目鏈接:http://www.lydsy.com/JudgeOnline/problem.php?id=2023 題意:   有n個家族,共m只螞蟻(n <= 1000, m <

BZOJ 1600 [Usaco2008 Oct]建造柵欄dp前綴優化

define cnblogs sigma set lin clas == problem swe 題目鏈接:http://www.lydsy.com/JudgeOnline/problem.php?id=1600 題意:   給你一個長度為n的木板,讓你把這個木板切割成四段

JVM第六篇Java-類鎖物件鎖

1.類鎖和物件鎖的定義 物件鎖的定義 是針對一個物件的,它只在該物件的某個記憶體位置宣告一個標誌位標識該物件是否擁有鎖,所以它只會鎖住當前的物件。一般一個物件鎖是對一個非靜態成員變數進行syncronized修飾,或者對一個非靜態方法進行syncronized修飾。對於物件鎖,不

資料科學迄今最全面的資料科學應用總結16個分析學科及落地應用

資料科學,資料探勘,機器學習,統計學,運籌學等方面有什麼不同? 在這裡,我比較幾個重疊的分析學科,來解釋差異和共同點。除了歷史原因,有時候除了別的東西外別無其他。有時候,差異是真實而微妙的。我還提供了典型的職位,分析型別以及傳統上與每個學科相關的行業。帶下劃線的域是主要的子域。 首先,我們從描述資料科學這

搞定MySQL資料庫MySQL索引實現原理

本文轉發自:https://blog.csdn.net/a724888/article/details/78366383 本文主要轉載自幾篇關於MySQL資料庫索引相關的文章。可以相互參考著看。 目錄 1、MySQL索引型別 1.1、簡介 1.2、語句 1.3、索引型別

搞定MySQL資料庫深入淺出 MySQL InnoDB

轉發自:https://blog.csdn.net/a724888/article/details/78765898 本文目錄: 1、資料庫的定義 1.1、資料庫和例項 1.2、MySQL 的架構 1.3、資料的儲存 1.4、如何儲存表 1.5、如何儲存記錄 1.6、

資料結構演算法8 線性表線性表的順序儲存結構

線性表的順序儲存結構 線性表有兩種物理儲存結構: 順序儲存結構 和 鏈式儲存結構。 物理上的儲存方式事實上就是在記憶體中找個初始地址,然後通過佔位的形式,把一定的記憶體空間給佔了,然後把相同資料型別的資料元素依次放在這塊空地中。 順序儲存結構:指的是用一段地址連續的儲

JVM原理與優化JVM記憶體設定多大合適?XmxXmn如何設定?

問題: 新上線一個java服務,或者是RPC或者是WEB站點, 記憶體的設定該怎麼設定呢?設定成多大比較合適,既不浪費記憶體,又不影響效能呢? 分析: 依據的原則是根據Java Performance裡面的推薦公式來進行設定。 具體來講: Java整

session理解與總結session原理應用、與cookie區別

session原理 session也是一種記錄瀏覽器狀態的機制,但與cookie不同的是,session是儲存在伺服器中。 由於http是無狀態協議,當伺服器儲存了多個使用者的session資料時,如何確認http請求對應伺服器上哪一條session,相當關鍵。這也是session原理的核心內容。 解決方法

MySQL全面瓦解12連線查詢的原理應用

概述 MySQL最強大的功能之一就是能在資料檢索的執行中連線(join)表。大部分的單表資料查詢並不能滿足我們的需求,這時候我們就需要連線一個或者多個表,並通過一些條件過濾篩選出我們需要的資料。 瞭解MySQL連線查詢之前我們先來理解下笛卡爾積的原理。 資料準備 依舊使用上節的表資料(包含classes 班級

R語言統計分析技術研究——嶺回歸技術的原理應用

gts 根據 誤差 med 分享 jce not -c rt4 嶺回歸技術的原理和應用

Python開發第十一篇JavaScript

靜態函數 發生 編寫 小寫 nsh div 區分 所有 是個 JavaScript是一門編程語言,瀏覽器內置了JavaScript語言的解釋器,所以在瀏覽器上按照JavaScript語言的規則編寫相應代碼之,瀏覽器可以解釋並做出相應的處理。 一、如何編寫 1、JavaScr

Python開發第十四篇Web框架本質

中一 用戶 contain get pattern app sta doc connect Web框架本質 眾所周知,對於所有的Web應用,本質上其實就是一個socket服務端,用戶的瀏覽器其實就是一個socket客戶端。 1 2 3 4 5 6 7 8 9

進擊的Python第十六章Web前端基礎之jQuery

name cat 隱藏 function wid get val 綁定 des 進擊的Python【第十六章】:Web前端基礎之jQuery 一、什麽是 jQuery ? jQuery是一個JavaScript函數庫。 jQuery是一個輕量級的"寫的少,做的多"的Java

圖解Python 第十二篇Django 基礎

aps 不同的 mage 清空 font 一個 取數 ccf pos 本節內容一覽表: Django基礎:http://www.ziqiangxuetang.com/django/django-tutorial.html 一、Django簡介 Django文

Python之路第十四篇Python的內置函數

blank function ocs 函數 alt 分享 詳細 png bsp Python中自帶了一些內置函數,如下圖所示 詳細說明可以戳這裏Python之路【第十四篇】:Python的內置函數

微信小程序小程序,新場景

容量 也好 優惠券 是你 ada 前段時間 tor 能夠 相對 前言: 我們頻繁進入的地方,是場景。手機。是場景;瀏覽器。是場景。事實上,微信,也是場景……