Java for Web學習筆記(九十):訊息和叢集(5)利用websocket實現訂閱和釋出(上)
叢集中的訂閱和釋出
利用spring framework在本app內的訂閱和釋出十分簡單。當我們系統越來越複雜的時候,我們需要向其他app釋出訊息。本學習將給出一個通過websocket來實現不同app之間訊息的訂購和釋出。
在小例子中,我們在所有節點之間都建立webSocket連線來實現訊息的釋出和訂閱。這種方式,節點既是publisher,又是subcriber,還是broker。我們利用spring app內可監聽不同訊息,而無區分地將所有訊息直接廣播出去。具體步驟如下:
- 啟動後,開啟一個組播socket(224.0.0.4:6780),監聽該組播,同時廣播其websocket的地址
- 使用@Service ClusterManager來實現
- 需要在app啟動完畢,可以正常工作時,才廣播自己的websocket地址,例子講通過一個ping的url來測試是否得到正確的迴應來判斷是否已經正常工作
- 叢集中的其他app,監聽到該廣播後,獲得新app的websocket地址,與之建立websocket連線
- 由於是在組播地址中進行廣播,所有自己也會收到,需要過濾掉自己傳送的自己websocket地址
- 維護與自己相連的websocket連線,無論自己作為server還是client,通過@Bean ClusterEventMulticaster來實現
- websocket不屬於spring framework,因此需要將其納入spring框架,才能支援spring的自動注入
- 釋出訊息,如果需要釋出到叢集,則向所有建立的websocket連線釋出
- 有些訊息需要釋出到叢集,有些訊息可能只需要在本app內釋出,用ClusterEvent來表示需要釋出到叢集的訊息。
- 釋出訊息由@Bean ClusterEventMulticaster來實現,向所有連線釋出
- 接收訊息由websocket endpoint來實現
- app關閉時,關閉相關連線
- 關閉組播socket
- 關閉websocket的連線。
很顯然,這是個N*N的websocket連線。在小規模的情況下,可能滿足我們的要求。我們也可以有專門的broker,而websocket是節點與該broker之間的連線,這種模式即是WebSocket Application Messaging Protocol,不過小例子不採用專門broker的方式。
ClusterEvent
小例子將Event物件直接在websocket中進行傳遞,採用java序列化的方式。這種方式簡單,但也有限制,也就是所對方也必須在java程式。我們還可以選擇JSON或者XML的格式進行傳遞。我們在之前專門學習了序列化的基本知識,現在可以直接使用。
public class ClusterEvent extends ApplicationEvent implements Serializable{
private static final long serialVersionUID = 1L;
private final Serializable serializableSource;
//在上一學習的基礎上增加rebroadcasted,用於標識這是一個從外部接收的事件,不需要向叢集廣播。
private boolean rebroadcasted;
public ClusterEvent(Serializable source) {
super(source);
this.serializableSource = source;
}
public final boolean isRebroadcasted() {
return rebroadcasted;
}
public final void setRebroadcasted() {
this.rebroadcasted = true;
}
private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException{
in.defaultReadObject();
this.source = this.serializableSource;
}
}
我們設定相關的Event和listener
public abstract class AuthenticationEvent extends ClusterEvent{
private static final long serialVersionUID = 1L;
public AuthenticationEvent(Serializable source) {
super(source);
}
}
public class LoginEvent extends AuthenticationEvent{
private static final long serialVersionUID = 1L;
public LoginEvent(String username) {
super(username);
}
}
@Service
public class AuthenticationInterestedParty implements ApplicationListener<AuthenticationEvent>{
private static final Logger log = LogManager.getLogger();
@Inject ServletContext servletContext;
@Override
public void onApplicationEvent(AuthenticationEvent event) {
log.info("Authentication event from context {} received in context {}.",
event.getSource(), this.servletContext.getContextPath());
}
}
@Component
public class LoginInterestedParty implements ApplicationListener<LoginEvent>{
private static final Logger log = LogManager.getLogger();
@Inject ServletContext servletContext;
@Override
public void onApplicationEvent(LoginEvent event) {
log.info("Login event for context {} received in context {}.",
event.getSource(), this.servletContext.getContextPath());
}
}
ClusterManager:通過組播發布自己的位置
這作為一個Service納入到spring framework中,我們的自定義ApplicationEventMulticaster為ClusterEventMulticaster,具體的websocket連線在ClusterMessagingEndpoint中實現。Spring在上下文初始化結束後,釋出ContextRefreshedEvent事件,我們可以監聽這個事件,就如同我們監聽前面設定的LoginEvent那樣。
@Service public class ClusterManager implements ApplicationListener<ContextRefreshedEvent>{...}
上下文,包括root上下文,web上下文和Rest上下文,按bootstrap的順序,先是root上下文完成初始化,但此時app尚未能正常啟動。我們可以具體檢查事件,是否是最後一個上下文啟動完畢。小例子中,我們採用另外一個方式,Controller中提供了一個ping介面,如果app正常工作,這個ping介面就可以正常回復200 OK。
用於檢測是否正常工作的ping介面
@Controller
public class HomeController {
@RequestMapping("/ping")
public ResponseEntity<String> ping() {
HttpHeaders headers = new HttpHeaders();
headers.add("Content-Type", "text/plain;charset=UTF-8");
return new ResponseEntity<>("ok", headers, HttpStatus.OK);
}
}
ClusterManager的程式碼
@Service
public class ClusterManager implements ApplicationListener<ContextRefreshedEvent>{
private static final Logger log = LogManager.getLogger();
//組播是UDP,本例子中組播地址為224.0.0.4,埠為6780
private static final InetAddress MULTICAST_GROUP;
private static final int MULTICAST_PORT = 6780;
static{
try {
MULTICAST_GROUP = InetAddress.getByName("224.0.0.4");
} catch (UnknownHostException e) {
throw new FatalBeanException("Could not initialize IP addresses.", e);
}
}
private boolean initialized,destroyed = false;
private MulticastSocket socket;
private String pingUrl, messagingUrl;
private Thread listenThread;
@Inject ServletContext servletContext; //獲取servlet Context,即可以獲得在web.xml中的配置
//multicaster是我們自定義的ApplicationEventMulticaster,我們將在那裡維護和叢集其他app的websocket連線。
@Inject ClusterEventMulticaster multicaster;
//【1】初始化:建立組播socket,並啟動監聽
@PostConstruct
public void listenForMulticastAnnouncements() throws NumberFormatException, IOException{
//1.1】初始化設定pingUrl和websocket的Url。這裡的host沒有自動獲取,而是通過配置,主要是多網絡卡的情況下,例如開發機上同時安裝了虛機,可能會指定到其他地址。在稍後的組播設定中,需要指定network interface。所以方便起見,小例子採用了配置的方式。web的埠port也一樣採用配置方式。
String host = servletContext.getInitParameter("host");
if(host == null)
host = InetAddress.getLocalHost().getHostAddress();
String port = servletContext.getInitParameter("port");
if(port == null)
port = "8080";
this.pingUrl = "http://" + host + ":" + port + this.servletContext.getContextPath() + "/ping";
this.messagingUrl = "ws://" + host + ":" + port + this.servletContext.getContextPath() + "/services/Messaging/a83teo83hou9883hha9";
//1.2】這裡組播socket的建立,並在執行緒中開啟監聽
this.socket = new MulticastSocket(MULTICAST_PORT);
this.socket.setInterface(InetAddress.getByName(host));//需要放在joinGroup()前,用於多網絡卡時確定使用哪個網絡卡,如單網絡卡,無需設定
this.socket.joinGroup(MULTICAST_GROUP);
this.listenThread = new Thread(this::listen, "cluster-listener"); //設定監聽的執行緒
this.listenThread.start();
}
//【2】在app正常執行後,通過組播socket,將自己的websocket的URL廣播出去
@Async //確保一定執行線上程中。
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//2.1】initialized用於確保只執行一次,否則root context初始化完成執行一次,web context初始化完成又執行一次
if(initialized)
return;
initialized = true;
//【2.2】不斷嘗試訪問自己的/ping介面,不成功,則休眠500ms,再次嘗試,總的嘗試次數限制為120次,即1分鐘內,都嘗試失敗,就放棄
try {
URL url = new URL(this.pingUrl);
log.info("Attempting to connect to self at {}.", url);
int tries = 0;
while(true){
tries ++;
//(2.2.1)方位自己的/ping,看看是否正常回復。這裡學習一下URLConnection的使用
URLConnection connection = url.openConnection();
connection.setConnectTimeout(100);
try(InputStream stream = connection.getInputStream()){
String response = StreamUtils.copyToString(stream,StandardCharsets.UTF_8);
if(response != null && response.equals("ok")){ //檢查是否已經正常工作
//(2.2.2)app正常工作,此處將放置通過組播socket,將自己的websocket的url(messageUrl)廣播出去的程式碼
DatagramPacket packet = new DatagramPacket(this.messagingUrl.getBytes(),this.messagingUrl.length(),
MULTICAST_GROUP, MULTICAST_PORT);
this.socket.send(packet);
return;
}else{
log.warn("Incorrect response: {}", response);
}
}catch(Exception e){
if(tries > 120) {
log.fatal("Could not connect to self within 60 seconds.",e);
return;
}
Thread.sleep(500L);
}
}
} catch (Exception e) {
log.fatal("Could not connect to self.", e);
}
}
//【3】組播socket監聽,如果聽到由websocket的URL,則連線該URL,建立起websocket的連線。由於是組播,因此也會收到自己廣播初期的自己的websocket的URL,需要將此過濾掉
private void listen(){
byte[] buffer = new byte[2048];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
while(true){
try {
this.socket.receive(packet);
String url = new String(buffer, 0, packet.getLength()); //獲取內容
if(url.length() == 0)
log.warn("Received blank multicast packet.");
else if(url.equals(this.messagingUrl)) //過濾掉自己的webSocket地址
log.info("Ignoring our own multicast packet from {}",packet.getAddress().getHostAddress());
else
//3.1】在自定義的ApplicationEventMulticaster(維護各websocket連線)中,根據url建立一個websocket連結
this.multicaster.registerNode(url);
} catch (IOException e) {
if(this.destroyed)
return;
log.error(e);
}
}
}
//【4】app關閉前,應關閉組播socket
@PreDestroy
public void shutDownMulticastConnection() throws IOException {
this.destroyed = true;
try{
this.listenThread.interrupt();
this.socket.leaveGroup(MULTICAST_GROUP);
}finally{
this.socket.close();
}
}
}
相關推薦
Java for Web學習筆記(十九):Session(3)Session Listener
Session Listener 可以通過Listner來監聽session的變化,這就是所謂的publish and subscribe模型。這是一種訊息資訊釋出一方叫釋出者,資訊的接收方叫訂閱者,實際也是事件驅動的高大上說法,訂閱某個事件,然後觸發處理。這種方式最大的作用是將進行session變化以及s
Java for Web學習筆記(九十):訊息和叢集(5)利用websocket實現訂閱和釋出(上)
叢集中的訂閱和釋出 利用spring framework在本app內的訂閱和釋出十分簡單。當我們系統越來越複雜的時候,我們需要向其他app釋出訊息。本學習將給出一個通過websocket來實現不同app之間訊息的訂購和釋出。 在小例子中,我們在所有節點之間都建立webSoc
Java for Web學習筆記(十一):JSP(1)何為JSP
使用JSP 通java程式碼來編寫HTML很是繁瑣,通過使用JSTL(JSP標準標籤庫,JavaServer Pages™ Standard Tag Library),通過JSP來更便攜的實現。對JSP的支援Maven中包括javax.servlet.jsp-api、javax.servlet.jsp.js
Java for Web學習筆記(九):Servlet(7)上傳檔案
上傳檔案 Servlet的引數設定 採用annotation方式如下: @WebServlet( name = "TicketServlet", urlPatterns = {"/tickets"}, loadOnStartup = 1 ) /* MultipartConfig配置了本Servlet
Java for Web學習筆記(五十):Log(2)配置log4j2
相關maven <!-- log4j-api提供logging的API。這是log4j2中唯一需要在compile時加入,含有你所需的所有類 --> <dependency> <groupId>org.apache.loggin
Java for Web學習筆記(一四一)Spring security準備(2)授權
訪問的範圍和許可權屬於授權。 Principals和Identities 就Java而言,很方便利用java.security.Principal。Principal至少會包含已被認證的使用者identity,例如使用者名稱,還可能有其他資訊。此外還可以保護使用者的授權資
Java for Web學習筆記(二五):JSTL(1)使用JSTL
在前面已經使用過JSTL,例如<c:url>,fn是JSTL的functionlibrary,而c是JSTL的tag library。使用它們,我們要告知解析器,如下: <%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core
Java for Web學習筆記(八):Servlet(6)doGet()和doPost()是執行緒還是佇列
做一個小實驗,程式碼如下: protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try{ for(i
Java for Web學習筆記(一四二)Spring security準備(3)初窺
瞭解Spring Security的基本知識 完全J2EE的web container也能提供完整的安全框架,但tomcat不是。Spring Security可以使用JDBC,或者我們的服務或倉庫來認證使用者,也提供了內建的對微軟Active Derectory,Jasi
Java for Web學習筆記(一三二)對映(8)@ElementCollection
說明 在前面我們學習了OneToOne、OneToMany、ManyToOne,將資料庫中不同表格的關聯轉換為spring中不同entity的關聯。但是在不少場景中,我們希望在一個entity的檢視中同檢視到這幾個表的資訊,而無需通過entity之間的關聯。 在這個小例子
Java for Web學習筆記(三五):自定義tag(3)TLDS和Tag Handler
JSTL的TLD 這是JSTL採用的方式。TLD(Tag Library Descriptor)描述tag和function,以及具體執行的java程式碼tag handler。Tag Handler是javax.servlet.jsp.tagext.Tag或javax.servlet.jsp.tage
Java for Web學習筆記(一二三):搜尋(5)MySQL全文索引(下)
小例子我們在表格Ticket和TicketComment中加入了fulltext key。小例子在Ticket的Subject或Body,以及在TicketComment的Body檢索內容,按分頁方式顯示出來,同時顯示關聯分數,並按關聯分數降序排列。 -- Ticket中隊S
Java for Web學習筆記(一):Java EE的介紹
最近有個專案,當中涉及到Java Servlet,專案已經進行了一段日子,最近因為有重大需求調整,需要對架構重新進行梳理,一看,基本上哭笑不得,有不少地方真是黑色幽默,怎麼會這樣呢。 當在這過程中,也引發了我興趣,畢竟之前沒有系統地瞭解過。在網上找到了一本書professional Java for
Java for Web學習筆記(三九):Filter(1)用途、定義和順序
Filter的用途 Filter可以用於以下方面: 記錄request和response的log進行認證和授權進行壓縮和加壓,非HTTPS的加密和解密錯誤處理。對於tomcat,出現錯誤通常會給出一個500的頁面,還有錯誤診斷資訊,對於一個公眾服務,這些診斷資訊可能會向黑客
Java for Web學習筆記(一一一):再談Entity對映(4)動態表格建立
如果這個不確定表格也需要我們的war來建立,如何實現。create table的原生SQL,entityManager是無法執行的,因為這不是可以回滾的事務。這種情況,我們需要: 捕獲表格不存在的異常 從原始的Connection中實現表格建立。獲取Connection能否從
Java for Web學習筆記(七):Servlet(5)初始化引數
Context Init引數 在web.xml中可以基於整個上下文進行設定,在web app內各個Servlet都可以獲取。 <context-param> <param-name>settingOne</param-name> <param-
Java for Web學習筆記(一三八)篇外之資料庫的ACID和JPA(2)JPA
在測試中,我們發現在一個JPA事務中: Spring Data的寫SQL是在最後commit前發出,這最大程度地縮短了寫操作和commit之間的時間。 對相同的ID的讀,JPA只從資料庫中讀取一次。 從資料庫中獲取entity,修改entity的資料,即使最後沒有執行s
Java for Web學習筆記(四):Servlet(2)HelloServlet
繼承關係: javax.servlet.GenericServlet –》javax.servlet.http.HttpServlet。 405返回 如果我們不重寫Servlet的doGet而採用HTTP GET的方式,將返回405 將返回405 Method Not Allowed。 如果我們重寫do
轉:【Java並發編程】之十二:線程間通信中notifyAll造成的早期通知問題(含代碼)
data light lan 添加項 article util tool 元素 seconds 轉載請註明出處:http://blog.csdn.net/ns_code/article/details/17229601 如果線程在等待時接到通知,但線程等待的條件
學習筆記第三十節:zkw費用流
正題 這個zkw大神非常6,把兩種很顯然的網路流演算法結合了起來。 zkw費用流=EK費用流+Dinic最大流 對,你沒有看錯。