1. 程式人生 > >【Java TCP/IP Socket程式設計】----深入剖析----TCP資料傳輸中的死鎖和效能

【Java TCP/IP Socket程式設計】----深入剖析----TCP資料傳輸中的死鎖和效能

目錄

 

死鎖問題

資料傳輸效能

案例


--------筆記來自於書籍《Java TCP/IP Socket程式設計》

死鎖問題

在TCP資料傳輸底層實現中(詳細參見https://blog.csdn.net/lili13897741554/article/details/83104539)可能會出現死鎖的情況,因此程式協議必須設計得非常小心,避免死鎖的發生。以下情況可能導致死鎖:

1.每個對等端都在阻塞等待其他端完成一些工作,例如:如果在連線建立後,客戶端和伺服器端都立即嘗試接收資料,顯然將導致死鎖。

2.SendQ和RecvQ緩衝區的容量在具體實現時會收到一定的限制,如果與TCP的流向控制機制結合使用,有可能到產生死鎖的情況。

1)詳細說明

      雖然SendQ和RecvQ緩衝區容量使用的實際大小會動態地增大和收縮,還是需要一個硬性限制,防止行為異常的程式所控制的單獨一個TCP連線將系統的記憶體全部耗盡,由於這些緩衝區的容量有限,它們可能被填滿。一旦RecvQ已滿,TCP流控制機制就會產生作用。它將阻止傳輸傳送端主機的SendQ中的任何資料,直到接收者呼叫輸入流的read()方法後騰出空間。(使用流控制機制的目的就是為了保證傳送者不會傳輸太多資料,而超出了接收系統的處理能力。)傳送程式可以持續地寫出資料,直到SendQ佇列被填滿,然而,如果SendQ佇列已滿時呼叫out.write()方法,則將阻塞等待,直到有新的空間為止,也就是說直到一些位元組傳輸到了接收到套接字的RecvQ佇列中。如果此時RecvQ佇列也已經被填滿,所有操作都將停止,直到接收程式呼叫in.read()方法將一些位元組傳輸到Delivered佇列中。

    假設SendQ佇列和RecvQ佇列大小分別是SQS和RQS。將一個大小為n的位元組陣列傳遞給write()方法呼叫,其中n>SQS,直到有至少n-SQS位元組傳遞到接收端主機的RecvQ對列後,該方法才會返回。如果n大小超過了(SQS+RQS),write()方法則將在接收程式從輸入流中讀取了n-(SQS+RQS)位元組後才會返回。如果接收程式沒有呼叫read()方法,大數量的send()呼叫則無法成功。特別是當連線的兩端同時分別呼叫它們輸出流的write()方法,而它們的緩衝區大小又大於SQS+RQS時將發生死鎖,兩個write操作都不能完成,兩個程式都將永遠保持阻塞狀態。

2)具體例項

     主機A上的程式和主機B上的程式之間有一個連線,假設A和B上的SQS和RQS都是500位元組,兩個程式試圖同時傳送1500位元組時的情況。主機A上的程式中前500位元組已經傳輸到了另一端,另外500位元組已經複製到了主機A的SendQ佇列紅,餘下的500位元組則無法傳送(因此out.write()方法將無法返回)直到主機B上的RecvQ佇列有空間空出來,然後主機B上程式也遇到同樣的情況,因此兩個程式的write()方法呼叫都永遠無法完成。

    注意點:要仔細設計協議,以避免兩個方向上傳輸大量資料時產生死鎖。

解決方法:

1)方案之一,是在不同的執行緒中執行客戶端的write()迴圈和read()迴圈。一個執行緒在客戶端寫完資料後呼叫該套接字的shutdownOutput()方法,另外一個執行緒從連線到伺服器的輸入流中反覆讀取伺服器端反饋給客戶端的資訊,直到到達輸入流的結尾(即伺服器端關閉套接字)。如果一個執行緒阻塞了,另外一個執行緒仍然可以獨立執行。

2)不使用多執行緒,使用NIO來解決(非阻塞Channel和Selector)

資料傳輸效能

    在TCP實現中,將使用者資料複製到SendQ佇列中不僅是因為可能重傳資料,這還與效能有關。尤其是SendQ和RecvQ緩衝佇列的大小,會對TCP連線的資料吞吐量產生影響。吞吐量是指使用者資料位元組從傳送端傳送到接收程式的頻率。在要傳輸大量資料的程式中,我們希望最大化這個頻率。在沒有網路容量或其他限制的情況下,越大的緩衝區通常能夠實現越高吞吐量。

    如果傳輸n位元組的資料,使用大小為n的緩衝區呼叫一次write()方法,通常要比使用大小為1位元組的緩衝區呼叫n次write()方法效率高很多。然而,如果呼叫writer()方法是使用了比SQS(SendQ佇列大小)大很多的緩衝區,系統還需要將資料從使用者地址轉換為大小為SQS的塊。即套接字底層實現先將SendQ佇列緩衝區填滿,等待TCP協議將資料轉移出去,再重新填滿SendQ佇列緩衝區,再等待資料轉移,反覆進行。套接字底層實現每次都要等待資料從SendQ佇列中移除,這就一系統訊息的形式消耗形式(系統需要上下文切換)浪費一些時間。由此可知,呼叫write()方法時實際有效緩衝區大小受到SQS限制,同樣read()方法也會受到RQS大小的限制。

    需要注意的是:只有當程式一次傳送比緩衝區容量大很多的資料,並且要求程式的吞吐量時,可以考慮通過Socket的setSendBufferSize()和sendReceiveBufferSize()方法來改變傳送和接收緩衝區的大小。

案例

    案例實現:客戶端讀取檔案,然後將檔案傳送到伺服器端,伺服器端接收到未壓縮的資料,將資料進行簡單地壓縮並返回給客戶端。

1.客戶端程式碼

public class CompressClient {
  private static final int BUFSIZE = 256;
  private static final String FILENAME = "D:\\java.txt";

  public static void main(String[] args) throws IOException {
    FileInputStream fileIn = new FileInputStream(FILENAME);
    FileOutputStream fileOut = new FileOutputStream(FILENAME + ".gz");
    Socket clientSocket = new Socket("127.0.0.1", 1234);
    InputStream in = clientSocket.getInputStream();
    sendBytes(clientSocket, fileIn);
    int bytesRead;
    byte[] buffer = new byte[BUFSIZE];
    while ((bytesRead = in.read(buffer)) != -1) {
      fileOut.write(buffer, 0, bytesRead);
      System.out.println("R");
    }
    fileIn.close();
    fileOut.close();
    in.close();
  }

  private static void sendBytes(Socket socket, InputStream fileIn) throws IOException {
    OutputStream out = socket.getOutputStream();
    int bytesRead = 0;
    byte[] buffer = new byte[BUFSIZE];
    while ((bytesRead = fileIn.read(buffer)) != -1) {
      out.write(buffer, 0, bytesRead);
      System.out.println("W");
    }
    socket.shutdownOutput();
  }
}

2.伺服器端的程式碼

public class CompressServerExecutor {
  public static void main(String[] args) throws IOException {
    ServerSocket serverSocket = new ServerSocket(1234);
    ExecutorService service = Executors.newCachedThreadPool();
    Logger logger = Logger.getLogger("pratical");
    while(true) {
      Socket clientSocket = serverSocket.accept();
      service.submit(new CompressProtocol(clientSocket,logger));
    }
  }
}
public class CompressProtocol implements Runnable {
  private static final int BUFSIZE= 1024;
  private Socket clientSocket;
  private Logger logger;
  public CompressProtocol(Socket socket,Logger logger) {
    this.clientSocket=socket;
    this.logger=logger;
  }
  
  private static void handleCompressClient(Socket clientSocket,Logger logger) {
    try {
      InputStream in = clientSocket.getInputStream();
      GZIPOutputStream out = new GZIPOutputStream(clientSocket.getOutputStream());
      byte[] buffer = new byte[BUFSIZE];
      int bytesRead;
      while((bytesRead = in.read(buffer))!=-1) {
        out.write(buffer, 0, bytesRead);
        out.finish(); //重新整理流
      }
      logger.info("client "+clientSocket.getRemoteSocketAddress()+" finished");
      clientSocket.close();
    } catch (IOException e) {
      logger.log(Level.WARNING,"Exception in echo Protocol",e);
    }
  }
  
  @Override
  public void run() {
    handleCompressClient(clientSocket,logger);
  }
}

分析可能出現的情況:

客戶端和伺服器端的SendQ佇列和RecvQ佇列中都有500位元組的資料,而客戶端傳送了一個大小為10000位元組(未壓縮)的檔案,同時假設對於這個檔案,伺服器讀取1000位元組並返回500位元組,即壓縮比2:1。當客戶端傳送了2000位元組後,伺服器端最終全部讀取這些位元組,併發回1000位元組,此時客戶端的RecvQ佇列和伺服器端的SendQ佇列都將被填滿。當客戶端又傳送了1000位元組並被伺服器全部讀取後,伺服器端後續的任何write操作嘗試都將阻塞。當客戶端又傳送了另外1000位元組後,客戶端的SendQ佇列和伺服器端的RecvQ佇列都將填滿,後續客戶端write操作將阻塞,從而形成死鎖。

解決方案之一:在不同的執行緒裡面執行客戶端write()和read()迴圈。即將CompressClient.java中sendBytes()方法放到執行緒裡面

public class CompressClient {
  private static final int BUFSIZE = 256;
  private static final String FILENAME = "D:\\java.txt";

  public static void main(String[] args) throws IOException {
    FileInputStream fileIn = new FileInputStream(FILENAME);
    FileOutputStream fileOut = new FileOutputStream(FILENAME + ".gz");
    Socket clientSocket = new Socket("127.0.0.1", 1234);
    InputStream in = clientSocket.getInputStream();
    //sendBytes(clientSocket, fileIn);
    Thread thread = new Thread() {
      public void run() {
        try {
          sendBytes(clientSocket,fileIn);
        }catch(IOException e) {
        }
      }
    };
    thread.start();
    int bytesRead;
    byte[] buffer = new byte[BUFSIZE];
    while ((bytesRead = in.read(buffer)) != -1) {
      fileOut.write(buffer, 0, bytesRead);
      System.out.println("R");
    }
    fileIn.close();
    fileOut.close();
    in.close();
  }

  private static void sendBytes(Socket socket, InputStream fileIn) throws IOException {
    OutputStream out = socket.getOutputStream();
    int bytesRead = 0;
    byte[] buffer = new byte[BUFSIZE];
    while ((bytesRead = fileIn.read(buffer)) != -1) {
      out.write(buffer, 0, bytesRead);
      System.out.println("W");
    }
    socket.shutdownOutput();
  }
}