1. 程式人生 > >大多數人可能都不會使用socketTimeout,看了底層才知道一直都做錯了

大多數人可能都不會使用socketTimeout,看了底層才知道一直都做錯了

  前幾天一個機房網路抖動,引發了很多對外請求的超時問題,在發生問題排查日誌的時候,發現了這麼一個現象,httpclient我們的請求超時時間並沒有按照我們的設定報超時異常

我們的大概配置如下:

RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(1000)
                .setSocketTimeout(2000)
                .setConnectionRequestTimeout(1000)
                .build();

     但實際卻發現很多請求超時時間都達到了10幾秒甚至有的二十幾秒,大大超過了我們的預期時間,決定通過跟蹤原始碼一探究竟:

     原來http讀取網路資料的時候是其實是使用的BufferedReader類,而我們知道java的io類其實都是對基本輸入流的裝飾,其底層其實是利用的SocketInputStream來讀取資料,一路程式碼跟蹤,我們跟蹤到了這個方法

int read(byte b[], int off, int length, int timeout) throws IOException {
        int n = 0;

        // EOF already encountered
        if (eof) {
            return -1;
        }

        // connection reset
        if (impl.isConnectionReset()) {
            throw new SocketException("Connection reset");
        }

        // bounds check
        if (length <= 0 || off < 0 || off + length > b.length) {
            if (length == 0) {
                return 0;
            }
            throw new ArrayIndexOutOfBoundsException();
        }

        boolean gotReset = false;

        Object traceContext = IoTrace.socketReadBegin();
        // acquire file descriptor and do the read
        FileDescriptor fd = impl.acquireFD();
        try {
            n = socketRead0(fd, b, off, length, timeout);
            if (n > 0) {
                return n;
            }
        } catch (ConnectionResetException rstExc) {
            gotReset = true;
        } finally {
            impl.releaseFD();
            IoTrace.socketReadEnd(traceContext, impl.address, impl.port,
                                  timeout, n > 0 ? n : 0);
        }

        /*
         * We receive a "connection reset" but there may be bytes still
         * buffered on the socket
         */
        if (gotReset) {
            traceContext = IoTrace.socketReadBegin();
            impl.setConnectionResetPending();
            impl.acquireFD();
            try {
                n = socketRead0(fd, b, off, length, timeout);
                if (n > 0) {
                    return n;
                }
            } catch (ConnectionResetException rstExc) {
            } finally {
                impl.releaseFD();
                IoTrace.socketReadEnd(traceContext, impl.address, impl.port,
                                      timeout, n > 0 ? n : 0);
            }
        }

        /*
         * If we get here we are at EOF, the socket has been closed,
         * or the connection has been reset.
         */
        if (impl.isClosedOrPending()) {
            throw new SocketException("Socket closed");
        }
        if (impl.isConnectionResetPending()) {
            impl.setConnectionReset();
        }
        if (impl.isConnectionReset()) {
            throw new SocketException("Connection reset");
        }
        eof = true;
        return -1;
    }

這個方法的核心其實就是 socketRead0(fd, b, off, length, timeout)這個方法的呼叫,而這個方法是這樣的:

private native int socketRead0(FileDescriptor fd,
                                   byte b[], int off, int len,
                                   int timeout)
        throws IOException;

這個是native方法,通過下載openjdk1.8原始碼,我們在openjdk\jdk\src\solaris\native\java\net的目錄下找到了相關實現,在SocketInputStream.c檔案裡,程式碼如下:

Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
                                            jobject fdObj, jbyteArray data,
                                            jint off, jint len, jint timeout)
{
    char BUF[MAX_BUFFER_LEN];
    char *bufP;
    jint fd, nread;

    if (IS_NULL(fdObj)) {
        /* shouldn't this be a NullPointerException? -br */
        JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                        "Socket closed");
        return -1;
    } else {
        fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
        /* Bug 4086704 - If the Socket associated with this file descriptor
         * was closed (sysCloseFD), then the file descriptor is set to -1.
         */
        if (fd == -1) {
            JNU_ThrowByName(env, "java/net/SocketException", "Socket closed");
            return -1;
        }
    }

    /*
     * If the read is greater than our stack allocated buffer then
     * we allocate from the heap (up to a limit)
     */
    if (len > MAX_BUFFER_LEN) {
        if (len > MAX_HEAP_BUFFER_LEN) {
            len = MAX_HEAP_BUFFER_LEN;
        }
        bufP = (char *)malloc((size_t)len);
        if (bufP == NULL) {
            bufP = BUF;
            len = MAX_BUFFER_LEN;
        }
    } else {
        bufP = BUF;
    }

    if (timeout) {
        nread = NET_Timeout(fd, timeout);
        if (nread <= 0) {
            if (nread == 0) {
                JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
                            "Read timed out");
            } else if (nread == JVM_IO_ERR) {
                if (errno == EBADF) {
                     JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "Socket closed");
                 } else if (errno == ENOMEM) {
                     JNU_ThrowOutOfMemoryError(env, "NET_Timeout native heap allocation failed");
                 } else {
                     NET_ThrowByNameWithLastError(env, JNU_JAVANETPKG "SocketException",
                                                  "select/poll failed");
                 }
            } else if (nread == JVM_IO_INTR) {
                JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
                            "Operation interrupted");
            }
            if (bufP != BUF) {
                free(bufP);
            }
            return -1;
        }
    }

    nread = NET_Read(fd, bufP, len);

    if (nread <= 0) {
        if (nread < 0) {

            switch (errno) {
                case ECONNRESET:
                case EPIPE:
                    JNU_ThrowByName(env, "sun/net/ConnectionResetException",
                        "Connection reset");
                    break;

                case EBADF:
                    JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
                        "Socket closed");
                    break;

                case EINTR:
                     JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
                           "Operation interrupted");
                     break;

                default:
                    NET_ThrowByNameWithLastError(env,
                        JNU_JAVANETPKG "SocketException", "Read failed");
            }
        }
    } else {
        (*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
    }

    if (bufP != BUF) {
        free(bufP);
    }
    return nread;
}

通過程式碼我們可以知道,資料的讀取是通過NET_Timeout (fd, timeout)來實現的,我們繼續跟蹤程式碼,在linux_close.c檔案中,發現了NET_Timeout的實現:

int NET_Timeout(int s, long timeout) {
    long prevtime = 0, newtime;
    struct timeval t;
    fdEntry_t *fdEntry = getFdEntry(s);

    /*
     * Check that fd hasn't been closed.
     */
    if (fdEntry == NULL) {
        errno = EBADF;
        return -1;
    }

    /*
     * Pick up current time as may need to adjust timeout
     */
    if (timeout > 0) {
        gettimeofday(&t, NULL);
        prevtime = t.tv_sec * 1000  +  t.tv_usec / 1000;
    }

    for(;;) {
        struct pollfd pfd;
        int rv;
        threadEntry_t self;

        /*
         * Poll the fd. If interrupted by our wakeup signal
         * errno will be set to EBADF.
         */
        pfd.fd = s;
        pfd.events = POLLIN | POLLERR;

        startOp(fdEntry, &self);
        rv = poll(&pfd, 1, timeout);
        endOp(fdEntry, &self);

        /*
         * If interrupted then adjust timeout. If timeout
         * has expired return 0 (indicating timeout expired).
         */
        if (rv < 0 && errno == EINTR) {
            if (timeout > 0) {
                gettimeofday(&t, NULL);
                newtime = t.tv_sec * 1000  +  t.tv_usec / 1000;
                timeout -= newtime - prevtime;
                if (timeout <= 0) {
                    return 0;
                }
                prevtime = newtime;
            }
        } else {
            return rv;
        }

    }
}

 

程式碼中的關鍵點在 poll(&pfd, 1, timeout);poll是linux中的字元裝置驅動中的一個函式,作用是把當前的檔案指標掛到裝置內部定義的等待

這樣就很好理解了,其實這個時間是我兩次讀取資料之間的最長阻塞時間,如果我在網路抖動的情況下,我每次2秒之內返回一部分資料,這樣我就一直不會超時了,為了驗證我們的理解寫了test,程式碼如下,一個controller,用來接受http請求:

@org.springframework.stereotype.Controller
@RequestMapping("/hello")
public class Controller {
    @RequestMapping("/test")
    public void tets(HttpServletRequest request ,HttpServletResponse response) throws IOException, InterruptedException {
        System.out.println("I'm coming");
        PrintWriter writer = response.getWriter();
        while (true){
            writer.print("ha ha ha");
            writer.flush();
            Thread.sleep(2000);
            System.out.println("I'm ha ha ha");
        }
    }
}

這個程式碼就是每隔2s傳送一條資料,迴圈傳送,模擬網路不好的時候,收到的資料斷斷續續,再來一個test用來發送請求:

 @Test
    public void tetsHttpClientHttp() throws IOException {
        RequestConfig requestConfig = RequestConfig.custom()
                .setConnectTimeout(1000)
                .setSocketTimeout(3000)
                .setConnectionRequestTimeout(1000)
                .build();

        CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
        // 建立Get請求
        HttpGet httpGet = new HttpGet("http://127.0.0.1:8080/hello/test");
        CloseableHttpResponse response =httpClient.execute(httpGet);
        HttpEntity responseEntity = response.getEntity();
        if (responseEntity != null) {
            System.out.println("響應內容為:" + EntityUtils.toString(responseEntity));
        }
    }

服務端結果如下:

客戶端結果如下:

程式並沒有如期丟擲異常,和我們預想的一樣,而當我們修改socketTimeout為1000時,經驗證可以丟擲java.net.SocketTimeoutException: Read timed out 異常

為此,為了更準確控制時間,我們需要自己實現超時機制:

 

ExecutorService executor = Executors.newFixedThreadPool(1);
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                RequestConfig requestConfig = RequestConfig.custom()
                        .setConnectTimeout(1000)
                        .setSocketTimeout(3000)
                        .setConnectionRequestTimeout(1000)
                        .build();

                CloseableHttpClient httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
                // 建立Get請求
                HttpGet httpGet = new HttpGet("http://127.0.0.1:8080/hello/test");
                CloseableHttpResponse response =httpClient.execute(httpGet);
                HttpEntity responseEntity = response.getEntity();
                return EntityUtils.toString(responseEntity);
            }
        };
        Future<String> future = executor.submit(callable);
        System.out.print(future.get(5,TimeUnit.SECONDS));

這樣就可以避免這種情況,在請求執行緒超時時丟擲 java.util.concurrent.TimeoutException避免長時間佔住業務執行緒影響我們的服務,當然這只是個例子,現實我們可能還要考慮執行緒數,拒絕策略等情