1. 程式人生 > >6、訊息釋出和訂閱功能

6、訊息釋出和訂閱功能

開發十年,就只剩下這套架構體系了! >>>   

一:介紹

redis提供了簡單的釋出訂閱功能,producer往某個channel推送,client訂閱指定的channel(可以模糊匹配),這樣就能夠消費。

redis和rabbitmq的區別

  • 可靠性

    • redis :沒有相應的機制保證訊息的可靠消費,如果釋出者釋出一條訊息,而沒有對應的訂閱者的話,這條訊息將丟失,不會存在記憶體中;

    • rabbitmq:具有訊息消費確認機制,如果釋出一條訊息,還沒有消費者消費該佇列,那麼這條訊息將一直存放在佇列中,直到有消費者消費了該條訊息,以此可以保證訊息的可靠消費,那麼rabbitmq的訊息是如何儲存的呢?(後續更新);

  • 實時性

    • redis:實時性高,redis作為高效的快取伺服器,所有資料都存在在伺服器中,所以它具有更高的實時性
  • 消費者負載均衡:

    • rabbitmq佇列可以被多個消費者同時監控消費,但是每一條訊息只能被消費一次,由於rabbitmq的消費確認機制,因此它能夠根據消費者的消費能力而調整它的負載;

    • redis釋出訂閱模式,一個佇列可以被多個消費者同時訂閱,當有訊息到達時,會將該訊息依次傳送給每個訂閱者;

  • 永續性

    • redis:redis的持久化是針對於整個redis快取的內容,它有RDB和AOF兩種持久化方式(redis持久化方式,後續更新),可以將整個redis例項持久化到磁碟,以此來做資料備份,防止異常情況下導致資料丟失。

    • rabbitmq:佇列,訊息都可以選擇性持久化,持久化粒度更小,更靈活;

  • 佇列監控

    • rabbitmq實現了後臺監控平臺,可以在該平臺上看到所有建立的佇列的詳細情況,良好的後臺管理平臺可以方面我們更好的使用;

    • redis沒有所謂的監控平臺。

  • 總結

    • redis:       輕量級,低延遲,高併發,低可靠性;
    • rabbitmq:重量級,高可靠,非同步,不保證實時;
    • rabbitmq是一個專門的AMQP協議佇列,他的優勢就在於提供可靠的佇列服務,並且可做到非同步,而redis主要是用於快取的,redis的釋出訂閱模組,可用於實現及時性,且可靠性低的功能。

二:釋出/訂閱程式碼例子

  • 1、pom依賴

      <dependency>
          <groupId>redis.clients</groupId>
          <artifactId>jedis</artifactId>
          <version>2.9.0</version>
      </dependency>
    
  • 2、publish ==> 只管往chennel傳送資料

      public class Publisher {
          public static void main(String[] args) {
              JedisPool pool = new JedisPool("192.168.50.10", 6279);
    
              while (true) {
                  Jedis jedis = null;
                  try {
                      jedis = pool.getResource();
                      // 就這一句關鍵推送
                      jedis.publish("channel1", "我是channel1 的msg");
                      jedis.publish("channel2", "我是channel2 的msg");
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      if (jedis != null) {
                          jedis.close();
                      }
                  }
              }
          }
      }
    
  • 3、Subscriber ==> 定義一個訂閱者處理類

      public class Subscriber extends JedisPubSub {
          public Subscriber() {
          }
    
          [@Override](https://my.oschina.net/u/1162528)
          public void onMessage(String channel, String message) {     //收到訊息會呼叫
              System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
          }
    
          [@Override](https://my.oschina.net/u/1162528)
          public void onSubscribe(String channel, int subscribedChannels) { //訂閱了頻道會呼叫
              System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                      channel, subscribedChannels));
          }
    
          [@Override](https://my.oschina.net/u/1162528)
          public void onUnsubscribe(String channel, int subscribedChannels) {//取消訂閱 會呼叫
              System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                      channel, subscribedChannels));
          }
      }
    
  • 4、jedis訂閱某個主題,並且呼叫Subscriber來處理

      public class SubMain {
          public static void main(String[] args) {
              JedisPool jedisPool = new JedisPool("192.168.50.10", 6279);
    
              Jedis jedis = null;
              try {
                  jedis = jedisPool.getResource();
                  jedis.subscribe(new Subscriber(), "channel1", "channel2");
                  jedis.psubscribe(new Subscriber(), "*cha*");
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  if (jedis != null) {
                      jedis.close();
                  }
              }
          }
      }