Flink 原始碼解析之從 Example 出發:讀懂本地任務執行流程
微信公眾號: 深廣大資料Club
關注可瞭解更多大資料相關的資訊。問題或建議,請公眾號留言;
[如果你覺得深廣大資料Club對你有幫助,歡迎讚賞]
從上一片《Flink原始碼解析 | 從Example出發理解Flink-Flink啟動》之後,本文講解Apache Flink example中的SocketWindowWordCount例項程式碼的實現。
SocketWindowWordCount
首先我們先來看下SocketWindowWordCount的重要程式碼內容
// get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy("word") .timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount a, WordWithCount b) { return new WordWithCount(a.word, a.count + b.count); } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount");
上述程式碼分為以下幾塊內容:
-
獲取執行環境
-
例項化DataStream物件
-
執行資料處理獲取windowCounts
-
Map - flatMap
-
transaction - keyby
-
reduce
-
列印
-
呼叫env的execute執行任務
這裡的MapFunction以及ReduceFunction可以根據你的業務場景自行實現。
StreamExecutionEnvironment例項化
public static StreamExecutionEnvironment getExecutionEnvironment() { if (contextEnvironmentFactory != null) { return contextEnvironmentFactory.createExecutionEnvironment(); } // because the streaming project depends on "flink-clients" (and not the other way around) // we currently need to intercept the data set environment and create a dependent stream env. // this should be fixed once we rework the project dependencies ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { return new StreamContextEnvironment((ContextEnvironment) env); } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) { return new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); } }
建立StreamExecutionEnvironment時線檢查是否存在contextEnvironmentFactory,如果有直接從contextEnvironmentFactory建立返回。沒有則通過ExecutionEnvironment建立。
本地模式呼叫createLocalEnvironment()方法建立StreamEnvironment。
public static LocalStreamEnvironment createLocalEnvironment() { return createLocalEnvironment(defaultLocalParallelism); } public static LocalStreamEnvironment createLocalEnvironment(int parallelism) { return createLocalEnvironment(parallelism, new Configuration()); } public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) { final LocalStreamEnvironment currentEnvironment; currentEnvironment = new LocalStreamEnvironment(configuration); currentEnvironment.setParallelism(parallelism); return currentEnvironment; }
方法一輪輪呼叫下來最終例項化一個LocalStreamEnvironment返回。
LocalStreamEnvironment
public JobExecutionResult execute(String jobName) throws Exception { // transform the streaming program into a JobGraph StreamGraph streamGraph = getStreamGraph(); streamGraph.setJobName(jobName); JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true); Configuration configuration = new Configuration(); configuration.addAll(jobGraph.getJobConfiguration()); configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0"); // add (and override) the settings with what the user defined configuration.addAll(this.configuration); if (!configuration.contains(RestOptions.PORT)) { configuration.setInteger(RestOptions.PORT, 0); } int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism()); MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setConfiguration(configuration) .setNumSlotsPerTaskManager(numSlotsPerTaskManager) .build(); if (LOG.isInfoEnabled()) { LOG.info("Running job on local embedded Flink mini cluster"); } MiniCluster miniCluster = new MiniCluster(cfg); try { miniCluster.start(); configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); return miniCluster.executeJobBlocking(jobGraph); } finally { transformations.clear(); miniCluster.close(); } }
整體程式碼分為以下幾步:
-
建立streamGraph
-
通過streamGraph建立jobGraph
-
建立Configuration
-
建立MiniClusterConfiguration,並設定每一個TaskManager使用的slot數量setNumSlotsPerTaskManager
-
建立miniCluster
-
通過miniCluster.executeJobBlocking執行jobGraph
注:jobGraph是我們要利用miniCluster執行獲取結果的Graph有向無環圖。
MiniCluster
public void start() throws Exception { synchronized (lock) { checkState(!running, "FlinkMiniCluster is already running"); LOG.info("Starting Flink Mini Cluster"); LOG.debug("Using configuration {}", miniClusterConfiguration); -----------------獲取配置資訊--------------------- final Configuration configuration = miniClusterConfiguration.getConfiguration(); final Time rpcTimeout = miniClusterConfiguration.getRpcTimeout(); final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers(); final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED; try { ---------初始化 IO Format類 -------------- initializeIOFormatClasses(configuration); ---------註冊MetricsRegistry並例項化jobManagerMetricGroup------- LOG.info("Starting Metrics Registry"); metricRegistry = createMetricRegistry(configuration); this.jobManagerMetricGroup = MetricUtils.instantiateJobManagerMetricGroup( metricRegistry, "localhost", ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); final RpcService jobManagerRpcService; final RpcService resourceManagerRpcService; final RpcService[] taskManagerRpcServices = new RpcService[numTaskManagers]; // bring up all the RPC services LOG.info("Starting RPC Service(s)"); // we always need the 'commonRpcService' for auxiliary calls commonRpcService = createRpcService(configuration, rpcTimeout, false, null); ---------建立ActorSystem------------- // TODO: Temporary hack until the metric query service is ported to the RpcEndpoint metricQueryServiceActorSystem = MetricUtils.startMetricsActorSystem( configuration, commonRpcService.getAddress(), LOG); metricRegistry.startQueryService(metricQueryServiceActorSystem, null); ---------例項化jobManagerRpcService、resourceManagerRpcService、taskManagerRpcServices ---------------- if (useSingleRpcService) { for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = commonRpcService; } jobManagerRpcService = commonRpcService; resourceManagerRpcService = commonRpcService; this.resourceManagerRpcService = null; this.jobManagerRpcService = null; this.taskManagerRpcServices = null; } else { // start a new service per component, possibly with custom bind addresses final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress(); final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress(); final String resourceManagerBindAddress = miniClusterConfiguration.getResourceManagerBindAddress(); jobManagerRpcService = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress); resourceManagerRpcService = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress); for (int i = 0; i < numTaskManagers; i++) { taskManagerRpcServices[i] = createRpcService( configuration, rpcTimeout, true, taskManagerBindAddress); } this.jobManagerRpcService = jobManagerRpcService; this.taskManagerRpcServices = taskManagerRpcServices; this.resourceManagerRpcService = resourceManagerRpcService; } ----------建立ha services--------------- // create the high-availability services LOG.info("Starting high-availability services"); haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( configuration, commonRpcService.getExecutor()); ----------建立blob server--------------- blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); ----------建立心跳 server--------------- heartbeatServices = HeartbeatServices.fromConfiguration(configuration); ---------- 啟動ResourceManager----------------- // bring up the ResourceManager(s) LOG.info("Starting ResourceManger"); resourceManagerRunner = startResourceManager( configuration, haServices, heartbeatServices, metricRegistry, resourceManagerRpcService, new ClusterInformation("localhost", blobServer.getPort()), jobManagerMetricGroup); ---------建立BlobCacheService-------------------- blobCacheService = new BlobCacheService( configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()) ); ---------啟動TaskManager---------------- // bring up the TaskManager(s) for the mini cluster LOG.info("Starting {} TaskManger(s)", numTaskManagers); taskManagers = startTaskManagers( configuration, haServices, heartbeatServices, metricRegistry, blobCacheService, numTaskManagers, taskManagerRpcServices); --------啟動排程程式rest埠----------------- // starting the dispatcher rest endpoint LOG.info("Starting dispatcher rest endpoint."); - dispatcherGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, DispatcherGateway.class, DispatcherId::fromUuid, 20, Time.milliseconds(20L)); final RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever = new RpcGatewayRetriever<>( jobManagerRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 20, Time.milliseconds(20L)); this.dispatcherRestEndpoint = new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration, RestHandlerConfiguration.fromConfiguration(configuration), resourceManagerGatewayRetriever, blobServer.getTransientBlobService(), WebMonitorEndpoint.createExecutorService( configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1), configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY), "DispatcherRestEndpoint"), new AkkaQueryServiceRetriever( metricQueryServiceActorSystem, Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))), haServices.getWebMonitorLeaderElectionService(), new ShutDownFatalErrorHandler()); dispatcherRestEndpoint.start(); restAddressURI = new URI(dispatcherRestEndpoint.getRestBaseUrl()); --------啟動HistoryServerArchivist----------------- // bring up the dispatcher that launches JobManagers when jobs submitted LOG.info("Starting job dispatcher(s) for JobManger"); final HistoryServerArchivist historyServerArchivist = HistoryServerArchivist.createHistoryServerArchivist(configuration, dispatcherRestEndpoint); -------例項化UI資料顯示器並啟動----------------- dispatcher = new StandaloneDispatcher( jobManagerRpcService, Dispatcher.DISPATCHER_NAME + UUID.randomUUID(), configuration, haServices, resourceManagerRunner.getResourceManageGateway(), blobServer, heartbeatServices, jobManagerMetricGroup, metricRegistry.getMetricQueryServicePath(), new MemoryArchivedExecutionGraphStore(), Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE, new ShutDownFatalErrorHandler(), dispatcherRestEndpoint.getRestBaseUrl(), historyServerArchivist); dispatcher.start(); ------獲取ResourceManagerLeader、dispatcherLeaderRetriever並啟動------------ resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever(); dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever(); resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever); dispatcherLeaderRetriever.start(dispatcherGatewayRetriever); } catch (Exception e) { // cleanup everything try { close(); } catch (Exception ee) { e.addSuppressed(ee); } throw e; } // create a new termination future terminationFuture = new CompletableFuture<>(); // now officially mark this as running running = true; LOG.info("Flink Mini Cluster started successfully"); } }
MiniCluster所做的事情較多,具體步驟如下:
-
獲取配置資訊
-
初始化 IO Format類
-
註冊MetricsRegistry並例項化jobManagerMetricGroup
-
啟動rpc服務
-
啟動HA服務
-
啟動resourceManager
-
啟動TaskManagers
-
啟動排程程式rest埠
-
在提交工作時啟動JobManagers的分配器
-
獲取ResourceManagerLeader、dispatcherLeaderRetriever並啟動
總結
簡化的描述下整個流程的處理過程:
-
建立獲取對應的StreamExecutionEnvironment物件:LocalStreamEnvironment
-
呼叫StreamExecutionEnvironment物件的execute方法
-
獲取streamGraph
-
獲取jobGraph
-
例項化miniCluster
-
miniCluster.executeJobBlocking指定要執行的jobGraph
-
啟動minCluster執行任務
啟動各類所需服務(rpc、ha、resourceManager、TaskManagers等等)