1. 程式人生 > >基於Apache Mina實現的TCP長連線和短連線例項

基於Apache Mina實現的TCP長連線和短連線例項

1、前言

Apache MINA是Apache組織的一個優秀的專案。MINA是Multipurpose Infrastructure for NetworkApplications的縮寫。它是一個網路應用程式框架,用來幫助使用者非常方便地開發高效能和高可靠性的網路應用程式。在本文中介紹瞭如何通過Apache Mina2.0來實現TCP協議長連線和短連線應用。

2、系統介紹

2.1系統框架

整個系統由兩個服務端程式和兩個客戶端程式組成。分別實現TCP長連線和短連線通訊。

系統業務邏輯是一個客戶端與服務端建立長連線,一個客戶端與服務端建立短連線。資料從短連線客戶端經過服務端傳送到長連線客戶端,並從長連線客戶端接收響應資料。當收到響應資料後斷開連線。

系統架構圖如下:


2.2處理流程

系統處理流程如下:

1)       啟動服務端程式,監聽8001和8002埠。

2)       長連線客戶端向服務端8002埠建立連線,服務端將連線物件儲存到共享記憶體中。由於採用長連線方式,連線物件是唯一的。

3)       短連線客戶端向服務端8001埠建立連線。建立連線後建立一個連線物件。

4)       短連線客戶端連線成功後傳送資料。服務端接收到資料後從共享記憶體中得到長連線方式的連線物件,使用此物件向長連線客戶端傳送資料。傳送前將短連線物件設為長連線物件的屬性值。

5)       長連線客戶端接收到資料後返回響應資料。服務端從長連線物件的屬性中取得短連線物件,通過此物件將響應資料傳送給短連線客戶端。

6)       短連線客戶端收到響應資料後,關閉連線。

3、服務端程式

3.1長連線服務端

服務啟動

public class MinaLongConnServer {

private static final int PORT = 8002;

    public void start()throws IOException{

       IoAcceptor acceptor = new NioSocketAcceptor();

       acceptor.getFilterChain().addLast("logger", new LoggingFilter());

       acceptor.getFilterChain().addLast("codec", new

ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

       acceptor.setHandler(new MinaLongConnServerHandler());

       acceptor.getSessionConfig().setReadBufferSize(2048);

       acceptor.bind(new InetSocketAddress(PORT));

       System.out.println("Listeningon port " + PORT);

    }

}

訊息處理

public class MinaLongConnServerHandler extends IoHandlerAdapter {

    private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());

    @Override

    public void sessionOpened(IoSession session) {

       InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();

       String clientIp = remoteAddress.getAddress().getHostAddress();

       logger.info("LongConnect Server opened Session ID ="+String.valueOf(session.getId()));

       logger.info("接收來自客戶端 :" + clientIp + "的連線.");

       Initialization init = Initialization.getInstance();

       HashMap<String, IoSession> clientMap =init.getClientMap();

       clientMap.put(clientIp, session);

    }

    @Override

    public void messageReceived(IoSession session, Object message) {

       logger.info("Messagereceived in the long connect server..");

       String expression = message.toString();

       logger.info("Message is:" + expression);

       IoSession shortConnSession =(IoSession) session.getAttribute("shortConnSession");

       logger.info("ShortConnect Server Session ID ="+String.valueOf(shortConnSession.getId()));

       shortConnSession.write(expression);

    }

    @Override

    public void sessionIdle(IoSession session, IdleStatus status) {

       logger.info("Disconnectingthe idle.");

       // disconnect an idle client

       session.close(true);

    }

    @Override

    public void exceptionCaught(IoSession session, Throwable cause) {

       // close the connection onexceptional situation

       logger.warn(cause.getMessage(), cause);

       session.close(true);

    }

}

3.2短連線服務端

服務啟動

public class MinaShortConnServer {

    private static final int PORT = 8001;

    public void start()throws IOException{

       IoAcceptor acceptor = new NioSocketAcceptor();

       acceptor.getFilterChain().addLast("logger", new LoggingFilter());

       acceptor.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

       acceptor.setHandler(new MinaShortConnServerHandler());

       acceptor.getSessionConfig().setReadBufferSize(2048);

       acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 3);

       acceptor.bind(new InetSocketAddress(PORT));

       System.out.println("Listeningon port " + PORT);

    }

}

訊息處理

public class MinaShortConnServerHandler extends IoHandlerAdapter {

    private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());

    @Override

    public void sessionOpened(IoSession session) {

       InetSocketAddress remoteAddress = (InetSocketAddress)session.getRemoteAddress();

       logger.info(remoteAddress.getAddress().getHostAddress());

       logger.info(String.valueOf(session.getId()));

    }

    @Override

    public void messageReceived(IoSession session, Object message) {

       logger.info("Messagereceived in the short connect server...");

       String expression = message.toString();

       Initialization init = Initialization.getInstance();

       HashMap<String, IoSession> clientMap =init.getClientMap();

       if (clientMap == null || clientMap.size() == 0) {

           session.write("error");

       } else {

           IoSession longConnSession = null;

           Iterator<String> iterator =clientMap.keySet().iterator();

           String key = "";

           while (iterator.hasNext()) {

              key = iterator.next();

              longConnSession = clientMap.get(key);

           }

           logger.info("ShortConnect Server Session ID :"+String.valueOf(session.getId()));

           logger.info("LongConnect Server Session ID :"+String.valueOf(longConnSession.getId()));

           longConnSession.setAttribute("shortConnSession",session);

           longConnSession.write(expression);

       }

    }

    @Override

    public void sessionIdle(IoSession session, IdleStatus status) {

       logger.info("Disconnectingthe idle.");

       // disconnect an idle client

       session.close(true);

    }

    @Override

    public void exceptionCaught(IoSession session, Throwable cause) {

       // close the connection onexceptional situation

       logger.warn(cause.getMessage(), cause);

       session.close(true);

    }

}

4、客戶端程式

4.1長連線客戶端

使用java.net.Socket來實現向服務端建立連線。Socket建立後一直保持連線,從服務端接收到資料包後直接將原文返回。

public class TcpKeepAliveClient {

    private String ip;

    private int port;

    private static Socket socket = null;

    private static int timeout = 50 * 1000;

    public TcpKeepAliveClient(String ip, int port) {

       this.ip = ip;

       this.port = port;

    }

    public void receiveAndSend() throws IOException {

       InputStream input = null;

       OutputStream output = null;

       try {

           if (socket == null ||socket.isClosed() || !socket.isConnected()) {

              socket = new Socket();

              InetSocketAddress addr = new InetSocketAddress(ip, port);

              socket.connect(addr, timeout);

              socket.setSoTimeout(timeout);

              System.out.println("TcpKeepAliveClientnew ");

           }

           input = socket.getInputStream();

           output = socket.getOutputStream();

           // read body

           byte[] receiveBytes = {};// 收到的包位元組陣列

           while (true) {

              if (input.available() > 0) {

                  receiveBytes = new byte[input.available()];

                  input.read(receiveBytes);

                  // send

                  System.out.println("TcpKeepAliveClientsend date :" +new String(receiveBytes));

                  output.write(receiveBytes, 0, receiveBytes.length);

                  output.flush();

              }

           }

       } catch (Exception e) {

           e.printStackTrace();

           System.out.println("TcpClientnew socket error");

       }

    }

    public static void main(String[] args) throws Exception {

       TcpKeepAliveClient client = new TcpKeepAliveClient("127.0.0.1", 8002);

       client.receiveAndSend();

    }

}

4.2短連線客戶端

服務啟動

public class MinaShortClient {

    private static final int PORT = 8001;

    public static void main(String[] args) throws IOException,InterruptedException {

       IoConnector connector = new NioSocketConnector();

       connector.getSessionConfig().setReadBufferSize(2048);

       connector.getFilterChain().addLast("logger", new LoggingFilter());

       connector.getFilterChain().addLast("codec", newProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

       connector.setHandler(new MinaShortClientHandler());

       for (int i = 1; i <= 10; i++) {

           ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1",PORT));

           future.awaitUninterruptibly();

           IoSession session =future.getSession();

           session.write(i);

           session.getCloseFuture().awaitUninterruptibly();

           System.out.println("result=" + session.getAttribute("result"));

       }

       connector.dispose();

    }

}

訊息處理

public class MinaShortClientHandler extends IoHandlerAdapter{

    private final Logger logger = (Logger) LoggerFactory.getLogger(getClass());

    public MinaShortClientHandler() {

    }

    @Override

    public void sessionOpened(IoSession session) {

    }

    @Override

    public void messageReceived(IoSession session, Object message) {

       logger.info("Messagereceived in the client..");

       logger.info("Message is:" + message.toString());

       session.setAttribute("result", message.toString());

       session.close(true);

    }

    @Override

    public void exceptionCaught(IoSession session, Throwable cause) {

       session.close(true);

    }

}

5、總結

通過本文中的例子,Apache Mina在服務端可實現TCP協議長連線和短連線。在客戶端只實現了短連線模式,長連線模式也是可以實現的(在本文中還是採用傳統的java Socket方式)。兩個服務端之間通過共享記憶體的方式來傳遞連線物件也許有更好的實現方式。