1. 程式人生 > >Netty4(三)多連線客戶端設計與實現

Netty4(三)多連線客戶端設計與實現

本文介紹多連線的netty客戶端設計

目標

Netty(二)一文中實現了單連線客戶端,也就是說客戶端只有一個連線,這就不利於高併發RPC的設計,本文嘗試設計一個多連線的客戶端,支援斷線重連

UML類圖

mutilClient.png

實現

多連線客戶端

package com.mym.netty.client;


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import
io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.ArrayList; import
java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** * 多連線客戶端 */ public class MutilClient { /**服務類*/ private Bootstrap bootstrap = new Bootstrap(); /**會話集合*/ private List<Channel> channels = new ArrayList<Channel>(); /**引用計數*/ private final AtomicInteger index = new
AtomicInteger(); /**初始化*/ public void init(int count){ //worker EventLoopGroup worker = new NioEventLoopGroup(); //設定工作執行緒 this.bootstrap.group(worker); //初始化channel bootstrap.channel(NioSocketChannel.class); //設定handler管道 bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline().addLast(new StringDecoder()); channel.pipeline().addLast(new StringEncoder()); channel.pipeline().addLast(new ClientHandler()); } }); //根據連線數建立連線 for(int i = 0;i < count;i++){ ChannelFuture channelFuture = bootstrap.connect("0.0.0.0",9099); channels.add(channelFuture.channel()); } } /**獲取channel(會話)*/ public Channel nextChannel(){ return getFirstActiveChannel(0); } private Channel getFirstActiveChannel(int count) { Channel channel = channels.get(Math.abs(index.getAndIncrement() % channels.size())); if(!channel.isActive()){ //重連 reconect(channel); if(count > channels.size()){ throw new RuntimeException("no Idle channel!"); } return getFirstActiveChannel(count + 1); } return channel; } /**重連*/ private void reconect(Channel channel) { //此處可改為原子操作 synchronized(channel){ if(channels.indexOf(channel) == -1){ return ; } Channel newChannel = bootstrap.connect("0.0.0.0", 9099).channel(); channels.set(channels.indexOf(channel), newChannel); System.out.println(channels.indexOf(channel) + "位置的channel成功進行重連!"); } } }

本類採用物件組的方式儲存連線。因為一個thread + 佇列 == 一個單執行緒執行緒池 是執行緒安全的,任務是線性序列執行的

客戶端handler

package com.mym.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("client receive msg:"+msg.toString());
    }
}

測試類

package com.mym.netty.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class StartClient {

    public static void main(String[] args) {
        mutilClient();
    }

    public static void mutilClient(){
        MutilClient client = new MutilClient();
        client.init(5);

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        while(true){
            try {
                System.out.println("請輸入:");
                String msg = bufferedReader.readLine();
                client.nextChannel().writeAndFlush(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

測試

測試步驟:連線服務端後,客戶端先向服務端傳送訊息,客戶端進行斷網,然後開網,然後再想服務端傳送訊息

客戶端輸出如下:

請輸入:
nihao
client receive msg:this is ServerHandler reply msg happend at !1531894695900this is ServerHandler2 reply msg happend at !1531894695902

此處斷網,一大堆錯。然後重新開網,再次傳送訊息
請輸入:
hello
-1位置的channel成功進行重連!
client receive msg:this is ServerHandler reply msg happend at !1531894961093
client receive msg:this is ServerHandler2 reply msg happend at !1531894961094

本次實現仍有可優化的地方,歡迎留言給出建議