開發訊息推送服務,基於Netty protobuf--fpush(含github原始碼)
開發訊息推送服務,基於Netty protobuf--fpush-含github原始碼
旨在做一個類似於極光推送,小米推送之類的Java程式開源實現。基於Netty + protobuf
github地址是 https://github.com/flylib/fpush
技術棧
- JDK1.8
- Netty-4.1.31.Final
- protobuf-java 3.6.1
程式碼簡介
fpush-core 核心類庫,protobuf原型類
fpush-server server端, 接受來自自己的應用伺服器的http推送請求,並把請求轉換成netty的socket傳送給fpush-client
實現訊息推送,即時通訊技術。
fpush-client 客戶端,模擬App或者網頁,或者桌面應用的客戶端
系統架構
1.系統部署架構圖如下:
2. 移動客戶端鑑定許可權原理
3. server端推送訊息到client端的原理
tcp通訊圖如下:
客戶端使用PushConfirmHandler處理
服務端
RegisterResponseHandler中channelRead()裡,如果客戶端註冊成功則把channel物件儲存到NettyChannelMap這個Map裡去
String clientId = header.getAlias();
ctx.channel().attr(ChannelAttrKey.KEY_CLIENT_ID).set(clientId);
NettyChannelMap.put(clientId, ctx.channel());
應用伺服器通過訪問 http介面 http://localhost:10200/api/**
後臺ApiController把訊息的內容寫入快取ToSendMap.aliasMap中去,如
ToSendMap.aliasMap.put(alias, list);
定時任務com.appjishu.fpush.server.boot.SendTask#scan
每隔一定的時間間隔,會掃描ToSendMap.aliasMap
裡的待發送的訊息. 遍歷後,會通過NettyChannelMap.get(alias)
獲取到Channel,然後 channel.writeAndFlush(message)
傳送出去
@Scheduled(fixedRate = 5000)
public void scan() {
for (Map.Entry<String, List<MsgData>> entry: ToSendMap.aliasMap.entrySet()) {
String alias = entry.getKey();
List<MsgData> msgList = entry.getValue();
if (StringUtils.isNotEmpty(alias) && msgList != null && msgList.size() > 0) {
pushService.doPush(msgList, alias);
}
}
}
public void doPush(List<MsgData> msgList, String alias) {
FMessage fMessage = buildPushMessage(msgList, alias);
if (fMessage != null) {
log.info("---TringToDoPush()--->");
Channel channel = NettyChannelMap.get(alias);
if (channel == null) {
log.info("------channelIsNull---");
} else if (!channel.isWritable()) {
log.info("------channelIsNotWritable---");
} else {
ChannelFuture future = channel.writeAndFlush(fMessage);
log.info("------msgWriten!!!---");
future.addListener(new ChannelFutureListener() {
public void operationComplete(final ChannelFuture future)
throws Exception {
if (msgList.size() > 0) {
msgList.remove(0);
log.info("------removeAreadySentMsg!!!---");
}
}
});
}
}
}
執行
eclipse/IDEA裡
Step1 右鍵run as–java application-- FpushServerApp.java
Step2 右鍵run as–java application-- FpushClientApp.java
Step3 後臺傳送訊息給fpush-client (用來模擬android,ios或者網頁,或者java應用的訊息客戶端)
瀏覽器訪問 http://localhost:10200
顯示Welcome to fpush application!, 說明server執行起來了
然後瀏覽器請求
http://localhost:10200/api/pushTest
如果瀏覽器返回OK
並且fpush-client打印出下面的資訊,說明推送訊息成功
2018-11-19 14:28:44.792 INFO 27780 --- [ntLoopGroup-2-1] c.a.f.client.handler.PushConfirmHandler : --->>>這是推送到客戶端的訊息:title=fpush-Demo
2018-11-19 14:29:17.067 INFO 27780 --- [ntLoopGroup-2-1] c.a.f.client.handler.PushConfirmHandler : --->>>這是推送到客戶端的訊息:description=這是一條推送給lsm001的訊息!
Step4 Android演示
server效果圖
client在eclipse上除錯的效果圖-eclipse console可以顯示中文字元
測試
註冊一個應用賬號,手機號是15600000000
http://localhost:10200/app/registerAccount?mobilePhone=15600000000
http://localhost:10200/app/secretToken?appId=517723931931574272&appSecretKey=cb2eb85b362941f1b3e1
http://localhost:10200/app/keyToken?appId=517723931931574272&appKey=9f5d74bb0f68
Done List
-
netty+protobuf
protobuf的解碼ProtobufVarint32FrameDecoder,ProtobufDecoder
protobuf的編碼ProtobufVarint32LengthFieldPrepender,ProtobufEncoder -
心跳機制的實現
client端經過HeartBeatResponseHandler新建執行緒,定期發出心跳請求
server端的HeartBeatResponseHandler監聽心跳並作出響應
-
客戶端長連線的鑑權
客戶端(即fpush-client)傳送appId + appKey,經後臺鑑定許可權通過後,獲取到clientToken
fpush-client與fpush-server通訊的時候, RegisterRequestHandler和HeartBeatRequestHandler裡面需要帶上
appId+clientToken
建立長連線後,最好所有的RequestHandler需要帶上appId+clientToken -
應用服務端的http連線的鑑權
應用服務端(即app server)傳送appId + appSecretKey,經後臺鑑定許可權通過後,獲取到appToken
應用服務端每次呼叫fpush-server的api都需要帶上appId+appToken
TODO list
- 需要在FHeader裡增加msgId
- 增加IdleStateHandler來對heartbeat進行監控,設定的時間間隔內沒有收到心跳,就斷開連線
Netty的IdleStateHandler會根據使用者的使用場景,啟動三類定時任務,分別是:ReaderIdleTimeoutTask、WriterIdleTimeoutTask和AllIdleTimeoutTask,它們都會被加入到NioEventLoop的Task佇列中被排程和執行。
- server端長連線的超時時間的設定
我們是長連線服務,手機端和服務端要維持這個長連線,需要定期的傳送心跳訊息,我們為了節約電量和流量,手機端採用的是智慧心跳模式。那麼對服務端來說,它是不知道手機端下次是幾分鐘之後會發送心跳上來的,那麼這個連線在服務端的超時時間應該設定多久就是一個問題了。
客戶端在每一次的心跳訊息中攜帶下一次的心跳時間。服務端就根據這個時間來設定連線的超時時間。
github地址是 https://github.com/flylib/fpush