1. 程式人生 > >基於Netty實現的Android 訊息推送(即時通訊)的解決方案

基於Netty實現的Android 訊息推送(即時通訊)的解決方案

根據Netty框架實現訊息推送(即時聊天)功能.

Netty框架,TCP長連線,心跳,阻塞訊息佇列,執行緒池處理訊息傳送, 基於Google ProtoBuf自定義的訊息協議, TCP粘包/拆包....

客戶端通過TCP連線到伺服器,並建立TCP長連線;當伺服器端收到新訊息後通過TCP連線推送給客戶端, 即訊息傳遞方式: 客戶端A -> 伺服器 -> 客戶端B.

不說了,直接上核心程式碼吧:


================ Server端核心程式碼 ===========================

//服務端主類.

public final class NettyServer {

public  static int PORT = 10000;
public  final int HEART_SYNC_TIME = 300;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    
    public  void bind(int port) throws InterruptedException{
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap=new ServerBootstrap();
        try {
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipe = socketChannel.pipeline();
                    pipe.addLast( new ProtobufVarint32FrameDecoder());
                    pipe.addLast(new ProtobufDecoder(MessageProto.Message .getDefaultInstance()));
                    pipe.addLast(new ProtobufVarint32LengthFieldPrepender());
                    pipe.addLast(new ProtobufEncoder());
                    pipe.addLast(new MessageServerHandler());
                }
            });
            // Bind and start to accept incoming connections.
            ChannelFuture f  =  bootstrap.bind(port).sync();


            if(f.isSuccess()){
            
Log.debug("server start success... port: "  + port + ", main work thread: " + Thread.currentThread().getId());
            }


            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
            
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    public synchronized void stop(){
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
    
    public static void main(String []args)  {
    
NettyServer server = null;
    try {
    server = new NettyServer();
    if(args != null && args.length>0 && args[0].length() > 3){
    PORT = Integer.parseInt(args[0]);
    }
    
    //message work thread.
    new Thread(new Runnable(){
@Override
public void run() {
MessageManager.getInstance().start();
}
    }).start();
    
    server.bind(PORT);
} catch (InterruptedException e) {
MessageManager.getInstance().stop();
UserManager.getInstance().clearAll();
server.stop();
e.printStackTrace();
}
    }
}


//訊息分發

public class MessageManager {
private static MessageManager manager;
private LinkedBlockingQueue<Message> mMessageQueue = new LinkedBlockingQueue<Message>();
private ThreadPoolExecutor mPoolExecutor = new ThreadPoolExecutor(5, 10, 15, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.AbortPolicy());

private MessageManager(){
}

public static MessageManager getInstance(){
if(manager  == null){
synchronized (MessageManager.class) { 
if(manager  == null){
manager = new MessageManager();
}
}
}
return manager;
}

public void putMessage(Message message){
Log.debug("MessageManager-> putMessage()..." + message.getClientID() + ",  " + message.getBody());
try {
mMessageQueue.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void start(){
Log.debug("MessageManager-> start()... ");
while(true){
try {
Message message = mMessageQueue.take();
mPoolExecutor.execute(new SendMessageTask(message));
} catch (InterruptedException e) {
e.printStackTrace();
break;
}catch (RejectedExecutionException e){
Log.debug("MessageManager-> 伺服器訊息佇列已滿...延時2妙後繼續傳送...");
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
continue;
}
}

}

public void stop(){
Log.debug("MessageManager-> stop()... ");
mMessageQueue.clear();
mPoolExecutor.shutdownNow();
}

class SendMessageTask implements Runnable{
private Message message;

public SendMessageTask(Message message){
this.message = message;
}
@Override
public void run() {
if(message.getReceiveID().length() > 2){
Log.debug("MessageManager-> sendMessage... to client: " + message.getReceiveID() + ",  " + message.getBody());
//傳送單聊訊息;
SocketChannel channel = UserManager.getInstance().getUserChannel(message.getReceiveID());
if(channel != null && channel.isActive()) {
channel.writeAndFlush(message);
}
}else{
Log.debug("MessageManager-> sendMessage... to group: " + message.getGroupID() + ",  " + message.getBody());
//傳送群聊訊息;
CopyOnWriteArrayList<String> userList = UserManager.getInstance().getUserListInGroup(message.getGroupID());
for(String user:userList){
if(!user.equalsIgnoreCase(message.getClientID())){
SocketChannel channel = UserManager.getInstance().getUserChannel(user);
if(channel != null && channel.isActive()) {
channel.writeAndFlush(message);
}
}
}
}
}
}
}


//負責管理連線的客戶端.

public final class UserManager {
private static UserManager manager;
private static Map<String, SocketChannel> userList = new ConcurrentHashMap();
private static Map<String, CopyOnWriteArrayList<String>> groupList = new ConcurrentHashMap();

private UserManager(){
}

public static UserManager getInstance(){
if(manager  == null){
synchronized (UserManager.class) { 
if(manager  == null){
manager = new UserManager();
}
}
}
return manager;
}

@SuppressWarnings("unchecked")
public  void addUser(String groupID, String clientID, SocketChannel channel){
userList.put(clientID, channel);
if(groupList.get(groupID) == null){
Log.debug("addUser()... create new group-> " + groupID + ",  " + clientID);
CopyOnWriteArrayList<String> users = new CopyOnWriteArrayList<String>();
users.add(clientID);
groupList.put(groupID, users);
}else{
Log.debug("addUser()... " + groupID + ",  " + clientID);
groupList.get(groupID).add(clientID);
}
}

public  SocketChannel getUserChannel(String clientID){
return (SocketChannel) userList.get(clientID);
}

public  void removeUser(String groupID, String clientID){
Log.debug("removeUser()... " + groupID + ",  " + clientID);
userList.remove(clientID);
CopyOnWriteArrayList<String> list = groupList.get(groupID);
int count = list.size();
for(int i=0; i<count; i++){
if(list.get(i).equalsIgnoreCase(clientID)){
groupList.get(groupID).remove(i);
break;
}
}
}

public  void removeChannel(SocketChannel channel){
Iterator<Entry<String, SocketChannel>> entries = userList.entrySet().iterator();
while (entries.hasNext()) {
   Entry<String, SocketChannel> entry = entries.next();
   if(entry.getValue().equals(channel)){
    entries.remove();
    Log.debug("removeChannel()... " + entry.getKey());
    return;
   }
}
}

/**
* work in single thread.
* @param groupID
* @return
*/
public   CopyOnWriteArrayList<String> getUserListInGroup(String groupID){
return groupList.get(groupID);
}

public  int getTotalUserCount(){
return userList.size();
}


public  int getGroupSize(String groupID){
return groupList.get(groupID).size();
}

public  void clearGroup(String groupID){
groupList.get(groupID).clear();
}

public  void clearAll(){
groupList.clear();
userList.clear();
}
}


//連線的ChannelHandler

public class MessageServerHandler extends ChannelHandlerAdapter{

    @Override
    public void channelInactive(ChannelHandlerContext cxt) throws Exception {
    Log.debug("channelInactive()...");
    UserManager.getInstance().removeChannel((SocketChannel)cxt.channel());
    }

    @Override
    public void channelRead(ChannelHandlerContext cxt, Object obj) throws Exception {
    Log.debug("channelRead()...  threadId: " + Thread.currentThread().getId());
    MessageProto.Message message = (MessageProto.Message) obj;
    switch(message.getMsgType()){
    case PING:
    Log.debug("received ping..." + message.getClientID());
    cxt.channel().writeAndFlush(createResponseMsg(message, MessageType.PING, null));
    break;
    case LOGIN:
    UserManager.getInstance().addUser(message.getGroupID(), message.getClientID(), (SocketChannel)cxt.channel());
    int count = UserManager.getInstance().getGroupSize(message.getGroupID());
    Log.debug("received login..." + message.getClientID() + ", count: " + count);
    cxt.channel().writeAndFlush(createResponseMsg(message, MessageType.LOGIN, "userCount:" + count));
    Log.debug("received login sended..." + message.getClientID());
    MessageManager.getInstance().putMessage((createResponseMsg(message, MessageType.MESSAGE, "大家好! 我來了...")));
    Log.debug("received login sended 222..." + message.getClientID());
    cxt.channel().writeAndFlush(createResponseMsg(MessageProto.Message.newBuilder().setClientID("管理員").setGroupID(message.getGroupID()).setMsgType(MessageType.MESSAGE).build(), MessageType.MESSAGE, "歡迎 " + message.getClientID() + " 加入本群..."));
    break;
    case MESSAGE:
    Log.debug("received message..." + message.getClientID() + ", " + message.getBody());
    MessageManager.getInstance().putMessage(createResponseMsg(message, MessageType.MESSAGE, null));
    break;
    case LOGOUT:
    UserManager.getInstance().removeUser(message.getGroupID(), message.getClientID());
    Log.debug("received logout..." + message.getClientID() + ", count: " + UserManager.getInstance().getGroupSize(message.getGroupID()));
    MessageManager.getInstance().putMessage(createResponseMsg(message, MessageType.MESSAGE, "大家聊! 我走了...."));
    break;
    }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext cxt, Throwable cause) {
    Log.debug("exceptionCaught()...");
    UserManager.getInstance().removeChannel((SocketChannel)cxt.channel());
        cause.printStackTrace();
        cxt.close();
    }
    
    private Message createResponseMsg(Message receivedMsg, MessageType type, String body) {
    MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();
builder.setClientID(receivedMsg.getClientID());
builder.setMsgType(type);
builder.setGroupID(receivedMsg.getGroupID());
if(body != null){
builder.setBody(body);
}else{
builder.setBody(receivedMsg.getBody());
}
return builder.build();
    }
}


===============================客戶端核心程式碼=======================================


public class NettyClient {
private String TAG = "ChatClient";
private static NettyClient client;
private EventLoopGroup eventLoopGroup;
private SocketChannel socketChannel;
private String clientID;
private String groupID;
private final int HEART_PING_TIME = 180;


private NettyClient(){
}

public static NettyClient getInstance(){
if(client  == null){
synchronized (NettyClient.class) {
if(client  == null){
client = new NettyClient();
}
}
}
return client;
}


public void connect(String serverIP, int port) {
eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap=new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(serverIP, port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new IdleStateHandler(0, 0, HEART_PING_TIME));
socketChannel.pipeline().addLast( new ProtobufVarint32FrameDecoder());
socketChannel.pipeline().addLast(new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
socketChannel.pipeline().addLast( new ProtobufVarint32LengthFieldPrepender());
socketChannel.pipeline().addLast(new ProtobufEncoder());
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future =bootstrap.connect(serverIP, port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel) future.channel();
Log.d(TAG, "connect server  success...");
}
}catch(InterruptedException e) {
e.printStackTrace();
eventLoopGroup.shutdownGracefully();
}
}


public synchronized void sendMessage(MessageProto.Message message){
Log.d(TAG, "sendMessage()..." + message.getBody());
socketChannel.writeAndFlush(message);
}


public void sync() throws InterruptedException{
socketChannel.closeFuture().sync();
}


public synchronized void stop(){
socketChannel.writeAndFlush(createMessage(MessageType.LOGOUT, null));
eventLoopGroup.shutdownGracefully();
}


public void setClientID(String clientID){
this.clientID = clientID;
}
    
    public String getClientID(){
    return this.clientID;
    }


public void setGroupID(String groupID){
this.groupID = groupID;
}


public String getGroupID(){
return this.groupID;
}


public Message createMessage(MessageType type, String body){
MessageProto.Message.Builder builder = MessageProto.Message.newBuilder();
builder.setClientID(clientID);
builder.setMsgType(type);
builder.setGroupID(groupID);
if(body != null){
builder.setBody(body);
}
return builder.build();
}
}


public class NettyClientHandler extends SimpleChannelInboundHandler<Message> {
    private String TAG = "ChatClient";
private int pingCount = 0;

    @Override
    public void userEventTriggered(ChannelHandlerContext cxt, Object event) throws Exception {
        Log.d(TAG, "userEventTriggered()...");
        if (event instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) event;
            switch (e.state()) {
                case ALL_IDLE:
                    cxt.writeAndFlush(NettyClient.getInstance().createMessage(Message.MessageType.PING, null));
                    pingCount++;
                    Log.d(TAG, "send ping to server...");
                    if(pingCount > 3){
                        Log.d(TAG, "heart timeout, so disconnect...");
                    cxt.close();
                    }
                    break;
                default:
                    break;
            }
        }
    }
    
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, MessageProto.Message message) throws Exception {
        Log.d(TAG, "messageReceived()...");
    switch(message.getMsgType()){
    case PING:
    pingCount = 0;
    break;
    case MESSAGE:
            Log.d(TAG, "Received message: " + message.getClientID() + ",  " + message.getBody());
            EventBus.getDefault().post(message);
    break;
    }
        ReferenceCountUtil.release(message);
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext cxt, Throwable cause) {
        Log.d(TAG, "messageReceived()...");
        cause.printStackTrace();
        cxt.close();
    }
}