WebSocket和kafka實現資料實時推送到前端
阿新 • • 發佈:2019-01-02
一. 需求背景
最近新接觸一個需求,需要將kafka中的資料實時推送到前端展示。最開始想到的是前端輪詢介面資料,但是無法保證輪詢的頻率和消費的頻率完全一致,或造成資料缺失等問題。最終確定用利用WebSocket實現資料的實時推送。
二. websocket簡介
網上已經有好多介紹WebSocket的文章了,就不詳細介紹了,這裡只做簡單介紹。 WebSocket協議是基於TCP的一種新的網路協議。它實現了瀏覽器與伺服器全雙工(full-duplex)通訊——允許伺服器主動傳送資訊給客戶端。
三. 服務端實現
1. pom檔案
P.S. 需要注意的是WebSocket對tomcat版本是有要求的
最近新接觸一個需求,需要將kafka中的資料實時推送到前端展示。最開始想到的是前端輪詢介面資料,但是無法保證輪詢的頻率和消費的頻率完全一致,或造成資料缺失等問題。最終確定用利用WebSocket實現資料的實時推送。
二. websocket簡介
網上已經有好多介紹WebSocket的文章了,就不詳細介紹了,這裡只做簡單介紹。 WebSocket協議是基於TCP的一種新的網路協議。它實現了瀏覽器與伺服器全雙工(full-duplex)通訊——允許伺服器主動傳送資訊給客戶端。
三. 服務端實現
1. pom檔案
這裡需要引用三個依賴。第一個為WebSocket需要的依賴,另外兩個為kafka的依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>person</groupId> <artifactId>wbSocketkafka</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <!-- webSocket所需依賴 --> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> </dependency> <!-- kafka 所需依賴 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>RELEASE</version> </dependency> </dependencies> </project>
2. webSocket服務端實現
//此處定義介面的uri @ServerEndpoint("/wbSocket") public class WebSocket { private Session session; public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); //此處定義靜態變數,以在其他方法中獲取到所有連線 /** * 建立連線。 * 建立連線時入參為session */ @OnOpen public void onOpen(Session session){ this.session = session; wbSockets.add(this); //將此物件存入集合中以在之後廣播用,如果要實現一對一訂閱,則型別對應為Map。由於這裡廣播就可以了隨意用Set System.out.println("New session insert,sessionId is "+ session.getId()); } /** * 關閉連線 */ @OnClose public void onClose(){ wbSockets.remove(this);//將socket物件從集合中移除,以便廣播時不傳送次連線。如果不移除會報錯(需要測試) System.out.println("A session insert,sessionId is "+ session.getId()); } /** * 接收前端傳過來的資料。 * 雖然在實現推送邏輯中並不需要接收前端資料,但是作為一個webSocket的教程或叫備忘,還是將接收資料的邏輯加上了。 */ @OnMessage public void onMessage(String message ,Session session){ System.out.println(message + "from " + session.getId()); } public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } }
3. kafka消費者實現
public class ConsumerKafka extends Thread {
private KafkaConsumer<String,String> consumer;
private String topic = "kafkaTopic";
public ConsumerKafka(){
}
@Override
public void run(){
//載入kafka消費者引數
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "ytna");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "15000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//建立消費者物件
consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList(this.topic));
//死迴圈,持續消費kafka
while (true){
try {
//消費資料,並設定超時時間
ConsumerRecords<String, String> records = consumer.poll(100);
//Consumer message
for (ConsumerRecord<String, String> record : records) {
//Send message to every client
for (WebSocket webSocket :wbSockets){
webSocket.sendMessage(record.value());
}
}
}catch (IOException e){
System.out.println(e.getMessage());
continue;
}
}
}
public void close() {
try {
consumer.close();
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
//供測試用,若通過tomcat啟動需通過其他方法啟動執行緒
public static void main(String[] args){
ConsumerKafka consumerKafka = new ConsumerKafka();
consumerKafka.start();
}
}
P.S. 需要注意的是WebSocket對tomcat版本是有要求的
四. 前端簡單實現
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket client</title>
<script type="text/javascript">
var socket;
if (typeof (WebSocket) == "undefined"){
alert("This explorer don't support WebSocket")
}
function connect() {
//Connect WebSocket server
socket =new WebSocket("ws://127.0.0.1:8080/wbSocket");
//open
socket.onopen = function () {
alert("WebSocket is open");
}
//Get message
socket.onmessage = function (msg) {
alert("Message is " + msg);
}
//close
socket.onclose = function () {
alert("WebSocket is closed");
}
//error
socket.onerror = function (e) {
alert("Error is " + e);
}
}
function close() {
socket.close();
}
function sendMsg() {
socket.send("This is a client message ");
}
</script>
</head>
<body>
<button onclick="connect()">connect</button>
<button onclick="close()">close</button>
<button onclick="sendMsg()">sendMsg</button>
</body>
</html>