1. 程式人生 > >Redis實現簡單訊息佇列

Redis實現簡單訊息佇列

任務非同步化

開啟瀏覽器,輸入地址,按下回車,打開了頁面。於是一個HTTP請求(request)就由客戶端傳送到伺服器,伺服器處理請求,返回響應(response)內容。

我們每天都在瀏覽網頁,傳送大大小小的請求給伺服器。有時候,伺服器接到了請求,會發現他也需要給另外的伺服器傳送請求,或者伺服器也需要做另外一些事情,於是最初們傳送的請求就被阻塞了,也就是要等待伺服器完成其他的事情。

更多的時候,伺服器做的額外事情,並不需要客戶端等待,這時候就可以把這些額外的事情非同步去做。從事非同步任務的工具有很多。主要原理還是處理通知訊息,針對通知訊息通常採取是佇列結構。生產和消費訊息進行通訊和業務實現。

生產消費與佇列

上述非同步任務的實現,可以抽象為生產者消費模型。如同一個餐館,廚師在做飯,吃貨在吃飯。如果廚師做了很多,暫時賣不完,廚師就會休息;如果客戶很多,廚師馬不停蹄的忙碌,客戶則需要慢慢等待。實現生產者和消費者的方式用很多,下面使用Python標準庫Queue寫個小例子:

1234567891011121314151617181920212223242526272829303132 import randomimport timefrom Queue import Queuefrom threading import Threadqueue=Queue(10)classProducer(Thread):def run(self):whileTrue:elem=random.randrange(9)queue.put(elem)print"廚師 {} 做了 {} 飯 --- 還剩 {} 飯沒賣完"
.format(self.name,elem,queue.qsize())time.sleep(random.random())classConsumer(Thread):def run(self):whileTrue:elem=queue.get()print"吃貨{} 吃了 {} 飯 --- 還有 {} 飯可以吃".format(self.name,elem,queue.qsize())time.sleep(random.random())def main():foriinrange(3):p=Producer()p.start()foriinrange(2):c=Consumer()c.start()if__name__=='__main__':main()

大概輸出如下:

12345678910111213141516 廚師Thread-1做了1---還剩1飯沒賣完廚師Thread-2做了8---還剩2飯沒賣完廚師Thread-3做了3---還剩3飯沒賣完吃貨Thread-4吃了1---還有2飯可以吃吃貨Thread-5吃了8---還有1飯可以吃吃貨Thread-4吃了3---還有0飯可以吃廚師Thread-1做了0---還剩1飯沒賣完廚師Thread-2做了0---還剩2飯沒賣完廚師Thread-1做了1---還剩3飯沒賣完廚師Thread-1做了1---還剩4飯沒賣完吃貨Thread-4吃了0---還有3飯可以吃廚師Thread-3做了3---還剩4飯沒賣完吃貨Thread-5吃了0---還有3飯可以吃吃貨Thread-5吃了1---還有2飯可以吃廚師Thread-2做了8---還剩3飯沒賣完廚師Thread-2做了8---還剩4飯沒賣完

Redis 佇列

Python內建了一個好用的佇列結構。我們也可以是用redis實現類似的操作。並做一個簡單的非同步任務。

Redis提供了兩種方式來作訊息佇列。一個是使用生產者消費模式模式,另外一個方法就是釋出訂閱者模式。前者會讓一個或者多個客戶端監聽訊息佇列,一旦訊息到達,消費者馬上消費,誰先搶到算誰的,如果佇列裡沒有訊息,則消費者繼續監聽。後者也是一個或多個客戶端訂閱訊息頻道,只要釋出者釋出訊息,所有訂閱者都能收到訊息,訂閱者都是ping的。

生產消費模式

主要使用了redis提供的blpop獲取佇列資料,如果佇列沒有資料則阻塞等待,也就是監聽。

123456789101112131415 import redisclassTask(object):def __init__(self):self.rcon=redis.StrictRedis(host='localhost',db=5)self.queue='task:prodcons:queue'def listen_task(self):whileTrue:task=self.rcon.blpop(self.queue,0)[1]print"Task get",taskif__name__=='__main__':print'listen task queue'Task().listen_task()

釋出訂閱模式

使用redis的pubsub功能,訂閱者訂閱頻道,釋出者釋出訊息到頻道了,頻道就是一個訊息佇列。

1234567891011121314151617 import redisclassTask(object):def __init__(self):self.rcon=redis.StrictRedis(host='localhost',db=5)self.ps=self.rcon.pubsub()self.ps.subscribe('task:pubsub:channel')def listen_task(self):foriinself.ps.listen():ifi['type']=='message':print"Task get",i['data']if__name__=='__main__':print'listen task channel'Task().listen_task()

Flask 入口

我們分別實現了兩種非同步任務的後端服務,直接啟動他們,就能監聽redis佇列或頻道的訊息了。簡單的測試如下:

12345678910111213141516171819202122232425262728293031323334353637383940414243 import redisimport randomimport loggingfrom flask import Flask,redirectapp=Flask(__name__)rcon=redis.StrictRedis(host='localhost',db=5)prodcons_queue='task:prodcons:queue'pubsub_channel='task:pubsub:channel'@app.route('/')def index():html="""<br><center><h3>Redis Message Queue</h3><br><a href="/prodcons">生產消費者模式</a><br><br><a href="/pubsub">釋出訂閱者模式</a></center>"""returnhtml@app.route('/prodcons')def prodcons():elem=random.randrange(10)rcon.lpush(prodcons_queue,elem)logging.info("lpush {} -- {}".format(prodcons_queue,elem))returnredirect('/')@app.route('/pubsub')def pubsub():ps=rcon.pubsub()ps.subscribe(pubsub_channel)elem=random.randrange(10)rcon.publish(pubsub_channel,elem)returnredirect('/')if__name__=='__main__':app.run(debug=True)

啟動指令碼,使用

12 siege-c10-r5http://127.0.0.1:5000/prodconssiege-c10-r5http://127.0.0.1:5000/pubsub

可以分別在監聽的指令碼輸入中看到非同步訊息。在非同步的任務中,可以執行一些耗時間的操作,當然目前這些做法並不知道非同步的執行結果,如果需要知道非同步的執行結果,可以考慮設計協程任務或者使用一些工具如RQ或者celery等。

相關推薦

Redis實現簡單訊息佇列

任務非同步化 開啟瀏覽器,輸入地址,按下回車,打開了頁面。於是一個HTTP請求(request)就由客戶端傳送到伺服器,伺服器處理請求,返回響應(response)內容。 我們每天都在瀏覽網頁,傳送大大小小的請求給伺服器。有時候,伺服器接到了請求,會發現他也需要給另外的伺服器傳送請求,或者伺服器也需要做另外

利用Redis實現非同步訊息佇列優化系統性能 (Redis高階應用)

寫在前面 今天把之前在專案中使用 Redis 做非同步訊息佇列的使用經驗總結一下。首先明確使用目的,因為專案中,我們進行某個操作後可能後續會有一系列的其他耗時操作,但是我們不希望將主執行緒阻塞在此過程中,這時便可將其他操作非同步化。舉個栗子,當你給這篇部落格點贊或評論的時候

基於Redis實現分散式訊息佇列(彙總目錄)

基於Redis實現分散式訊息佇列(1)– 緣起 基於Redis實現分散式訊息佇列(2)– 分散式訊息佇列功能設計 基於Redis實現分散式訊息佇列(3)– Redis功能分析 基於Redis實現分散式訊息佇列(4)– 程式碼實現

基於Redis實現分散式訊息佇列(1)

1、為什麼需要訊息佇列? 當系統中出現“生產“和“消費“的速度或穩定性等因素不一致的時候,就需要訊息佇列,作為抽象層,彌合雙方的差異。 舉個例子:業務系統觸發簡訊傳送申請,但簡訊傳送模組速度跟不上,需要將來不及處理的訊息暫存一下,緩衝壓力。 再舉個例子:調

Spring Spring-data-redis 實現訊息佇列

配置環境 版本資訊: Spring 4.2.8.RELEASE(其中囊括tx、webmvc、context、aop之類的) Spring-data-jpa 1.10.5.RELEASE Spring-data-redis 1.8.0.RELEASE

使用Redis實現非同步訊息佇列

@Service public class EventConsumer implements InitializingBean, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger

基於Redis實現分散式訊息佇列(4)

1、訪問Redis的工具類 public class RedisManager { private static Pool<Jedis> pool; protected final static Logger logger

基於Redis實現分散式訊息佇列(3)

1、Redis是什麼鬼? Redis是一個簡單的,高效的,分散式的,基於記憶體的快取工具。 假設好伺服器後,通過網路連線(類似資料庫),提供Key-Value式快取服務。 簡單,是Redis突出的特色。 簡單可以保證核心功能的穩定和優異。 2、效能

什麼鬼,面試官竟然讓我用Redis實現一個訊息佇列!!?

GitHub 9.4k Star 的Java工程師成神之路 ,不來了解一下嗎? GitHub 9.4k Star 的Java工程師成神之路 ,真的不來了解一下嗎? GitHub 9.4k Star 的Java工程師成神之路 ,真的確定不來了解一下嗎? 眾所周知,redis是一個高效能的key-value資料庫

Redis 實現 PHP 的簡單訊息佇列

參考:PHP高階程式設計之訊息佇列 訊息佇列就是在訊息的傳輸過程中,可以儲存訊息的容器。 常見用途: 儲存轉發:非同步處理耗時的任務 分散式事務:多個消費者消費同一個訊息佇列 應對高併發:通過訊息佇列儲存任務,慢慢處理 釋出訂閱:實現解耦

Redis延時訊息佇列、非同步訊息佇列實現

package list; import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.a

玩轉redis-簡單訊息佇列

使用go語言基於redis寫了一個簡單的訊息佇列 原始碼地址 使用demo redis的 list 非常的靈活,可以從左邊或者右邊新增元素,當然也以從任意一頭讀取資料 新增資料和獲取資料的操作也是非常簡單的 LPUSH 從左邊插入資料 RPUSH 大右邊插入資料 LPOP 從左邊取出一個數據 RPOP 從

進階高階IoT架構-教你如何簡單實現一個訊息佇列

![](https://james-1258744956.cos.ap-shanghai.myqcloud.com/thingsboard_queue/queue-blackgroud.jpg) ## 前言 訊息佇列是軟體系統領域用來實現系統間通訊最廣泛的中介軟體。基於訊息佇列的方式是指由應用中的某個系統

使用redis實現簡單的秒殺

lec connect use decode else 隊列 def 固定 urn 自己做的簡單秒殺 感覺思路是沒太大問題的 但是代碼寫的不是很好 做個記錄方便以後回來嘲諷下自己 <?phpnamespace frontend\controllers;use Y

Redis-實現簡單的條件查詢

一,匯入jar包 二,實現簡單的條件查詢 建立一個User實體類 public class User { private String id; private String name; private String sex; private int

Delayer 基於 Redis 的延遲訊息佇列中介軟體

Delayer 基於 Redis 的延遲訊息佇列中介軟體,採用 Golang 開發,支援 PHP、Golang 等多種語言客戶端。 參考 有贊延遲佇列設計 中的部分設計,優化後實現。 專案連結:https://github.com/mixstart/d... ,有需要的朋友加 Star 哦。 應用場景

spring-boot+Redis實現簡單的分散式叢集session共享

  寫在前面:      首先宣告,筆者是一名Java程式設計屆的小學生。前面一直在幾家公司裡面做開發,其實都是一些傳統的專案,對於像分散式啦,叢集啦一些大型的專案接觸的很少,所以一直沒有自己整合和實現過。由於最近幾天專案不是很忙,自己又有點時間

php實現訊息佇列

<?php /** * Created by PhpStorm. * User: lin * Date: 2017/6/9 * Time: 11:19 * 實現php共享記憶體訊息佇列 */ class ShmQueue { private $maxQSize = 0;/

Netty實戰開發(7):Netty結合kafka實現分散式訊息佇列

在分散式遊戲伺服器系統中,訊息處理佇列主要解決問題就是解耦系統中的業務,使得每個系統看起來功能比較單一,而且解決一些全服資料共享等問題。 通常我們知道kafka是作為訊息佇列比較火的一種方式,其實還有(Active MQ,Rabbit MQ,Zero MQ)個人

java 連結redis 實現簡單的查詢

1  新建maven專案 新增依賴  <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schema