1. 程式人生 > >Java nio 客戶端連線Server

Java nio 客戶端連線Server

在做通訊系統的開發過程中,經常需要使用Socket通訊。java新的io機制給我提供了一個很好的非同步socket通訊方式,這段時間用java寫了一個客戶端用來連線server。發現執行效率還比較讓人滿意。下面是我實現的部分功能。

連線伺服器的socket,多執行緒啟動。如果連線失敗就重連。

Java程式碼  收藏程式碼
  1. public class CommonSocket extends Thread {  
  2.     private SocketChannel socketChannel;  
  3.     private boolean stop = false;  
  4.     private int
     port = 0;  
  5.     private String ip = "";  
  6.     private Selector selector = null;  
  7.     private SocketAddress socketAddress = null;  
  8.     private Logger logger = Logger.getLogger(CommonSocket.class);  
  9.     public CommonSocket() {  
  10.         this.ip = SocketInfoUtils.TCP_IP;  
  11.         this.port = SocketInfoUtils.TCP_PORT;  
  12.     }  
  13.     public void run() {  
  14.         while (!stop) {  
  15.             socketConnet();  
  16.             try {  
  17.                 sleep(5000);  
  18.             } catch (InterruptedException e) {  
  19.                 logger.error("SocketConnect run error: InterruptedException");  
  20.             }  
  21.         }  
  22.     }  
  23.     public void socketBuilder() {  
  24.         try {  
  25.             selector = Selector.open();  
  26.         } catch (IOException e) {  
  27.             e.printStackTrace();  
  28.             logger.error("Open to selector failed: IOException");  
  29.         }  
  30.     }  
  31.     private void openSocketChannel() {  
  32.         try {  
  33.             socketAddress = new InetSocketAddress(ip, port);  
  34.             socketChannel = SocketChannel.open();  
  35.             socketChannel.socket().setReuseAddress(true);  
  36.             socketChannel.connect(socketAddress);  
  37.         } catch (ClosedChannelException e) {  
  38.             logger.warn("Channel is closed: ClosedChannelException");  
  39.         } catch (IOException e) {  
  40.             logger  
  41.                     .warn("Connet is failed or time out,the system will automatically re-connected : IOException");  
  42.         }  
  43.     }  
  44.     /** 
  45.      * do ClientBuilder if socket conncte success 
  46.      */  
  47.     public void socketConnet() {  
  48.         try {  
  49.             openSocketChannel();  
  50.             if (socketChannel.isOpen()) {  
  51.                 this.stop = true;  
  52.                 socketBuilder();  
  53.                 socketChannel.configureBlocking(false);  
  54.                 socketChannel.register(selector, SelectionKey.OP_READ  
  55.                         | SelectionKey.OP_WRITE);  
  56.                 PackageBuilder clientBuilder = new PackageBuilder(socketChannel,  
  57.                         selector);  
  58.                 clientBuilder.start();  
  59.                 logger.info("Has been successfully connected to " + ip  
  60.                         + "and port:    " + port);  
  61.             } else {  
  62.                 socketChannel.close();  
  63.             }  
  64.         } catch (ClosedChannelException e) {  
  65.             logger.warn("Channel is closed: ClosedChannelException");  
  66.         } catch (IOException e) {  
  67.             logger  
  68.                     .warn("Connet is failed or time out,the system will automatically re-connected : IOException");  
  69.         }  
  70.     }  
  71. }  

 傳送和接收事件處理,NIO是基於事件的驅動模型,這個類就是專門處理收發的。

Java程式碼  收藏程式碼
  1. public class PackageBuilder  extends Thread{  
  2.     private SocketChannel socketChannel = null;  
  3.     private Selector selector = null;  
  4.     private boolean stop = false;  
  5.     private byte[] array = new byte[1024];  
  6.     private ByteBuffer byteBuffer;  
  7.     private PackageQueue packageQueue;  
  8.     private Logger logger = Logger.getLogger(PackageBuilder.class);  
  9.     public PackageBuilder(SocketChannel socketChannel,Selector selectore){  
  10.         this.socketChannel = socketChannel;  
  11.         this.selector = selectore;  
  12.         packageQueue=new PackageQueue();  
  13.     }  
  14.     public void run(){  
  15.         try {  
  16.             while (!stop) {  
  17.                 Thread.sleep(1);  
  18.                 if(!socketChannel.isOpen()){  
  19.                     reconnect();//通道沒開啟或者斷開執行重連工作(Channel did not open the work of the implementation of re-connection )  
  20.                     break;  
  21.                 }  
  22.                 if (selector.select(30) > 0) {  
  23.                     doSelector();  
  24.                 }  
  25.             }  
  26.         } catch (IOException e) {  
  27.             logger.error("CameraBuilder run error: IOException");  
  28.         } catch (InterruptedException e){  
  29.             logger.error("CameraBuilder run error: InterruptedException");  
  30.         }  
  31.     }  
  32.     public void doSelector(){  
  33.         for(SelectionKey key:selector.selectedKeys()){  
  34.             selector.selectedKeys().remove(key);  
  35.             if(!key.isValid()){  
  36.                 continue;  
  37.             }  
  38.             doKeys(key);  
  39.         }  
  40.     }  
  41.     public void doKeys(SelectionKey key){  
  42.         SocketChannel channel = (SocketChannel)key.channel();  
  43.         if(key.isReadable()){  
  44.             readResponse(channel);  
  45.         }  
  46.         if(key.isWritable()){  
  47.             sendRequest(channel);  
  48.         }  
  49.     }  
  50.     private void readResponse(SocketChannel channel) {  
  51.         byteBuffer=ByteBuffer.wrap(array);  
  52.         byteBuffer.clear();  
  53.         int count = 0;  
  54.         try {  
  55.             count = channel.read(byteBuffer);  
  56.         } catch (IOException e) {  
  57.             reconnect();//通道沒開啟或者斷開執行重連工作(Channel did not open the work of the implementation of re-connection )  
  58.             logger.error("Connection reset by peer: IOException");  
  59.         }  
  60.         if(count != -1){  
  61.             byteBuffer.flip();  
  62.             byte[] bs = new byte[count];  
  63.             byteBuffer.get(bs);  
  64.             ByteBuffer returnBuffer = ByteBuffer.allocate(count);  
  65.             returnBuffer.clear();  
  66.             returnBuffer.put(bs);  
  67.             returnBuffer.flip();  
  68.             PrintUtil.printBf(returnBuffer.array());  
  69.             ParseBufferData parseData=new ParseBufferData(returnBuffer);          
  70.             parseData.parseBuffer();              
  71.       }  
  72.         if(count < 0){  
  73.             reconnect();  
  74.         }  
  75.     }  
  76.     /** 
  77.      * send pakcet of request 
  78.      * @param channel 
  79.      */  
  80.     public void sendRequest(SocketChannel channel){  
  81.         byte[] array = packageQueue.takeMsgs();  
  82.         if(array!=null){  
  83.         ByteBuffer byteBuffer = ByteBuffer.wrap(array);  
  84.             try {  
  85.                 channel.write(byteBuffer);  
  86.              } catch (IOException e) {  
  87.                  reconnect();//通道沒開啟或者斷開執行重連工作(Channel did not open the work of the implementation of re-connection )  
  88.                 logger.warn("socket not connected or has been closed: IOException");  
  89.              }  
  90.          }  
  91.     }  
  92.     public void reconnect(){  
  93.         stopClient();  
  94.         logger.warn("socket not connected or has been closed");  
  95.         ThreadPoolUtil.getExecutor().execute(new CameraSocket());  
  96.     }  
  97.     public void stopClient(){  
  98.         this.stop = true;  
  99.         if(socketChannel.isConnected() && !socketChannel.isOpen()){  
  100.             try {  
  101.                 socketChannel.close();  
  102.                 logger.info("server_socket has connected");  
  103.             } catch (IOException e) {  
  104.                 logger.warn("Channel closed to failed: IOException");  
  105.             }  
  106.         }  
  107.     }  
  108. }  

 傳送和接收資料存放在快取中

Java程式碼  收藏程式碼
  1. public class PackageQueue {  
  2.     private static  List<byte[]> queue = new ArrayList<byte[]>();  
  3.     public PackageQueue(){    
  4.     }  
  5.     public void pushMsgs(byte[] array){  
  6.         synchronized(queue){  
  7.             queue.add(array);  
  8.         }  
  9.     }  
  10.     public byte[] takeMsgs() {  
  11.         synchronized (queue) {  
  12.             byte[] sd=null;  
  13.             if(queue != null){  
  14.                 if(queue.size() > 0){  
  15.                     sd = queue.get(0);  
  16.                     queue.remove(0);  
  17.                 }  
  18.             }  
  19.             return sd;  
  20.         }  
  21.     }  
  22.     public static List<byte[]> getQueue() {  
  23.         return queue;  
  24.     }  
  25.     public static void setQueue(List<byte[]> queue) {  
  26.         PackageQueue.queue = queue;  
  27.     }  
  28. }  
public class CameraSocket extends Thread {
    private int cmdPort = SocketInfoUtils.CMD_PORT; // 5554
    private String host = SocketInfoUtils.HOST; // 172.16.163.38
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    //	DatagramChannel dataChannel;
    DatagramChannel cmdChannel;
    Selector selector;
    CameraQueue cameraQueue;

    public CameraSocket() throws Exception {
        selector = Selector.open();
        cameraQueue = new CameraQueue();
        cmdChannel = DatagramChannel.open();
        cmdChannel.configureBlocking(false);
        SocketAddress target = new InetSocketAddress(host, cmdPort);
        cmdChannel.connect(target);
        cmdChannel.register(selector, SelectionKey.OP_WRITE);
    }

    @Override
    public void run() {
        boolean flag = true;
        while (flag) {
            try {
                doSelector();
            } catch (IOException e) {
                flag = false;
                e.printStackTrace();
            }
        }
    }

    private void doSelector() throws IOException {
        if (selector.select(1000) > 0) {
            for (SelectionKey key : selector.selectedKeys()) {
                if (key.isWritable()) {
                    writeEvent(cmdChannel);
                }
            }
            selector.selectedKeys().clear();
        }
    }

//	private void readEvent(SelectionKey key) throws IOException {
//	ByteBuffer buffer = ByteBuffer.allocate(1024);
//	dataChannel.receive(buffer);
//	buffer.flip();
//	ParseBufferData parseBufferData=new ParseBufferData(buffer);
//	parseBufferData.parseBuffer();
//	}

    private void writeEvent(DatagramChannel channel) throws IOException {
        byte[] array = cameraQueue.takeMsgs();
        if (array != null) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(array);
            try {
                channel.write(byteBuffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

public class SocketInfoUtils {

    public static Properties factory = SocketPropertiesFactory.getInstance().getBoundle();
    public static String TCP_IP = factory.getProperty("tcp_ip");
    public static int TCP_PORT = Integer.parseInt(factory.getProperty("tcp_port"));
    public static int CAMERA_PORT=Integer.parseInt(factory.getProperty("camera_port"));

    //public static int UDP_PORT = Integer.parseInt(factory.getProperty("udp_port"));
    public static int HIS_UDP_PORT = Integer.parseInt(factory.getProperty("his_udp_port"));

    public static int CMD_PORT = Integer.parseInt(factory.getProperty("cmd_port"));
    public static int DATA_PORT = Integer.parseInt(factory.getProperty("data_port"));
    public static final String HOST = factory.getProperty("host");
}
public class ThreadPoolUtil {

    private static ThreadPoolExecutor executor;
    static{
        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1);
        executor = new ThreadPoolExecutor(5,100,500,TimeUnit.MILLISECONDS,queue);
        RejectedExecutionHandler rejected = new RejectedExecutionHandler() {
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println(String.format("======= Task %d rejected.======", r.hashCode()));
            }
        };
        executor.setRejectedExecutionHandler(rejected);
    }

    public static ThreadPoolExecutor getExecutor() {
        return executor;
    }

    public static void setExecutor(ThreadPoolExecutor executor) {
        ThreadPoolUtil.executor = executor;
    }

}


 以上就是客戶端連線、傳送、接收的程式碼。希望對大家有所幫助