1. 程式人生 > >開發訊息推送服務,基於Netty protobuf--fpush(含github原始碼)

開發訊息推送服務,基於Netty protobuf--fpush(含github原始碼)

開發訊息推送服務,基於Netty protobuf--fpush-含github原始碼

旨在做一個類似於極光推送,小米推送之類的Java程式開源實現。基於Netty + protobuf


github地址是 https://github.com/flylib/fpush



技術棧

  1. JDK1.8
  2. Netty-4.1.31.Final
  3. protobuf-java 3.6.1

程式碼簡介

fpush-core 核心類庫,protobuf原型類

fpush-server server端, 接受來自自己的應用伺服器的http推送請求,並把請求轉換成netty的socket傳送給fpush-client
實現訊息推送,即時通訊技術。

fpush-client 客戶端,模擬App或者網頁,或者桌面應用的客戶端

系統架構

1.系統部署架構圖如下:










2. 移動客戶端鑑定許可權原理










3. server端推送訊息到client端的原理

tcp通訊圖如下:





客戶端使用PushConfirmHandler處理

服務端

RegisterResponseHandler中channelRead()裡,如果客戶端註冊成功則把channel物件儲存到NettyChannelMap這個Map裡去

String clientId = header.getAlias();
			ctx.channel().attr(ChannelAttrKey.KEY_CLIENT_ID).set(clientId);
			NettyChannelMap.put(clientId,
ctx.channel());

應用伺服器通過訪問 http介面 http://localhost:10200/api/**

後臺ApiController把訊息的內容寫入快取ToSendMap.aliasMap中去,如
ToSendMap.aliasMap.put(alias, list);

定時任務com.appjishu.fpush.server.boot.SendTask#scan每隔一定的時間間隔,會掃描ToSendMap.aliasMap

裡的待發送的訊息. 遍歷後,會通過NettyChannelMap.get(alias)獲取到Channel,然後 channel.writeAndFlush(message)

傳送出去

@Scheduled(fixedRate = 5000)
    public void scan() {
        for (Map.Entry<String, List<MsgData>> entry: ToSendMap.aliasMap.entrySet()) {
            String alias = entry.getKey();
            List<MsgData> msgList = entry.getValue();
            if (StringUtils.isNotEmpty(alias) && msgList != null && msgList.size() > 0) {
                pushService.doPush(msgList, alias);
            }
        }

    }
public void doPush(List<MsgData> msgList, String alias) {
        FMessage fMessage = buildPushMessage(msgList, alias);
        if (fMessage != null) {
            log.info("---TringToDoPush()--->");
            Channel channel = NettyChannelMap.get(alias);
            if (channel == null) {
                log.info("------channelIsNull---");
            } else if (!channel.isWritable()) {
                log.info("------channelIsNotWritable---");
            } else {
                ChannelFuture future = channel.writeAndFlush(fMessage);
                log.info("------msgWriten!!!---");
                future.addListener(new ChannelFutureListener() {
                    public void operationComplete(final ChannelFuture future)
                            throws Exception {
                        if (msgList.size() > 0) {
                            msgList.remove(0);
                            log.info("------removeAreadySentMsg!!!---");
                        }
                    }
                });
            }
        }
    }


執行

eclipse/IDEA裡

Step1 右鍵run as–java application-- FpushServerApp.java

Step2 右鍵run as–java application-- FpushClientApp.java

Step3 後臺傳送訊息給fpush-client (用來模擬android,ios或者網頁,或者java應用的訊息客戶端)
瀏覽器訪問 http://localhost:10200

顯示Welcome to fpush application!, 說明server執行起來了

然後瀏覽器請求
http://localhost:10200/api/pushTest


如果瀏覽器返回OK
並且fpush-client打印出下面的資訊,說明推送訊息成功


2018-11-19 14:28:44.792  INFO 27780 --- [ntLoopGroup-2-1] c.a.f.client.handler.PushConfirmHandler  : --->>>這是推送到客戶端的訊息:title=fpush-Demo
2018-11-19 14:29:17.067  INFO 27780 --- [ntLoopGroup-2-1] c.a.f.client.handler.PushConfirmHandler  : --->>>這是推送到客戶端的訊息:description=這是一條推送給lsm001的訊息!

Step4 Android演示



server效果圖






client在eclipse上除錯的效果圖-eclipse console可以顯示中文字元






測試

註冊一個應用賬號,手機號是15600000000


http://localhost:10200/app/registerAccount?mobilePhone=15600000000

http://localhost:10200/app/secretToken?appId=517723931931574272&appSecretKey=cb2eb85b362941f1b3e1


http://localhost:10200/app/keyToken?appId=517723931931574272&appKey=9f5d74bb0f68

Done List

  1. netty+protobuf

    protobuf的解碼ProtobufVarint32FrameDecoder,ProtobufDecoder

    protobuf的編碼ProtobufVarint32LengthFieldPrepender,ProtobufEncoder

  2. 心跳機制的實現

    client端經過HeartBeatResponseHandler新建執行緒,定期發出心跳請求

    server端的HeartBeatResponseHandler監聽心跳並作出響應


  3. 客戶端長連線的鑑權

    客戶端(即fpush-client)傳送appId + appKey,經後臺鑑定許可權通過後,獲取到clientToken

    fpush-client與fpush-server通訊的時候, RegisterRequestHandler和HeartBeatRequestHandler裡面需要帶上

    appId+clientToken
    建立長連線後,最好所有的RequestHandler需要帶上appId+clientToken

  4. 應用服務端的http連線的鑑權

    應用服務端(即app server)傳送appId + appSecretKey,經後臺鑑定許可權通過後,獲取到appToken
    應用服務端每次呼叫fpush-server的api都需要帶上appId+appToken

TODO list

  1. 需要在FHeader裡增加msgId

  2. 增加IdleStateHandler來對heartbeat進行監控,設定的時間間隔內沒有收到心跳,就斷開連線
    Netty的IdleStateHandler會根據使用者的使用場景,啟動三類定時任務,分別是:ReaderIdleTimeoutTask、WriterIdleTimeoutTask和AllIdleTimeoutTask,它們都會被加入到NioEventLoop的Task佇列中被排程和執行。

  3. server端長連線的超時時間的設定

    我們是長連線服務,手機端和服務端要維持這個長連線,需要定期的傳送心跳訊息,我們為了節約電量和流量,手機端採用的是智慧心跳模式。那麼對服務端來說,它是不知道手機端下次是幾分鐘之後會發送心跳上來的,那麼這個連線在服務端的超時時間應該設定多久就是一個問題了。
    客戶端在每一次的心跳訊息中攜帶下一次的心跳時間。服務端就根據這個時間來設定連線的超時時間。

github地址是 https://github.com/flylib/fpush