1. 程式人生 > >Redis實現訊息佇列

Redis實現訊息佇列

個人理解在專案中使用訊息佇列一般是有如下幾個原因:

    把瞬間伺服器的請求處理換成非同步處理,緩解伺服器的壓力

    實現資料順序排列獲取

​redis實現訊息佇列步驟如下:

1).redis函式rpush,lpop

2).建議定時任務入佇列

3)建立定時任務出佇列

檔案:demo.php插入資料到redis佇列
<?php
 
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
//$password = '123456';
//$redis->auth($password);
$arr = array('h','e','l','l','o','w','o','r','l','d');
 
foreach($arr as $k=>$v){
 
  $redis->rpush("mylist",$v);
 
}?>

檔案:index.php定時掃描出佇列

<?php
 
$redis = new Redis();
 
$redis->connect('127.0.0.1',6379);
 
$password = '123456';
 
$redis->auth($password);
 
//list型別出隊操作
 
$value = $redis->lpop('mylist');
 
if($value){
 
 echo "出隊的值".$value;
 
}else{
 
  echo "出隊完成";
 
}
 
?>

立定時任務

 */1 * * * * root php /wwwroot/workplace/redis/index.php

 */3 * * * * root php /wwwroot/workplace/redis/demo.php

 

tail -f /var/log/cron  檢視定時任務執行情況

Nov  7 00:30:01 dongzi CROND[6888]: (root) CMD (php /wwwroot/workplace/redis/demo.php)

Nov  7 00:30:01 dongzi CROND[6890]: (root) CMD (php /wwwroot/workplace/redis/index.php )

定時任務執行佇列寫入結果如下
    
127.0.0.1:6379> lrange mylist 0 -1
 
 1) "h"
 
 2) "e"
 
 3) "l"
 
 4) "l"
 
 5) "o"
 
 6) "w"
 
 7) "o"
 
 8) "r"
 
 9) "l"
 
10) "d"

  

 定時任務執行出佇列後:
複製程式碼

127.0.0.1:6379> lrange mylist 0 -1

1) "e"

2) "l"

3) "l"

4) "o"

5) "w"

6) "o"

7) "r"

8) "l"

9) "d"



    連線Redis
        連線Redis
        Redis入佇列
    Redis出佇列 - 先進先出lPop 先進後出rPop
        Redis入佇列
        Redis出佇列 - 先進先出lPop 先進後出rPop
        阻塞出佇列 brPopblPop 同上
        檢視佇列長度 LLEN


【PHP使用 Redis 實現訊息佇列簡單事例】

Redis安裝說明在此不再說,可以看我另一篇“Redis與PHP安裝的那些事”,這裡只介紹了windows下安裝。


新建立兩個檔案,push和pop檔案


1.push.php 推送

這裡通過URL直接傳遞引數進行 keyword

$keyword=$_GET['keyword'];
$redis=newRedis();
$redis->connect('127.0.0.1',6379);
try{
echo$redis->LPUSH('list',''.$keyword);
}catch(Exception$e){
echo$e->getMessage();
}


2.pop.php 彈出接受資訊

寫個死迴圈,一直監聽

$redis=newRedis();
$redis->connect('127.0.0.1',6379);
//echo"連線成功
";
//echo"狀態:".$redis->ping();
while(true){
try{
$value=$redis->LPOP('list');
//這裡進行業務處理
print_r(value);

}catch(Exception$e){
echo$e->getMessage();
}
}


在命令cmd下執行pop.php,我本地是xampp整合環境。故在cmd中執行

D:/xampp/php>php.exe E:/project/p2/redis/pop.php


3、在瀏覽器下執行

http://lock.com/redis/push.php?keyword=hello lock

在cmd中檢視應該會輸出hello lock


注意要點:

1、先要開啟執行redis服務,在cmd下執行,不要關閉

2. 新開一個cmd視窗,執行pop.php

3. 在瀏覽器下執行push.php,在pop.php的cmd下可以看到相關值




----------------------------------------------------------------------------------------------------

PHP專案需要一個訊息佇列,最後為了簡單選擇了Redis List..
中文的Redis文件 http://redisdoc.com/  

Php +Redis 做訊息佇列
在Redis伺服器已經啟動的前提下.
1. 連線Redis

$redis = new Redis();
$redis->connect("127.0.0.1", "6379");  //php客戶端設定的ip及埠  

2. Redis入佇列

 $redis->lPush("GPS_LIST", data-notOrarrayOrObject);


我這裡測試陣列取不出來

    Insert all the specified values at the head of the list stored at key. If key does not exist, it is created as empty list before performing the push operations. When key holds a value that is not a list, an error is returned.

3.Redis出佇列 - 先進先出lPop 先進後出rPop

$redis->lPush(("GPS_LIST");



開始我用的這玩意,但是非阻塞的,不好使。

    return – the value of the first element, or nil when key does not exist

redis> RPUSH mylist "one"
(integer) 1
redis> RPUSH mylist "two"
(integer) 2
redis> RPUSH mylist "three"
(integer) 3
redis> LPOP mylist
"one"
redis> LRANGE mylist 0 -1
1) "two"
2) "three"


    阻塞出佇列 brPop,blPop 同上

http://redis.io/commands/brpop

    BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.

    BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the head of the first list that is non-empty, with the given keys being checked in the order that they are given.


redis->brPop("GPS_LIST", 5);
返回的是一個數組
0=> "GPS_LIST"
1=> "實際資料"

-----
Return value
Array reply: specifically:
A nil multi-bulk when no element could be popped and the timeout expired.
A two-element multi-bulk with the first element being the name of the key where an element was popped and the second element being the value of the popped element.
-----
redis> DEL list1 list2
(integer) 0
redis> RPUSH list1 a b c
(integer) 3
redis> BLPOP list1 list2 0
1) "list1"
2) "a"

   

    檢視佇列長度 LLEN

    Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned. An error is returned when the value stored at key is not a list.

Examples
redis> LPUSH mylist "World"
(integer) 1
redis> LPUSH mylist "Hello"
(integer) 2
redis> LLEN mylist
(integer) 2
redis>

   

2. Redis入佇列

 $redis->lPush("GPS_LIST", data-notOrarrayOrObject);


我這裡測試陣列取不出來

    Insert all the specified values at the head of the list stored at key. If key does not exist, it is created as empty list before performing the push operations. When key holds a value that is not a list, an error is returned.

3.Redis出佇列 - 先進先出lPop 先進後出rPop

$redis->lPush(("GPS_LIST");


開始我用的這玩意,但是非阻塞的,不好使。

    return – the value of the first element, or nil when key does not exist

redis> RPUSH mylist "one"
(integer) 1
redis> RPUSH mylist "two"
(integer) 2
redis> RPUSH mylist "three"
(integer) 3
redis> LPOP mylist
"one"
redis> LRANGE mylist 0 -1
1) "two"
2) "three"


4. 阻塞出佇列 brPop,blPop 同上

http://redis.io/commands/brpop

    BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.

    BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the head of the first list that is non-empty, with the given keys being checked in the order that they are given.


redis->brPop("GPS_LIST", 5);
返回的是一個數組
0=> "GPS_LIST"
1=> "實際資料"

-----
Return value
Array reply: specifically:
A nil multi-bulk when no element could be popped and the timeout expired.
A two-element multi-bulk with the first element being the name of the key where an element was popped and the second element being the value of the popped element.
-----
redis> DEL list1 list2
(integer) 0
redis> RPUSH list1 a b c
(integer) 3
redis> BLPOP list1 list2 0
1) "list1"
2) "a"

   

5. 檢視佇列長度 LLEN

    Returns the length of the list stored at key. If key does not exist, it is interpreted as an empty list and 0 is returned. An error is returned when the value stored at key is not a list.

Examples
redis> LPUSH mylist "World"
(integer) 1
redis> LPUSH mylist "Hello"
(integer) 2
redis> LLEN mylist
(integer) 2
redis>




【redis 佇列操作的例子(php)】
Reids是一個比較高階的開源key-value儲存系統,採用ANSI C實現。其與memcached類似,但是支援持久化資料儲存

入隊操作
複製程式碼 程式碼如下:

<?php
$redis = new Redis();
$redis->connect('127.0.0.1',6379);
while(True){
try{
$value = 'value_'.date('Y-m-d H:i:s');
$redis->LPUSH('key1',$value);
sleep(rand()%3);
echo $value."\n";
}catch(Exception $e){
echo $e->getMessage()."\n";
}
}
?>

出隊操作
複製程式碼 程式碼如下:

<?php
$redis = new Redis();
$redis->pconnect('127.0.0.1',6379);
while(True){
try{
echo $redis->LPOP('key1')."\n";
}catch(Exception $e){
echo $e->getMessage()."\n";
}
sleep(rand()%3);
}?>

如何使用Redis 做佇列操作
Reids是一個比較高階的開源key-value儲存系統,採用ANSI C實現。其與memcached類似,但是支援持久化資料儲存,同時value支援多種型別:字串 (同memcached中的value),列表 ,集合 (Set),有序集合 (OrderSet)和Hash 。所有的值型別均支援原子操作,如列表中追加彈出元素,集合中插入移除元素等。Rdids的資料大部分位於記憶體中,其讀寫效率非常高,其提供AOF(追加 式操作記錄檔案)和DUMP(定期資料備份)兩種持久化方式。Redis支援自定義的VM(虛擬記憶體)機制,當資料容量超過記憶體時,可以將部分Value 儲存到檔案中。同時Redis支援Master-Slave機制,可以進行資料複製。
可以把Redis的list結構當佇列來用.
從上面Redis的場景和作用來說,對於我們現在的開發活動,究竟能把Redis引入在那些場景,而不是把這麼好的東東演變成“為了使用Redis,而Redis”的慘烈局面呢?當然,具體問題具體分析,這個真的很重要哈。
快取?分散式快取?
佇列?分散式佇列?
某些系統應用(例如,電信、銀行和大型網際網路應用等)都會使用到,當然,現在大行其道的memcache就是很好的證明;但從某一方面來說,memcache是否能把兩張囊括其中,而且能做到更好(沒有實際的應用過,所以只是丟擲)。但從Redis身上,我就能感覺到,Redis,就能把佇列和快取兩張都囊括其中,而且都不會產生併發環境下的困擾,因為Redis中的操作都是原子操作來著。
至於評論兩者的孰好孰壞就免了,存在就是理由,選擇適合的就是最好的。
下面開始玩玩Redis中的佇列(分散式)設計YY吧,請大蝦們多多指點。
狀況場景:
現在的專案,都是部署在多個伺服器,或者多個IP上,而且前臺經由F5分發,所以使用者的請求究竟落在那一臺的伺服器上,是無法確定的。對於專案中,有一秒殺設計,剛開始沒有考慮到這種部署,同時也是使用最容易處理的方式,直接給資料庫表鎖行記錄(Oracle上的)。可以說,對於不同的應用部署,而只有一臺資料庫伺服器來說,很“輕鬆”的就解決了這個併發的問題。所以現在考慮一下,是不是挪到應用上,避免資料庫伺服器也摻雜到業務上。
比如,現在有2臺應用伺服器,1臺數據庫伺服器。想法是,把Redis部署在資料庫伺服器上,兩臺伺服器在操作併發快取或者佇列時,先從Redis伺服器上,取得在兩臺應用伺服器的代理物件,再做入列出列的操作。
看程式碼實現(PHP)
入佇列操作檔案 list_push.php
複製程式碼 程式碼如下:

<?php
$redis = getRedisInstance();//從Redis伺服器拿到redis例項
$redis->connect('Redis伺服器IP', 6379);
while (true) {
$redis->lPush('list1', 'A_'.date('Y-m-d H:i:s'));
sleep(rand()%3);
}
?>

執行# php list_push.php &
出佇列操作 list_pop.php檔案
複製程式碼 程式碼如下:

<?php
$redis = getRedisInstance();//從Redis伺服器拿到redis例項
$redis->pconnect('Redis伺服器IP', 6379);
while(true) {
try {
var_export( $redis->blPop('list1', 10) );
} catch(Exception $e) {
//echo $e;
}
}

實現方法(Python)
1.入佇列(write.py)
複製程式碼 程式碼如下:

#!/usr/bin/env python
import time
from redis import Redis
redis = Redis(host='127.0.0.1', port=6379)
while True:
now = time.strftime("%Y/%m/%d %H:%M:%S")
redis.lpush('test_queue', now)
time.sleep(1)

2.出佇列(read.py)
複製程式碼 程式碼如下:

#!/usr/bin/env python
import sys
from redis import Redis
redis = Redis(host='127.0.0.1', port=6379)
while True:
res = redis.rpop('test_queue')
if res == None:
pass
else:
print str(res)

在操作時,注意,要操作的是同一個list物件。
呵呵,現在的主要思路就差不多就是如此,不過真實場景中,會有出入。


【Redis實現簡單訊息佇列】

任務非同步化

開啟瀏覽器,輸入地址,按下回車,打開了頁面。於是一個HTTP請求(request)就由客戶端傳送到伺服器,伺服器處理請求,返回響應(response)內容。

我們每天都在瀏覽網頁,傳送大大小小的請求給伺服器。有時候,伺服器接到了請求,會發現他也需要給另外的伺服器傳送請求,或者伺服器也需要做另外一些事情,於是最初們傳送的請求就被阻塞了,也就是要等待伺服器完成其他的事情。

更多的時候,伺服器做的額外事情,並不需要客戶端等待,這時候就可以把這些額外的事情非同步去做。從事非同步任務的工具有很多。主要原理還是處理通知訊息,針對通知訊息通常採取是佇列結構。生產和消費訊息進行通訊和業務實現。
生產消費與佇列

上述非同步任務的實現,可以抽象為生產者消費模型。如同一個餐館,廚師在做飯,吃貨在吃飯。如果廚師做了很多,暫時賣不完,廚師就會休息;如果客戶很多,廚師馬不停蹄的忙碌,客戶則需要慢慢等待。實現生產者和消費者的方式用很多,下面使用Python標準庫Queue寫個小例子:

import random
import time
from Queue import Queue
from threading import Thread

queue = Queue(10)

class Producer(Thread):
    def run(self):
        while True:
            elem = random.randrange(9)
            queue.put(elem)
            print "廚師 {} 做了 {} 飯 --- 還剩 {} 飯沒賣完".format(self.name, elem, queue.qsize())
            time.sleep(random.random())

class Consumer(Thread):
    def run(self):
        while True:
            elem = queue.get()
            print "吃貨{} 吃了 {} 飯 --- 還有 {} 飯可以吃".format(self.name, elem, queue.qsize())
            time.sleep(random.random())

def main():
    for i in range(3):
        p = Producer()
        p.start()
    for i in range(2):
        c = Consumer()
        c.start()

if __name__ == '__main__':
    main()

大概輸出如下:

廚師 Thread-1 做了 1 飯 --- 還剩 1 飯沒賣完
廚師 Thread-2 做了 8 飯 --- 還剩 2 飯沒賣完
廚師 Thread-3 做了 3 飯 --- 還剩 3 飯沒賣完
吃貨Thread-4 吃了 1 飯 --- 還有 2 飯可以吃
吃貨Thread-5 吃了 8 飯 --- 還有 1 飯可以吃
吃貨Thread-4 吃了 3 飯 --- 還有 0 飯可以吃
廚師 Thread-1 做了 0 飯 --- 還剩 1 飯沒賣完
廚師 Thread-2 做了 0 飯 --- 還剩 2 飯沒賣完
廚師 Thread-1 做了 1 飯 --- 還剩 3 飯沒賣完
廚師 Thread-1 做了 1 飯 --- 還剩 4 飯沒賣完
吃貨Thread-4 吃了 0 飯 --- 還有 3 飯可以吃
廚師 Thread-3 做了 3 飯 --- 還剩 4 飯沒賣完
吃貨Thread-5 吃了 0 飯 --- 還有 3 飯可以吃
吃貨Thread-5 吃了 1 飯 --- 還有 2 飯可以吃
廚師 Thread-2 做了 8 飯 --- 還剩 3 飯沒賣完
廚師 Thread-2 做了 8 飯 --- 還剩 4 飯沒賣完

Redis 佇列

Python內建了一個好用的佇列結構。我們也可以是用redis實現類似的操作。並做一個簡單的非同步任務。

Redis提供了兩種方式來作訊息佇列。一個是使用生產者消費模式模式,另外一個方法就是釋出訂閱者模式。前者會讓一個或者多個客戶端監聽訊息佇列,一旦訊息到達,消費者馬上消費,誰先搶到算誰的,如果佇列裡沒有訊息,則消費者繼續監聽。後者也是一個或多個客戶端訂閱訊息頻道,只要釋出者釋出訊息,所有訂閱者都能收到訊息,訂閱者都是ping的。
生產消費模式

主要使用了redis提供的blpop獲取佇列資料,如果佇列沒有資料則阻塞等待,也就是監聽。

import redis

class Task(object):
    def __init__(self):
        self.rcon = redis.StrictRedis(host='localhost', db=5)
        self.queue = 'task:prodcons:queue'

    def listen_task(self):
        while True:
            task = self.rcon.blpop(self.queue, 0)[1]
            print "Task get", task

if __name__ == '__main__':
    print 'listen task queue'
    Task().listen_task()

釋出訂閱模式

使用redis的pubsub功能,訂閱者訂閱頻道,釋出者釋出訊息到頻道了,頻道就是一個訊息佇列。

import redis


class Task(object):

    def __init__(self):
        self.rcon = redis.StrictRedis(host='localhost', db=5)
        self.ps = self.rcon.pubsub()
        self.ps.subscribe('task:pubsub:channel')

    def listen_task(self):
        for i in self.ps.listen():
            if i['type'] == 'message':
                print "Task get", i['data']

if __name__ == '__main__':
    print 'listen task channel'
    Task().listen_task()

Flask 入口

我們分別實現了兩種非同步任務的後端服務,直接啟動他們,就能監聽redis佇列或頻道的訊息了。簡單的測試如下:

import redis
import random
import logging
from flask import Flask, redirect

app = Flask(__name__)

rcon = redis.StrictRedis(host='localhost', db=5)
prodcons_queue = 'task:prodcons:queue'
pubsub_channel = 'task:pubsub:channel'

@app.route('/')
def index():

    html = """
<br>
<center><h3>Redis Message Queue</h3>
<br>
<a href="/prodcons">生產消費者模式</a>
<br>
<br>
<a href="/pubsub">釋出訂閱者模式</a>
</center>
"""
    return html


@app.route('/prodcons')
def prodcons():
    elem = random.randrange(10)
    rcon.lpush(prodcons_queue, elem)
    logging.info("lpush {} -- {}".format(prodcons_queue, elem))
    return redirect('/')

@app.route('/pubsub')
def pubsub():
    ps = rcon.pubsub()
    ps.subscribe(pubsub_channel)
    elem = random.randrange(10)
    rcon.publish(pubsub_channel, elem)
    return redirect('/')

if __name__ == '__main__':
    app.run(debug=True)

啟動指令碼,使用

siege -c10 -r 5 http://127.0.0.1:5000/prodcons
siege -c10 -r 5 http://127.0.0.1:5000/pubsub

可以分別在監聽的指令碼輸入中看到非同步訊息。在非同步的任務中,可以執行一些耗時間的操作,當然目前這些做法並不知道非同步的執行結果,如果需要知道非同步的執行結果,可以考慮設計協程任務或者使用一些工具如RQ或者celery等。