1. 程式人生 > >記錄一次線上關於socket超時問題的定位

記錄一次線上關於socket超時問題的定位

現象:應用程式就是簡單的spring+cxf組成的系統,系統上線執行後發現執行一段時間之後就發現請求可以進來卻得不到處理,cxf的處理過程是建立一個執行緒,並提交到執行緒池去執行.。


import java.io.PrintWriter;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.RandomUtils;
import org.apache.cxf.message.Message;
import org.apache.cxf.phase.PhaseInterceptorChain;
import org.apache.cxf.transport.http.AbstractHTTPDestination;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

import com.yzx.rest.util.SysConfig;
import com.yzx.rest.util.web.HttpUtils;

@Service("callbackService")
public class CallbackServiceImpl implements CallbackService {

	private static Logger logger = LogManager.getLogger(CallbackServiceImpl.class);
	
	@Autowired
	private ThreadPoolTaskExecutor poolTaskExecutor;
	
	
	@Override
	public String callback(String msg) {
		logger.info("=================================模擬AS客戶端接收到訊息,msg="+msg);
		return "000000";
	}
	@Override
	public void test(String msg) {
		final Message messageContext = PhaseInterceptorChain.getCurrentMessage();
		final HttpServletRequest request = (HttpServletRequest) messageContext
				.get(AbstractHTTPDestination.HTTP_REQUEST);
		final HttpServletResponse response = (HttpServletResponse) messageContext
				.get(AbstractHTTPDestination.HTTP_RESPONSE);
		int size  = poolTaskExecutor.getThreadPoolExecutor().getQueue().size();
		logger.info("執行緒ID:{},推送前置機測試資訊.....當前活躍執行緒數:{},執行緒池核心執行緒數:{},執行緒池最大執行緒數:{},執行緒池佇列大小:{}",
				Thread.currentThread().getId(),
				poolTaskExecutor.getActiveCount(),
				poolTaskExecutor.getCorePoolSize(),
				poolTaskExecutor.getMaxPoolSize(),
				size);
		if (request.isAsyncSupported()) {
			// 清除cxf的OutInterceptors,防止cxf將響應返回給客戶端
			messageContext.getExchange().getEndpoint().getOutInterceptors().clear();
			messageContext.getExchange().getBinding().getOutInterceptors().clear();
			request.startAsync(request, response);
			if (request.isAsyncStarted()) {
				final AsyncContext asyncContext = request.getAsyncContext();
				asyncContext.setTimeout(SysConfig.getInstance().getPropertyInt("async_timeout"));
				asyncContext.start(
				// 開啟http執行緒
						new Runnable() {
							@Override
							public void run() {
								poolTaskExecutor.execute(// 由http執行緒交於work執行緒
										new Runnable() {
											@Override
											public void run() {
												PrintWriter printWriter = null;
												try {
													String responseMsgContent = "";
													String ws_callback = SysConfig.getInstance().getProperty("ws_callback");
													String result = HttpUtils.postXml(ws_callback, "desc="+System.currentTimeMillis()+"-"+RandomUtils.nextInt(0, 9999));
													logger.info("執行緒ID:{},result:{}",Thread.currentThread().getId(),result);
													response.setCharacterEncoding("UTF-8");
													response.setHeader("Content-type","application/json;charset=UTF-8");
													printWriter = response.getWriter();
													printWriter.write(responseMsgContent);
													logger.info(responseMsgContent);
												} catch (Exception e) {
													e.printStackTrace();
												} finally {
													if (printWriter != null) {
														printWriter.flush();
														printWriter.close();
													}
													asyncContext.complete();
												}
											}
										});
							}
						});
			}
		} else { // 不支援非同步
		}
	
	}

}

我在cxf邏輯的外邊列印了當前系統的執行緒池大小,佇列資訊.spring的配置檔案中配置了執行緒池配置:

    <!-- 非同步執行緒池 -->  
  	<bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">  
	    <!-- 核心執行緒數  -->  
	    <property name="corePoolSize" value="500" />  
	    <!-- 最大執行緒數 -->  
	    <property name="maxPoolSize" value="800" />  
	    <!-- 佇列最大長度 >=mainExecutor.maxSize -->  
	    <property name="queueCapacity" value="1000" />  
	    <!-- 執行緒池維護執行緒所允許的空閒時間 -->  
	    <property name="keepAliveSeconds" value="30" />  
	    <!-- 執行緒池對拒絕任務(無執行緒可用)的處理策略 -->  
	    <property name="rejectedExecutionHandler">  
	    <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />  
   		</property>  
  	</bean>

連續觀察了兩三天發現異常是時列印的活躍執行緒數總是500,然後就再也沒有給響應,於是推測是不是執行緒沒有回收,所以調整了執行緒池的keepAliveSeconds引數,從300調小到了30s,繼續觀察,結果確實是比之前出異常的時間晚了那麼一點點,就可以忽略不計了.

於是安裝了個probe對tomcat進行監控.並在測試環境進行壓測,開始的時候執行緒狀態是

sun.misc.Unsafe.park ( native code )

解讀一下:

sun.misc.Unsafe.park ( native code )

java.util.concurrent.locks.LockSupport.park ( LockSupport.java:186 )

java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await ( AbstractQueuedSynchronizer.java:2043 )

java.util.concurrent.LinkedBlockingQueue.take ( LinkedBlockingQueue.java:442 )

org.apache.tomcat.util.threads.TaskQueue.take ( TaskQueue.java:104 )

org.apache.tomcat.util.threads.TaskQueue.take ( TaskQueue.java:32 )

java.util.concurrent.ThreadPoolExecutor.getTask ( ThreadPoolExecutor.java:1068 )

java.util.concurrent.ThreadPoolExecutor.runWorker ( ThreadPoolExecutor.java:1130 )

java.util.concurrent.ThreadPoolExecutor$Worker.run ( ThreadPoolExecutor.java:615 )

org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run ( TaskThread.java:61 )

java.lang.Thread.run ( Thread.java:744 )

正常的狀態:執行緒處於WAITING狀態,阻塞在試圖從任務佇列中取任務(LinkedBlockingQueue.take),這個任務佇列指的是ThreadPoolExecutor的執行緒池啟動的執行緒任務佇列;

也就是說,這些執行緒都是空閒狀態,在等著任務的到來呢!

併發庫中的BlockingQueue是一個比較好玩的類,顧名思義,就是阻塞佇列。該類主要提供了兩個方法put()take(),前者將一個物件放到佇列尾部,如果佇列已經滿了,就等待直到有空閒節點;後者從head取一個物件,如果沒有物件,就等待直到有可取的物件。

隨著壓測的進行,發現幾介面發起方的Linux機器中出現大量的CLOSE_WAIT連線

程式碼中使用HttpClient, 查了下:

        按apache httpclient的設計理念, 當http client 處於高併發時, 預設機制導致的CLOSE_WAIT會影響服務的可用性.

產生的原因:

伺服器A會去請求伺服器B上面的apache獲取檔案資源,正常情況下,如果請求成功,那麼在抓取完資源後伺服器A會主動發出關閉連線的請求,這個時候就是主動關閉連線,連線狀態我們可以看到是TIME_WAIT。如果一旦發生異常呢?假設請求的資源伺服器B上並不存在,那麼這個時候就會由伺服器B發出關閉連線的請求,伺服器A就是被動的關閉了連線,如果伺服器A被動關閉連線之後自己並沒有釋放連線,那就會造成CLOSE_WAIT的狀態了。

那麼伺服器A為什麼沒有主動釋放連線,而伺服器B作為被呼叫放釋放了連線,在這裡被呼叫方我是使用Netty實現的Http Server.觀察了probe發現一個現象,執行一段時間後所有的Thread都達到了一個相同的狀態:

java.net.SocketInputStream.socketRead0 ( native code )

所有的請求竟然都卡在了邏輯中的傳送Http請求的地方,好像是在等待結果響應,問題是都等了半個小時了還是這個狀態,遲遲不釋放,難道是不會自動超時嗎???於是檢查了下程式碼,發現這個POST請求真的沒有設定socket_timeout超時時間。

/**
	 * 傳送post請求(xml字串)
	 * 
	 * @param url
	 * @param data
	 * @return
	 */
	public static String postXml(String url, String data) {
		String result = null;
		Request request = Request.Post(url);
		request.setHeader("Accept", "application/xml");
		request.setHeader("Content-Type", "text/xml;charset=utf-8");
		request.setHeader("Connection", "close");
		Integer so_timeout = SysConfig.getInstance().getPropertyInt("req_so_timeout",5000);
		Integer connenctTimeOut = SysConfig.getInstance().getPropertyInt("req_connect_timeout",5000);//設定連線超時時間,單位毫秒。
		request.connectTimeout(connenctTimeOut);
		request.socketTimeout(so_timeout);
		request.bodyString(data, ContentType.create("text/xml", Consts.UTF_8));
		try {
			logger.debug(Thread.currentThread().getName()+"【傳送post請求(xml字串)】開始:url=" + url + ", data=" + data);
			result = request.execute().returnContent().asString();
			logger.debug("【傳送post請求(xml字串)】成功:url=" + url + ", data=" + data
					+ ", result=" + result);
		} catch (Throwable e) {
			logger.error("【傳送post請求(xml字串)】失敗:url=" + url + ", data=" + data
					+ ", result=" + result, e);
		}
		return result;
	}

果然,重新壓測後發現連線都在超時後釋放了,執行緒數也恢復正常了,我頂!

但是後來一個新問題出現了,就是為什麼傳送的http請求會超時呢?Netty服務端這個對應介面的邏輯也非常之簡單沒有任何IO操作就返回了!

在請求方機器上抓包:

tcpdump host 115.218.666.146 -s0  -w qianfenghaha.pcap

根據條件過濾wireshark:(引數中包含某個字串)

             http.file_data  contains 1542790732067-5152

其實Netty服務端好像就沒有收到請求,日誌中也查不到,抓包顯示是傳送了,只不過沒有響應罷了,一度懷疑是對方Linux檔案控制代碼連線數滿了,可惜對方也沒怎麼配合,到現在也沒有找到問題的根本原因。

大家有什麼懷疑的點?

 

----後來發現問題真夠奇葩的,原來是服務端出現了執行緒安全問題,這個就不言而明瞭吧!