Netty 實現一對一客戶端聊天(由伺服器轉發)
學習Netty有一個多星期,參考《Netty實戰》敲了Echo C/S 程式碼,也學習了channelHandler等元件。不滿足客戶端伺服器一對一聊天,所以尋思著自己實現一個客戶端和客戶端一對一聊天,訊息由伺服器轉發。
如果有程式碼寫的不合適的地方,還請評論指正。
《Netty實戰》電子書下載地址
主要思路
伺服器端
- 伺服器端維護一個ChannelHandlerContext型別的陣列,用來儲存與伺服器連線的各個channel的資訊。 (思路來源於《Netty 實戰》6.3.2)
- 最大連線數控制。設定一個變數為最大數,
- 伺服器主動關閉連線。當檢測到一個連線的客戶端掉線或異常,伺服器應主動關閉連線
- 伺服器轉發訊息。通過ChannelHandlerContext型別的陣列獲得與之相連的channel資訊,將伺服器讀到的訊息寫給另一個客戶端
- 譯碼器解碼器。使讀寫更加方便
- 為了便於除錯,還附加了一些包含獲取時間,伺服器顯示讀取的訊息等等函式
客戶端
- 傳送資訊
- 讀取資訊
- 主動關閉連線
- 輸入指定字串時下線。由於是控制檯程式,所以我設定輸入“bye”或“再見”時,呼叫相關函式關閉連線
實現程式碼
伺服器端
類結構
類名 | 說明 |
---|---|
ServerHandler | 繼承ChannelInboundHandlerAdapter介面 |
Server | 引導伺服器端 |
實現程式碼
ServerHandler類
繼承ChannelInboundHandlerAdapter介面,重寫一些方法,新增一些私有變數來儲存和控制channel資訊
//ServerHandler類
import java.net.InetSocketAddress;
import java.util.Calendar;
import java.util.Vector;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
private static final int MAX_CONN = 2;//指定最大連線數
private int connectNum = 0;//當前連線數
//channelHandlerContext表
private Vector<ChannelHandlerContext> contexts = new Vector<>(2);
//獲取當前時間
private String getTime() {
Calendar c = Calendar.getInstance();
int year = c.get(Calendar.YEAR);
int month = c.get(Calendar.MONTH);
int date = c.get(Calendar.DATE);
int hour = c.get(Calendar.HOUR_OF_DAY);
int minute = c.get(Calendar.MINUTE);
int second = c.get(Calendar.SECOND);
return new String(year + "/" + month + "/" + date + " " + hour + ":" + minute + ":" + second);
}
/*
* 重寫channelActive()方法
* 更新當前連線數
* 控制連線客戶端的個數,超過則關閉該channel
* 更新contexts陣列
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
connectNum++;
//控制客戶端連線數量,超過則關閉
if (connectNum > MAX_CONN) {
ctx.writeAndFlush(Unpooled.copiedBuffer("達到人數上限".getBytes()));
ctx.channel().close();
//當前連線數的更新放在channelInactive()裡
}
//更新contexts
contexts.add(ctx);
//控制檯輸出相關資訊
InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
System.out.println(socket.getAddress().getHostAddress() + ":" + socket.getPort() + "已連線");
System.out.println("當前連線數:" + connectNum);
ctx.writeAndFlush("hello client");
}
/*
* 重寫channelInactive()方法
* 更新當前連線數
* 更新contexts陣列
* 控制檯輸出相關資訊
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
//更新當前連線數
connectNum--;
//更新contexts陣列
contexts.remove(ctx);
//控制檯輸出相關資訊
InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
System.out.println(getTime() + ' ' + socket.getAddress().getHostAddress() + ":" + socket.getPort() + "已退出");
System.out.println("當前連線數:" + connectNum);
//對另一個客戶端發出通知
if (contexts.size() == 1) {
contexts.get(0).writeAndFlush("對方退出聊天");
}
}
/*
* 重寫channelRead()函式
* 讀取資料
* 轉發訊息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub
String in = (String) msg;
System.out.println(getTime() + " 客戶端" + ctx.channel().remoteAddress() + ":" + in);
//當只有一方線上時,傳送通知
if (contexts.size() < 2) {
ctx.writeAndFlush("對方不線上");
return;
}
//獲取另一個channelhandlercontxt的下表
int currentIndex = contexts.indexOf(ctx);
int anotherIndex = Math.abs(currentIndex - 1);
//給另一個客戶端轉發資訊
contexts.get(anotherIndex).writeAndFlush(in);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
if (!ctx.channel().isActive()) {
System.out.println(ctx.channel().remoteAddress() + "客戶端異常");
}
cause.printStackTrace();
ctx.close();
}
}
Server類
用的最常規最簡單的寫法,和EchoServer差不多,等以後進一步學習再豐富內容
//Server類
import java.net.InetSocketAddress;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Server {
public static void main(String[] args) throws InterruptedException {
new Server().start();
}
public Server() {
// TODO Auto-generated constructor stub
}
// 引導類
public void start() throws InterruptedException {
ServerHandler sHandler = new ServerHandler();
InetSocketAddress localSocket = new InetSocketAddress("127.0.0.1", 9990);
ServerBootstrap b = new ServerBootstrap();
b.group(newNioEventLoopGroup())
.localAddress(localSocket)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//新增譯碼器解碼器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(sHandler);
}
});
final ChannelFuture f = b.bind().sync();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// TODO Auto-generated method stub
if (f.isSuccess()) {
System.out.println("伺服器開啟成功");
} else {
System.out.println("伺服器開啟失敗");
f.cause().printStackTrace();
}
}
});
}
}
客戶端
類結構
類名 | 說明 |
---|---|
ClientHandler | 繼承SimpleChannelInboundHandler < String > 介面 |
Client | 引導客戶端 |
實現程式碼
ClientHandler類
繼承SimpleChannelInboundHandler< String >介面,重寫一些方法,新增私有closeChannel()來主動關閉連線
import java.util.Calendar;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
//因為加入了String型別的編碼器和譯碼器,所以介面例項化為String型別
public class clientHandler extends SimpleChannelInboundHandler<String> {
//儲存當前ChannelHandlerContext,在後面的closeChannel()中使用
private ChannelHandlerContext chc = null;
//獲取當前時間
private String getTime() {
Calendar c = Calendar.getInstance();
int year = c.get(Calendar.YEAR);
int month = c.get(Calendar.MONTH);
int date = c.get(Calendar.DATE);
int hour = c.get(Calendar.HOUR_OF_DAY);
int minute = c.get(Calendar.MINUTE);
int second = c.get(Calendar.SECOND);
return new String(year + "/" + month + "/" + date + " " + hour + ":" + minute + ":" + second);
}
//主動關閉連線
public void closeChannel(boolean readyToClose) throws InterruptedException {
if (readyToClose) {
System.out.println("即將關閉連線");
chc.channel().closeFuture();
chc.channel().close();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
//儲存當前ChannelHandlerContext
chc = ctx;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String in) throws Exception {
// TODO Auto-generated method stub
System.out.println(getTime() + " " + in);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println(getTime() + " 斷開連線");
ctx.channel().closeFuture();
ctx.channel().close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// TODO Auto-generated method stub
cause.printStackTrace();
System.out.println("有異常");
ctx.channel().close();
}
}
Client類
簡單地引導客戶端
//Client類
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class Client {
public void start() throws IOException, InterruptedException {
Bootstrap b = new Bootstrap();
EventLoopGroup g = new NioEventLoopGroup();
//建立物件,後面呼叫closeChannel()主動關閉連線
ClientHandler cHandler = new ClientHandler();
b.group(g)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// TODO Auto-generated method stub
//新增編碼器、譯碼器
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new StringEncoder());
sc.pipeline().addLast(cHandler);
}
});
final ChannelFuture f = b.connect("127.0.0.1", 9990).sync();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture arg0) throws Exception {
// TODO Auto-generated method stub
if (f.isSuccess()) {
System.out.println("連線伺服器成功");
} else {
System.out.println("連線伺服器失敗");
f.cause().printStackTrace();
}
}
});
Channel channel = f.channel();
/*
* 獲取控制檯輸入
* 當輸入了“再見”或“bye”時,停止輸入
* 主動關閉連線
*/
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String in = br.readLine();
while (!(in.equals("再見") || in.equals("bye"))) {
channel.writeAndFlush(in);
in = br.readLine();
}
channel.writeAndFlush(in);
cHandler.closeChannel(true);
g.shutdownGracefully().sync();
}
public Client() {
// TODO Auto-generated constructor stub
}
public static void main(String[] args) throws Exception {
new Client().start();
}
}
實現過程中遇到的一些問題
關於控制檯輸入的問題
控制檯輸入使用如下程式碼
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
String in = br.readLine();
while (!(in.equals("再見") || in.equals("bye"))) {
channel.writeAndFlush(in);
in = br.readLine();
}
channel.writeAndFlush(in);
這段程式碼可以放在很多位置,不一定非要放在客戶端內,在ClientHandler類的channelActive()裡也可以加入這段程式碼。也就是說,在任何你想要通過控制檯輸入的地方都可以輸入。
ChannelHandlerContext的使用
為了實現訊息轉發,我維護了一個數組用來儲存每個連線的ChannelHandlerContext,這樣在讀取一個客戶端傳送的訊息時,我可以迅速找到另一個客戶端的ChannelHandlerContext並向其傳送剛剛讀取的資訊。
這個技巧是從《Netty實戰》6.3.2小節中學到的,在ChannelHandler中儲存一些ChannelHandlerContext的引用以供以後使用,這將很方便地管理不同的channel,更廣泛地說,在其他操作中對目標channel進行操作。這也要求ChannelHandler必須標註@Sharable,讓多個ChannelPipeline共享同一個ChannelHandler時不會出現異常。
不過為了避免外部呼叫,標記為private是個不錯的選擇。
客戶端發完訊息後自動斷開連線
這個問題發生的很突然,當我一步步實現了各個功能後,一測試發現客戶端傳送一條訊息後莫名其妙自動下線了,並且觸發了channelInactive()函式。為此我付出兩個小時的代價尋找問題的根源,這也是為什麼我加了那麼多控制檯輸出資訊和getTime()函式,就是為了找出在那裡出的問題。
最後在群裡以為大神的指點下,找到了問題所在。
在《Netty實戰》中的EchoServerHandler中,有這樣一段override:
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
由於沒有理解透徹,我簡單地把這個函式也寫到了我的ServerHandler中,最後就是這裡出了問題。
在 ChannelInboundHandler 的方法中,channelReadComplete()方法是當Channel上的一個讀操作完成時被呼叫,而上面這行程式碼在最後使用了ChannelFutureListener.CLOSE,再看一下CLOSE的原碼:
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.getChannel().close();
}
};
這樣就一目瞭然了,如果這樣重寫channelReadComplete(),那麼伺服器將會在讀操作完成時,關閉連線。
但是在獨自找問題的時候,我也搜尋到了關於斷線重連和心跳包這些物件,我也確實遇到了這個問題,出去辦件事後就掉線了。後面會關注這些並實現他們。
這裡也感謝那位大神,願意花時間幫我看程式碼、改程式碼、解決問題、講解原理,再次感謝!
連線數控制
這是我在測試客戶端時發現的問題。當連線三個客戶端時,由於伺服器比較簡單,所以轉發操作會出現問題,最終乾脆限制連線數,如果超過了2個,伺服器就主動關閉連線。當然這不是個好方法,我記得在一些遊戲中,如果當前連線人數太多會讓你等待排隊,而不是直接掉線,這樣太不友好了。這個功能在日後我會關注和實現它。
只有一個客戶端時,傳送訊息無響應
這個算是一個優化吧,當只有一個客戶端連線時,伺服器應該提醒對方不線上。
由此我聯想到QQ的做法,當一對一聊天時,即使對方不線上,資訊也會發送過去,當對方上線後,資訊會按時間順序展示給他。我想到大概是利用快取,將訊息暫時存起來,當channelActive()呼叫,就把訊息按順序傳送。這個日後實現。
還有一個就是聊天記錄,這個應該可以存到資料庫中,需要的時候通過關鍵字就可以查詢,這個也日後實現吧。
問題總結
已解決
伺服器
- 維護ctx陣列
- 最大連線數控制
- 主動關閉連線
- 轉發訊息
- 讀取客戶端資訊
客戶端
- 傳送訊息
- 讀訊息
- 主動關閉連線
- 傳送指定字串後關閉連線
待實現
- 斷線重連
- 心跳包保持線上
- 儲存聊天記錄
- 離線時儲存未接收訊息,上線即得
- 超過最大連線數,排隊等待