1. 程式人生 > >使用Netty框架開發websocket即時通訊

使用Netty框架開發websocket即時通訊

package com.company.server;


import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


import org.apache.log4j.Logger;


import com.company.serviceimpl.BananaService;
import com.company.util.CODE;
import com.company.util.Request;
import com.company.util.Response;
import com.google.common.base.Strings;
import com.google.gson.JsonSyntaxException;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;




/**
 * WebSocket服務端Handler
 *
 */
public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName());
// map用於channel和具體的使用者名稱繫結起來,可以根據具體業務實現認證資訊和channel繫結
    static final Map<String ,Channel> channelMap = Collections.synchronizedMap(new HashMap<String ,Channel>());
    // set儲存登陸的使用者資訊
static final List<String> set = new ArrayList<String>();
private WebSocketServerHandshaker handshaker;
private ChannelHandlerContext ctx;
private String sessionId;


@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
// 傳統的HTTP接入
if (msg instanceof FullHttpRequest) { 
handleHttpRequest(ctx, (FullHttpRequest) msg);
// WebSocket接入
} else if (msg instanceof WebSocketFrame) { 
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}


@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("WebSocket異常", cause);
ctx.close();
LOG.info(sessionId + " 登出");
BananaService.logout(sessionId); // 登出
BananaService.notifyDownline(sessionId); // 通知有人下線
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
LOG.info("WebSocket關閉");
super.close(ctx, promise);
LOG.info(sessionId + " 登出");
BananaService.logout(sessionId); // 登出
BananaService.notifyDownline(sessionId); // 通知有人下線
}


/**
* 處理Http請求,完成WebSocket握手<br/>
* 注意:WebSocket連線第一次請求使用的是Http
* @param ctx
* @param request
* @throws Exception
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
// 如果HTTP解碼失敗,返回HHTP異常
if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}


// 正常WebSocket的Http連線請求,構造握手響應返回
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
handshaker = wsFactory.newHandshaker(request);
if (handshaker == null) { // 無法處理的websocket版本
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
} else { // 向客戶端傳送websocket握手,完成握手
handshaker.handshake(ctx.channel(), request);
// 記錄管道處理上下文,便於伺服器推送資料到客戶端
this.ctx = ctx;
}
}


/**
* 處理Socket請求
* @param ctx
* @param frame
* @throws Exception 
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 判斷是否是關閉鏈路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判斷是否是Ping訊息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 當前只支援文字訊息,不支援二進位制訊息
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("當前只支援文字訊息,不支援二進位制訊息");
}

// 處理來自客戶端的WebSocket請求
try {
Request request = Request.create(((TextWebSocketFrame)frame).text());
Response response = new Response();
response.setServiceId(request.getServiceId());
if (CODE.online.code.intValue() == request.getServiceId()) { // 客戶端註冊
String requestId = request.getRequestId();
if (Strings.isNullOrEmpty(requestId)) {
response.setIsSucc(false).setMessage("requestId不能為空");
return;
} else if (Strings.isNullOrEmpty(request.getName())) {
response.setIsSucc(false).setMessage("name不能為空");
return;
} else if (BananaService.bananaWatchMap.containsKey(requestId)) {
response.setIsSucc(false).setMessage("您已經註冊了,不能重複註冊");
return;
}
if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) {
response.setIsSucc(false).setMessage("註冊失敗");
} else {
response.setIsSucc(true).setMessage("註冊成功");




//channelMap.put(request.getRequestId(), ctx.channel());
//set.add(request.getRequestId());

BananaService.bananaWatchMap.forEach((reqId, callBack) -> {
response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 將已經上線的人員返回
//新增使用者列表到集合中
/*UserBean userBean=new UserBean();
userBean.setUserId(reqId);
userBean.setUserName(((BananaService)callBack).getName());
response.getUsers().add(userBean);*/

if (!reqId.equals(requestId)) {
Request serviceRequest = new Request();
serviceRequest.setServiceId(CODE.online.code);
serviceRequest.setRequestId(requestId);
serviceRequest.setName(request.getName());
//serviceRequest.set
try {
callBack.send(serviceRequest); // 通知有人上線
} catch (Exception e) {
LOG.warn("回調發送訊息給客戶端異常", e);
}
}
});
}
System.out.println("response(login)========="+response.toJson());
sendWebSocket(response.toJson());
this.sessionId = requestId; // 記錄會話id,當頁面重新整理或瀏覽器關閉時,登出掉此鏈路
} else if (CODE.send_message.code.intValue() == request.getServiceId()) { // 客戶端傳送訊息到聊天群
String requestId = request.getRequestId();
if (Strings.isNullOrEmpty(requestId)) {
response.setIsSucc(false).setMessage("requestId不能為空");
} else if (Strings.isNullOrEmpty(request.getName())) {
response.setIsSucc(false).setMessage("name不能為空");
} else if (Strings.isNullOrEmpty(request.getMessage())) {
response.setIsSucc(false).setMessage("message不能為空");
} else {
//response.setIsSucc(true).setMessage("傳送訊息成功");
if (request.getType()==1) {
BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 單聊
Request serviceRequest = new Request();
serviceRequest.setServiceId(CODE.receive_message.code);
serviceRequest.setRequestId(requestId);
serviceRequest.setName(request.getName());
serviceRequest.setMessage(request.getMessage());
serviceRequest.setTo(request.getTo());
serviceRequest.setType(1);
try {
callBack.send(serviceRequest);
} catch (Exception e) {
LOG.warn("回調發送訊息給客戶端異常", e);
}
});
sendWebSocket(response.toJson());

}else if(request.getType()==2) {//群聊
BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 將訊息傳送到所有機器
Request serviceRequest = new Request();
serviceRequest.setServiceId(CODE.receive_message.code);
serviceRequest.setRequestId(requestId);
serviceRequest.setName(request.getName());
serviceRequest.setMessage(request.getMessage());

try {
callBack.send(serviceRequest);
} catch (Exception e) {
LOG.warn("回調發送訊息給客戶端異常", e);
}
});


sendWebSocket(response.toJson());
}
}
} else if (CODE.downline.code.intValue() == request.getServiceId()) { // 客戶端下線
String requestId = request.getRequestId();
if (Strings.isNullOrEmpty(requestId)) {
sendWebSocket(response.setIsSucc(false).setMessage("requestId不能為空").toJson());
} else {
BananaService.logout(requestId);
response.setIsSucc(true).setMessage("下線成功");
BananaService.notifyDownline(requestId); // 通知有人下線
sendWebSocket(response.toJson());
}

} else {
sendWebSocket(response.setIsSucc(false).setMessage("未知請求").toJson());
}
} catch (JsonSyntaxException e1) {
LOG.warn("Json解析異常", e1);
} catch (Exception e2) {
LOG.error("處理Socket請求異常", e2);
}
}


/**
* Http返回
* @param ctx
* @param request
* @param response
*/
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
// 返回應答給客戶端
if (response.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
response.content().writeBytes(buf);
buf.release();
HttpHeaders.setContentLength(response, response.content().readableBytes());
}


// 如果是非Keep-Alive,關閉連線
ChannelFuture f = ctx.channel().writeAndFlush(response);
if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}

/**
* WebSocket返回
* @param ctx
* @param req
* @param res
*/
public void sendWebSocket(String msg) throws Exception {
if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) {
throw new Exception("尚未握手成功,無法向客戶端傳送WebSocket訊息");
}
this.ctx.channel().write(new TextWebSocketFrame(msg));
this.ctx.flush();
}
}