1. 程式人生 > >Netty 實現一對一客戶端聊天(由伺服器轉發)

Netty 實現一對一客戶端聊天(由伺服器轉發)

學習Netty有一個多星期,參考《Netty實戰》敲了Echo C/S 程式碼,也學習了channelHandler等元件。不滿足客戶端伺服器一對一聊天,所以尋思著自己實現一個客戶端和客戶端一對一聊天,訊息由伺服器轉發。
如果有程式碼寫的不合適的地方,還請評論指正。
《Netty實戰》電子書下載地址

主要思路

伺服器端

  1. 伺服器端維護一個ChannelHandlerContext型別的陣列,用來儲存與伺服器連線的各個channel的資訊。 (思路來源於《Netty 實戰》6.3.2)
  2. 最大連線數控制。設定一個變數為最大數,
  3. 伺服器主動關閉連線。當檢測到一個連線的客戶端掉線或異常,伺服器應主動關閉連線
  4. 伺服器轉發訊息。通過ChannelHandlerContext型別的陣列獲得與之相連的channel資訊,將伺服器讀到的訊息寫給另一個客戶端
  5. 譯碼器解碼器。使讀寫更加方便
  6. 為了便於除錯,還附加了一些包含獲取時間,伺服器顯示讀取的訊息等等函式

客戶端

  1. 傳送資訊
  2. 讀取資訊
  3. 主動關閉連線
  4. 輸入指定字串時下線。由於是控制檯程式,所以我設定輸入“bye”或“再見”時,呼叫相關函式關閉連線

實現程式碼

伺服器端

類結構

類名 說明
ServerHandler 繼承ChannelInboundHandlerAdapter介面
Server 引導伺服器端

實現程式碼

ServerHandler類

繼承ChannelInboundHandlerAdapter介面,重寫一些方法,新增一些私有變數來儲存和控制channel資訊

//ServerHandler類

import java.net.InetSocketAddress;
import java.util.Calendar;
import java.util.Vector;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import
io.netty.channel.ChannelInboundHandlerAdapter; @Sharable public class ServerHandler extends ChannelInboundHandlerAdapter { private static final int MAX_CONN = 2;//指定最大連線數 private int connectNum = 0;//當前連線數 //channelHandlerContext表 private Vector<ChannelHandlerContext> contexts = new Vector<>(2); //獲取當前時間 private String getTime() { Calendar c = Calendar.getInstance(); int year = c.get(Calendar.YEAR); int month = c.get(Calendar.MONTH); int date = c.get(Calendar.DATE); int hour = c.get(Calendar.HOUR_OF_DAY); int minute = c.get(Calendar.MINUTE); int second = c.get(Calendar.SECOND); return new String(year + "/" + month + "/" + date + " " + hour + ":" + minute + ":" + second); } /* * 重寫channelActive()方法 * 更新當前連線數 * 控制連線客戶端的個數,超過則關閉該channel * 更新contexts陣列 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub connectNum++; //控制客戶端連線數量,超過則關閉 if (connectNum > MAX_CONN) { ctx.writeAndFlush(Unpooled.copiedBuffer("達到人數上限".getBytes())); ctx.channel().close(); //當前連線數的更新放在channelInactive()裡 } //更新contexts contexts.add(ctx); //控制檯輸出相關資訊 InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); System.out.println(socket.getAddress().getHostAddress() + ":" + socket.getPort() + "已連線"); System.out.println("當前連線數:" + connectNum); ctx.writeAndFlush("hello client"); } /* * 重寫channelInactive()方法 * 更新當前連線數 * 更新contexts陣列 * 控制檯輸出相關資訊 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub //更新當前連線數 connectNum--; //更新contexts陣列 contexts.remove(ctx); //控制檯輸出相關資訊 InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress(); System.out.println(getTime() + ' ' + socket.getAddress().getHostAddress() + ":" + socket.getPort() + "已退出"); System.out.println("當前連線數:" + connectNum); //對另一個客戶端發出通知 if (contexts.size() == 1) { contexts.get(0).writeAndFlush("對方退出聊天"); } } /* * 重寫channelRead()函式 * 讀取資料 * 轉發訊息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub String in = (String) msg; System.out.println(getTime() + " 客戶端" + ctx.channel().remoteAddress() + ":" + in); //當只有一方線上時,傳送通知 if (contexts.size() < 2) { ctx.writeAndFlush("對方不線上"); return; } //獲取另一個channelhandlercontxt的下表 int currentIndex = contexts.indexOf(ctx); int anotherIndex = Math.abs(currentIndex - 1); //給另一個客戶端轉發資訊 contexts.get(anotherIndex).writeAndFlush(in); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // TODO Auto-generated method stub if (!ctx.channel().isActive()) { System.out.println(ctx.channel().remoteAddress() + "客戶端異常"); } cause.printStackTrace(); ctx.close(); } }

Server類

用的最常規最簡單的寫法,和EchoServer差不多,等以後進一步學習再豐富內容

//Server類

import java.net.InetSocketAddress;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class Server {

    public static void main(String[] args) throws InterruptedException {
        new Server().start();
    }

    public Server() {
        // TODO Auto-generated constructor stub
    }
    // 引導類
    public void start() throws InterruptedException {
        ServerHandler sHandler = new ServerHandler();
        InetSocketAddress localSocket = new InetSocketAddress("127.0.0.1", 9990);
        ServerBootstrap b = new ServerBootstrap();
        b.group(newNioEventLoopGroup())
        .localAddress(localSocket)
        .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //新增譯碼器解碼器
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(sHandler);
                    }

                });
        final ChannelFuture f = b.bind().sync();
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                // TODO Auto-generated method stub
                if (f.isSuccess()) {
                    System.out.println("伺服器開啟成功");
                } else {
                    System.out.println("伺服器開啟失敗");
                    f.cause().printStackTrace();
                }
            }
        });
    }
}

客戶端

類結構

類名 說明
ClientHandler 繼承SimpleChannelInboundHandler < String > 介面
Client 引導客戶端

實現程式碼

ClientHandler類

繼承SimpleChannelInboundHandler< String >介面,重寫一些方法,新增私有closeChannel()來主動關閉連線


import java.util.Calendar;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

//因為加入了String型別的編碼器和譯碼器,所以介面例項化為String型別
public class clientHandler extends SimpleChannelInboundHandler<String> {

    //儲存當前ChannelHandlerContext,在後面的closeChannel()中使用
    private ChannelHandlerContext chc = null;

    //獲取當前時間
    private String getTime() {
        Calendar c = Calendar.getInstance();
        int year = c.get(Calendar.YEAR);
        int month = c.get(Calendar.MONTH);
        int date = c.get(Calendar.DATE);
        int hour = c.get(Calendar.HOUR_OF_DAY);
        int minute = c.get(Calendar.MINUTE);
        int second = c.get(Calendar.SECOND);
        return new String(year + "/" + month + "/" + date + " " + hour + ":" + minute + ":" + second);
    }

    //主動關閉連線
    public void closeChannel(boolean readyToClose) throws InterruptedException {
        if (readyToClose) {
            System.out.println("即將關閉連線");
            chc.channel().closeFuture();
            chc.channel().close();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        //儲存當前ChannelHandlerContext
        chc = ctx;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String in) throws Exception {
        // TODO Auto-generated method stub
        System.out.println(getTime() + " " + in);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        System.out.println(getTime() + " 斷開連線");
        ctx.channel().closeFuture();
        ctx.channel().close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // TODO Auto-generated method stub
        cause.printStackTrace();
        System.out.println("有異常");
        ctx.channel().close();
    }

}

Client類

簡單地引導客戶端

//Client類
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class Client {

    public void start() throws IOException, InterruptedException {
        Bootstrap b = new Bootstrap();
        EventLoopGroup g = new NioEventLoopGroup();
        //建立物件,後面呼叫closeChannel()主動關閉連線
        ClientHandler cHandler = new ClientHandler();

        b.group(g)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel sc) throws Exception {
                // TODO Auto-generated method stub
                //新增編碼器、譯碼器
                sc.pipeline().addLast(new StringDecoder());
                sc.pipeline().addLast(new StringEncoder());
                sc.pipeline().addLast(cHandler);
            }
        });
        final ChannelFuture f = b.connect("127.0.0.1", 9990).sync();
        f.addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture arg0) throws Exception {
                // TODO Auto-generated method stub
                if (f.isSuccess()) {
                    System.out.println("連線伺服器成功");

                } else {
                    System.out.println("連線伺服器失敗");
                    f.cause().printStackTrace();
                }
            }
        });
        Channel channel = f.channel();
        /*
         * 獲取控制檯輸入
         * 當輸入了“再見”或“bye”時,停止輸入
         * 主動關閉連線
         */
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        String in = br.readLine();
        while (!(in.equals("再見") || in.equals("bye"))) {
            channel.writeAndFlush(in);
            in = br.readLine();
        }
        channel.writeAndFlush(in);
        cHandler.closeChannel(true);
        g.shutdownGracefully().sync();

    }

    public Client() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) throws Exception {
        new Client().start();
    }
}

實現過程中遇到的一些問題

關於控制檯輸入的問題

控制檯輸入使用如下程式碼

BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
        String in = br.readLine();
        while (!(in.equals("再見") || in.equals("bye"))) {
            channel.writeAndFlush(in);
            in = br.readLine();
        }
        channel.writeAndFlush(in);

這段程式碼可以放在很多位置,不一定非要放在客戶端內,在ClientHandler類的channelActive()裡也可以加入這段程式碼。也就是說,在任何你想要通過控制檯輸入的地方都可以輸入。

ChannelHandlerContext的使用

為了實現訊息轉發,我維護了一個數組用來儲存每個連線的ChannelHandlerContext,這樣在讀取一個客戶端傳送的訊息時,我可以迅速找到另一個客戶端的ChannelHandlerContext並向其傳送剛剛讀取的資訊。

這個技巧是從《Netty實戰》6.3.2小節中學到的,在ChannelHandler中儲存一些ChannelHandlerContext的引用以供以後使用,這將很方便地管理不同的channel,更廣泛地說,在其他操作中對目標channel進行操作。這也要求ChannelHandler必須標註@Sharable,讓多個ChannelPipeline共享同一個ChannelHandler時不會出現異常。

不過為了避免外部呼叫,標記為private是個不錯的選擇。

客戶端發完訊息後自動斷開連線

這個問題發生的很突然,當我一步步實現了各個功能後,一測試發現客戶端傳送一條訊息後莫名其妙自動下線了,並且觸發了channelInactive()函式。為此我付出兩個小時的代價尋找問題的根源,這也是為什麼我加了那麼多控制檯輸出資訊和getTime()函式,就是為了找出在那裡出的問題。
最後在群裡以為大神的指點下,找到了問題所在。
在《Netty實戰》中的EchoServerHandler中,有這樣一段override:

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    // TODO Auto-generated method stub
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}

由於沒有理解透徹,我簡單地把這個函式也寫到了我的ServerHandler中,最後就是這裡出了問題。
在 ChannelInboundHandler 的方法中,channelReadComplete()方法是當Channel上的一個讀操作完成時被呼叫,而上面這行程式碼在最後使用了ChannelFutureListener.CLOSE,再看一下CLOSE的原碼:

ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.getChannel().close();
        }
    };

這樣就一目瞭然了,如果這樣重寫channelReadComplete(),那麼伺服器將會在讀操作完成時,關閉連線。
但是在獨自找問題的時候,我也搜尋到了關於斷線重連心跳包這些物件,我也確實遇到了這個問題,出去辦件事後就掉線了。後面會關注這些並實現他們。
這裡也感謝那位大神,願意花時間幫我看程式碼、改程式碼、解決問題、講解原理,再次感謝!

連線數控制

這是我在測試客戶端時發現的問題。當連線三個客戶端時,由於伺服器比較簡單,所以轉發操作會出現問題,最終乾脆限制連線數,如果超過了2個,伺服器就主動關閉連線。當然這不是個好方法,我記得在一些遊戲中,如果當前連線人數太多會讓你等待排隊,而不是直接掉線,這樣太不友好了。這個功能在日後我會關注和實現它。

只有一個客戶端時,傳送訊息無響應

這個算是一個優化吧,當只有一個客戶端連線時,伺服器應該提醒對方不線上。
由此我聯想到QQ的做法,當一對一聊天時,即使對方不線上,資訊也會發送過去,當對方上線後,資訊會按時間順序展示給他。我想到大概是利用快取,將訊息暫時存起來,當channelActive()呼叫,就把訊息按順序傳送。這個日後實現。
還有一個就是聊天記錄,這個應該可以存到資料庫中,需要的時候通過關鍵字就可以查詢,這個也日後實現吧。

問題總結

已解決

伺服器

  1. 維護ctx陣列
  2. 最大連線數控制
  3. 主動關閉連線
  4. 轉發訊息
  5. 讀取客戶端資訊

客戶端

  1. 傳送訊息
  2. 讀訊息
  3. 主動關閉連線
  4. 傳送指定字串後關閉連線

待實現

  1. 斷線重連
  2. 心跳包保持線上
  3. 儲存聊天記錄
  4. 離線時儲存未接收訊息,上線即得
  5. 超過最大連線數,排隊等待