Netty實現服務端客戶端長連線通訊及心跳檢測
阿新 • • 發佈:2019-02-04
通過netty實現服務端與客戶端的長連線通訊,及心跳檢測。
基本思路:netty服務端通過一個Map儲存所有連線上來的客戶端SocketChannel,客戶端的Id作為Map的key。每次伺服器端如果要向某個客戶端傳送訊息,只需根據ClientId取出對應的SocketChannel,往裡面寫入message即可。心跳檢測通過IdleEvent事件,定時向服務端放送Ping訊息,檢測SocketChannel是否終斷。
環境JDK1.8 和netty5
以下是具體的程式碼實現和介紹:
1公共的Share部分(主要包含訊息協議型別的定義)
設計訊息型別:
public enum MsgType {
PING,ASK,REPLY,LOGIN
}
Message基類:
//必須實現序列,serialVersionUID 一定要有,否者在netty訊息序列化反序列化會有問題,接收不到訊息!!!
public abstract class BaseMsg implements Serializable {
private static final long serialVersionUID = 1L;
private MsgType type;
//必須唯一,否者會出現channel呼叫混亂
private String clientId;
//初始化客戶端id
public BaseMsg() {
this.clientId = Constants.getClientId();
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public MsgType getType() {
return type;
}
public void setType(MsgType type) {
this.type = type;
}
}
常量設定:
public class Constants {
private static String clientId;
public static String getClientId() {
return clientId;
}
public static void setClientId(String clientId) {
Constants.clientId = clientId;
}
}
登入型別訊息:
public class LoginMsg extends BaseMsg {
private String userName;
private String password;
public LoginMsg() {
super();
setType(MsgType.LOGIN);
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
心跳檢測Ping型別訊息:
public class PingMsg extends BaseMsg {
public PingMsg() {
super();
setType(MsgType.PING);
}
}
請求型別訊息:
public class AskMsg extends BaseMsg {
public AskMsg() {
super();
setType(MsgType.ASK);
}
private AskParams params;
public AskParams getParams() {
return params;
}
public void setParams(AskParams params) {
this.params = params;
}
}
//請求型別引數
//必須實現序列化介面
public class AskParams implements Serializable {
private static final long serialVersionUID = 1L;
private String auth;
public String getAuth() {
return auth;
}
public void setAuth(String auth) {
this.auth = auth;
}
}
響應型別訊息:
public class ReplyMsg extends BaseMsg {
public ReplyMsg() {
super();
setType(MsgType.REPLY);
}
private ReplyBody body;
public ReplyBody getBody() {
return body;
}
public void setBody(ReplyBody body) {
this.body = body;
}
}
//相應型別body對像
public class ReplyBody implements Serializable {
private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
private String clientInfo;
public ReplyClientBody(String clientInfo) {
this.clientInfo = clientInfo;
}
public String getClientInfo() {
return clientInfo;
}
public void setClientInfo(String clientInfo) {
this.clientInfo = clientInfo;
}
}
public class ReplyServerBody extends ReplyBody {
private String serverInfo;
public ReplyServerBody(String serverInfo) {
this.serverInfo = serverInfo;
}
public String getServerInfo() {
return serverInfo;
}
public void setServerInfo(String serverInfo) {
this.serverInfo = serverInfo;
}
}
2 Server端:主要包含對SocketChannel引用的Map,ChannelHandler的實現和Bootstrap.
Map:
public class NettyChannelMap {
private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
public static void add(String clientId,SocketChannel socketChannel){
map.put(clientId,socketChannel);
}
public static Channel get(String clientId){
return map.get(clientId);
}
public static void remove(SocketChannel socketChannel){
for (Map.Entry entry:map.entrySet()){
if (entry.getValue()==socketChannel){
map.remove(entry.getKey());
}
}
}
}
Handler
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//channel失效,從Map中移除
NettyChannelMap.remove((SocketChannel)ctx.channel());
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
if(MsgType.LOGIN.equals(baseMsg.getType())){
LoginMsg loginMsg=(LoginMsg)baseMsg;
if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
//登入成功,把channel存到服務端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
System.out.println("client"+loginMsg.getClientId()+" 登入成功");
}
}else{
if(NettyChannelMap.get(baseMsg.getClientId())==null){
//說明未登入,或者連線斷了,伺服器向客戶端發起登入請求,讓客戶端重新登入
LoginMsg loginMsg=new LoginMsg();
channelHandlerContext.channel().writeAndFlush(loginMsg);
}
}
switch (baseMsg.getType()){
case PING:{
PingMsg pingMsg=(PingMsg)baseMsg;
PingMsg replyPing=new PingMsg();
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
}break;
case ASK:{
//收到客戶端的請求
AskMsg askMsg=(AskMsg)baseMsg;
if("authToken".equals(askMsg.getParams().getAuth())){
ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
ReplyMsg replyMsg=new ReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
}break;
case REPLY:{
//收到客戶端回覆
ReplyMsg replyMsg=(ReplyMsg)baseMsg;
ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
System.out.println("receive client msg: "+clientBody.getClientInfo());
}break;
default:break;
}
ReferenceCountUtil.release(baseMsg);
}
}
ServerBootstrap:
public class NettyServerBootstrap {
private int port;
private SocketChannel socketChannel;
public NettyServerBootstrap(int port) throws InterruptedException {
this.port = port;
bind();
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.
基本思路:netty服務端通過一個Map儲存所有連線上來的客戶端SocketChannel,客戶端的Id作為Map的key。每次伺服器端如果要向某個客戶端傳送訊息,只需根據ClientId取出對應的SocketChannel,往裡面寫入message即可。心跳檢測通過IdleEvent事件,定時向服務端放送Ping訊息,檢測SocketChannel是否終斷。
環境JDK1.8 和netty5
以下是具體的程式碼實現和介紹:
1公共的Share部分(主要包含訊息協議型別的定義)
設計訊息型別:
public enum MsgType {
PING,ASK,REPLY,LOGIN
}
Message基類:
//必須實現序列,serialVersionUID 一定要有,否者在netty訊息序列化反序列化會有問題,接收不到訊息!!!
public abstract class BaseMsg implements Serializable {
private static final long serialVersionUID = 1L;
private MsgType type;
//必須唯一,否者會出現channel呼叫混亂
private String clientId;
//初始化客戶端id
public BaseMsg() {
this.clientId = Constants.getClientId();
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public MsgType getType() {
return type;
}
public void setType(MsgType type) {
this.type = type;
}
}
常量設定:
public class Constants {
private static String clientId;
public static String getClientId() {
return clientId;
}
public static void setClientId(String clientId) {
Constants.clientId = clientId;
}
}
登入型別訊息:
public class LoginMsg extends BaseMsg {
private String userName;
private String password;
public LoginMsg() {
super();
setType(MsgType.LOGIN);
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
心跳檢測Ping型別訊息:
public class PingMsg extends BaseMsg {
public PingMsg() {
super();
setType(MsgType.PING);
}
}
請求型別訊息:
public class AskMsg extends BaseMsg {
public AskMsg() {
super();
setType(MsgType.ASK);
}
private AskParams params;
public AskParams getParams() {
return params;
}
public void setParams(AskParams params) {
this.params = params;
}
}
//請求型別引數
//必須實現序列化介面
public class AskParams implements Serializable {
private static final long serialVersionUID = 1L;
private String auth;
public String getAuth() {
return auth;
}
public void setAuth(String auth) {
this.auth = auth;
}
}
響應型別訊息:
public class ReplyMsg extends BaseMsg {
public ReplyMsg() {
super();
setType(MsgType.REPLY);
}
private ReplyBody body;
public ReplyBody getBody() {
return body;
}
public void setBody(ReplyBody body) {
this.body = body;
}
}
//相應型別body對像
public class ReplyBody implements Serializable {
private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
private String clientInfo;
public ReplyClientBody(String clientInfo) {
this.clientInfo = clientInfo;
}
public String getClientInfo() {
return clientInfo;
}
public void setClientInfo(String clientInfo) {
this.clientInfo = clientInfo;
}
}
public class ReplyServerBody extends ReplyBody {
private String serverInfo;
public ReplyServerBody(String serverInfo) {
this.serverInfo = serverInfo;
}
public String getServerInfo() {
return serverInfo;
}
public void setServerInfo(String serverInfo) {
this.serverInfo = serverInfo;
}
}
2 Server端:主要包含對SocketChannel引用的Map,ChannelHandler的實現和Bootstrap.
Map:
public class NettyChannelMap {
private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
public static void add(String clientId,SocketChannel socketChannel){
map.put(clientId,socketChannel);
}
public static Channel get(String clientId){
return map.get(clientId);
}
public static void remove(SocketChannel socketChannel){
for (Map.Entry entry:map.entrySet()){
if (entry.getValue()==socketChannel){
map.remove(entry.getKey());
}
}
}
}
Handler
public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//channel失效,從Map中移除
NettyChannelMap.remove((SocketChannel)ctx.channel());
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
if(MsgType.LOGIN.equals(baseMsg.getType())){
LoginMsg loginMsg=(LoginMsg)baseMsg;
if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
//登入成功,把channel存到服務端的map中
NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
System.out.println("client"+loginMsg.getClientId()+" 登入成功");
}
}else{
if(NettyChannelMap.get(baseMsg.getClientId())==null){
//說明未登入,或者連線斷了,伺服器向客戶端發起登入請求,讓客戶端重新登入
LoginMsg loginMsg=new LoginMsg();
channelHandlerContext.channel().writeAndFlush(loginMsg);
}
}
switch (baseMsg.getType()){
case PING:{
PingMsg pingMsg=(PingMsg)baseMsg;
PingMsg replyPing=new PingMsg();
NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
}break;
case ASK:{
//收到客戶端的請求
AskMsg askMsg=(AskMsg)baseMsg;
if("authToken".equals(askMsg.getParams().getAuth())){
ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
ReplyMsg replyMsg=new ReplyMsg();
replyMsg.setBody(replyBody);
NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
}
}break;
case REPLY:{
//收到客戶端回覆
ReplyMsg replyMsg=(ReplyMsg)baseMsg;
ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
System.out.println("receive client msg: "+clientBody.getClientInfo());
}break;
default:break;
}
ReferenceCountUtil.release(baseMsg);
}
}
ServerBootstrap:
public class NettyServerBootstrap {
private int port;
private SocketChannel socketChannel;
public NettyServerBootstrap(int port) throws InterruptedException {
this.port = port;
bind();
}
private void bind() throws InterruptedException {
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
ServerBootstrap bootstrap=new ServerBootstrap();
bootstrap.