1. 程式人生 > >Flink執行時之基於Netty的網路通訊中

Flink執行時之基於Netty的網路通訊中

PartitionRequestClient

分割槽請求客戶端(PartitionRequestClient)用於發起遠端PartitionRequest請求,它也是RemoteChannel跟Netty通訊層之間進行銜接的物件。

對單一的TaskManager而言只存在一個NettyClient例項。但處於同一TaskManager中不同的任務例項可能會跟不同的遠端TaskManager上的任務之間交換資料,不同的TaskManager例項會有不同的ConnectionID(用於標識不同的IP地址)。因此,Flink採用PartitionRequestClient來對應ConnectionID,並提供了分割槽請求客戶端工廠(PartitionRequestClientFactory)來建立PartitionRequestClient並儲存ConnectionID與之的對應關係。

接下來,我們重點分析一下其請求ResultPartition的requestSubpartition方法:

public ChannelFuture requestSubpartition(
        final ResultPartitionID partitionId,
        final int subpartitionIndex,
        final RemoteInputChannel inputChannel,
        int delayMs) throws IOException {
    checkNotClosed();

    //將當前請求資料的RemoteInputChannel的例項注入到NettyClient的ChannelHandler管道的
//PartitionRequestClientHandler例項中 partitionRequestHandler.addInputChannel(inputChannel); //構建PartitionRequest請求物件 final PartitionRequest request = new PartitionRequest( partitionId, subpartitionIndex, inputChannel.getInputChannelId()); //構建一個ChannelFutureListener的例項,當I/O操作執行失敗後,會觸發相關的錯誤處理邏輯
final ChannelFutureListener listener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { partitionRequestHandler.removeInputChannel(inputChannel); inputChannel.onError( new LocalTransportException( "Sending the partition request failed.", future.channel().localAddress(), future.cause() )); } } }; //立即傳送該請求,並註冊listener if (delayMs == 0) { ChannelFuture f = tcpChannel.writeAndFlush(request); f.addListener(listener); return f; } //如果請求需要延遲一定的時間,則延遲傳送請求 else { final ChannelFuture[] f = new ChannelFuture[1]; tcpChannel.eventLoop().schedule(new Runnable() { @Override public void run() { f[0] = tcpChannel.writeAndFlush(request); f[0].addListener(listener); } }, delayMs, TimeUnit.MILLISECONDS); return f[0]; } }

NettyMessage

Flink為基於Netty的通訊框架定義了自己的通訊訊息格式,以及相應的解編碼器。NettyMessage由固定大小的訊息頭和自定義的訊息體組成:

NettyMessage-structure

NettyMessage訊息頭的結構如下圖:

NettyMessage-header-structure

由上圖可見,NettyMessage訊息頭所佔的空間是固定的9個位元組。其中,Frame佔用一個整型值的空間其儲存的值為整個訊息(訊息頭和自定義的訊息體)的大小。Magic Number同樣佔用一個整形數值的空間且值是固定不變的;而Id表示訊息的型別,佔用一個位元組空間。

自定義的訊息體部分依不同的訊息型別各有不同。

服務端的訊息有:

  • BufferResponse:服務端給出的Buffer響應訊息,編號為0;
  • ErrorResponse:服務端的錯誤響應訊息,編號為1;

客戶端的訊息有:

  • PartitionRequest:客戶端發起的分割槽請求,編號為2;
  • TaskEventRequest:客戶端發起的任務事件請求,編號為3;
  • CancelPartitionRequest:客戶端發起的取消分割槽請求,編號為4;
  • CloseRequest:客戶端發起的關閉請求,編號為5;

另外,它定義了讀寫介面,面向的物件是Netty的位元組緩衝(ByteBuf)。解編碼器NettyMessageEncoder和NettyMessageDecoder以靜態內部類實現,分別用來在訊息的兩種表示(NettyMessage和ByteBuf)之間進行轉換。

NettyProtocol

NettyProtocol定義了基於Netty進行網路通訊時客戶端和服務端對事件的處理邏輯與順序。由於Netty中所有事件處理邏輯的程式碼都處於擴充套件自ChannelHandler介面的類中,所以,NettyProtocol約定了所有的協議實現者,必須提供服務端和客戶端處理邏輯的ChannelHandler陣列。

最終這些ChannelHandler將依據它們在陣列中的順序進行連結以形成ChannelPipeline。

PartitionRequestProtocol作為NettyProtocol唯一的實現,負責例項化並編排客戶端和服務端的ChannelHandler。按照順序連結的這些ChannelHandler可被視為“協議棧”。接下來,我們分別就客戶端和服務端的協議棧給出了圖示。

客戶端的整個的協議棧如下圖所示:

NettyClient-channel-handler-pipeline

PartitionRequestProtocol構建出的客戶端協議棧將會被構建成ChannelPipeline,並註冊到客戶端引導物件Bootstrap中:

bootstrap.handler(newChannelInitializer<SocketChannel>(){
    @Override
    publicvoidinitChannel(SocketChannelchannel)throwsException{
        channel.pipeline().addLast(protocol.getClientChannelHandlers());
    }
});

服務端協議棧如下圖所示:

NettyServer-channel-handler-pipeline

同客戶端協議棧,服務端協議棧也會被構建成ChannelPipeline並註冊到服務端引導物件ServerBootstrap中:

bootstrap.childHandler(newChannelInitializer<SocketChannel>(){
    @Override
    publicvoidinitChannel(SocketChannelchannel)throwsException{
        channel.pipeline().addLast(protocol.getServerChannelHandlers());
    }
});

需要注意的是,無論是客戶端還是服務端,資料都存在流入(inbound)和流出(outbound)的過程。流入對應著處理器介面為ChannelInboundHandler,而流出對應的處理器介面為ChannelOutboundHandler。因此,兩個協議方法所獲取到的ChannelHandler陣列並不是安裝其元素的絕對順序組成的管道。而是會區分其型別是流入還是流出(根據介面的型別判斷),結合不同的型別並按照其在陣列中的順序將其連結成管道。

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat

QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group

相關推薦

Flink執行基於Netty網路通訊

PartitionRequestClient 分割槽請求客戶端(PartitionRequestClient)用於發起遠端PartitionRequest請求,它也是RemoteChannel跟Netty通訊層之間進行銜接的物件。 對單一的TaskMan

Flink執行基於Netty網路通訊(下)

客戶端核心處理器 這一篇,我們分析一下客戶端協議棧中的核心的處理器PartitionRequestClientHandler,該處理器用於處理服務端的響應訊息。 我們以客戶端獲取到響應之後回撥該處理器的channelRead方法為入口來進行分析:

Flink執行統一的資料交換物件

統一的資料交換物件 在Flink的執行引擎中,流動的元素主要有兩種:緩衝(Buffer)和事件(Event)。Buffer主要針對使用者資料交換,而Event則用於一些特殊的控制標識。但在實現時,為了在通訊層統一資料交換,Flink提供了資料交換物件——Buf

Flink執行生成作業圖

生成作業圖 在分析完了流處理程式生成的流圖(StreamGraph)以及批處理程式生成的優化後的計劃(OptimizedPlan)之後,下一步就是生成它們面向Flink執行時執行引擎的共同抽象——作業圖(JobGraph)。 什麼是作業圖 作業圖(

基於OkHttp網路通訊工具類(傳送get、post請求、檔案上傳和下載)

一、為什麼要用OkHttp? okhttp是專注於提升網路連線效率的http客戶端。 優點: 1、它能實現同一ip和埠的請求重用一個socket,這種方式能大大降低網路連線的時間,和每次請求都建立socket,再斷開socket的方式相比,降低了伺服器伺服器的壓力。 2、okhttp 對

基於MySQL網路通訊協議 編寫自己的JDBC

基於MySQL網路協議實現自己的資料庫驅動。主要是利用Wireshark解析MySQL的packet,使用Socket實現通訊。具體實現思路可以點選下方的個人部落格檢視 。程式碼上傳至GitHub。 在實現功能,借鑑了 sea-boat的packet實現,在其基礎上進行功能塊的編寫,

Objective-C Runtime 執行五:協議與分類

Objective-C中的分類允許我們通過給一個類新增方法來擴充它(但是通過category不能新增新的例項變數),並且我們不需要訪問類中的程式碼就可以做到。 Objective-C中的協議是普遍存在的介面定義方式,即在一個類中通過@protocol定義介面,在另外

Objective-C Runtime 執行六:拾遺

前面幾篇基本介紹了runtime中的大部分功能,包括對類與物件、成員變數與屬性、方法與訊息、分類與協議的處理。runtime大部分的功能都是圍繞這幾點來實現的。 本章的內容並不算重點,主要針對前文中對Objective-C Runtime Reference內容遺漏

Objective-C Runtime 執行二:成員變數與屬性

在前面一篇文章中,我們介紹了Runtime中與類和物件相關的內容,從這章開始,我們將討論類實現細節相關的內容,主要包括類中成員變數,屬性,方法,協議與分類的實現。 本章的主要內容將聚集在Runtime對成員變數與屬性的處理。在討論之前,我們先介紹一個重要的概念:型別

Objective-C Runtime 執行三:方法與訊息

前面我們討論了Runtime中對類和物件的處理,及對成員變數與屬性的處理。這一章,我們就要開始討論Runtime中最有意思的一部分:訊息處理機制。我們將詳細討論訊息的傳送及訊息的轉發。不過在討論訊息之前,我們先來了解一下與方法相關的一些內容。 基礎資料型別 SEL

Objective-C Runtime 執行四:Method Swizzling

理解Method Swizzling是學習runtime機制的一個很好的機會。在此不多做整理,僅翻譯由Mattt Thompson發表於nshipster的Method Swizzling一文。 Method Swizzling是改變一個selector的實際實現的

基於MySQL網路通訊協議 編寫自己的JDBC

基於MySQL網路協議實現自己的資料庫驅動。主要是利用Wireshark解析MySQL的packet,使用Socket實現通訊。具體實現思路可以點選下方的個人部落格檢視 。程式碼上傳至GitHub。 在

[ObjectC]Runtime執行三:方法與訊息

這一章,我們就要開始討論Runtime中最有意思的一部分:訊息處理機制。我們將詳細討論訊息的傳送及訊息的轉發。 基礎資料型別 SEL SEL又叫選擇器,是表示一個方法的selector的指標,其定義如下:typedef struct objc_selector *SEL;o

卷積神經網路(CNN)學習演算法----基於LeNet網路的中文驗證碼識別

  由於公司需要進行了中文驗證碼的圖片識別開發,最近一段時間剛忙完上線,好不容易閒下來就繼上篇《基於Windows10 x64+visual Studio2013+Python2.7.12環境下的Caffe配置學習 》文章,記錄下利用caffe進行中文驗證碼圖片識別的開發過程。由於這裡主要介紹開發和實現過程,

iOS Runtime 執行三:訊息處理機制

前面我們討論了Runtime中對類和物件的處理,及對成員變數與屬性的處理。這一章,我們就要開始討論Runtime中最有意思的一部分:訊息處理機制。我們將詳細討論訊息的傳送及訊息的轉發。不過在討論訊息之前,我們先來了解一下與方法相關的一些內容。 基礎資料型別

趣談iOS執行方法呼叫原理

導語 訊息轉發 OC的動態語言特性 1動態型別 2動態繫結 3動態載入 導語 一個成熟的計算機語言必然有豐富的體系,複雜的容錯機制,處理邏輯以及判斷邏輯。但這些複雜的邏輯都是圍繞一個主線豐富和展開的,所以在學習計算機語言的時候,先掌握核心

Socket系列(二)基於Socket網路通訊的服務端和客戶端程式設計

        Socket系列一主要介紹了Socket的基礎知識、工作原理以及與傳統的http協議的區別。這部分內容的目的是為本文做鋪墊。本文將介紹基於Socket網路通訊的伺服器端和客戶端的程式設計。 一、伺服器端的程式設計         關於Socket伺服器端的

iOS學習筆記56(Runtime)-Objective-C Runtime 執行三:方法與訊息

前面我們討論了Runtime中對類和物件的處理,及對成員變數與屬性的處理。這一章,我們就要開始討論Runtime中最有意思的一部分:訊息處理機制。我們將詳細討論訊息的傳送及訊息的轉發。不過在討論訊息之前,我們先來了解一下與方法相關的一些內容。 基礎資料型別 SEL

基於Windows Socket 的網路通訊的心跳機制原理

(2)SIO_KEEPALIVE_VALS 機制           這是從彭博兄那裡學到一個機制拉,設定介面是WSAIoctl API:      DWORD dwError = 0L ;      tcp_keepalive sKA_Settings = {0}, sReturned = {0} ;  

計算機網路通訊資料加密技術的應用

通訊方面的安全,重點在下述的兩個領域:第一個是資訊在進行傳遞過程中的安全,第二個是資訊在進行儲存過程中的安全。 使用鏈路的方式進行加密 過程:因為在所有中間部位的傳遞節點當中,訊息全部被經過解密以後,再一次實施加密操作,所以,將路由資訊涵蓋在鏈路當中,全部的資料普遍使用祕聞的方式進