Netty簡介
Netty是一個面向網路程式設計的Java基礎框架,它基於非同步的事件驅動,並且內建多種網路協議的支援,可以快速地開發可維護的高效能的面向協議的伺服器和客戶端。
安聊簡介
安聊是一個即時聊天系統,服務端通過點對點與客戶端建立TCP連結,接受來自客戶端的請求,同時,也可以實時地將訊息通知給客戶端。
安聊為什麼選擇Netty
首先是效能和穩定性。在我們內部團隊進行過測試,使用Netty框架,單臺伺服器可以維持10000個客戶端長連結,並且穩定性非常高:我們的伺服器曾經有過連續六個月穩定執行的記錄,並且中斷的原因還是因為服務端版本升級。
其次是應用程式的簡潔性和易維護性。使用Netty進行網路開發,可以利用框架遮蔽那些網路底層的實現細節,讓應用只關注於業務邏輯本身;同時因為pipeline的設計模式,讓應用新增對資料/事件的額外處理變得非常簡單。
安聊使用Netty的一些技術特點:
1、結合Spring,讓埠偵聽服務成為一個Bean,結合Bean的生命週期掛鉤函式完成埠服務的安裝/關閉行為
2、將終端長連線的ChannelHandleContext與對應的使用者ID進行繫結,方便訊息轉發
3、使用自定義的編碼/解碼器對協議包進行處理
4、通過繼承SimpleChannelInboundHandler的類,來處理客戶端請求的協議包
5、因為處理客戶端包的業務過程中,會涉及到資料庫操作,磁碟讀寫操作,若直接在網路IO執行緒中處理,則會顯著降低網路IO的處理能力,所以把每個業務處理都獨立成為一個任務(Task)例項,然後放到執行緒池中去執行;當任務執行完畢,需要通知回網路IO執行緒時,使用userEvent的形式通知回去
一些關鍵程式碼:
網路服務初始化
public class IMClientServerInitializer extends ChannelInitializer<SocketChannel> { private final EventExecutorGroup execGroup;
private final int pduTimeout; public IMClientServerInitializer(EventExecutorGroup execGroup, int pduTimeout) {
this.execGroup = execGroup;
this.pduTimeout = pduTimeout;
} @Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(this.pduTimeout, 0, 0, TimeUnit.SECONDS));
//pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast(new IMPacketEncoder());
pipeline.addLast(new IMPacketDecoder());
pipeline.addLast(new IMClientPacketHandler(execGroup));
}
}
網路服務類
public class IMClientListenService implements InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(IMClientListenService.class); @Value("${imcore.client.service.listen}")
private String clientServiceListenAddress; @Value("${imcore.client.service.port}")
private Integer clientServiceListenPort; @Value("${imcore.client.service.pdu.timeout}")
private Integer clientServicePduTimeout; private EventLoopGroup bossEventLoopGroup;
private EventLoopGroup childEventLoopGroup;
private EventExecutorGroup eventExecutorGroup;
private ServerBootstrap serverBootstrap;
private Channel listenChannel;
private final IMClientChannelManager imClientChannelManager = new IMClientChannelManager(); public IMClientChannelManager getImClientChannelManager() {
return imClientChannelManager;
} @Override
public void afterPropertiesSet() {
eventExecutorGroup = new DefaultEventExecutorGroup(128);
bossEventLoopGroup = new NioEventLoopGroup();
childEventLoopGroup = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossEventLoopGroup, childEventLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childHandler(new IMClientServerInitializer(eventExecutorGroup, clientServicePduTimeout));
} public void destroy() {
logger.info("IMClientListenService destroy called");
} public void run() {
try {
listenChannel = serverBootstrap.bind(clientServiceListenPort).sync().channel();
logger.info("Client Listen Service started at port: " + clientServiceListenPort);
listenChannel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
} public void shutdown() {
try {
//
// 先關閉偵聽埠
//
logger.info("close listen channel...");
ChannelFuture closeFuture = listenChannel.close();
closeFuture.sync();
logger.info("close listen channel...done!"); //
// 再關閉所有客戶端的連線
//
List<ChannelHandlerContext> channels = imClientChannelManager.getAllChannels();
for (ChannelHandlerContext channel : channels) {
channel.close().sync();
} logger.info("Client Listen Service stopped");
}
catch (Exception ex) {
logger.error("Client Listen Service close failed", ex);
}
finally {
//
// 最後關閉所有執行緒池
//
childEventLoopGroup.shutdownGracefully();
bossEventLoopGroup.shutdownGracefully();
eventExecutorGroup.shutdownGracefully();
}
}
}
網路資料包處理Handler類
public class IMClientPacketHandler extends SimpleChannelInboundHandler<IMPacket> { private static final int NEED_LOGIN_FLAG = 1; private static final Logger logger = LoggerFactory.getLogger(IMClientPacketHandler.class); private final EventExecutorGroup eventExecutor;
private final IMClientChannelManager imClientChannelManager;
private final IMClientListenService imClientListenService; private int userId;
private String loginName;
... public IMClientPacketHandler(EventExecutorGroup eventExecutor) {
this.eventExecutor = eventExecutor;
this.imClientListenService = Application.getInstance().getBean(IMClientListenService.class);
this.imClientChannelManager = imClientListenService.getImClientChannelManager();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("ImPacketHandler exception caught, closing...");
if (cause != null) {
logger.error(cause.getMessage(), cause);
} else {
logger.error("exception object is null");
}
ctx.close(); this.imClientChannelManager.removeChannel(ctx);
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
InetSocketAddress socketAddress = (InetSocketAddress)(ctx.channel().remoteAddress());
this.clientIP = socketAddress.getAddress().getHostAddress(); this.imClientChannelManager.addChannel(ctx);
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
this.imClientChannelManager.removeChannel(ctx); if (this.userId != 0 && this.terminal != 0) {
this.imClientChannelManager.removeLoginChannel(ctx, this.userId, this.terminal);
} super.channelInactive(ctx);
this.userId = 0;
this.loginName = null;
...
} @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("userEventTriggered:" + evt);
}
if (evt instanceof IdleStateEvent) {
// 如果是長時間沒有write事件,則嘗試去從佇列裡拿出通知來發送
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
// close the channel
ctx.close();
return;
}
} else if (evt instanceof IMUserLoginEvent) {
logger.info("user login event thread id " + Thread.currentThread().getId());
IMUserLoginEvent loginEvent = (IMUserLoginEvent)evt;
if (loginEvent.isSucceed()) {
this.userId = loginEvent.getUserInfo().getUserId();
this.loginName = loginEvent.getUserInfo().getLoginName();
...
if (logger.isDebugEnabled()) {
logger.debug("set login succeed in channel handler for user '" + this.loginName + "' with session '" + this.sessionKey + "'");
} this.imClientChannelManager.addLoginChannel(ctx, this.userId, this.terminal);
}
} super.userEventTriggered(ctx, evt);
} @Override
protected void channelRead0(ChannelHandlerContext ctx, IMPacket msg) throws Exception {
if (this.loginName == null && this.userId == 0) {
if (msg.getCommandId() == IMBaseDefine.LoginCmdID.CID_USER_LOGIN_REQ_VALUE) {
handleUserLoginRequest(ctx, msg);
}
else {
// 如果未登入的情況下發送其他所有非登入命令,一律返回flag為1的響應頭
IMPacket packet = new IMPacket(NEED_LOGIN_FLAG, null);
ctx.channel().write(packet).addListener(ChannelFutureListener.CLOSE);
}
}
else {
if (msg.getCommandId() == IMBaseDefine.LoginCmdID.CID_USER_LOGIN_REQ_VALUE) {
// 如果已登入的情況下發送登入命令,則返回 ERR_ALREADY_LOGGIN_VALUE 登入錯誤
IMLogin.IMLoginRsp.Builder loginRespBuilder = IMLogin.IMLoginRsp.newBuilder()
.setErrorCode(IMBaseDefine.CommonErrors.ERR_DUPLICATE_LOGIN_VALUE)
.setErrorMsg("duplicate login")); IMPacket packet = new IMPacket(loginRespBuilder.build().toByteArray());
ctx.channel().write(packet).addListener(ChannelFutureListener.CLOSE);
}
else {
switch (msg.getCommandId()) {
case IMBaseDefine.OtherCmdID.CID_OTHER_HEARTBEAT_VALUE:
handleHeartbeatRequest(ctx, msg);
break;
case IMBaseDefine.LoginCmdID.CID_USER_LOGOUT_REQ_VALUE:
handleUserLogoutRequest(ctx, msg);
break;
// 建立群聊
case IMBaseDefine.GroupCmdID.CID_GROUP_CREATE_REQ_VALUE:
handleGroupCreateRequest(ctx, msg);
break;
...
default:
logger.warn("unsupported command: " + msg.getCommandId());
break;
}
}
}
} private void handleUserLoginRequest(ChannelHandlerContext ctx, IMPacket msg) {
if (logger.isDebugEnabled()) {
logger.debug("user login received thread id " + Thread.currentThread().getId());
} TaskContext taskContext = new TaskContext(ctx, clientIP, userId, loginName, clientType, sessionKey, pushToken);
try {
UserLoginTask userLoginTask = new UserLoginTask(taskContext, msg);
eventExecutor.submit(userLoginTask);
} catch (CreateTaskException ex) {
logger.error("create user login task failed", ex);
}
} private void handleHeartbeatRequest(ChannelHandlerContext ctx, IMPacket msg) {
if (logger.isDebugEnabled()) {
logger.debug("heartbeat received");
} TaskContext taskContext = new TaskContext(ctx, clientIP, userId, loginName, clientType, sessionKey, pushToken);
try {
HeartbeatTask heartbeatTask = new HeartbeatTask(taskContext, msg);
eventExecutor.submit(heartbeatTask);
} catch (CreateTaskException ex) {
logger.error("create heartbeat task failed", ex);
}
} private void handleUserLogoutRequest(ChannelHandlerContext ctx, IMPacket msg) {
if (logger.isDebugEnabled()) {
logger.debug("session logout received");
} TaskContext taskContext = new TaskContext(ctx, clientIP, userId, loginName, clientType, sessionKey, pushToken);
try {
UserLogoutTask userLogoutTask = new UserLogoutTask(taskContext, msg);
eventExecutor.submit(userLogoutTask);
} catch (CreateTaskException ex) {
logger.error("create logout task failed", ex);
}
} private void handleGroupCreateRequest(ChannelHandlerContext ctx, IMPacket msg) {
if (logger.isDebugEnabled()) {
logger.debug("group creation received");
} TaskContext taskContext = new TaskContext(ctx, clientIP, userId, loginName, clientType, sessionKey, pushToken);
try {
GroupCreationTask groupCreationTask = new GroupCreationTask(taskContext, msg);
eventExecutor.submit(groupCreationTask);
} catch (CreateTaskException ex) {
logger.error("create group creation task failed", ex);
}
} }
使用者登入Task類
/**
* Task executed in thread pool for user login
*/
public class UserLoginTask extends TaskBase { private static final Logger logger = LoggerFactory.getLogger(UserLoginTask.class); private final IMPacket request;
private final IMLogin.IMLoginReq reqBody;
private final IMUserService userService; private int errorCode;
private String errorMessage; public UserLoginTask(TaskContext taskContext, IMPacket request) throws CreateTaskException {
super(taskContext); this.request = request;
try {
this.reqBody = IMLogin.IMLoginReq.parseFrom(request.getPayload());
} catch (InvalidProtocolBufferException e) {
throw new CreateTaskException("parse pb failed", e);
}
this.userService = Application.getInstance().getBean(IMUserService.class);
} @Override
protected void taskRun() {
this.errorCode = 0;
this.errorMessage = "ok"; IMUserRecord user = userService.findUserByLoginName(reqBody.getUserName());
if (user == null) {
this.errorCode = IMBaseDefine.CommonErrors.ERR_USERNAME_OR_PASSWD_INVALID_VALUE;
this.errorMessage = "bad username or password";
handleErrorResponse();
return;
} if (!user.getPassword().equals(reqBody.getPassword())) {
this.errorCode = IMBaseDefine.CommonErrors.ERR_USERNAME_OR_PASSWD_INVALID_VALUE;
this.errorMessage = "bad username or password"; handleErrorResponse();
return;
} handleSucceedResponse(user);
return;
} private void handleErrorResponse() {
logger.error("user '" + reqBody.getUserName() + "' login failed with code " + this.errorCode + " '" + this.errorMessage + "'"); IMLogin.IMLoginRsp.Builder loginResp = IMLogin.IMLoginRsp.newBuilder()
.setErrorCode(this.errorCode)
.setErrorMsg(this.errorMessage);
IMPacket packetResp = new IMPacket(loginResp.build().toByteArray()); getContext().getChannelContext().writeAndFlush(packetResp);
} private void handleSucceedResponse(IMUserRecord user) {
IMLogin.IMLoginRsp.Builder loginResp = IMLogin.IMLoginRsp.newBuilder()
.setErrorCode(0)
.setErrorMsg("succeed")
.setUserInfo(IMUserProtobufUtils.toProtobuf(user)); IMPacket packetResp = new IMPacket(loginResp.build().toByteArray()); getContext().getChannelContext().writeAndFlush(packetResp); // 觸發 user event 通知 IO 執行緒:我們可以非同步的改變相關 pipeline 的狀態
IMUserLoginEvent event = new IMUserLoginEvent();
event.setSucceed(true);
event.setUserInfo(user);
getContext().getChannelContext().pipeline().fireUserEventTriggered(event);
}
}
踩過的一個坑備註一下:
之前使用ChannelHandlerContext向客戶端寫資料的時候,都是這樣子的:
getContext().getChannelContext().write(packetResp);
寫完之後,發現有概率性的客戶端收不到響應包,原來是寫完資料,還需要flush一下:
getContext().getChannelContext().writeAndFlush(packetResp);
這樣子就沒有問題了。
-------------------------------------------------
本人在企業做過五年的即時聊天系統開發,關注這一塊開發的同學,可以一起探討。
另外,本人獨自開發了一套安聊系統,感興趣的同學可以去下載demo試用一下:安聊系統1.0釋出