1. 程式人生 > >Java Jedis操作Redis示例(一)——pub/sub模式實現訊息佇列

Java Jedis操作Redis示例(一)——pub/sub模式實現訊息佇列

轉載:http://blog.csdn.net/shaobingj126/article/details/50585035

轉載:http://blog.csdn.net/abcd898989/article/details/51697596

一 訊息佇列

1. 定義

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。目前在生產環境,使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

2. 訊息佇列的適用場景

訊息佇列的適用場景包括非同步處理,應用解耦,流量削鋒和訊息通訊四個場景

1. 非同步處理:非同步處理中訊息佇列儲存了當前處理操作,使得動作請求方可以在發出動作請求/寫入訊息佇列後理解返回,非同步獲取結果,關注點在於請求的友好程度。


2. 應用解耦:應用解耦用於消除請求發起方和請求處理方的耦合,提升系統的健壯性。


3. 流量削鋒:流量削峰一般指秒殺或搶購場景,訊息佇列用於控制活動人數,緩解高訪問壓力。

4. 日誌處理:日誌處理是指將訊息佇列用在日誌處理中,比如Kafka的應用,解決大量日誌傳輸的問題。


3. 訊息模型

在JMS標準中,有兩種訊息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

P2P模式:


P2P模式包含三個角色:訊息佇列(Queue),傳送者(Sender),接收者(Receiver)。每個訊息都被髮送到一個特定的佇列,接收者從佇列中獲取訊息。佇列保留著訊息,直到他們被消費或超時。
P2P的特點

  1. 每個訊息只有一個消費者(Consumer)(即一旦被消費,訊息就不再在訊息佇列中)
  2. 傳送者和接收者之間在時間上沒有依賴性,也就是說當傳送者傳送了訊息之後,不管接收者有沒有正在執行,它不會影響到訊息被髮送到佇列
  3. 接收者在成功接收訊息之後需向佇列應答成功
 如果希望傳送的每個訊息都會被成功處理的話,那麼需要P2P模式。、

Pub/sub模式:


包含三個角色主題(Topic),釋出者(Publisher),訂閱者(Subscriber) 。多個釋出者將訊息傳送到Topic,系統將這些訊息傳遞給多個訂閱者。
Pub/Sub的特點

  1. 每個訊息可以有多個消費者
  2. 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的訊息。
  3. 為了消費訊息,訂閱者必須保持執行的狀態。
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被啟用(執行),它也能接收到釋出者的訊息。
如果希望傳送的訊息可以不被做任何處理、或者只被一個訊息者處理、或者可以被多個消費者處理的話,那麼可以採用Pub/Sub模型。

4. 訊息消費

在JMS中,訊息的產生和消費都是非同步的。對於消費來說,JMS的訊息者可以通過兩種方式來消費訊息。
(1)同步:訂閱者或接收者通過receive方法來接收訊息,receive方法在接收到訊息之前(或超時之前)將一直阻塞;
(2)非同步:訂閱者或接收者可以註冊為一個訊息監聽器。當訊息到達之後,系統自動呼叫監聽器的onMessage方法。

二 Redis 釋出-訂閱模式(pub/sub)

Pub/Sub功能(means Publish, Subscribe)即釋出及訂閱功能。基於事件的系統中,Pub/Sub是目前廣泛使用的通訊模型,它採用事件作為基本的通訊機制,提供大規模系統所要求的鬆散耦合的互動模式:

訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;

釋出者(如伺服器)可將訂閱者感興趣的事件隨時通知相關訂閱者。

1.       時間非耦合:釋出者和訂閱者不必同時線上,它們不必同時參與互動。

2.       空間非耦合:釋出者和訂閱者不必相互知道對方所在的位置。釋出者通過事件服務釋出事件,訂閱者通過事件服務間接獲得事件。釋出者和訂閱者不需要擁有直接到對方的引用,也不必知道有多少個訂閱者或者是釋出者參與互動。

3.       同步非耦合:釋出者/訂閱者是非同步模式。釋出者可不斷地生產事件,而訂閱者(通過一個回撥)則可非同步地得到產生事件的通知。

分類:
按照訂閱方式分為基於主題(topic-based)、基於內容(content-based)、基於型別(type-based)的pub/sub方式。

三 Redis pub/sub的實現(非持久)

Redis通過publishsubscribe命令實現訂閱和釋出的功能。訂閱者可以通過subscribe向redis server訂閱自己感興趣的訊息型別。redis將資訊型別稱為通道(channel)。當釋出者通過publish命令向redis server傳送特定型別的資訊時,訂閱該訊息型別的全部訂閱者都會收到此訊息。

1. 匯入Redis依賴(以Maven工程為例子):

        <dependency>  
            <groupId>redis.clients</groupId>  
            <artifactId>jedis</artifactId>  
            <version>2.8.0</version>  
        </dependency>  

2. 增加日誌配置檔案,這裡使用系統輸出代替日至
log4j.rootLogger=info,stdout    
log4j.appender.stdout=org.apache.log4j.ConsoleAppender    
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout    
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n  

3. 建立訊息的釋出者Publisher.java
package com.zenhobby.redis_pub_sub;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import redis.clients.jedis.Jedis;

public class Publisher {
	private Jedis publisherJedis;
	private String channel;
	
	public Publisher(Jedis publishJedis,String channel){
		this.publisherJedis=publishJedis;
		this.channel=channel;
	}
	public void startPublish(){
		try{
			BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
			while(true){
				System.out.println("請輸入message:");
				String line = reader.readLine();
				if(!"quit".equals(line)){
					publisherJedis.publish(channel, line);
				}else{
					break;
				}
			}
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}


4. 實現訊息的接收者Subscriber.java,實現JedisPubSub介面

package com.zenhobby.redis_pub_sub;

import redis.clients.jedis.JedisPubSub;

public class Subscriber extends JedisPubSub {
	@Override
	public void onMessage(String channel, String message) {
		System.out.println("Channel:" + channel + ",Message:" + message);
	}

	@Override
	public void onPMessage(String pattern, String channel, String message) {
		System.out.println("Pattern:" + pattern + ",Channel:" + channel + ",Message:" + message);
	}

	@Override
	public void onSubscribe(String channel, int subscribedChannels) {
		System.out.println("onSubscribe---channel:"+channel+",subscribedChannels:"+subscribedChannels);
	}

	@Override
	public void onPUnsubscribe(String pattern, int subscribedChannels) {
		System.out.println("onPUnsubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
	}

	@Override
	public void onPSubscribe(String pattern, int subscribedChannels) {
		System.out.println("onPSubscribe---pattern:"+pattern+",subscribedChannels:"+subscribedChannels);
	}
}

JedisPubSub是Redis提供的抽象類,繼承這個類就完成了對客戶端對訂閱的監聽。

抽象類中存在六個方法。分別表示

  1. 監聽到訂閱模式接受到訊息時的回撥 (onPMessage)
  2. 監聽到訂閱頻道接受到訊息時的回撥 (onMessage )
  3. 訂閱頻道時的回撥( onSubscribe )
  4. 取消訂閱頻道時的回撥( onUnsubscribe )
  5. 訂閱頻道模式時的回撥 ( onPSubscribe )
  6. 取消訂閱模式時的回撥( onPUnsubscribe )

5. 建立測試Main.java

package com.zenhobby.redis_pub_sub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class TestMain {
	public static final String CHANNEL = "mychannel";
	public static final String HOST = "127.0.0.1";
	public static final int PORT = 6379;

	private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
	private final static JedisPool JEDIS_POOL = new JedisPool(POOL_CONFIG, HOST, PORT, 0);

	public static void main(String[] args) {
		final Jedis subscriberJedis = JEDIS_POOL.getResource();
		final Jedis publisherJedis = JEDIS_POOL.getResource();
		final Subscriber subscriber = new Subscriber();
		new Thread(new Runnable() {
			public void run() {
				try {
					System.out.println("Subscribing to mychannel,this thread will be block");
					subscriberJedis.subscribe(subscriber, CHANNEL);
					System.out.println("subscription ended");
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}).start();
		new Publisher(publisherJedis, CHANNEL).startPublish();
		publisherJedis.close();

		subscriber.unsubscribe();
		subscriberJedis.close();
	}
}

6.測試方法:首先,啟動main方法中所示地址的Redis伺服器;然後,執行main方法,觀察控制檯輸出。並且我們是以控制檯輸入內容作為訊息釋出的內容,各位看官可以在控制檯輸入任意內容,點選回車鍵,觀察控制檯輸出。示例如下(直接把原博的圖借過來啦):


注意:此方法實現的釋出與訂閱功能,訊息不會在Redis客戶端進行快取。

四 Redis的pub/sub實現(持久)

Redis的pub/sub的持久主要通過,在非持久化的基礎上需要作如下處理:

1. 重新實現Publisher

package com.zenhobby.redis.persistence;
import java.util.Set;
import redis.clients.jedis.Jedis;

public class PPubClient {
	private Jedis jedis;
	private String CONSTANT_CLIENTSET = "clientSet";
	public PPubClient(String host,int port){
		jedis = new Jedis(host,port);
	}
	private void put(String message){
		Set<String> subClients = jedis.smembers(CONSTANT);
		for(String clientKey:subClients){
			jedis.rpush(clientKey, message);
		}
	}
	public void pub(String channel,String message){
		Long txid = jedis.incr("MAXID");
		String content = txid+"/"+message;
		this.put(content);
		jedis.publish(channel, message);
	}
	public void close(String channel){
		jedis.publish(channel, "quit");
		jedis.del(channel);
	}
}

在新實現的Publisher中使用Jedis儲存釋出的訊息。

2. 重新實現SubClient

package com.zenhobby.redis.persistence;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PPSubClient {
	private Jedis jedis;
	private JedisPubSub listener;
	private String CONSTANT_CLIENTSET="clientSet";
	public PPSubClient(String host,int port,String clientId){
		jedis = new Jedis(host,port);
		listener = new PPrintListener(clientId,new Jedis(host,port));
		jedis.sadd(CONSTANT_CLIENTSET, clientId);
	}
	public void sub(String channel){
		jedis.subscribe(listener, channel);
	}
	public void unsubscribe(String channel){
		listener.unsubscribe(channel);
	}
}

這個客戶端並沒有繼承JedisPubSub類,轉而在如下的輸出類進行Listener的處理

3. Listener類用於處理訊息

package com.zenhobby.persistence;

import java.util.Date;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class PPrintListener extends JedisPubSub {

	private String clientId;
	private PSubHandler handler;
	private String CONSTANT = "clientSet";
	public PPrintListener(String clientId, Jedis jedis) {
		this.clientId = clientId;
		handler = new PSubHandler(jedis);
	}

	@Override
	public void onMessage(String channel, String message) {
		if (message.equalsIgnoreCase("quit")) {
			this.unsubscribe(channel);
		}
		handler.handle(channel, message);
		System.out.println("message receive:" + message + ",channel:" + channel);
	}

	private void message(String channel, String message) {
		Date time = new Date();
		System.out.println("message receive:" + message + ",channel:" + channel + time.toString());
	}

	@Override
	public void onPMessage(String pattern, String channel, String message) {
		System.out.println("message receive:" + message + ",pattern channel:" + channel);
	}

	@Override
	public void onSubscribe(String channel, int subscribedChannels) {
		handler.subscribe(channel);
		System.out.println("subscribe:" + channel + ",total channels:" + subscribedChannels);
	}

	@Override
	public void onUnsubscribe(String channel, int subscribedChannels) {
		handler.unsubscribe(channel);
		System.out.println("unsubscribe:" + channel + ",total channels:" + subscribedChannels);
	}

	@Override
	public void onPSubscribe(String pattern, int subscribedChannels) {
		System.out.println("subscribe pattern:" + pattern + ",total channels:" + subscribedChannels);
	}

	@Override
	public void unsubscribe(String... channels) {
		super.unsubscribe(channels);
		for (String channel : channels) {
			handler.unsubscribe(channel);
		}
	}

	class PSubHandler {
		private Jedis jedis;

		PSubHandler(Jedis jedis) {
			this.jedis = jedis;
		}

		public void handle(String channel, String message) {
			int index = message.indexOf("/");
			if (index < 0) {
				return;
			}
			Long txid = Long.valueOf(message.substring(0, index));
			String key = clientId + "/" + channel;
			while (true) {
				String lm = jedis.lindex(key, 0);
				if (lm == null) {
					break;
				}
				int li = lm.indexOf("/");
				if(li<0){
					String result = jedis.lpop(key);
					if(result == null){
						break;
					}
					message(channel, lm);
					continue;
				}
				Long lxid = Long.valueOf(lm.substring(0, li));
				if(txid>=lxid){
					jedis.lpop(key);
					message(channel,lm);
					continue;
				}else{
					break;
				}
			}
		}
		public void subscribe(String channel){
			String key = clientId+"/"+channel;
			boolean exist = jedis.sismember(CONSTANT, key);
			if(!exist){
				jedis.sadd(CONSTANT, key);
			}
		}
		public void unsubscribe(String channel){
			String key = clientId+"/"+channel;
			jedis.srem(CONSTANT, key);
			jedis.del(key);
		}
	}
}

其中jedis.sismember(CONSTANT, Key)用於判斷當前使用者是否存在,如果不存在則新增(和Redis快取的思路相同)。

4. 建立測試Main方法,具體內容如下:

package com.zenhobby.redis.persistence;

public class PPubSubTestMain {
	public static void main(String[] args) throws Exception {
		String host = "127.0.0.1";
		int port = 6379;
		String clientId = "myclient";
		PPubClient pubClient = new PPubClient(host, port);
		final String channel = "mychannel";
		final PPSubClient subClient = new PPSubClient(host, port, clientId);
		Thread subThread = new Thread(new Runnable() {
			public void run() {
				System.out.println("------------sub----start------------");
				subClient.sub(channel);
				System.out.println("------------sub----end------------");
			}
		});
		subThread.setDaemon(true);
		subThread.start();
		int i = 0;
		while (i < 20) {
			String message = "message--" + i;
			pubClient.pub(channel, message);
			i++;
			Thread.sleep(100);
		}
		subClient.unsubscribe(channel);
	}
}
5.測試方法:首先,啟動main方法中所示地址的Redis伺服器;然後,執行main方法,觀察控制檯輸出。這次我們是以迴圈呼叫作為輸入內容作為訊息釋出的內容,各位看官觀察控制檯輸出。示例如下:


然後,開啟Redis客戶端,觀察當前Redis中保留的所有資料:


題外的話:
Redis目前提供的釋出與訂閱功能,將會完全阻塞訂閱者的客戶端,在java實現時,即需要保留一個執行緒來專門處理髮布者與訂閱者的連線。因此,在實際應用時,更加推薦的做法是使用MQ元件來實現該功能

至此,NoSQL之Redis---PUB/SUB(訂閱與釋出)---JAVA實現 結束

在此,對以下參考資料的作者表示感謝!:

參考資料:

redis官網:

其他博文:

http://my.oschina.net/itblog/blog/601284?fromerr=FiejlElw

http://www.sxrczx.com/pages/shift-alt-ctrl.iteye.com/blog/1867454.html


相關推薦

Java Jedis操作Redis示例——pub/sub模式實現訊息佇列

轉載:http://blog.csdn.net/shaobingj126/article/details/50585035 轉載:http://blog.csdn.net/abcd898989/article/details/51697596 一 訊息佇列 1. 定義 訊息

Java Jedis操作Redis示例——Redis的事務、管道和指令碼

一 Redis的事務 在資料庫系統中,一個事務是指:由一系列資料庫操作組成的一個完整的邏輯過程。例如銀行轉帳,從原賬戶扣除金額,以及向目標賬戶新增金額,這兩個資料庫操作的總和,構成一個完整的邏輯過程,不可拆分。這個過程被稱為一個事務,具有ACID特性。 0. A

JAVA並行框架Fork/Join:簡介和代碼示例

over 框架設計 put 分割 gif 得到 java owa trace 一、背景 雖然目前處理器核心數已經發展到很大數目,但是按任務並發處理並不能完全充分的利用處理器資源,因為一般的應用程序沒有那麽多的並發處理任務。基於這種現狀,考慮把一個任務拆分成多個單元,每個單元

Jedis操作單節點redis,叢集及redisTemplate操作redis叢集

package com.dream21th.dream21thredis.controller;import java.util.List;import java.util.Map;import org.springframework.beans.factory.annotation.Autowired;im

Jedis操作單節點redis,叢集及redisTemplate操作redis叢集

package com.dream21th.dream21thredis.redis;import java.util.List;import java.util.Map;import java.util.Set;import com.dream21th.dream21thredis.key.KeyPrefi

《大話設計模式Java程式碼示例之簡單工廠方法

簡單工廠模式(Simple Factory):也叫靜態工廠模式,就是建立一個工廠類,對實現了同一介面的一些類進行例項的建立。 package simplefactory; /** * 簡單工廠方法(Simple Factory) * 簡單運算工廠類 */ p

Java中常用到的文件操作那些事——替換doc文檔模板,生成真實合同案例

代碼 sta ring site hashmap i++ illegal puts except   工作中,我們時常會遇到一些操作文件的操作,比如在線生成合同模板,上傳/下載/解析Excel,doc文檔轉為pdf等操作。本文就已工作中遇到的在線生成合同為例,簡要地介紹一種

Java中的反射機制

erl void port 令行 sage [0 ray 輸出 我們 基本概念   在Java運行時環境中,對於任意一個類,能否知道這個類有哪些屬性和方法?對於任意一個對象,能否調用它的任意一個方法?   答案是肯定的。   這種動態獲取類的信息以及動態調用對象的方法的功能

JAVA中的枚舉

enum 枚舉 在實際編程中,往往存在著這樣的“數據集”,它們的數值在程序中是穩定的,而且“數據集”中的元素是有限的。例如星期一到星期日七個數據元素組成了一周的“數據集”,春夏秋冬四個數據元素組成了四季的“數據集”。在Java中想表示這種數據集最容易想到的寫法可能是這樣,我們以表示一周五天的工作日來舉

vue-router單頁應用簡單示例

問題 clas 做了 設置 new scope 文件的 log target 請先完成了項目初始化,具體請看我另一篇博文。vue項目初始化 看一下完成的效果圖,很典型的單頁應用。 .vue後綴名的單文件組件 這裏先說一下我對組件的理解。組件,顧名思義就是一組元素組成的

Redis研究—簡介

創始人 存儲結構 隊列 cached tar 寫入 關系 退出 使用 http://blog.csdn.net/wtyvhreal/article/details/41855327 Redis是一個開源的高性能鍵值對數據庫。它通過提供多種鍵值數據類型來適應不同場景下的

Java並發編程

implement 返回 tile 對象 not seconds dex note 系統調用 1、定義

Java導出txt模板——

qps ogl iar i++ vnr snv bho vra dsr 導出txt文件時候\r\n才能換行 java代碼 package DRDCWordTemplates; import java.io.BufferedWriter; import java.io.F

.NET中使用Redis之ServiceStack.Redis學習安裝與簡單的運行

arraylist write client cli ring blog 控制臺 創建 spa 1.下載ServiceStack.Redis PM> Install-Package ServiceStack.Redis 2.vs中創建一個控制臺程序 class Pro

JAVA秒會技術之秒殺面試官】秒殺Java面試官——集合篇

tails category tail java cat 秒殺 試題 面試官 java面試 【JAVA秒會技術之秒殺面試官】秒殺Java面試官——集合篇(一) 【JAVA秒會技術之秒殺面試官】JavaEE常見面試題(三) http://blog.csdn.net/qq296

Java核心技術 卷二

rgs code 一個 dir 字節 per workspace spa 核心技術 書:《Java核心技術 卷二 高級特性 9》 時間:2017.9.4 17:13 1.流   輸入流:可以從其中讀取一個字節序列的對象;抽象類(InputStream)   輸出流:可以向其

Java中的線程

java線程一、線程與進程 談到線程,那就不得不提進程,很久之前其實並沒有線程,只有進程,當一個程序需要運行的時候,必然需要使用系統資源和CPU,因此進程就擔任了對一個應用程序進行資源分配以及CPU調度這兩項職責。後來,為了進一步提高並發執行和資源利用的效率,提出了線程的概念,將進程作了細分,進程將負責資源

Java集合幹貨系列-ArrayList源碼解析

div imp ins bject 增加 toa tof capacity == 前言 今天來介紹下ArrayList,在集合框架整體框架一章中,我們介紹了List接口,ArrayList繼承了AbstractList,實現了List。ArrayList在工作中經常用到,所

Java語言中的----繼承

java語言中的----繼承(一)day10 Java語言中的繼承(一)一、繼承概述: 繼承:什麽是繼承,程序中的繼承與生活中的繼承還是有區別的,在程序中繼承以後,你的父類和你的子類同樣的也具有某一成員變量。那麽我們為什麽藥學習繼承?是因為我們在編程的時候我們會有大量的代碼需要重寫,從而導致我們代碼比較

Python操作rabbitmq系列

targe 紅色 入門 web 之間 cap ssa 隊列 技術 從本文開始,接下來的內容,我們將討論rabbitmq的相關功能。我的這些文章,最終是要實現一個項目(具體是什麽暫不透露)。前面每一篇,都是在為這個系統做準備。rabbitmq,是我們這個項目的關鍵部分之一。所