Apache Mina 入門 (二)—— 非同步通訊機制
阿新 • • 發佈:2019-01-27
通過前面的Apache mina 入門(一)— 基礎知識
我們可以瞭解到 mina是個非同步通訊框架,一般使用場景是服務端開發,長連線、非同步通訊使用mina是及其方便的。不多說,看例子。
本次mina 使用的例子是使用maven構建的,過程中需要用到的jar包如下:
<!-- mina -->
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-integration-beans</artifactId>
<version >2.0.16</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.16</version>
</dependency>
匯入jar包,pom檔案會報錯,具體錯誤如下:
Missing artifact org.apache.mina:mina-core:bundle:2 .0.16 pom.xml
原因是因為缺少maven-bundle-plugin匯入即可
<build>
<plugins>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<extensions>true</extensions>
</plugin >
</plugins>
</build>
本次mina機制的訊息通訊介面規範主要為:
包頭2個位元組,包長2個位元組,協議型別2個位元組,資料包標識碼8個位元組,報文正文內容,校驗碼4個位元組,包尾2個位元組
mina 與spring 結合後,使用更加方便。
spring配置檔案如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
default-lazy-init="false">
<bean class="org.springframework.beans.factory.config.CustomEditorConfigurer">
<property name="customEditors">
<map>
<entry key="java.net.SocketAddress"
value="org.apache.mina.integration.beans.InetSocketAddressEditor"></entry>
</map>
</property>
</bean>
<bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor"
init-method="bind" destroy-method="unbind">
<!--埠號 -->
<property name="defaultLocalAddress" value=":8888"></property>
<!--繫結自己實現的handler -->
<property name="handler" ref="serverHandler"></property>
<!--宣告過濾器的集合 -->
<property name="filterChainBuilder" ref="filterChainBuilder"></property>
<property name="reuseAddress" value="true" />
</bean>
<bean id="filterChainBuilder"
class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">
<property name="filters">
<map>
<!--mina自帶的執行緒池filter -->
<entry key="executor" value-ref="executorFilter"></entry>
<entry key="mdcInjectionFilter" value-ref="mdcInjectionFilter" />
<!--自己實現的編解碼器filter -->
<entry key="codecFilter" value-ref="codecFilter" />
<!--日誌的filter -->
<entry key="loggingFilter" value-ref="loggingFilter" />
<!--心跳filter -->
<entry key="keepAliveFilter" value-ref="keepAliveFilter" />
</map>
</property>
</bean>
<!-- executorFilter多執行緒處理 -->
<bean id="executorFilter" class="org.apache.mina.filter.executor.ExecutorFilter" />
<bean id="mdcInjectionFilter" class="org.apache.mina.filter.logging.MdcInjectionFilter">
<constructor-arg value="remoteAddress" />
</bean>
<!--日誌 -->
<bean id="loggingFilter" class="org.apache.mina.filter.logging.LoggingFilter" />
<!--編解碼 -->
<bean id="codecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter">
<constructor-arg>
<!--建構函式的引數傳入自己實現的物件 -->
<bean class="com.onion.mina.server.NSMinaCodeFactory"></bean>
</constructor-arg>
</bean>
<!--心跳檢測filter -->
<bean id="keepAliveFilter" class="org.apache.mina.filter.keepalive.KeepAliveFilter">
<!--建構函式的第一個引數傳入自己實現的工廠 -->
<constructor-arg>
<bean class="com.onion.mina.server.NSMinaKeepAliveMessageFactory"></bean>
</constructor-arg>
<!--第二個引數需要的是IdleStatus物件,value值設定為讀寫空閒 -->
<constructor-arg type="org.apache.mina.core.session.IdleStatus"
value="BOTH_IDLE">
</constructor-arg>
<!--心跳頻率,不設定則預設5 -->
<property name="requestInterval" value="1500" />
<!--心跳超時時間,不設定則預設30s -->
<property name="requestTimeout" value="30" />
<!--預設false,比如在心跳頻率為5s時,實際上每5s會觸發一次KeepAliveFilter中的session_idle事件,
該事件中開始傳送心跳包。當此引數設定為false時,對於session_idle事件不再傳遞給其他filter,如果設定為true,
則會傳遞給其他filter,例如handler中的session_idle事件,此時也會被觸發-->
<property name="forwardEvent" value="true" />
</bean>
<!--自己實現的handler-->
<bean id="serverHandler" class="com.onion.mina.server.NSMinaHandler" />
</beans>
mina 核心包括 IOhandler處理器,編碼工廠(包含編碼器,解碼器)等核心。
服務端程式碼如下:
編碼工廠 (NSMinaCodeFactory)如下所示:
public class NSMinaCodeFactory implements ProtocolCodecFactory {
private final NSProtocalEncoder encoder;
private final NSProtocalDecoder decoder;
public NSMinaCodeFactory() {
this(Charset.forName("utf-8"));
}
public NSMinaCodeFactory(Charset charset) {
encoder = new NSProtocalEncoder();
decoder = new NSProtocalDecoder();
}
public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return decoder;
}
public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
// TODO Auto-generated method stub
return encoder;
}
}
編碼器——負責將需要傳送給客戶端的資料進行編碼,然後傳送給客戶端
public class NSProtocalEncoder extends ProtocolEncoderAdapter {
private static final Logger logger = Logger.getLogger(NSProtocalEncoder.class);
@SuppressWarnings("unused")
private final Charset charset = Charset.forName("GBK");
/**
* 在此處實現包的編碼工作,並把它寫入輸出流中
*/
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
// TODO Auto-generated method stub
if(message instanceof BaseMessageForClient){
BaseMessageForClient clientmessage = (BaseMessageForClient)message;
byte[] packhead_arr = clientmessage.getPackHead().getBytes(charset);//包頭2個位元組
byte[] length_arr = ByteTools.intToByteArray(clientmessage.getLength()+19, 2);//包長
byte[] funcid_arr = ByteTools.intToByteArray(clientmessage.getFuncid(), 1);//協議型別
byte[] packetIdCode_arr = ByteTools.longToByteArray(clientmessage.getPacketIdCode(), 8);//資料包標識碼
byte[] content_arr = clientmessage.getContent().getBytes(charset);//內容
byte[] checkcode_arr = ByteTools.longToByteArray(clientmessage.getCheckCode(), 4);//校驗碼
byte[] packtail_arr = clientmessage.getPackTail().getBytes();//包尾
IoBuffer buffer = IoBuffer.allocate(packhead_arr.length + length_arr.length + funcid_arr.length + packetIdCode_arr.length+ content_arr.length + checkcode_arr.length + packtail_arr.length);
buffer.setAutoExpand(true);
buffer.put(packhead_arr);
buffer.put(length_arr);
buffer.put(funcid_arr);
buffer.put(packetIdCode_arr);
buffer.put(content_arr);
buffer.put(checkcode_arr);
buffer.put(packtail_arr);
buffer.flip();
out.write(buffer);
out.flush();
buffer.free();
}else{
String value = (String)message;
logger.warn("encode message:" + message);
IoBuffer buffer = IoBuffer.allocate(value.getBytes().length);
buffer.setAutoExpand(true);
if(value != null){
buffer.put(value.trim().getBytes());
}
buffer.flip();
out.write(buffer);
out.flush();
buffer.free();
}
}
}
解碼器——負責將客戶端傳送過來的資料進行解碼變換為物件,傳輸給IoHandler處理器進行處理。本解碼器包含了斷包問題解決。
public class NSProtocalDecoder implements ProtocolDecoder {
private static final Logger logger = Logger.getLogger(NSProtocalDecoder.class);
private final AttributeKey context = new AttributeKey(getClass(), "context");
private final Charset charset = Charset.forName("GBK");
private final String PACK_HEAD = "$$"; //包頭
private final String PACK_TAIL = "\r\n"; //包尾
// 請求報文的最大長度 100k
private int maxPackLength = 102400;
public int getMaxPackLength() {
return maxPackLength;
}
public void setMaxPackLength(int maxPackLength) {
if (maxPackLength <= 0) {
throw new IllegalArgumentException("請求報文最大長度:" + maxPackLength);
}
this.maxPackLength = maxPackLength;
}
private Context getContext(IoSession session) {
Context ctx;
ctx = (Context) session.getAttribute(context);
if (ctx == null) {
ctx = new Context();
session.setAttribute(context, ctx);
}
return ctx;
}
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
Long start = System.currentTimeMillis();
// 報文字首長度 包頭2個位元組,包長2個位元組,協議型別2個位元組,資料包標識碼8個位元組,校驗碼4個位元組,包尾2個位元組
final int packHeadLength = 19;
// 先獲取上次的處理上下文,其中可能有未處理完的資料
Context ctx = getContext(session);
// 先把當前buffer中的資料追加到Context的buffer當中
ctx.append(in);
// 把position指向0位置,把limit指向原來的position位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
// 然後按資料包的協議進行讀取
while (buf.remaining() >= packHeadLength) {
logger.debug("test 長度1:" + buf.remaining());
buf.mark();
// 讀取包頭 2個位元組
String packhead = new String(new byte[]{buf.get(),buf.get()});
logger.debug("包頭:" + packhead);
if(PACK_HEAD.equals(packhead)){
//讀取包的長度 2個位元組 報文的長度,不包含包頭和包尾
byte[] length_byte = new byte[]{buf.get(),buf.get()};
byte[] length_byte_arr = new byte[]{0,0,0,0};
length_byte_arr[2] = length_byte[0];
length_byte_arr[3] = length_byte[1];
int length = ByteTools.byteArrayToInt(length_byte_arr);
logger.debug("長度:" + length);
logger.debug("test 長度1:" + buf.remaining());
// 檢查讀取是否正常,不正常的話清空buffer
if (length < 0 || length > maxPackLength) {
logger.debug("報文長度[" + length + "] 超過最大長度:" + maxPackLength
+ "或者小於0,清空buffer");
buf.clear();
break;
//packHeadLength - 2 :減去包尾的長度,
//length - 2 <= buf.remaining() :代表length-本身長度佔用的兩個位元組-包頭長度
}else if(length >= packHeadLength && length - 4 <= buf.remaining()){
//讀取協議型別2個位元組
byte[] funcid_byte = new byte[]{buf.get()};
byte[] funcid_byte_arr = new byte[]{0,0,0,0};
//funcid_byte_arr[2] = funcid_byte[0];
funcid_byte_arr[3] = funcid_byte[0];
int funcid = ByteTools.byteArrayToInt(funcid_byte_arr);
logger.warn("協議型別:" + funcid);
//讀取資料包標識碼8個位元組
byte[] packetIdCode_byte = new byte[]{buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get(),buf.get()};
long packetIdCode = ByteTools.byteArrayToLong(packetIdCode_byte);
logger.debug("資料包標識碼:" + packetIdCode);
//讀取報文正文內容
int oldLimit = buf.limit();
logger.debug("limit:" + (buf.position() + length));
//當前讀取的位置 + 總長度 - 前面讀取的位元組長度 - 校驗碼
buf.limit(buf.position() + length - 19);
String content = buf.getString(ctx.getDecoder());
buf.limit(oldLimit);
logger.debug("報文正文內容:" + content);
CRC32 crc = new CRC32();
crc.update(content.getBytes("GBK"));
//讀取校驗碼 4個位元組
byte[] checkcode_byte = new byte[]{buf.get(),buf.get(),buf.get(),buf.get()};
byte[] checkcode_byte_arr = new byte[]{0,0,0,0,0,0,0,0};
checkcode_byte_arr[4] = checkcode_byte[0];
checkcode_byte_arr[5] = checkcode_byte[1];
checkcode_byte_arr[6] = checkcode_byte[2];
checkcode_byte_arr[7] = checkcode_byte[3];
long checkcode = ByteTools.byteArrayToLong(checkcode_byte_arr);
logger.debug("校驗碼:" + checkcode);
//驗證校驗碼
if(checkcode != crc.getValue()){
// 如果訊息包不完整,將指標重新移動訊息頭的起始位置
buf.reset();
break;
}
//讀取包尾 2個位元組
String packtail = new String(new byte[]{buf.get(),buf.get()});
logger.debug("包尾:" + packtail);
if(!PACK_TAIL.equals(packtail)){
// 如果訊息包不完整,將指標重新移動訊息頭的起始位置
buf.reset();
break;
}
BaseMessageForServer message = new BaseMessageForServer();
message.setLength(length);
message.setCheckCode(checkcode);
message.setFuncid(funcid);
message.setPacketIdCode(packetIdCode);
message.setContent(content);
out.write(message);
}else{
// 如果訊息包不完整,將指標重新移動訊息頭的起始位置
buf.reset();
break;
}
}else{
// 如果訊息包不完整,將指標重新移動訊息頭的起始位置
buf.reset();
break;
}
}
if (buf.hasRemaining()) {
// 將資料移到buffer的最前面
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
} else {// 如果資料已經處理完畢,進行清空
buf.clear();
}
}
public void finishDecode(IoSession session, ProtocolDecoderOutput out)
throws Exception {
// TODO Auto-generated method stub
}
public void dispose(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
// 記錄上下文,因為資料觸發沒有規模,很可能只收到資料包的一半
// 所以,需要上下文拼起來才能完整的處理
private class Context {
private final CharsetDecoder decoder;
private IoBuffer buf;
private int matchCount = 0;
private int overflowPosition = 0;
private Context() {
decoder = charset.newDecoder();
buf = IoBuffer.allocate(3000).setAutoExpand(true);
}
public CharsetDecoder getDecoder() {
return decoder;
}
public IoBuffer getBuffer() {
return buf;
}
@SuppressWarnings("unused")
public int getOverflowPosition() {
return overflowPosition;
}
@SuppressWarnings("unused")
public int getMatchCount() {
return matchCount;
}
@SuppressWarnings("unused")
public void setMatchCount(int matchCount) {
this.matchCount = matchCount;
}
@SuppressWarnings("unused")
public void reset() {
overflowPosition = 0;
matchCount = 0;
decoder.reset();
}
public void append(IoBuffer in) {
getBuffer().put(in);
}
}
}
NSMinaHandler——處理器,處理業務資料。繼承IoHandlerAdapter 介面,主要重寫messageReceived 方法
public class NSMinaHandler extends IoHandlerAdapter {
private final Logger logger = Logger.getLogger(NSMinaHandler.class);
public static ConcurrentHashMap<Long, IoSession> sessionHashMap = new ConcurrentHashMap<Long, IoSession>();
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
session.closeOnFlush();
logger.error("session occured exception, so close it."
+ cause.getMessage());
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
BaseMessageForServer basemessage = (BaseMessageForServer) message;
logger.debug("客戶端"
+ ((InetSocketAddress) session.getRemoteAddress()).getAddress()
.getHostAddress() + "連線成功!");
session.setAttribute("type", message);
String remoteAddress = ((InetSocketAddress) session.getRemoteAddress())
.getAddress().getHostAddress();
session.setAttribute("ip", remoteAddress);
// 組裝訊息內容,返回給客戶端
BaseMessageForClient messageForClient = new BaseMessageForClient();
messageForClient.setFuncid(2);
if (basemessage.getContent().indexOf("hello") > 0) {
// 內容
messageForClient.setContent("hello,我收到您的訊息了! ");
} else {
// 內容
messageForClient.setContent("恭喜,您已經入門! ");
}
// 校驗碼生成
CRC32 crc32 = new CRC32();
crc32.update(messageForClient.getContent().getBytes());
// crc校驗碼
messageForClient.setCheckCode(crc32.getValue());
// 長度
messageForClient
.setLength(messageForClient.getContent().getBytes().length);
// 資料包標識碼
messageForClient.setPacketIdCode(basemessage.getPacketIdCode());
session.write(messageForClient);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
logger.debug("messageSent:" + message);
}
@Override
public void sessionCreated(IoSession session) throws Exception {
logger.debug("remote client [" + session.getRemoteAddress().toString()
+ "] connected.");
Long time = System.currentTimeMillis();
session.setAttribute("id", time);
sessionHashMap.put(time, session);
}
@Override
public void sessionClosed(IoSession session) throws Exception {
logger.debug("sessionClosed");
session.closeOnFlush();
sessionHashMap.remove(session.getAttribute("id"));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status)
throws Exception {
logger.debug("session idle, so disconnecting......");
session.closeOnFlush();
logger.warn("disconnected");
}
@Override
public void sessionOpened(IoSession session) throws Exception {
logger.debug("sessionOpened.");
}
}
還有個就是心跳工廠:
public class NSMinaKeepAliveMessageFactory implements KeepAliveMessageFactory {
private final Logger logger = Logger
.getLogger(NSMinaKeepAliveMessageFactory.class);
private BaseMessageForServer basemessage;
/** 心跳包內容 */
private static long packetIdCode = 0;
/**
* 判斷是否心跳請求包 是的話返回true
*/
public boolean isRequest(IoSession session, Object message) {
// TODO Auto-generated method stub
if (message instanceof BaseMessageForServer) {
basemessage = (BaseMessageForServer) message;
// 心跳包方法協議型別
if (basemessage.getFuncid() == 3) {
// 為3,代表是一個心跳包,
packetIdCode = basemessage.getPacketIdCode();
return true;
} else {
return false;
}
} else {
return false;
}
}
/**
* 由於被動型心跳機制,沒有請求當然也就不關注反饋 因此直接返回false
*/
public boolean isResponse(IoSession session, Object message) {
// TODO Auto-generated method stub
return false;
}
/**
* 被動型心跳機制無請求 因此直接返回nul
*/
public Object getRequest(IoSession session) {
// TODO Auto-generated method stub
return null;
}
/**
* 根據心跳請求request 反回一個心跳反饋訊息
*/
public Object getResponse(IoSession session, Object request) {
// 組裝訊息內容,返回給客戶端
BaseMessageForClient messageForClient = new BaseMessageForClient();
messageForClient.setFuncid(4);
// 內容
messageForClient.setContent("2222");
// 校驗碼生成
CRC32 crc32 = new CRC32();
crc32.update(messageForClient.getContent().getBytes());
// crc校驗碼
messageForClient.setCheckCode(crc32.getValue());
// 長度
messageForClient
.setLength(messageForClient.getContent().getBytes().length);
// 資料包標識碼
messageForClient.setPacketIdCode(packetIdCode);
return messageForClient;
}
}
到此服務端程式碼結束。其實服務端與客戶端都存在相似之處。編碼,解碼器都是一樣的。客戶端啟動程式如下:
public class ClientTest {
public static void main(String[] args) {
NioSocketConnector connector = new NioSocketConnector();
//新增過濾器
connector.getFilterChain().addLast("logger", new LoggingFilter());
//設定編碼,解碼過濾器
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory()));
//connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));//設定編碼過濾器
connector.setHandler(new ClientHandler());//設定事件處理器
ConnectFuture cf = connector.connect(new InetSocketAddress("127.0.0.1",8888)); //建立連線
cf.awaitUninterruptibly(); //等待連線建立完成
BaseMessageForServer message = new BaseMessageForServer();
String content = "hello world!";
CRC32 crc = new CRC32();
try {
crc.update(content.getBytes("GBK"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
message.setFuncid(5);
message.setPacketIdCode(10000);
message.setContent(content);
message.setCheckCode(crc.getValue());
message.setLength(content.getBytes().length);
cf.getSession().write(message);
}
}