1. 程式人生 > >mina通訊,對於高併發的產生:java.io.IOException: Too many open files(開啟檔案控制代碼過多問題)

mina通訊,對於高併發的產生:java.io.IOException: Too many open files(開啟檔案控制代碼過多問題)

起因:由於業務系統有多個定時任務定時訪問銀行端,銀行每天也有大量業務訪問業務系統,都是通過mina通訊,部署在測試環境的系統每過一兩天開啟控制代碼過萬,生產的也是一週左右不重啟業務系統就會爆掉。一開始並不清楚到底是哪方面原因導致控制代碼增長這麼快,因為這是一個老系統,經過多次升級,大量的併發、多執行緒,所以只好做了一個定時任務,每週重啟生產業務系統。

說明:業務系統和銀行之間的通訊是通過c寫的轉換平臺轉發雙方的資訊的,結構:業務系統(請求)——>轉換平臺(轉發)——>銀行端(相應)——>轉換平臺(轉發)——>業務系統收到響應,銀行端訪問業務系統也是這樣的方式。

開始通過命令查程序佔用的控制代碼數,從大到小排序,一行一個程序ID   
lsof -n|awk '{print $2}'|sort|uniq -c|sort -nr|more  其中第一列是開啟的控制代碼數,第二列是程序ID。

然後通過命令檢視單個程序所有開啟的檔案詳情
lsof -p 程序id

但是這樣檢視感覺太亂了,沒辦法檢視,於是通過命令:將執行結果內容輸出到日誌檔案中檢視
lsof -p 程序id > openfiles.log

發現是因為很多socket連線沒有釋放,這就能定位出大概是業務系統和銀行通訊的問題,分析原因:測試環境有多家銀行,有些銀行端測試環境沒有測試時並不會開啟,而業務系統直連的是轉換平臺,所以業務系統作為客戶端,訪問轉換平臺是通的,而轉換平臺轉發不出去,無響應,雖說轉換平臺設定了超時時間,但是業務端作為客戶端訪問時並沒有設定讀取超時時間,所以會導致客戶端等待因而導致控制代碼快速增長

下面貼出業務系統作為cilen端和service端的程式碼,並標誌出做出修改的部分。

client程式碼:

package com.fortunes.hmfms.network.client;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.apache.mina.core.RuntimeIoException;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.ReadFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketConnector;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fortunes.Message;
import com.fortunes.hmfms.network.codec.MessageCodecFactory;
import com.fortunes.hmfms.network.model.XmlEntity;

public class Client extends IoHandlerAdapter{
	
	final Logger logger = LoggerFactory.getLogger("ROOT");
	
	public static final int CONNECT_TIMEOUT = 3000;
	public static final String RETURN_VALUE = "returnValue";
	
	private InetSocketAddress serverAddress;
	private SocketConnector connector;
	private IoSession session;
	private MessageReceivedCallback messageReceivedCallback;
	
	public Client() {
		connector = new NioSocketConnector();
		connector.getFilterChain().addLast("logger", new LoggingFilter());
		connector.getFilterChain().addLast("codec",
				new ProtocolCodecFilter(new MessageCodecFactory()));
		//connector.setHandler(new Client());
		connector.setHandler(this);
		
		//設定超時   add by czp 20181207
        connector.setConnectTimeoutMillis(CONNECT_TIMEOUT*2);  
	}
	
	public boolean isConnected() {
		return (session != null && session.isConnected());
	}
	
	public void connect(InetSocketAddress serverAddress){
		setServerAddress(serverAddress);
		connect();
	}
	
	public void reConnectIfNecessary(){
		if(!isConnected()){
			logger.info("連線被斷開,重新連線");
			connect();
		}
	}

	public void connect() {
		ConnectFuture connectFuture = getConnector().connect(getServerAddress());
		connectFuture.awaitUninterruptibly(CONNECT_TIMEOUT);
		
		//add by czp 20181207
		if (connectFuture.isDone()) {
	            if  (!connectFuture.isConnected()) {  //若在指定時間內沒連線成功,則丟擲異常
	            	logger.info("連線失敗");
	            	getConnector().dispose();    //不關閉的話會執行一段時間後丟擲,too many open files異常,導致無法連線
	            }
         }

		if(connectFuture.isConnected()){
			try {
				session = connectFuture.getSession();
				session.getConfig().setUseReadOperation(true);
				logger.info("成功連線至{},本地地址:{}",session.getRemoteAddress(),session.getLocalAddress());
			} catch (RuntimeIoException e) {
				logger.info("連線失敗",e);
			}
		}else{
			connectFuture.cancel();
			getConnector().dispose();
			logger.info("連線失敗");
		}
	}

	public XmlEntity sendRequest(Message message,MessageReceivedCallback callback){
		/*setMessageReceivedCallback(callback);
		WriteFuture writeFuture = session.write(message);
		writeFuture.awaitUninterruptibly();
		
		ReadFuture readFuture = session.read();
		readFuture.awaitUninterruptibly();
		return callback.process(this, session, (Message)readFuture.getMessage());
		*/
		//change by czp 20181207 解決銀行端無響應出現控制代碼快速上漲
		Message resp=null;
		try {
			setMessageReceivedCallback(callback);
			WriteFuture writeFuture = session.write(message);
			writeFuture.awaitUninterruptibly();
			
			ReadFuture readFuture = session.read();
			
			if(readFuture.awaitUninterruptibly(CONNECT_TIMEOUT*2, TimeUnit.MILLISECONDS)){ //Wait until the message is received
				resp=(Message)readFuture.getMessage();
				//return callback.process(this, session, (Message)readFuture.getMessage());
			}else{
				logger.info("讀取服務端響應超時,服務端:"+readFuture.getSession().getServiceAddress());
				if(session != null){
					//關閉IoSession,該操作是非同步的,true為立即關閉,false為所有寫操作都flush後關閉
					//這裡僅僅是關閉了TCP的連線通道,並未關閉Client端程式
					session.getService().dispose();
					session.close(false);
					//客戶端發起連線時,會請求系統分配相關的檔案控制代碼,而在連線失敗時記得釋放資源,否則會造成檔案控制代碼洩露
					//當總的檔案控制代碼數超過系統設定值時[ulimit -n],則拋異常"java.io.IOException: Too many open files",導致新連線無法建立,伺服器掛掉
					//所以,若不關閉的話,其執行一段時間後可能丟擲too many open files異常,導致無法連線
					connector.dispose();
					logger.info("讀取服務端響應超時,客戶端自動釋放資源。。。。。。。。。。。");
				}
			}
		} catch (Exception e) {
			logger.info("Client.sendRequest出現異常:"+e.getStackTrace());
		}
		return callback.process(this, session, resp);
	}
	
	public void close(){
		if(isConnected()){
			//關閉IoSession,該操作是非同步的,true為立即關閉,false為所有寫操作都flush後關閉
			//這裡僅僅是關閉了TCP的連線通道,並未關閉Client端程式
			session.getService().dispose();//add by czp 20181207
			session.close(false);
	
			connector.dispose();
			logger.info("客戶端關閉了連線\n");
		}
	}
	
	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		logger.info("收到來自"+session.getRemoteAddress()+"的訊息:\n{} - 本地埠:{}",message,session.getLocalAddress());
	}
	
	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		logger.info("傳送至"+session.getRemoteAddress()+"的訊息:\n{}",message);
		logger.info("訊息已傳送!");
	}
	
	@Override
	public void sessionClosed(IoSession session) throws Exception {
		session.getService().dispose();//add by czp 20181207
		session.close(false);//add by czp 20181207
		logger.info("連線至{}的連線被關閉!- 本地埠:{}",session.getRemoteAddress(),session.getLocalAddress());
	}
	
	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		session.close(true);
		logger.info("通訊出現異常{}",cause);
	}

	public void setMessageReceivedCallback(MessageReceivedCallback messageReceivedCallback) {
		this.messageReceivedCallback = messageReceivedCallback;
	}

	public MessageReceivedCallback getMessageReceivedCallback() {
		return messageReceivedCallback;
	}


	public void setConnector(SocketConnector connector) {
		this.connector = connector;
	}

	public SocketConnector getConnector() {
		return connector;
	}

	public void setServerAddress(InetSocketAddress serverAddress) {
		this.serverAddress = serverAddress;
	}

	public InetSocketAddress getServerAddress() {
		return serverAddress;
	}
}

尤其是在方法sessionClosed中新增的session.getService().dispose();和session.close(false);在session關閉前對控制代碼的釋放。這很重要,如果沒有釋放,即使session關閉,被它開啟的檔案控制代碼會一直持有的。

下面是業務系統作為service服務端的程式碼MessageHandler:(服務端程式碼無修改)

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fortunes.Message;
import com.fortunes.hmfms.network.model.XmlEntity;


public class MessageHandler extends IoHandlerAdapter {
	
	final Logger logger = LoggerFactory.getLogger("ROOT");
@Override
	public void messageReceived(IoSession session, Object o){
logger.info("mina socket 通訊,收到來自遠端客戶端:{}的請求,銀行程式碼:{}",session.getRemoteAddress(),bankCode);
		
		Message requestMessage = (Message)o;
		
		XmlEntity responseXml = null;
		XmlEntity requestXml = XmlEntity.parse(requestMessage.getContents());
		
		if(requestXml == null){
			responseXml = XmlEntity.create().createDefaultRequest(XML_ERROR);
			responseXml.setResponseCodeAndMsg("000001", "XML報文格式解釋出錯!請檢查輸入的報文格式");
			session.write(Message.createDefaultMessage(responseXml.buildAsBytes()));
		}else{
             try {
                   //業務處理程式碼
                 } catch (NumberFormatException e) {
				logger.info("報文介面程式執行異常", e);
				responseXml = XmlEntity.create().createDefaultResponse(requestXml);
				responseXml.setResponseCodeAndMsg("000001", "系統異常!請檢查輸入資料,"+e.getMessage());
			} catch (Exception e) {
				logger.info("報文介面程式執行異常", e);
				responseXml = XmlEntity.create().createDefaultResponse(requestXml);
				responseXml.setResponseCodeAndMsg("000001", "系統異常!請檢查輸入資料,稍後再試");
			}
        }
      session.write(Message.createDefaultMessage(responseXml.buildAsBytes()));
   }

@Override
	public void exceptionCaught(IoSession session, Throwable cause)throws Exception {
		session.close(true);
		logger.info("通訊程式執行異常", cause);
	}
	
	@Override
	public void sessionClosed(IoSession session) throws Exception {
		session.close(false);
	//	logger.info("連線已關閉!", session.getRemoteAddress());
		logger.info("連線已關閉!");
	}

}

通過上述修改,部署在測試環境測試後,發現再無控制代碼快速增長的情況,控制代碼數穩定在初始部署的條數。