Netty聊天室(2):從0開始實戰100w級流量應用
目錄
- 客戶端 Client 登入和響應處理
- 連線伺服器與Session 的建立
- Session和 channel 相互繫結
- AttributeMap介面的使用
- 瘋狂創客圈 Java 死磕系列
客戶端 Client 登入和響應處理
瘋狂創客圈 Java 分散式聊天室【 億級流量】實戰系列之 17【部落格園 總入口 】
原始碼IDEA工程獲取連結: ofollow,noindex" target="_blank">Java 聊天室 實戰 原始碼
寫在前面
大家好,我是作者尼恩。
前面,已經完成一個高效能的 Java 聊天程式的四件大事:
-
完成了協議選型,選擇了效能更佳的 Protobuf協議 。具體的文章為: Netty+Protobuf 整合一:實戰案例,帶原始碼
-
介紹了 通訊訊息資料包的幾條設計準則 。具體的文章為: Netty +Protobuf 整合二:protobuf 訊息通訊協議設計的幾個準則
-
解決了一個非常基礎的問題,這就是通訊的 粘包和半包問題。 具體的文章為: Netty 粘包/半包 全解 | 史上最全解讀
-
前一篇檔案,已經完成了 系統三大組成模組的組成介紹。 具體的文章為: Netty聊天程式(實戰一):從0開始實戰100w級流量應用
今天介紹非常重要的一個內容:
客戶端的通訊、登入請求和登入響應設計。
下面,開啟今天的 驚險和刺激實戰之旅 。
客戶端的會話管理
什麼是會話?
為了方便客戶端的開發,管理與伺服器的連線,這裡引入一個非常重要的中間角色——Session (會話)。有點兒像Web開發中的Tomcat的伺服器 Session,但是又有很大的不同。
客戶端的會話概念圖,如下圖所示:
客戶端會話有兩個很重的成員,一個是user,代表了擁有會話的使用者。一個是channel,代表了連線的通道。兩個成員的作用是:
-
通過user,可以獲得當前的使用者資訊
-
通過channel,可以向伺服器傳送訊息
所以,會話左擁右抱,左手使用者資料,右手伺服器的連線。在本例的開發中,會經常用到。
客戶端的邏輯構成
從邏輯上來說,客戶端有三個子的功能模組。
模組一:Handler
入站處理器。
在Netty 中非常重要,負責處理入站訊息。比方,伺服器傳送過來登入響應,伺服器傳送過來的聊天訊息。
模組二:MsgBuilder
訊息組裝器。
將 Java 內部的 訊息 Bean 物件,轉成傳送出去的 Protobuf 訊息。
模組三:Sender
訊息傳送器。
Handler 負責收的工作。Sender 則是負責將訊息傳送出去。
三大子模組的類關係圖:

介紹完成了主要的組成部分後,開始伺服器的連線和Session 的建立。
連線伺服器與Session 的建立
通過bootstrap 幫助類,設定完成執行緒組、通道型別,向管道流水線加入處理器Handler後,就可以開始連線伺服器的工作。
本小節需要重點介紹的,是連線成功之後,建立 Session,並且將 Session和 channel 相互繫結。
程式碼如下:
package com.crazymakercircle.chat.client; //... @Data @Service("EchoClient") public class ChatClient { static final Logger LOGGER = LoggerFactory.getLogger(ChatClient.class); //.. private Channel channel; private ClientSender sender; public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) { ChannelFuture f = null; try { if (bootstrap != null) { bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); bootstrap.remoteAddress(host, port); // 設定通道初始化 bootstrap.handler( new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProtobufDecoder()); ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(chatClientHandler); } } ); LOGGER.info(new Date() + "客戶端開始登入[瘋狂創客圈IM]"); f = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { LOGGER.info("與服務端斷開連線!在10s之後準備嘗試重連!"); eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS); initFalg = false; } else { initFalg = true; } if (initFalg) { LOGGER.info("EchoClient客戶端連線成功!"); LOGGER.info(new Date() + ": 連線成功,啟動控制檯執行緒……"); channel = futureListener.channel(); // 建立會話 ClientSession session = new ClientSession(channel); channel.attr(ClientSession.SESSION).set(session); session.setUser(ChatClient.this.getUser()); startConsoleThread(); } }); // 阻塞 f.channel().closeFuture().sync(); } } catch (Exception e) { LOGGER.info("客戶端連線失敗!" + e.getMessage()); } } //... }
Session和 channel 相互繫結
Session和 channel 相互繫結,再截取出來,分析一下。
ClientSession session = new ClientSession(channel); channel.attr(ClientSession.SESSION).set(session); session.setUser(ChatClient.this.getUser());
為什麼要Session和 channel 相互繫結呢?
- 發的時候, 需要從Session 寫入 Channel ,這相當於正向的繫結。
- 收的時候,是從Channel 過來的,需要找到 Session ,這相當於反向的繫結。
Netty 中的 channel ,實現了AttributeMap介面 ,相當於一個 Map容器。 反向的繫結,利用了channel 的這個特點。
看一下AttributeMap介面 如何使用的?
AttributeMap介面的使用
AttributeMap 是一個介面,並且只有一個attr()方法,接收一個AttributeKey型別的key,返回一個Attribute型別的value。按照Javadoc,AttributeMap實現必須是執行緒安全的。
AttributeMap內部結構看起來像下面這樣:
不要被嚇著了,其實很簡單。
AttributeMap 的使用,主要是設定和取值。
- 設值 Key-> Value
AttributeMap 的設值的方法,舉例如下:
channel.attr(ClientSession.SESSION).set(session);
這個是鏈式呼叫,attr() 方法中的是 Key, set()方法中的是Value。 這樣就完成了 Key-> Value 的設定。
- 取值
AttributeMap 的取值的方法,舉例如下:
ClientSession session = ctx.channel().attr(ClientSession.SESSION).get();
這個是鏈式呼叫,attr() 方法中的是 Key, get()方法返回 的是Value。 這樣就完成了 取值。
關鍵是,這個key比較特殊。
一般的Map,Key 的型別多半為字串。但是這裡的Key不行,有特殊的約定。
Key的型別必須是 AttributeKey 型別,而且這是一個泛型類,它的優勢是,不需要對值進行強制的型別轉換。
Key的例子如下:
public static final AttributeKey<ClientSession> SESSION = AttributeKey.valueOf("session");
客戶端登入請求
登入的請求,大致如下:

ClientSender的 程式碼如下:
package com.crazymakercircle.chat.client; @Service("ClientSender") public class ClientSender { static final Logger LOGGER = LoggerFactory.getLogger(ClientSender.class); private User user; private ClientSession session; public void sendLoginMsg() { LOGGER.info("開始登陸"); ProtoMsg.Message message = LoginMsgBuilder.buildLoginMsg(user); session.writeAndFlush(message); } //... public boolean isLogin() { returnsession.isLogin(); } }
Sender 首先通過 LoginMsgBuilder,構造一個protobuf 訊息。然後呼叫session傳送訊息。
session 會通過繫結的channel ,將訊息傳送出去。
session的程式碼,如下:
public synchronized void writeAndFlush(Object pkg) { channel.writeAndFlush(pkg); }
其他的客戶端請求流程,大致也是類似的。
一個客戶端的請求大致的流程有三步,分別從Sender 到session到channel。
處理登入成功的響應
這是從伺服器過來的入站訊息。 如果登入成功,伺服器會發送一個登入成功的響應過來。 這個響應,會從channel 傳遞到Handler。
處理器 LoginResponceHandler 的程式碼如下:
package com.crazymakercircle.chat.clientHandler; //... public class LoginResponceHandler extends ChannelInboundHandlerAdapter { static final Logger LOGGER = LoggerFactory.getLogger(LoginResponceHandler.class); /** * 業務邏輯處理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { LOGGER.info("msg:{}", msg.toString()); if (msg != null && msg instanceof ProtoMsg.Message) { ProtoMsg.Message pkg = (ProtoMsg.Message) msg; ProtoMsg.LoginResponse info = pkg.getLoginResponse(); ProtoInstant.ResultCodeEnum result = ProtoInstant.ResultCodeEnum.values()[info.getCode()]; if (result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) { ClientSession session = ctx.channel().attr(ClientSession.SESSION).get(); session.setLogin(true); LOGGER.info("登入成功"); } } } }
LoginResponceHandler 對訊息型別進行判斷,如果是請求響應訊息,並且登入成功。 則取出繫結的session,通過session,進一步完成登入成功後的業務處理。
比如設定成功的狀態,完成一些成功的善後處理操作等等。
其他的客戶端響應處理流程,大致也是類似的。
寫在最後
至此為止,可以看到,客戶端登入的完整流程。
下一篇:伺服器的請求處理和通訊的全流程閉環介紹。
瘋狂創客圈 Java 死磕系列
-
Java (Netty) 聊天程式【 億級流量】實戰 開源專案實戰
- Netty 原始碼、原理、JAVA NIO 原理
- Java 面試題 一網打盡
-
瘋狂創客圈 【 部落格園 總入口 】