1. 程式人生 > >使用jedis實現Redis訊息佇列(MQ)的釋出(publish)和訊息監聽(subscribe)

使用jedis實現Redis訊息佇列(MQ)的釋出(publish)和訊息監聽(subscribe)

前言:

本文基於jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar

其中jedis連線池需要依賴commons-pool2包,json包用於物件例項和json字串的相互轉換

1、jedis的訊息佇列方法簡述

1.1、釋出訊息方法

(其中,channel是對應訊息通道,message是對應訊息體)

jedis.publish(channel, message);

1.2、監聽訊息方法

(其中,jedisPubSub用於處理監聽到的訊息,channels是對應的通道

jedis.subscribe(jedisPubSub, channels);

2、釋出訊息

/**
	 * 從jedis連線池獲取jedis操作例項
	 * @return
	 */
	public static Jedis getJedis() {
		return RedisPoolManager.getJedis();
	}

	/**
	 * 推入訊息到redis訊息通道
	 * 
	 * @param String
	 *            channel
	 * @param String
	 *            message
	 */
	public static void publish(String channel, String message) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.publish(channel, message);
		} finally {
			jedis.close();
		}
	}

	/**
	 * 推入訊息到redis訊息通道
	 * 
	 * @param byte[]
	 *            channel
	 * @param byte[]
	 *            message
	 */
	public void publish(byte[] channel, byte[] message) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.publish(channel, message);
		} finally {
			jedis.close();
		}

	}

3、監聽訊息

3.1、監聽訊息主體方法

/**
	 * 監聽訊息通道
	 * @param jedisPubSub - 監聽任務
	 * @param channels - 要監聽的訊息通道
	 */
	public static void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.subscribe(jedisPubSub, channels);
		} finally {
			jedis.close();
		}
	}

	/**
	 * 監聽訊息通道
	 * @param jedisPubSub - 監聽任務
	 * @param channels - 要監聽的訊息通道
	 */
	public static void subscribe(JedisPubSub jedisPubSub, String... channels) {
		Jedis jedis = null;
		try {
			jedis = getJedis();
			jedis.subscribe(jedisPubSub, channels);
		} finally {
			jedis.close();
		}
	}

3.2、處理監聽到的訊息任務

class Tasker implements Runnable {
	private String[] channel = null;//監聽的訊息通道
	private JedisPubSub jedisPubSub = null;//訊息處理任務

	public Tasker(JedisPubSub jedisPubSub, String ...channel) {
		this.jedisPubSub = jedisPubSub;
		this.channel = channel;
	}

	@Override
	public void run() {
		// 監聽channel通道的訊息
		RedisMQ.subscribe(jedisPubSub, channel);
	}

}

3.3、處理監聽到的訊息主體類實現

package cn.eguid.livePushServer.redisManager;

import java.util.Map;

import org.json.JSONObject;

import cc.eguid.livepush.PushManager;
import redis.clients.jedis.JedisPubSub;

public class RedisMQHandler extends JedisPubSub{
	PushManager pushManager = null;

	public RedisMQHandler(PushManager pushManager) {
		super();
		this.pushManager = pushManager;
	}

	@Override
	// 接收到訊息後進行分發執行
	public void onMessage(String channel, String message) {
		JSONObject jsonObj = new JSONObject(message);
		System.out.println(channel+","+message);
		if ("push".equals(channel)) {
			Map<String,Object> map=jsonObj.toMap();
			System.out.println("接收到一條推流訊息,準備推流:"+map);
//			String appName=pushManager.push(map);
			//推流完成後還需要釋出一個成功訊息到返回佇列
			
		} else if ("close".equals(channel)) {
			String appName=jsonObj.getString("appName");
			System.out.println("接收到一條關閉訊息,準備關閉應用:"+appName);
//			pushManager.closePush(appName);
		}
	}
}

4、測試訊息佇列釋出和監聽

public static void main(String[] args) throws InterruptedException {
		
		PushManager pushManager= new PushManagerImpl();
		Thread t1 = new Thread(new Tasker(new RedisMQHandler (pushManager), "push"));
		Thread t2 = new Thread(new Tasker(new RedisMQHandler (pushManager), "close"));
		t1.start();
		t2.start();
		
		LivePushEntity livePushInfo=new LivePushEntity();
		livePushInfo.setAppName("test1");
		JSONObject json=new JSONObject(livePushInfo);
		publish("push",json.toString());
		publish("close", json.toString());
		Thread.sleep(2000);
		publish("push", json.toString());
		publish("close",json.toString());
		Thread.sleep(2000);
		publish("push", json.toString());
		publish("close",json.toString());

	}

相關推薦

使用jedis實現Redis訊息佇列(MQ)的釋出(publish)訊息(subscribe)

前言: 本文基於jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar 其中jedis連線池需要依賴commons-pool2包,json

訊息佇列MQ, rabbitMQrocketMQ的實現方式

MQ全稱為Message Queue, 訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用

linux 下C++實現 ARP釋出ARP

改造自http://blog.csdn.net/xiaodao1986/article/details/6628250 g++ -o即可編譯通過。 ubuntu 14.04 可以用適當的方法,在寢室裡,讓室友不能上網。 #include <stdio.h>

訊息佇列mq的原理及實現方法

訊息佇列技術是分散式應用間交換資訊的一種技術。訊息佇列可駐留在記憶體或磁碟上,佇列儲存訊息直到它們被應用程式讀走。通過訊息佇列,應用程式可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程式接收此訊息。 訊息中介軟體概述 訊息佇列技術是分散式應用間交

訊息佇列MQ實踐----實現Queue(佇列訊息)Topic(主題訊息)兩種模式

之前有篇檔案介紹了生產消費者模式(http://blog.csdn.net/canot/article/details/51541920 ),當時是通過BlockingQueue阻塞佇列來實現,以及在Redis中使用pub/sub模式(http://blog.csdn.ne

訊息佇列MQ分析

做java開發的面試基本會遇到java基礎知識,設計模式,多執行緒,io,集合等,spring/springMvc/mybatis/springBoot,mysql/oracle/sql優化這些。現在僅僅會這些是不夠的,還會要求快取、訊息佇列、訊息中介軟體。springCloud/Dubbo

訊息佇列MQ選型 - Kafka、RabbitMQ對比

image.png 適應場景 非同步處理,應用解耦,流量削鋒和訊息通訊 對比 feature scenario Kafka RabbitMQ 備註 PUB-SUB 釋出訂閱模型

訊息佇列MQ技術的介紹原理

訊息佇列技術是分散式應用間交換資訊的一種技術。訊息佇列可駐留在記憶體或磁碟上,佇列儲存訊息直到它們被應用程式讀走。通過訊息佇列,應用程式可獨立地執行--它們不需要知道彼此的位置、或在繼續執行前不需要等待接收程式接收此訊息。   訊息中介軟體概述    訊息佇列技術是分散式

轉載:訊息佇列MQ

本文大概圍繞如下幾點進行闡述: 為什麼使用訊息佇列? 使用訊息佇列有什麼缺點? 訊息佇列如何選型? 如何保證訊息佇列是高可用的? 如何保證訊息不被重複消費? 如何保證消費的可靠性傳輸? 如何保證訊息的順序性? 1、為什麼要使用訊息佇列? 分析:一個用訊息佇列的人,不知道

reids叢集學習(二)使用jedis實現redis叢集客戶端

上一節我記錄瞭如何搭建redis官方的叢集,這節我就開始講怎麼用jedis實現叢集環境下的客戶端。 jedis中實現叢集的客戶端類是redis.clients.jedis.JedisClust

jedis 實現 redis 統計一個使用者在一段時間內的登入次數

import java.util.BitSet; import redis.clients.jedis.Jedis; public class SetBitTest {public static void main(String[] args) {// TODO Auto

SpringBoot對訊息佇列(MQ)的支援

1.非同步訊息的定義   非同步訊息的主要目的是為了系統與系統之間的通訊,所謂非同步訊息即訊息傳送者無需等待訊息接收者的處理以及返回,甚至無需關心訊息是否傳送成功   在非同步訊息中有兩個很重要的概念,即訊息代理和目的地,當訊息傳送者傳送訊息之後,訊息將由訊

訊息佇列MQ】各類MQ比較 【轉載】

原文地址:http://blog.csdn.net/sunxinhere/article/details/7968886目前業界有很多MQ產品,我們作如下對比: RabbitMQ 是使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它

訊息佇列MQ】各類MQ比較

目前業界有很多MQ產品,我們作如下對比: RabbitMQ 是使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它變的非常重量級,更適合於企業級的開發。同時實現了一個經紀人(Broker)構架,這意味

詳解RPC遠端呼叫訊息佇列MQ的區別

PC(Remote Procedure Call)遠端過程呼叫,主要解決遠端通訊間的問題,不需要了解底層網路的通訊機制。   RPC框架   知名度較高的有Thrift(FB的)、dubbo(阿里的)。   RPC的一般需要經歷4個步驟: 1、建立通訊 首先要

訊息佇列MQ】【Kafka&Jafka】design-2

源:http://incubator.apache.org/kafka/design.html Message Persistence and Caching Don't fear the filesystem!          Kafka在很大程度上依賴檔案系統來實現儲

Java執行緒實現Redis任務佇列(生產者消費者)

注:接上篇IDEA整合Redis,本篇實現Redis的任務佇列,Redis連線池具體配置看上篇。 一:寫一個Jedis的工具類JedisUtil,將Jedis中的部分方法實現,程式碼如下: package com.wq.Util; import com.wq.RedisP

Amazon SQS 訊息佇列服務_訊息佇列mq解決方案

Amazon Simple Queue Service (SQS) 是一種完全託管的訊息佇列服務,可讓您分離和擴充套件微服務、分散式系統和無伺服器應用程式。SQS 消除了與管理和運營訊息型中介軟體相關的複雜性和開銷,並使開發人員能夠專注於重要工作。藉助 SQS,您可以在軟體元件之間傳送、儲

訊息佇列 MQ

訊息佇列(Message Queue,簡稱 MQ)是阿里巴巴集團中介軟體技術部自主研發的專業訊息中介軟體,產品基於高可用分散式叢集技術,提供訊息釋出訂閱、訊息軌跡查詢、定時(延時)訊息、資源統計、監

訊息佇列mq總結(重點看,比較了主流訊息佇列框架)

RabbitMQ/Kafka/ZeroMQ 都能提供訊息佇列服務,但有很大的區別。在面向服務架構中通過訊息代理(比如 RabbitMQ / Kafka等),使用生產者-消費者模式在服務間進行非同步通訊是一種比較好的思想。因為服務間依賴由強耦合變成了鬆耦合。訊息代理都會提供持久化機制,在消費者負載高或者掉線的情