Java粗淺認識-網路程式設計(二)
單執行緒模型
服務端繫結一個埠,然後接收請求,每次請求就處理,後續請求進來時,等待之前的任務處理完成,如果任務處理非常快,也是不會有明顯阻塞的。
單執行緒模型服務端程式碼
展示檔案上傳後處理邏輯,在一個while(true)中阻塞等待accept,由於是演示網路通訊,這裡的檔案I/O快取直接使用的是一個byte[1<<14] = 16k的容量,在專案中可以寫成迴圈使用的方式。
private static void server(int port) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); System.out.println("服務端啟動成功,繫結埠[" + port +"]"); //這裡使用了一個單執行緒處理,後面講多執行緒的時候,會改為多執行緒版本 while (true) { System.out.println("服務端等待連線中。"); Socket socket = serverSocket.accept(); System.out.println("接收到一個客戶端連線。"); InputStream inputStream = socket.getInputStream(); //16k File file = new File("C:\\Users\\baopz\\Desktop\\test\\socket\\server\\1.gif"); if (!file.exists()) { if (!file.createNewFile()) { System.out.println("建立檔案" + file.getAbsolutePath() + "失敗。"); continue; } } OutputStream outputStream = null; //這裡可以使用java 1.7的資源自動釋放寫法 try { outputStream = new FileOutputStream(file); byte[] bytes = new byte[1 << 14]; int length; while ((length = inputStream.read(bytes)) != -1) { outputStream.write(bytes, 0, length); } System.out.println("檔案接受成功。"); } finally { if (outputStream != null) { outputStream.close(); } } } }
客戶端測試程式碼
可以在一個for迴圈中呼叫,看看會不會等待之前的處理
private static void client() throws IOException { Socket socket = new Socket(); OutputStream outputStream = null; try { System.out.println("客戶端請求連線。"); socket.connect(new InetSocketAddress("127.0.0.1", 8888)); outputStream = socket.getOutputStream(); File file = new File("C:\\Users\\baopz\\Desktop\\test\\socket\\client\\1.gif"); if (!file.exists()) { System.out.println("需要傳輸的檔案不存在。"); return; } InputStream inputStream = new FileInputStream(file); //16k byte[] bytes = new byte[1 << 14]; int length; while((length=inputStream.read(bytes))!=-1){ outputStream.write(bytes,0,length); } outputStream.flush(); inputStream.close(); System.out.println("檔案傳送成功。"); }finally { if(outputStream!=null){ outputStream.close(); } socket.close(); System.out.println("客戶端關閉連線。"); } }
preThreading模式
在服務啟動的時候,預先new出一個執行緒池,讓每個連線都用一個執行緒處理。
基於執行緒池的服務端
需要注意一下,這裡的ThreadFactoryBuilder是來自com.google.guava.guava包
/** * 基於執行緒池的服務端 * @param port * @throws IOException */ private static void server(int port) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); System.out.println("伺服器繫結埠[" + port + "]成功"); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("任務處理執行緒.%d").build(); //這裡的引數一次含義是,核心執行緒數量=4,最大執行緒數量8,空閒等待時間10,空閒等待時間單位s, //阻塞佇列,LinkedBlockingQueue無界佇列,任務量大於了最大執行緒容量後,會放到這個等待隊列當中 //執行緒工廠,定義執行緒名稱,方便定位 //拒絕接受新任務策略,如果新來的任務大於了等待佇列上限,那麼就會呼叫這個拒絕策略,拒絕策略可以自己實現,也可以呼叫 //系統預設的4中策略,這裡用的是AbortPolicy,直接丟擲異常,終止執行緒池 //這裡的執行緒池可以用自帶的執行緒池工具new出來,如:ExecutorService executorService = Executors.newCachedThreadPool(); ExecutorService executorService = new ThreadPoolExecutor(4, 8, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(16), threadFactory, new ThreadPoolExecutor.AbortPolicy()); while (true) { Socket socket = serverSocket.accept(); System.out.println("接收到一個客戶端連線,分發任務。"); //這裡檔名遞增,每次一個線連線進來,就存放一個新檔案 executorService.execute(new Task(socket,"C:\\Users\\baopz\\Desktop\\test\\socket\\server\\"+atomicInteger.getAndAdd(1)+".gif")); } }
基於執行緒池執行的任務程式碼
接受一個socket和儲存路徑,進行處理
/**
* 服務端的執行任務
*/
public static class Task implements Runnable {
private Socket socket;
private String filename;
public Task(Socket socket, String filename) {
this.socket = socket;
this.filename = filename;
}
@Override
public void run() {
System.out.println("執行緒"+Thread.currentThread().getName()+"正在處理任務。");
//16k
File file = new File(filename);
if (!file.exists()) {
try {
if (!file.createNewFile()) {
System.out.println("建立檔案" + file.getAbsolutePath() + "失敗。");
return;
}
} catch (IOException e) {
System.out.println("建立檔案" + file.getAbsolutePath() + "失敗。");
e.printStackTrace();
}
}
InputStream inputStream = null;
OutputStream outputStream = null;
try {
inputStream = socket.getInputStream();
outputStream = new FileOutputStream(file);
byte[] bytes = new byte[1 << 14];
int length;
while ((length = inputStream.read(bytes)) != -1) {
outputStream.write(bytes, 0, length);
}
System.out.println(Thread.currentThread().getName()+"檔案接受成功。");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (outputStream != null) {
try {
outputStream.close();
System.out.println(Thread.currentThread().getName()+"關閉連線。");
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
測試客戶端可以直接使用上面的客戶端
Reactor模型服務端(一個Reactor,多個業務處理執行緒)
服務端分發程式,Reactor,注意,
1.這裡在selectionKey.isReadable()裡面,做了一個selectionKey.cancel();這個操作,主要是防止在起一個subChannel沒有完成時,這個selectionKey依然有效,會重複提交任務的。
2.示例程式展示的是小檔案的傳送,很不穩定,如果檔案大了,你會發現,服務端會接受不到完整的資料。後續在Netty裡面會講怎麼處理這個問題,手工處理起來還是比較繁瑣的。
private static void server(int port) throws IOException {
ExecutorService executorService = Executors.newCachedThreadPool();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
Selector selector = Selector.open();
//設定為非阻塞模式
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(port));
System.out.println("繫結埠成功.");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//這裡就是在底層做的epoll操作,等待資料就緒
int completed = selector.select();
if (completed == 0) {
System.out.println("監聽");
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
//如果是連線事件,就在當前執行緒做連線操作,然後把連線得到的socketChannel註冊到Selector中,等待下一個select
if (selectionKey.isAcceptable()) {
ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel1.accept();
System.out.println("得到一個新連線。");
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
//如果是寫事件,放到執行緒池裡面處理
else if (selectionKey.isReadable()) {
//
SocketChannel subSocketChannel = (SocketChannel) selectionKey.channel();
System.out.println("執行緒提交任務。");
ReadTask readTask = new ReadTask(subSocketChannel, "C:\\Users\\baopz\\Desktop\\test\\socket\\server\\" + atomicInteger.getAndAdd(1) + ".gif");
executorService.execute(readTask);
selectionKey.cancel();
}
//如果是讀事件
else if (selectionKey.isWritable()) {
SocketChannel subSocketChannel = (SocketChannel) selectionKey.channel();
executorService.execute(new WriteTask(subSocketChannel));
}
iterator.remove();
}
}
}
Handler處理邏輯
實現了一個讀事件,沒有實現寫事件。
public static class WriteTask implements Runnable {
private SocketChannel socketChannel;
public WriteTask(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void run() {
System.out.println("執行一個讀事件。");
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static class ReadTask implements Runnable {
private SocketChannel socketChannel;
private String filename;
public ReadTask(SocketChannel socketChannel, String filename) {
this.socketChannel = socketChannel;
this.filename = filename;
}
@Override
public void run() {
//16k
ByteBuffer buffer = ByteBuffer.allocateDirect(1 << 14);
Path path = Paths.get(filename);
Path parent = path.getParent();
if (Files.notExists(parent)) {
try {
Files.createDirectories(parent);
} catch (IOException e) {
e.printStackTrace();
System.out.println("建立資料夾失敗:" + parent);
return;
}
}
if (Files.notExists(path)) {
try {
Files.createFile(path);
} catch (IOException e) {
e.printStackTrace();
System.out.println("檔案失敗:" + path);
return;
}
}
FileChannel outFileChannel = null;
RandomAccessFile randomAccessFile = null;
try {
randomAccessFile = new RandomAccessFile(path.toFile(), "rw");
outFileChannel = randomAccessFile.getChannel();
//這裡buffer讀取的時候,可能資料讀不全,所以需要精細的設計,在Netty章節,我會詳細講一下它是怎麼處理
while (socketChannel.read(buffer) > 0) {
buffer.flip();
outFileChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
System.out.println("執行清理工作");
if (outFileChannel != null) {
try {
outFileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (randomAccessFile != null) {
try {
randomAccessFile.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (socketChannel != null) {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
非同步網路通訊
服務端程式
開啟一個AsynchronusServerSocketChannel,繫結到埠上,accept請求。
private static void server(int port) throws IOException, InterruptedException {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("服務處理程式%d").build();
//執行緒池
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(4, threadFactory);
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);
//這裡的backlog可以參考網上資料,這裡不詳細講,就是一個tcp/ip的一個引數,不想了解,可以忽略
serverSocketChannel.bind(new InetSocketAddress(port), 512);
System.out.println("服務端繫結埠[" + port + "]完成。");
serverSocketChannel.accept(new Attachment(serverSocketChannel, atomicInteger), new CopyHandler());
}
完成連線後的回撥處理類
注意,這裡在CopyHandler中的第一句,把AsynchronousServerSocketChannel.accept(),這裡是當前這個serverchannel繼續監聽下一個請求。
可以用之前的client測試程式,for迴圈測試一下。
public static class CopyHandler implements CompletionHandler<AsynchronousSocketChannel, Attachment> {
@Override
public void completed(AsynchronousSocketChannel result, Attachment attachment) {
//接受下一次請求
attachment.getAsynchronousServerSocketChannel().accept(new Attachment(attachment.getAsynchronousServerSocketChannel(), attachment.atomicInteger), this);
//處理業務邏輯
try {
handle(result, attachment.getAtomicInteger());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
private void handle(AsynchronousSocketChannel result, AtomicInteger attachment) throws ExecutionException, InterruptedException {
System.out.println("連線完成。");
//16k
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1 << 14);
//這裡是阻塞是獲取資料,因為需要把資料放到byteBuffer中,所有,需要獲取到資料
result.read(byteBuffer).get();
Path target = Paths.get("C:\\Users\\baopz\\Desktop\\test\\socket\\server\\" + attachment.getAndAdd(1) + ".gif");
if (Files.notExists(target)) {
try {
Files.createFile(target);
} catch (IOException e) {
e.printStackTrace();
}
}
//
try {
AsynchronousFileChannel asynchronousFileChannel = AsynchronousFileChannel.open(target, StandardOpenOption.WRITE);
byteBuffer.flip();
while (!asynchronousFileChannel.write(byteBuffer, 0).isDone()) {
TimeUnit.MILLISECONDS.sleep(500);
}
asynchronousFileChannel.close();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
try {
result.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Attachment attachment) {
exc.printStackTrace();
}
}
儲存引數的輔助類
在回撥函式中需要傳遞進去一些資料。
public static class Attachment {
private AsynchronousServerSocketChannel asynchronousServerSocketChannel;
private AtomicInteger atomicInteger;
public Attachment(AsynchronousServerSocketChannel serverSocketChannel, AtomicInteger atomicInteger) {
this.asynchronousServerSocketChannel = serverSocketChannel;
this.atomicInteger = atomicInteger;
}
public AsynchronousServerSocketChannel getAsynchronousServerSocketChannel() {
return asynchronousServerSocketChannel;
}
public void setAsynchronousServerSocketChannel(AsynchronousServerSocketChannel asynchronousServerSocketChannel) {
this.asynchronousServerSocketChannel = asynchronousServerSocketChannel;
}
public AtomicInteger getAtomicInteger() {
return atomicInteger;
}
public void setAtomicInteger(AtomicInteger atomicInteger) {
this.atomicInteger = atomicInteger;
}
}
總結,到這裡就把單執行緒模型,執行緒池處理,Reactor模型,非同步通訊模型都講完了,下一篇講解Java中的相關容器。