1. 程式人生 > >Beanstalked的初步瞭解和使用(包括利用beanstalkd 秒殺訊息佇列的實現)

Beanstalked的初步瞭解和使用(包括利用beanstalkd 秒殺訊息佇列的實現)

一  Beanstalkd 是什麼

Beanstalkd,一個高效能、輕量級的分散式記憶體佇列系統

 

二  Beanstalkd 特性

1. 優先順序(priority)

注:優先順序就意味 支援任務插隊(數字越小,優先順序越高,0的優先順序最高)

2. 延遲(delay)

注:延遲意味著可以定義任務什麼時間才開始被消費,也就實現了定時任務(比如為了增加網站活躍性,增加定時評論,定時點贊功能)

3. 持久化(persistent data)

注:Beanstalkd 支援定時將檔案刷到日誌檔案裡,即使beanstalkd宕機,重啟之後仍然可以找回檔案

4. 預留(buried)

注:Beanstalkd支援把一個任務設定為預留,這樣,消費者就無法取出這個任務了,等合適的時機再把這個任務拿出來消費

5. 任務超時重發(time-to-run)

注:消費者必須在指定的時間內處理完這個任務,否則就認為消費者處理失敗,任務會被重新放到佇列,等待消費

三 管道(tube)與任務(job)
注:生產者生產任務,並根據業務需求將任務放到不同管道中,比如和註冊有關的任務放到註冊管道中,和訂單有關的放到訂單管道中

注:任務從進入管道到離開管道一共有5個狀態(ready,delayed,reserved,buried,delete)
1. 生產者將任務放到管道中,任務的狀態可以是ready(表示任務已經準備好,隨時可以被消費者讀取),也可以是delayed(任務在被生產者放入管道時,設定了延遲,比如設定了5s延遲,意味著5s之後,這個任務才會變成ready狀態,才可以被消費者讀取)

2. 消費者消費任務(消費者將處於ready狀態的任務讀出來後,被讀取處理的任務狀態變為reserved)

3. 消費者處理完任務後,任務的狀態可能是delete(刪除,處理成功),可能是buried(預留,意味著先把任務放一邊,等待條件成熟還要用),可能是ready,也可能是delayed,需要根據具體業務場景自己進行判斷定義

具體示意圖:

四 Beanstalkd 的安裝(git、方式)
注:beanstalkd是不支援windows的 ,必須要在Linux環境下(本地環境介紹 Linux, php7,mysql5.6,nginx,centos7.4) 

1. yum install -y git 

2.  git clone https://github.com/kr/beanstalkd 

3.  cd beanstalkd 

4. make

5. make install

6. 檢視安裝是否成功


五 開始使用Beanstalkd
1. 繫結地址和埠
beanstalkd -l 127.0.0.1 -p 11301 &

2. composer安裝pheanstalkd類(這個類在php使用中會簡化很多操作,非常方便)
--------------

2.1 安裝composer     curl -sS https://getcomposer.org/installer | php

2.2 將composer全域性呼叫     mv composer.phar /usr/local/bin/composer

2.3  cd /usr/local/bin

2.4 chmod +x composer

-----------------------

2.5 composer require pda/pheanstalk

注:pheanstalkd 用法示例:https://github.com/pda/pheanstalk

3. 簡單使用
環境介紹(阿里雲Linux伺服器, 本地win10,Filezilla用於雙方檔案傳輸 ,xshell用於連線遠端阿里雲伺服器)

3.1  編寫demo.php


<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$ph = new Pheanstalk('127.0.0.1',11301);

print_r($ph->stats());//檢視目前pheanStalkd狀態資訊

3.2  將在本地編寫的demo.php程式碼通過filezilla上傳到阿里雲伺服器 /usr/local/beanstalkd/beanstalkd下,並在linux下執行

https://github.com/kr/beanstalkd.git
https://github.com/kr/beanstalkd.git


4. pheanstalkd類常用的方法
<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$ph = new Pheanstalk('127.0.0.1',11301);

//----------------------------------------維護類----------------------------------

//1.檢視目前pheanStalkd狀態資訊
//print_r($ph->stats()); 

//2.顯示目前存在的管道
//print_r($ph->listTubes()); 

//3.檢視NewUsers管道的資訊
//$ph->useTube('NewUsers')->put('test'); 
//$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道新增一個up任務
//print_r($ph->statsTube('NewUsers'));//3.檢視NewUsers管道的資訊

//6.檢視指定管道中某一個任務的情況
//$job = $ph->watch('NewUsers')->reserve(); //5.從管道中取出任務(消費)
//print_r($ph->statsJob($job)); //6.檢視指定管道中某一個任務的情況

//7.檢視任務id為1的任務詳情
//$job = $ph->peek(1);7.直接取出任務id為1的任務 [注:beanstalkd中所有任務的id都具有唯一性] 
//print_r($ph->statsJob($job));//檢視任務id為1的任務詳情


//----------------------------------------生產類--------------------------------------

////第一種 put()

//$tube = $ph->useTube('NewUsers');//連線NewUsers管道
//print_r($tube->put('four'));//向NewUsers管道新增任務four,並返回結果
//注: put()方法還有3個可選引數(依次為: 優先順序priority,延遲時間delay,任務超時重發ttr)

////第二種 putInTube() [注: putInTube()就是對useTube()和put()的封裝]
//$res = $ph->putInTube('NewUsers','three');//向NewUsers管道新增任務three
////注: putInTube()方法還有3個可選引數(依次為: 優先順序priority,延遲時間delay,任務超時重發ttr)
//print_r($res);//返回任務id

//print_r($ph->statsTube('NewUsers'));//檢視NewUsers管道的詳細情況


//---------------------------------------消費類--------------------------------------

// 1.watch 監聽NewUsers管道 [ 注: watch()同樣可以監聽多個管道 ]
//$tube = $ph->watch('NewUsers');
//print_r($ph->listTubesWatched());//列印已經監聽的管道


// 2.watch 監聽多個管道
//$tube = $ph->watch('NewUsers')
//           ->watch('default');
//print_r($ph->listTubesWatched());//列印已經監聽的管道


// 3.ignore 監聽NewUsers管道,忽略default管道
//$tube = $ph->watch('NewUsers')
//            ->ignore('default');
//print_r($ph->listTubesWatched());//列印已經監聽的管道


// 4.reserve 監聽NewUsers管道,並且取出任務
//$job = $ph->watch('NewUsers')
//          ->reserve();
//
////注reserve()有1個引數,阻塞的時間,過了阻塞時間,不管有沒有東西,直接返回
//
//var_dump($job);//列印已經取出的任務
//$ph->delete($job);//刪除已經取出的任務


// 5.putInTube/put 向NewUsers管道寫入任務 [ 注:此為生產者方法,放到此處是為了方便理解 ]
//$ph->putInTube('NewUsers','number_1',5);
//$ph->putInTube('NewUsers','number_2',3);
//$ph->putInTube('NewUsers','number_3',0);
//$ph->putInTube('NewUsers','number_4',4);
//print_r($ph->statsTube('NewUsers'));//5.檢視NewUsers管道詳細資訊


// 6.release 將取出的任務放回ready狀態,還有2個引數(優先順序和延遲)
//$job = $ph->watch('NewUsers')->reserve();//6.監聽NewUsers管道,並取出任務

//if (true) {
//    sleep(30);
//    $ph->release($job);//6.將任務取出之後,停留30秒,然後將任務狀態重新變為ready
//} else {
//    $ph->delete($job);
//}


// 7.bury (預留) 將任務取出之後,發現後面執行的邏輯不成熟(比如發郵件,突然發現郵件伺服器掛掉了),
//或者說還不能執行後面的邏輯,需要把任務先封存起來,等待時機成熟了,再拿出這個任務進行消費

//$job = $ph->watch('NewUsers')->reserve();//取出任務
//$ph->bury($job);//取出任務後,將任務放到一邊(預留)

// 8.peekBuried() 將處在bury狀態的任務讀取出來
//$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來
//var_dump($ph->statsJob($job));//列印任務狀態(此時任務狀態應該是bury)

// 9.kickJob() 將處在bury任務狀態的任務轉化為ready狀態
//$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來
//$ph->kickJob($job);

// 10.kick()  將處在bury任務狀態的任務轉化為ready狀態,有第二個引數int, 批量將任務id小於此數值的任務轉化為ready
//$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任務id小於65,並且任務狀態處於bury的任務全部轉化為ready

// 11.peekReady() 將管道中處於ready狀態的任務讀出來
//$job = $ph->peekReady('NewUser');//將NewUser管道中處於ready狀態的任務讀取出來
//var_dump($job);
//$ph->delete($job);

// 12.peekDelay() 將管道中所有處於delay狀態的任務讀取出來
//$job = $ph->peekDelayed('NewUser');
//var_dump($job);
//$ph->delete($job);


// 13.pauseTube() 對整個管道進行延遲設定,讓管道處於延遲狀態
//$ph->pauseTube('NewUser',10);//設定管道NewUser延遲時間為10s
//$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,並取出任務
//var_dump($job);

// 14.resumeTube() 恢復管道,讓管道處於不延遲狀態,立即被消費
//$ph->resumeTube('NewUser');//取消管道NewUser的延遲狀態,變為立即讀取
//$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,並取出任務
//var_dump($job);

// 15.touch() 讓任務重新計算任務超時重發ttr時間,相當於給任務延長壽命


5. 專案中的應用總結
生產者中常用的方法
useTube() : 如果沒有管道,則建立對應管道,有,則直接使用

put() : 向管道中放任務

消費者中常用的方法步驟:
1. watch():監聽管道

2. reserve():將管道中處於ready狀態的任務讀取出來

3.1 可以使用delete 方法刪除任務

3.2 可以使用release 方法將任務放回ready狀態

3.3 可以使用bury 方法將任務先放一邊(例如發郵件,郵箱伺服器掛掉),等待條件成熟再取出來

6. 實際應用演示
producer.php


<?php
require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$ph = new Pheanstalk('127.0.0.1',11301);

$ph->useTube('List')->put('goods');
$ph->useTube('List')->put('goods2');
$ph->useTube('List')->put('goods3');
$ph->useTube('List')->put('goods4');
$ph->useTube('List')->put('goods5');

//print_r($ph->statsTube('List'));//檢視List管道的資訊
consumer.php


<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

$ph = new Pheanstalk('127.0.0.1',11301);

$res = $ph->watch('List')->reserve();//監聽List管道,並將任務取出來

if ($res) {
    $ph->delete($res);
    var_dump($res);
}
注:

生產者根據業務不同,將任務放到不同管道(管道用於儲存消費者生產的任務),比如將和註冊有關的任務通通放到註冊管道,和訂單有關的通通放入訂單管道

消費者將處於ready狀態的任務根據優先順序逐個讀取出來

五  使用Beanstalkd 實現類似redis秒殺活動
producer.php程式碼:
<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

//連線beanstalkd
$ph = new Pheanstalk('127.0.0.1', 11301);

$tube_name = 'SecKill2';

//使用SecKill2管道
$SEC = $ph->useTube($tube_name);

//模擬100人請求秒殺
for ($i = 0; $i < 100; $i++) {
    $uid = rand(10000000, 99999999);
    //獲取當前佇列已經擁有的數量,如果人數少於十,則加入這個佇列
    $total_jobs = $ph->statsTube($tube_name)['total-jobs'];
    $num = 10;
    if ($total_jobs < $num) {
        $SEC->put($uid);//向管道放任務
        echo $uid . "秒殺成功";
    } else {
        //如果當前佇列人數已經達到10人,則返回秒殺已完成
        echo "秒殺已結束<br>";
    }
}
print_r($ph->statsTube($tube_name));//檢視SecKill2管道的資訊
注: 執行完producer.php程式碼後,應當會在SecKill2管道中看到10個任務,每一個任務內容是一個uid

consumer.php程式碼:

<?php

require "vendor/autoload.php";

use Pheanstalk\Pheanstalk;

//連線BeanStalkd佇列系統
$ph = new Pheanstalk('127.0.0.1',11301);
$tube_name = 'SecKill2';
//取出SecKill2管道的任務總數
$total_jobs = $ph->statsTube($tube_name)['total-jobs'];

//PDO連線mysql資料庫
$dsn = "mysql:dbname=test;host=127.0.0.1";
$pdo = new PDO($dsn, 'root', '123456');

//迴圈取出管道中任務,並執行插入資料庫操作
for($i = 0; $i < $total_jobs; $i++){

    //監聽SecKill4管道,並將任務取出來
    $job = $ph->watch($tube_name)->reserve();

    //取出任務儲存的值uid
    $uid = $job->getData();//打印出的樣子 string(8) "24541944"

    if (!$uid) {
        sleep(2);
        continue;
    }
    //生成訂單號
    $orderNum = build_order_no($uid);
    //生成訂單時間
    $timeStamp = time();
    //構造插入陣列
    $user_data = array('uid'=>$uid,'time_stamp'=>$timeStamp,'order_num'=>$orderNum);
    //將資料儲存到資料庫
    $sql = "insert into seckill (uid,time_stamp,order_num) values (:uid,:time_stamp,:order_num)";
    $stmt = $pdo->prepare($sql);
    $res = $stmt->execute($user_data);
    //如果資料庫操作成功,則刪除任務
    if ($res) {
        $ph->delete($job);
    }

}

//生成唯一訂單號
function build_order_no($uid){
    return  substr(implode(NULL, array_map('ord', str_split(substr(uniqid(), 7, 13), 1))), 0, 8).$uid;
}
注: 執行完consumer.php檔案之後, 資料表應該已經寫入了10個訂單

注: 

用到的資料表


create table `seckill`(
`uid` int unsigned not null default '0',
`time_stamp` int unsigned not null  default '0',
`order_num` bigint unsigned not null default '0',
primary key (`uid`),
key (order_num)
)engine = myisam default charset= utf8;
參考視訊地址: https://www.imooc.com/video/16016

這邊部落格可能會用到 https://blog.csdn.net/ahjxhy2010/article/details/53196450

參考文獻:

https://blog.csdn.net/m_nanle_xiaobudiu/article/details/80466702