1. 程式人生 > >基於AIO的CS聊天室

基於AIO的CS聊天室

所謂AIO,即是非同步IO,它的IO操作交由作業系統完成。設定監聽器(類似於一個訊號處理函式),當系統IO操作完成時,會被監聽器監聽到,並執行相應的後續操作,然後返回。

監聽器一般使用CompletionHandler

伺服器端程式碼: 

package com.nanhao.AIOTest;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.*;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {

    static final int PORT = 3000;
    final static String UTF_8 = "utf-8";
    static List<AsynchronousSocketChannel>channelList = new ArrayList<>();

    public void startListen()throws IOException,Exception{

        //建立一個執行緒池
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        //以指定的執行緒池來建立一個AsynchronousChannelGroup
        AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
        //通過channelGroup來建立一個AsynchronousServerSocketChannel
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup)
                //指定監聽本機的埠
                .bind(new InetSocketAddress("127.0.0.1",PORT));
        //使用監聽器來接收來自客戶端的連結請求
        serverSocketChannel.accept(null,new AcceptHandler(serverSocketChannel));
    }
    public static void main(String[]args)throws Exception{
        Server server = new Server();
        server.startListen();

    }
     class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{

        private AsynchronousServerSocketChannel serverSocketChannel;


        public AcceptHandler(AsynchronousServerSocketChannel serverSocketChannel) {
            this.serverSocketChannel = serverSocketChannel;
        }

        //定義一個ByteBuffer準備讀取資料
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        @Override
        public void completed(final AsynchronousSocketChannel result, Object attachment) {
            Server.channelList.add(result);
            serverSocketChannel.accept(null,this);
            result.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {


                @Override
                public void completed(Integer result, Object attachment) {
                    byteBuffer.flip();
                    //將buffer中的位元組轉換為字元
                    String context = StandardCharsets.UTF_8.decode(byteBuffer).toString();
                    //由於是聊天室,所以將所有的channel裡面寫入這個訊息
                    for(AsynchronousSocketChannel ass:Server.channelList){
                        try{

                            ass.write(ByteBuffer.wrap(context.getBytes(Server.UTF_8))).get();

                        }catch(Exception e){
                            e.printStackTrace();
                        }
                    }
                    byteBuffer.clear();
                    result.read(byteBuffer,null,this);
                }

                @Override
                public void failed(Throwable exc, Object attachment) {


                    System.out.println("讀取資料失敗: "+exc);
                    Server.channelList.remove(result);


                }
            });

        }

         @Override
         public void failed(Throwable exc, Object attachment) {
            System.out.println("連線失敗 :"+exc);

         }
     }


}