1. 程式人生 > >Java for Web學習筆記(九十):訊息和叢集(5)利用websocket實現訂閱和釋出(上)

Java for Web學習筆記(九十):訊息和叢集(5)利用websocket實現訂閱和釋出(上)

叢集中的訂閱和釋出

利用spring framework在本app內的訂閱和釋出十分簡單。當我們系統越來越複雜的時候,我們需要向其他app釋出訊息。本學習將給出一個通過websocket來實現不同app之間訊息的訂購和釋出。

在小例子中,我們在所有節點之間都建立webSocket連線來實現訊息的釋出和訂閱。這種方式,節點既是publisher,又是subcriber,還是broker。我們利用spring app內可監聽不同訊息,而無區分地將所有訊息直接廣播出去。具體步驟如下:

  1. 啟動後,開啟一個組播socket(224.0.0.4:6780),監聽該組播,同時廣播其websocket的地址
    • 使用@Service ClusterManager來實現
    • 需要在app啟動完畢,可以正常工作時,才廣播自己的websocket地址,例子講通過一個ping的url來測試是否得到正確的迴應來判斷是否已經正常工作
  2. 叢集中的其他app,監聽到該廣播後,獲得新app的websocket地址,與之建立websocket連線
    • 由於是在組播地址中進行廣播,所有自己也會收到,需要過濾掉自己傳送的自己websocket地址
    • 維護與自己相連的websocket連線,無論自己作為server還是client,通過@Bean ClusterEventMulticaster來實現
    • websocket不屬於spring framework,因此需要將其納入spring框架,才能支援spring的自動注入
  3. 釋出訊息,如果需要釋出到叢集,則向所有建立的websocket連線釋出
    • 有些訊息需要釋出到叢集,有些訊息可能只需要在本app內釋出,用ClusterEvent來表示需要釋出到叢集的訊息。
    • 釋出訊息由@Bean ClusterEventMulticaster來實現,向所有連線釋出
    • 接收訊息由websocket endpoint來實現
  4. app關閉時,關閉相關連線
    • 關閉組播socket
    • 關閉websocket的連線。

很顯然,這是個N*N的websocket連線。在小規模的情況下,可能滿足我們的要求。我們也可以有專門的broker,而websocket是節點與該broker之間的連線,這種模式即是WebSocket Application Messaging Protocol,不過小例子不採用專門broker的方式。

ClusterEvent

小例子將Event物件直接在websocket中進行傳遞,採用java序列化的方式。這種方式簡單,但也有限制,也就是所對方也必須在java程式。我們還可以選擇JSON或者XML的格式進行傳遞。我們在之前專門學習了序列化的基本知識,現在可以直接使用。

public class ClusterEvent extends ApplicationEvent implements Serializable{    
    private static final long serialVersionUID = 1L;    
    private final Serializable serializableSource;
    //在上一學習的基礎上增加rebroadcasted,用於標識這是一個從外部接收的事件,不需要向叢集廣播。
    private boolean rebroadcasted;

    public ClusterEvent(Serializable source) {
        super(source);
        this.serializableSource = source;
    }

    public final boolean isRebroadcasted() {
        return rebroadcasted;
    }

    public final void setRebroadcasted() {
        this.rebroadcasted = true;
    }

    private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException{
        in.defaultReadObject();
        this.source = this.serializableSource;
    }
}

我們設定相關的Event和listener

public abstract class AuthenticationEvent extends ClusterEvent{
    private static final long serialVersionUID = 1L;

    public AuthenticationEvent(Serializable source) {
        super(source);
    }    
}

public class LoginEvent extends AuthenticationEvent{
    private static final long serialVersionUID = 1L;

    public LoginEvent(String username) {
        super(username);
    }
}

@Service
public class AuthenticationInterestedParty implements ApplicationListener<AuthenticationEvent>{
    private static final Logger log = LogManager.getLogger();    
    @Inject ServletContext servletContext;

    @Override
    public void onApplicationEvent(AuthenticationEvent event) {
        log.info("Authentication event from context {} received in context {}.",
                event.getSource(), this.servletContext.getContextPath());
    }
}

@Component
public class LoginInterestedParty implements ApplicationListener<LoginEvent>{
    private static final Logger log = LogManager.getLogger();    
    @Inject ServletContext servletContext;

    @Override
    public void onApplicationEvent(LoginEvent event) {
        log.info("Login event for context {} received in context {}.",
                event.getSource(), this.servletContext.getContextPath());
    }
}

ClusterManager:通過組播發布自己的位置

這作為一個Service納入到spring framework中,我們的自定義ApplicationEventMulticaster為ClusterEventMulticaster,具體的websocket連線在ClusterMessagingEndpoint中實現。Spring在上下文初始化結束後,釋出ContextRefreshedEvent事件,我們可以監聽這個事件,就如同我們監聽前面設定的LoginEvent那樣。

@Service public class ClusterManager implements ApplicationListener<ContextRefreshedEvent>{...}

上下文,包括root上下文,web上下文和Rest上下文,按bootstrap的順序,先是root上下文完成初始化,但此時app尚未能正常啟動。我們可以具體檢查事件,是否是最後一個上下文啟動完畢。小例子中,我們採用另外一個方式,Controller中提供了一個ping介面,如果app正常工作,這個ping介面就可以正常回復200 OK。

用於檢測是否正常工作的ping介面

@Controller
public class HomeController {
    @RequestMapping("/ping")
    public ResponseEntity<String> ping()  {
        HttpHeaders headers = new HttpHeaders();
        headers.add("Content-Type", "text/plain;charset=UTF-8");
        return new ResponseEntity<>("ok", headers, HttpStatus.OK);
    }
}

ClusterManager的程式碼

@Service
public class ClusterManager implements ApplicationListener<ContextRefreshedEvent>{
    private static final Logger log = LogManager.getLogger();

    //組播是UDP,本例子中組播地址為224.0.0.4,埠為6780
    private static final InetAddress MULTICAST_GROUP;
    private static final int MULTICAST_PORT = 6780;
    static{
        try {
            MULTICAST_GROUP = InetAddress.getByName("224.0.0.4");
        } catch (UnknownHostException e) {
            throw new FatalBeanException("Could not initialize IP addresses.", e);
        }
    }
    
    private boolean  initialized,destroyed = false;
    private MulticastSocket socket;
    private String pingUrl, messagingUrl;
    private Thread listenThread;
 
    @Inject ServletContext servletContext; //獲取servlet Context,即可以獲得在web.xml中的配置
    //multicaster是我們自定義的ApplicationEventMulticaster,我們將在那裡維護和叢集其他app的websocket連線。
    @Inject ClusterEventMulticaster multicaster;

    //【1】初始化:建立組播socket,並啟動監聽
    @PostConstruct
    public void listenForMulticastAnnouncements() throws NumberFormatException, IOException{
        //1.1】初始化設定pingUrl和websocket的Url。這裡的host沒有自動獲取,而是通過配置,主要是多網絡卡的情況下,例如開發機上同時安裝了虛機,可能會指定到其他地址。在稍後的組播設定中,需要指定network interface。所以方便起見,小例子採用了配置的方式。web的埠port也一樣採用配置方式。
        String host = servletContext.getInitParameter("host");
        if(host == null)
            host = InetAddress.getLocalHost().getHostAddress();        
        String port = servletContext.getInitParameter("port");
        if(port == null)
            port = "8080";

        this.pingUrl = "http://" + host + ":" + port + this.servletContext.getContextPath() + "/ping";
        this.messagingUrl = "ws://" + host + ":" + port + this.servletContext.getContextPath() + "/services/Messaging/a83teo83hou9883hha9";

         //1.2】這裡組播socket的建立,並在執行緒中開啟監聽
        this.socket =  new MulticastSocket(MULTICAST_PORT);
        this.socket.setInterface(InetAddress.getByName(host));//需要放在joinGroup()前,用於多網絡卡時確定使用哪個網絡卡,如單網絡卡,無需設定
        this.socket.joinGroup(MULTICAST_GROUP);
        this.listenThread = new Thread(this::listen, "cluster-listener"); //設定監聽的執行緒
        this.listenThread.start();
    }

    //【2】在app正常執行後,通過組播socket,將自己的websocket的URL廣播出去
    @Async //確保一定執行線上程中。
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        //2.1】initialized用於確保只執行一次,否則root context初始化完成執行一次,web context初始化完成又執行一次
        if(initialized)
            return;
        initialized = true;

        //【2.2】不斷嘗試訪問自己的/ping介面,不成功,則休眠500ms,再次嘗試,總的嘗試次數限制為120次,即1分鐘內,都嘗試失敗,就放棄
       try {
            URL url = new URL(this.pingUrl);
            log.info("Attempting to connect to self at {}.", url);

            int tries = 0;
            while(true){
                tries ++;
                //(2.2.1)方位自己的/ping,看看是否正常回復。這裡學習一下URLConnection的使用
                URLConnection connection = url.openConnection();
                connection.setConnectTimeout(100);
                try(InputStream stream = connection.getInputStream()){
                    String response = StreamUtils.copyToString(stream,StandardCharsets.UTF_8);
                    if(response != null && response.equals("ok")){  //檢查是否已經正常工作
                        //(2.2.2)app正常工作,此處將放置通過組播socket,將自己的websocket的url(messageUrl)廣播出去的程式碼
                        DatagramPacket packet = new DatagramPacket(this.messagingUrl.getBytes(),this.messagingUrl.length(), 
                                                                   MULTICAST_GROUP, MULTICAST_PORT);                         
                        this.socket.send(packet);
                        return;
                    }else{
                        log.warn("Incorrect response: {}", response);
                    }
                }catch(Exception e){
                    if(tries > 120) {
                        log.fatal("Could not connect to self within 60 seconds.",e);
                        return;
                    }
                    Thread.sleep(500L);
                }
            }            
        } catch (Exception e) {
            log.fatal("Could not connect to self.", e);
        }
    }

    //【3】組播socket監聽,如果聽到由websocket的URL,則連線該URL,建立起websocket的連線。由於是組播,因此也會收到自己廣播初期的自己的websocket的URL,需要將此過濾掉
    private void listen(){
        byte[] buffer = new byte[2048];
        DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
        while(true){
            try {
                this.socket.receive(packet);
                String url = new String(buffer, 0, packet.getLength()); //獲取內容
                if(url.length() == 0)
                    log.warn("Received blank multicast packet.");
                else if(url.equals(this.messagingUrl)) //過濾掉自己的webSocket地址
                    log.info("Ignoring our own multicast packet from {}",packet.getAddress().getHostAddress());
                else
                    //3.1】在自定義的ApplicationEventMulticaster(維護各websocket連線)中,根據url建立一個websocket連結
                    this.multicaster.registerNode(url);
            } catch (IOException e) {
                if(this.destroyed)
                     return;
                log.error(e);
            }
        }
    }

    //【4】app關閉前,應關閉組播socket 
    @PreDestroy
    public void shutDownMulticastConnection() throws IOException {
        this.destroyed = true;
        try{
            this.listenThread.interrupt();
            this.socket.leaveGroup(MULTICAST_GROUP);
        }finally{
            this.socket.close();
        }
    }
}

相關推薦

Java for Web學習筆記Session3Session Listener

Session Listener 可以通過Listner來監聽session的變化,這就是所謂的publish and subscribe模型。這是一種訊息資訊釋出一方叫釋出者,資訊的接收方叫訂閱者,實際也是事件驅動的高大上說法,訂閱某個事件,然後觸發處理。這種方式最大的作用是將進行session變化以及s

Java for Web學習筆記訊息叢集5利用websocket實現訂閱釋出

叢集中的訂閱和釋出 利用spring framework在本app內的訂閱和釋出十分簡單。當我們系統越來越複雜的時候,我們需要向其他app釋出訊息。本學習將給出一個通過websocket來實現不同app之間訊息的訂購和釋出。 在小例子中,我們在所有節點之間都建立webSoc

Java for Web學習筆記JSP1何為JSP

使用JSP 通java程式碼來編寫HTML很是繁瑣,通過使用JSTL(JSP標準標籤庫,JavaServer Pages™ Standard Tag Library),通過JSP來更便攜的實現。對JSP的支援Maven中包括javax.servlet.jsp-api、javax.servlet.jsp.js

Java for Web學習筆記Servlet7傳檔案

上傳檔案 Servlet的引數設定 採用annotation方式如下: @WebServlet( name = "TicketServlet", urlPatterns = {"/tickets"}, loadOnStartup = 1 ) /* MultipartConfig配置了本Servlet

Java for Web學習筆記Log2配置log4j2

相關maven <!-- log4j-api提供logging的API。這是log4j2中唯一需要在compile時加入,含有你所需的所有類 --> <dependency> <groupId>org.apache.loggin

Java for Web學習筆記一四一Spring security準備2授權

訪問的範圍和許可權屬於授權。 Principals和Identities 就Java而言,很方便利用java.security.Principal。Principal至少會包含已被認證的使用者identity,例如使用者名稱,還可能有其他資訊。此外還可以保護使用者的授權資

Java for Web學習筆記二五JSTL1使用JSTL

在前面已經使用過JSTL,例如<c:url>,fn是JSTL的functionlibrary,而c是JSTL的tag library。使用它們,我們要告知解析器,如下: <%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core

Java for Web學習筆記Servlet6doGet()doPost()是執行緒還是佇列

做一個小實驗,程式碼如下: protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { try{ for(i

Java for Web學習筆記一四二Spring security準備3初窺

瞭解Spring Security的基本知識 完全J2EE的web container也能提供完整的安全框架,但tomcat不是。Spring Security可以使用JDBC,或者我們的服務或倉庫來認證使用者,也提供了內建的對微軟Active Derectory,Jasi

Java for Web學習筆記一三二對映8@ElementCollection

說明 在前面我們學習了OneToOne、OneToMany、ManyToOne,將資料庫中不同表格的關聯轉換為spring中不同entity的關聯。但是在不少場景中,我們希望在一個entity的檢視中同檢視到這幾個表的資訊,而無需通過entity之間的關聯。 在這個小例子

Java for Web學習筆記三五自定義tag3TLDSTag Handler

JSTL的TLD   這是JSTL採用的方式。TLD(Tag Library Descriptor)描述tag和function,以及具體執行的java程式碼tag handler。Tag Handler是javax.servlet.jsp.tagext.Tag或javax.servlet.jsp.tage

Java for Web學習筆記一二三搜尋5MySQL全文索引

小例子我們在表格Ticket和TicketComment中加入了fulltext key。小例子在Ticket的Subject或Body,以及在TicketComment的Body檢索內容,按分頁方式顯示出來,同時顯示關聯分數,並按關聯分數降序排列。 -- Ticket中隊S

Java for Web學習筆記Java EE的介紹

  最近有個專案,當中涉及到Java Servlet,專案已經進行了一段日子,最近因為有重大需求調整,需要對架構重新進行梳理,一看,基本上哭笑不得,有不少地方真是黑色幽默,怎麼會這樣呢。   當在這過程中,也引發了我興趣,畢竟之前沒有系統地瞭解過。在網上找到了一本書professional Java for

Java for Web學習筆記三九Filter1用途、定義順序

Filter的用途 Filter可以用於以下方面: 記錄request和response的log進行認證和授權進行壓縮和加壓,非HTTPS的加密和解密錯誤處理。對於tomcat,出現錯誤通常會給出一個500的頁面,還有錯誤診斷資訊,對於一個公眾服務,這些診斷資訊可能會向黑客

Java for Web學習筆記一一一再談Entity對映4動態表格建立

如果這個不確定表格也需要我們的war來建立,如何實現。create table的原生SQL,entityManager是無法執行的,因為這不是可以回滾的事務。這種情況,我們需要: 捕獲表格不存在的異常 從原始的Connection中實現表格建立。獲取Connection能否從

Java for Web學習筆記Servlet5初始化引數

Context Init引數 在web.xml中可以基於整個上下文進行設定,在web app內各個Servlet都可以獲取。 <context-param> <param-name>settingOne</param-name> <param-

Java for Web學習筆記一三八篇外之資料庫的ACIDJPA2JPA

在測試中,我們發現在一個JPA事務中: Spring Data的寫SQL是在最後commit前發出,這最大程度地縮短了寫操作和commit之間的時間。 對相同的ID的讀,JPA只從資料庫中讀取一次。 從資料庫中獲取entity,修改entity的資料,即使最後沒有執行s

Java for Web學習筆記Servlet2HelloServlet

繼承關係: javax.servlet.GenericServlet –》javax.servlet.http.HttpServlet。 405返回 如果我們不重寫Servlet的doGet而採用HTTP GET的方式,將返回405 將返回405 Method Not Allowed。 如果我們重寫do

Java並發編程】之線程間通信中notifyAll造成的早期通知問題含代碼

data light lan 添加項 article util tool 元素 seconds 轉載請註明出處:http://blog.csdn.net/ns_code/article/details/17229601 如果線程在等待時接到通知,但線程等待的條件

學習筆記第三zkw費用流

正題       這個zkw大神非常6,把兩種很顯然的網路流演算法結合了起來。        zkw費用流=EK費用流+Dinic最大流        對,你沒有看錯。