Beanstalked的初步瞭解和使用(包括利用beanstalkd 秒殺訊息佇列的實現)
一 Beanstalkd 是什麼
Beanstalkd,一個高效能、輕量級的分散式記憶體佇列系統
二 Beanstalkd 特性
1. 優先順序(priority)
注:優先順序就意味 支援任務插隊(數字越小,優先順序越高,0的優先順序最高)
2. 延遲(delay)
注:延遲意味著可以定義任務什麼時間才開始被消費,也就實現了定時任務(比如為了增加網站活躍性,增加定時評論,定時點贊功能)
3. 持久化(persistent data)
注:Beanstalkd 支援定時將檔案刷到日誌檔案裡,即使beanstalkd宕機,重啟之後仍然可以找回檔案
4. 預留(buried)
注:Beanstalkd支援把一個任務設定為預留,這樣,消費者就無法取出這個任務了,等合適的時機再把這個任務拿出來消費
5. 任務超時重發(time-to-run)
注:消費者必須在指定的時間內處理完這個任務,否則就認為消費者處理失敗,任務會被重新放到佇列,等待消費
三 管道(tube)與任務(job)
注:生產者生產任務,並根據業務需求將任務放到不同管道中,比如和註冊有關的任務放到註冊管道中,和訂單有關的放到訂單管道中
注:任務從進入管道到離開管道一共有5個狀態(ready,delayed,reserved,buried,delete)
1. 生產者將任務放到管道中,任務的狀態可以是ready(表示任務已經準備好,隨時可以被消費者讀取),也可以是delayed(任務在被生產者放入管道時,設定了延遲,比如設定了5s延遲,意味著5s之後,這個任務才會變成ready狀態,才可以被消費者讀取)
2. 消費者消費任務(消費者將處於ready狀態的任務讀出來後,被讀取處理的任務狀態變為reserved)
3. 消費者處理完任務後,任務的狀態可能是delete(刪除,處理成功),可能是buried(預留,意味著先把任務放一邊,等待條件成熟還要用),可能是ready,也可能是delayed,需要根據具體業務場景自己進行判斷定義
具體示意圖:
四 Beanstalkd 的安裝(git、方式)
注:beanstalkd是不支援windows的 ,必須要在Linux環境下(本地環境介紹 Linux, php7,mysql5.6,nginx,centos7.4)
1. yum install -y git
2. git clone https://github.com/kr/beanstalkd
3. cd beanstalkd
4. make
5. make install
6. 檢視安裝是否成功
五 開始使用Beanstalkd
1. 繫結地址和埠
beanstalkd -l 127.0.0.1 -p 11301 &
2. composer安裝pheanstalkd類(這個類在php使用中會簡化很多操作,非常方便)
--------------
2.1 安裝composer curl -sS https://getcomposer.org/installer | php
2.2 將composer全域性呼叫 mv composer.phar /usr/local/bin/composer
2.3 cd /usr/local/bin
2.4 chmod +x composer
-----------------------
2.5 composer require pda/pheanstalk
注:pheanstalkd 用法示例:https://github.com/pda/pheanstalk
3. 簡單使用
環境介紹(阿里雲Linux伺服器, 本地win10,Filezilla用於雙方檔案傳輸 ,xshell用於連線遠端阿里雲伺服器)
3.1 編寫demo.php
<?php
require "vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$ph = new Pheanstalk('127.0.0.1',11301);
print_r($ph->stats());//檢視目前pheanStalkd狀態資訊
3.2 將在本地編寫的demo.php程式碼通過filezilla上傳到阿里雲伺服器 /usr/local/beanstalkd/beanstalkd下,並在linux下執行
https://github.com/kr/beanstalkd.git
https://github.com/kr/beanstalkd.git
4. pheanstalkd類常用的方法
<?php
require "vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$ph = new Pheanstalk('127.0.0.1',11301);
//----------------------------------------維護類----------------------------------
//1.檢視目前pheanStalkd狀態資訊
//print_r($ph->stats());
//2.顯示目前存在的管道
//print_r($ph->listTubes());
//3.檢視NewUsers管道的資訊
//$ph->useTube('NewUsers')->put('test');
//$ph->useTube('NewUsers')->put('up'); //4.向NewUsers管道新增一個up任務
//print_r($ph->statsTube('NewUsers'));//3.檢視NewUsers管道的資訊
//6.檢視指定管道中某一個任務的情況
//$job = $ph->watch('NewUsers')->reserve(); //5.從管道中取出任務(消費)
//print_r($ph->statsJob($job)); //6.檢視指定管道中某一個任務的情況
//7.檢視任務id為1的任務詳情
//$job = $ph->peek(1);7.直接取出任務id為1的任務 [注:beanstalkd中所有任務的id都具有唯一性]
//print_r($ph->statsJob($job));//檢視任務id為1的任務詳情
//----------------------------------------生產類--------------------------------------
////第一種 put()
//$tube = $ph->useTube('NewUsers');//連線NewUsers管道
//print_r($tube->put('four'));//向NewUsers管道新增任務four,並返回結果
//注: put()方法還有3個可選引數(依次為: 優先順序priority,延遲時間delay,任務超時重發ttr)
////第二種 putInTube() [注: putInTube()就是對useTube()和put()的封裝]
//$res = $ph->putInTube('NewUsers','three');//向NewUsers管道新增任務three
////注: putInTube()方法還有3個可選引數(依次為: 優先順序priority,延遲時間delay,任務超時重發ttr)
//print_r($res);//返回任務id
//print_r($ph->statsTube('NewUsers'));//檢視NewUsers管道的詳細情況
//---------------------------------------消費類--------------------------------------
// 1.watch 監聽NewUsers管道 [ 注: watch()同樣可以監聽多個管道 ]
//$tube = $ph->watch('NewUsers');
//print_r($ph->listTubesWatched());//列印已經監聽的管道
// 2.watch 監聽多個管道
//$tube = $ph->watch('NewUsers')
// ->watch('default');
//print_r($ph->listTubesWatched());//列印已經監聽的管道
// 3.ignore 監聽NewUsers管道,忽略default管道
//$tube = $ph->watch('NewUsers')
// ->ignore('default');
//print_r($ph->listTubesWatched());//列印已經監聽的管道
// 4.reserve 監聽NewUsers管道,並且取出任務
//$job = $ph->watch('NewUsers')
// ->reserve();
//
////注reserve()有1個引數,阻塞的時間,過了阻塞時間,不管有沒有東西,直接返回
//
//var_dump($job);//列印已經取出的任務
//$ph->delete($job);//刪除已經取出的任務
// 5.putInTube/put 向NewUsers管道寫入任務 [ 注:此為生產者方法,放到此處是為了方便理解 ]
//$ph->putInTube('NewUsers','number_1',5);
//$ph->putInTube('NewUsers','number_2',3);
//$ph->putInTube('NewUsers','number_3',0);
//$ph->putInTube('NewUsers','number_4',4);
//print_r($ph->statsTube('NewUsers'));//5.檢視NewUsers管道詳細資訊
// 6.release 將取出的任務放回ready狀態,還有2個引數(優先順序和延遲)
//$job = $ph->watch('NewUsers')->reserve();//6.監聽NewUsers管道,並取出任務
//if (true) {
// sleep(30);
// $ph->release($job);//6.將任務取出之後,停留30秒,然後將任務狀態重新變為ready
//} else {
// $ph->delete($job);
//}
// 7.bury (預留) 將任務取出之後,發現後面執行的邏輯不成熟(比如發郵件,突然發現郵件伺服器掛掉了),
//或者說還不能執行後面的邏輯,需要把任務先封存起來,等待時機成熟了,再拿出這個任務進行消費
//$job = $ph->watch('NewUsers')->reserve();//取出任務
//$ph->bury($job);//取出任務後,將任務放到一邊(預留)
// 8.peekBuried() 將處在bury狀態的任務讀取出來
//$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來
//var_dump($ph->statsJob($job));//列印任務狀態(此時任務狀態應該是bury)
// 9.kickJob() 將處在bury任務狀態的任務轉化為ready狀態
//$job = $ph->peekBuried('NewUsers');//將NewUsers管道中處在bury狀態的任務讀取出來
//$ph->kickJob($job);
// 10.kick() 將處在bury任務狀態的任務轉化為ready狀態,有第二個引數int, 批量將任務id小於此數值的任務轉化為ready
//$ph->useTube('NewUsers')->kick(65);//把NewUsers管道中任務id小於65,並且任務狀態處於bury的任務全部轉化為ready
// 11.peekReady() 將管道中處於ready狀態的任務讀出來
//$job = $ph->peekReady('NewUser');//將NewUser管道中處於ready狀態的任務讀取出來
//var_dump($job);
//$ph->delete($job);
// 12.peekDelay() 將管道中所有處於delay狀態的任務讀取出來
//$job = $ph->peekDelayed('NewUser');
//var_dump($job);
//$ph->delete($job);
// 13.pauseTube() 對整個管道進行延遲設定,讓管道處於延遲狀態
//$ph->pauseTube('NewUser',10);//設定管道NewUser延遲時間為10s
//$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,並取出任務
//var_dump($job);
// 14.resumeTube() 恢復管道,讓管道處於不延遲狀態,立即被消費
//$ph->resumeTube('NewUser');//取消管道NewUser的延遲狀態,變為立即讀取
//$job = $ph->watch('NewUser')->reserve();//監聽NewUser管道,並取出任務
//var_dump($job);
// 15.touch() 讓任務重新計算任務超時重發ttr時間,相當於給任務延長壽命
5. 專案中的應用總結
生產者中常用的方法
useTube() : 如果沒有管道,則建立對應管道,有,則直接使用
put() : 向管道中放任務
消費者中常用的方法步驟:
1. watch():監聽管道
2. reserve():將管道中處於ready狀態的任務讀取出來
3.1 可以使用delete 方法刪除任務
3.2 可以使用release 方法將任務放回ready狀態
3.3 可以使用bury 方法將任務先放一邊(例如發郵件,郵箱伺服器掛掉),等待條件成熟再取出來
6. 實際應用演示
producer.php
<?php
require "vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$ph = new Pheanstalk('127.0.0.1',11301);
$ph->useTube('List')->put('goods');
$ph->useTube('List')->put('goods2');
$ph->useTube('List')->put('goods3');
$ph->useTube('List')->put('goods4');
$ph->useTube('List')->put('goods5');
//print_r($ph->statsTube('List'));//檢視List管道的資訊
consumer.php
<?php
require "vendor/autoload.php";
use Pheanstalk\Pheanstalk;
$ph = new Pheanstalk('127.0.0.1',11301);
$res = $ph->watch('List')->reserve();//監聽List管道,並將任務取出來
if ($res) {
$ph->delete($res);
var_dump($res);
}
注:
生產者根據業務不同,將任務放到不同管道(管道用於儲存消費者生產的任務),比如將和註冊有關的任務通通放到註冊管道,和訂單有關的通通放入訂單管道
消費者將處於ready狀態的任務根據優先順序逐個讀取出來
五 使用Beanstalkd 實現類似redis秒殺活動
producer.php程式碼:
<?php
require "vendor/autoload.php";
use Pheanstalk\Pheanstalk;
//連線beanstalkd
$ph = new Pheanstalk('127.0.0.1', 11301);
$tube_name = 'SecKill2';
//使用SecKill2管道
$SEC = $ph->useTube($tube_name);
//模擬100人請求秒殺
for ($i = 0; $i < 100; $i++) {
$uid = rand(10000000, 99999999);
//獲取當前佇列已經擁有的數量,如果人數少於十,則加入這個佇列
$total_jobs = $ph->statsTube($tube_name)['total-jobs'];
$num = 10;
if ($total_jobs < $num) {
$SEC->put($uid);//向管道放任務
echo $uid . "秒殺成功";
} else {
//如果當前佇列人數已經達到10人,則返回秒殺已完成
echo "秒殺已結束<br>";
}
}
print_r($ph->statsTube($tube_name));//檢視SecKill2管道的資訊
注: 執行完producer.php程式碼後,應當會在SecKill2管道中看到10個任務,每一個任務內容是一個uid
consumer.php程式碼:
<?php
require "vendor/autoload.php";
use Pheanstalk\Pheanstalk;
//連線BeanStalkd佇列系統
$ph = new Pheanstalk('127.0.0.1',11301);
$tube_name = 'SecKill2';
//取出SecKill2管道的任務總數
$total_jobs = $ph->statsTube($tube_name)['total-jobs'];
//PDO連線mysql資料庫
$dsn = "mysql:dbname=test;host=127.0.0.1";
$pdo = new PDO($dsn, 'root', '123456');
//迴圈取出管道中任務,並執行插入資料庫操作
for($i = 0; $i < $total_jobs; $i++){
//監聽SecKill4管道,並將任務取出來
$job = $ph->watch($tube_name)->reserve();
//取出任務儲存的值uid
$uid = $job->getData();//打印出的樣子 string(8) "24541944"
if (!$uid) {
sleep(2);
continue;
}
//生成訂單號
$orderNum = build_order_no($uid);
//生成訂單時間
$timeStamp = time();
//構造插入陣列
$user_data = array('uid'=>$uid,'time_stamp'=>$timeStamp,'order_num'=>$orderNum);
//將資料儲存到資料庫
$sql = "insert into seckill (uid,time_stamp,order_num) values (:uid,:time_stamp,:order_num)";
$stmt = $pdo->prepare($sql);
$res = $stmt->execute($user_data);
//如果資料庫操作成功,則刪除任務
if ($res) {
$ph->delete($job);
}
}
//生成唯一訂單號
function build_order_no($uid){
return substr(implode(NULL, array_map('ord', str_split(substr(uniqid(), 7, 13), 1))), 0, 8).$uid;
}
注: 執行完consumer.php檔案之後, 資料表應該已經寫入了10個訂單
注:
用到的資料表
create table `seckill`(
`uid` int unsigned not null default '0',
`time_stamp` int unsigned not null default '0',
`order_num` bigint unsigned not null default '0',
primary key (`uid`),
key (order_num)
)engine = myisam default charset= utf8;
參考視訊地址: https://www.imooc.com/video/16016
這邊部落格可能會用到 https://blog.csdn.net/ahjxhy2010/article/details/53196450
參考文獻:
https://blog.csdn.net/m_nanle_xiaobudiu/article/details/80466702