1. 程式人生 > >Java粗淺認識-網路程式設計(二)

Java粗淺認識-網路程式設計(二)

單執行緒模型

服務端繫結一個埠,然後接收請求,每次請求就處理,後續請求進來時,等待之前的任務處理完成,如果任務處理非常快,也是不會有明顯阻塞的。

單執行緒模型服務端程式碼

展示檔案上傳後處理邏輯,在一個while(true)中阻塞等待accept,由於是演示網路通訊,這裡的檔案I/O快取直接使用的是一個byte[1<<14] = 16k的容量,在專案中可以寫成迴圈使用的方式。

private static void server(int port) throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(port));
        System.out.println("服務端啟動成功,繫結埠[" + port +"]");
        //這裡使用了一個單執行緒處理,後面講多執行緒的時候,會改為多執行緒版本
        while (true) {
            System.out.println("服務端等待連線中。");
            Socket socket = serverSocket.accept();
            System.out.println("接收到一個客戶端連線。");
            InputStream inputStream = socket.getInputStream();
            //16k
            File file = new File("C:\\Users\\baopz\\Desktop\\test\\socket\\server\\1.gif");
            if (!file.exists()) {
                if (!file.createNewFile()) {
                    System.out.println("建立檔案" + file.getAbsolutePath() + "失敗。");
                    continue;
                }
            }
            OutputStream outputStream = null;
            //這裡可以使用java 1.7的資源自動釋放寫法
            try {
                outputStream = new FileOutputStream(file);
                byte[] bytes = new byte[1 << 14];
                int length;
                while ((length = inputStream.read(bytes)) != -1) {
                    outputStream.write(bytes, 0, length);
                }
                System.out.println("檔案接受成功。");
            } finally {
                if (outputStream != null) {
                    outputStream.close();
                }
            }
        }
    }

客戶端測試程式碼

可以在一個for迴圈中呼叫,看看會不會等待之前的處理

private static void client() throws IOException {
        Socket socket = new Socket();
        OutputStream outputStream = null;
        try {
            System.out.println("客戶端請求連線。");
            socket.connect(new InetSocketAddress("127.0.0.1", 8888));
            outputStream = socket.getOutputStream();
            File file = new File("C:\\Users\\baopz\\Desktop\\test\\socket\\client\\1.gif");
            if (!file.exists()) {
                System.out.println("需要傳輸的檔案不存在。");
                return;
            }
            InputStream inputStream = new FileInputStream(file);
            //16k
            byte[] bytes = new byte[1 << 14];
            int length;
            while((length=inputStream.read(bytes))!=-1){
                outputStream.write(bytes,0,length);
            }
            outputStream.flush();
            inputStream.close();
            System.out.println("檔案傳送成功。");
        }finally {
            if(outputStream!=null){
                outputStream.close();
            }
            socket.close();
            System.out.println("客戶端關閉連線。");
        }
    }

preThreading模式

在服務啟動的時候,預先new出一個執行緒池,讓每個連線都用一個執行緒處理。

基於執行緒池的服務端

需要注意一下,這裡的ThreadFactoryBuilder是來自com.google.guava.guava

/**
     * 基於執行緒池的服務端
     * @param port
     * @throws IOException
     */
    private static void server(int port) throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(port));
        System.out.println("伺服器繫結埠[" + port + "]成功");

        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("任務處理執行緒.%d").build();
        //這裡的引數一次含義是,核心執行緒數量=4,最大執行緒數量8,空閒等待時間10,空閒等待時間單位s,
        //阻塞佇列,LinkedBlockingQueue無界佇列,任務量大於了最大執行緒容量後,會放到這個等待隊列當中
        //執行緒工廠,定義執行緒名稱,方便定位
        //拒絕接受新任務策略,如果新來的任務大於了等待佇列上限,那麼就會呼叫這個拒絕策略,拒絕策略可以自己實現,也可以呼叫
        //系統預設的4中策略,這裡用的是AbortPolicy,直接丟擲異常,終止執行緒池
        //這裡的執行緒池可以用自帶的執行緒池工具new出來,如:ExecutorService executorService =  Executors.newCachedThreadPool();
        ExecutorService executorService = new ThreadPoolExecutor(4,
                8,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(16),
                threadFactory,
                new ThreadPoolExecutor.AbortPolicy());
        while (true) {
            Socket socket = serverSocket.accept();
            System.out.println("接收到一個客戶端連線,分發任務。");
            //這裡檔名遞增,每次一個線連線進來,就存放一個新檔案
            executorService.execute(new Task(socket,"C:\\Users\\baopz\\Desktop\\test\\socket\\server\\"+atomicInteger.getAndAdd(1)+".gif"));
        }
    }

基於執行緒池執行的任務程式碼

接受一個socket和儲存路徑,進行處理

 /**
     * 服務端的執行任務
     */
    public static class Task implements Runnable {
        private Socket socket;
        private String filename;

        public Task(Socket socket, String filename) {
            this.socket = socket;
            this.filename = filename;
        }

        @Override
        public void run() {
            System.out.println("執行緒"+Thread.currentThread().getName()+"正在處理任務。");
            //16k
            File file = new File(filename);
            if (!file.exists()) {
                try {
                    if (!file.createNewFile()) {
                        System.out.println("建立檔案" + file.getAbsolutePath() + "失敗。");
                        return;
                    }
                } catch (IOException e) {
                    System.out.println("建立檔案" + file.getAbsolutePath() + "失敗。");
                    e.printStackTrace();
                }
            }
            InputStream inputStream = null;
            OutputStream outputStream = null;
            try {
                inputStream = socket.getInputStream();
                outputStream = new FileOutputStream(file);
                byte[] bytes = new byte[1 << 14];
                int length;
                while ((length = inputStream.read(bytes)) != -1) {
                    outputStream.write(bytes, 0, length);
                }
                System.out.println(Thread.currentThread().getName()+"檔案接受成功。");
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (outputStream != null) {
                    try {
                        outputStream.close();
                        System.out.println(Thread.currentThread().getName()+"關閉連線。");
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

測試客戶端可以直接使用上面的客戶端

Reactor模型服務端(一個Reactor,多個業務處理執行緒)

服務端分發程式,Reactor,注意,

1.這裡在selectionKey.isReadable()裡面,做了一個selectionKey.cancel();這個操作,主要是防止在起一個subChannel沒有完成時,這個selectionKey依然有效,會重複提交任務的。

2.示例程式展示的是小檔案的傳送,很不穩定,如果檔案大了,你會發現,服務端會接受不到完整的資料。後續在Netty裡面會講怎麼處理這個問題,手工處理起來還是比較繁瑣的。

   private static void server(int port) throws IOException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        Selector selector = Selector.open();
        //設定為非阻塞模式
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(port));
        System.out.println("繫結埠成功.");
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            //這裡就是在底層做的epoll操作,等待資料就緒
            int completed = selector.select();
            if (completed == 0) {
                System.out.println("監聽");
                continue;
            }
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                //如果是連線事件,就在當前執行緒做連線操作,然後把連線得到的socketChannel註冊到Selector中,等待下一個select
                if (selectionKey.isAcceptable()) {
                    ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel = serverSocketChannel1.accept();
                    System.out.println("得到一個新連線。");
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                //如果是寫事件,放到執行緒池裡面處理
                else if (selectionKey.isReadable()) {
                    //
                    SocketChannel subSocketChannel = (SocketChannel) selectionKey.channel();
                    System.out.println("執行緒提交任務。");
                    
                    ReadTask readTask = new ReadTask(subSocketChannel, "C:\\Users\\baopz\\Desktop\\test\\socket\\server\\" + atomicInteger.getAndAdd(1) + ".gif");
                    executorService.execute(readTask);
                    selectionKey.cancel();
                }
                //如果是讀事件
                else if (selectionKey.isWritable()) {
                    SocketChannel subSocketChannel = (SocketChannel) selectionKey.channel();
                    executorService.execute(new WriteTask(subSocketChannel));
                }
                iterator.remove();
            }
        }
    }

Handler處理邏輯

實現了一個讀事件,沒有實現寫事件。

public static class WriteTask implements Runnable {
        private SocketChannel socketChannel;

        public WriteTask(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        @Override
        public void run() {
            System.out.println("執行一個讀事件。");
            try {
                socketChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static class ReadTask implements Runnable {
        private SocketChannel socketChannel;
        private String filename;

        public ReadTask(SocketChannel socketChannel, String filename) {
            this.socketChannel = socketChannel;
            this.filename = filename;
        }

        @Override
        public void run() {
            //16k
            ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 14);
            Path path = Paths.get(filename);
            Path parent = path.getParent();
            if (Files.notExists(parent)) {
                try {
                    Files.createDirectories(parent);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("建立資料夾失敗:" + parent);
                    return;
                }
            }
            if (Files.notExists(path)) {
                try {
                    Files.createFile(path);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("檔案失敗:" + path);
                    return;
                }
            }

            FileChannel outFileChannel = null;
            RandomAccessFile randomAccessFile = null;
            try {
                randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
                outFileChannel = randomAccessFile.getChannel();
                //這裡buffer讀取的時候,可能資料讀不全,所以需要精細的設計,在Netty章節,我會詳細講一下它是怎麼處理
                while (socketChannel.read(buffer) > 0) {
                    buffer.flip();
                    outFileChannel.write(buffer);
                    buffer.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                System.out.println("執行清理工作");
                if (outFileChannel != null) {
                    try {
                        outFileChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (randomAccessFile != null) {
                    try {
                        randomAccessFile.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (socketChannel != null) {
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

非同步網路通訊

服務端程式

開啟一個AsynchronusServerSocketChannel,繫結到埠上,accept請求。

private static void server(int port) throws IOException, InterruptedException {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("服務處理程式%d").build();
        //執行緒池
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(4, threadFactory);
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);

        //這裡的backlog可以參考網上資料,這裡不詳細講,就是一個tcp/ip的一個引數,不想了解,可以忽略
        serverSocketChannel.bind(new InetSocketAddress(port), 512);
        System.out.println("服務端繫結埠[" + port + "]完成。");
        serverSocketChannel.accept(new Attachment(serverSocketChannel, atomicInteger), new CopyHandler());
    }

完成連線後的回撥處理類

注意,這裡在CopyHandler中的第一句,把AsynchronousServerSocketChannel.accept(),這裡是當前這個serverchannel繼續監聽下一個請求。

可以用之前的client測試程式,for迴圈測試一下。

public static class CopyHandler implements CompletionHandler<AsynchronousSocketChannel, Attachment> {
        @Override
        public void completed(AsynchronousSocketChannel result, Attachment attachment) {
            //接受下一次請求
            attachment.getAsynchronousServerSocketChannel().accept(new Attachment(attachment.getAsynchronousServerSocketChannel(), attachment.atomicInteger), this);
            //處理業務邏輯
            try {
                handle(result, attachment.getAtomicInteger());
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }

        private void handle(AsynchronousSocketChannel result, AtomicInteger attachment) throws ExecutionException, InterruptedException {
            System.out.println("連線完成。");
            //16k
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1 << 14);
            //這裡是阻塞是獲取資料,因為需要把資料放到byteBuffer中,所有,需要獲取到資料
            result.read(byteBuffer).get();
            Path target = Paths.get("C:\\Users\\baopz\\Desktop\\test\\socket\\server\\" + attachment.getAndAdd(1) + ".gif");
            if (Files.notExists(target)) {
                try {
                    Files.createFile(target);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            //
            try {
                AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(target, StandardOpenOption.WRITE);
                byteBuffer.flip();
                while (!asynchronousFileChannel.write(byteBuffer, 0).isDone()) {
                    TimeUnit.MILLISECONDS.sleep(500);
                }
                asynchronousFileChannel.close();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }

            try {
                result.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, Attachment attachment) {
            exc.printStackTrace();
        }
    }

儲存引數的輔助類

在回撥函式中需要傳遞進去一些資料。

public static class Attachment {
        private AsynchronousServerSocketChannel asynchronousServerSocketChannel;
        private AtomicInteger atomicInteger;

        public Attachment(AsynchronousServerSocketChannel serverSocketChannel, AtomicInteger atomicInteger) {
            this.asynchronousServerSocketChannel = serverSocketChannel;
            this.atomicInteger = atomicInteger;
        }

        public AsynchronousServerSocketChannel getAsynchronousServerSocketChannel() {
            return asynchronousServerSocketChannel;
        }

        public void setAsynchronousServerSocketChannel(AsynchronousServerSocketChannel asynchronousServerSocketChannel) {
            this.asynchronousServerSocketChannel = asynchronousServerSocketChannel;
        }

        public AtomicInteger getAtomicInteger() {
            return atomicInteger;
        }

        public void setAtomicInteger(AtomicInteger atomicInteger) {
            this.atomicInteger = atomicInteger;
        }
    }

總結,到這裡就把單執行緒模型,執行緒池處理,Reactor模型,非同步通訊模型都講完了,下一篇講解Java中的相關容器。