nodejs之簡單的秒殺系統實現(mysql、redis、kafka、zookeeper、postman或docker)
nodejs之簡單的秒殺系統實現(mysql、redis、kafka、zookeeper或docker)
一:環境
1.一臺阿里雲伺服器
2.雲伺服器需要安裝redis、kafka、mysql、zookeeper環境
3.一臺本地電腦需要安裝postman
注:kafka與zookeeper的安裝和配置請參考:https://blog.csdn.net/wushichao0325/article/details/84993081 或者使用docker直接拉去映象安裝。
二:工具說明
1.postman:用於使用其中的(Run)實現併發測試
2.redis:用於存放商品資訊,這裡只建立一個counter欄位,使用redis的命令set設定個初始值。100,即商品總量。模擬資料為:counter=100
3.kafka:結合zookeeper使用,將秒殺成功的結果生成一個producer,待consumer去消費及同步資料到mysql。模擬資料為:建立一個topic名為PRODUCT_NUMBER。
4.mysql:存放最終的秒殺結果。模擬資料為:建立一個seckill資料庫,再建立一個seckill表,表字段為id自增和date存放秒殺時間和uid存放使用者id。
三:程式碼實現
1.程式碼目錄
2.node依賴包
3.seckillService.js
接收秒殺請求,並將秒殺結果及使用者資訊傳送個kafka的producer。
var express=require("express"),
redis=require("redis"),
kafka=require('kafka-node'),
Producer=kafka.Producer,
kafkaClient=new kafka.Client(),
producer=new Producer(kafkaClient);
count= 0;
app=express();
config=require("./config/redis.json");
var bodyParser=require('body-parser');
app.use(express.json());
app.use(bodyParser.json({limit: '1mb'})); //body-parser 解析json格式資料
app.use(bodyParser.urlencoded({ //此項必須在 bodyParser.json 下面,為引數編碼
extended: true
}));
app.all("*", function (req, res, next) {
res.setHeader("Access-Control-Allow-Origin", "*");
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Content-Length, Authorization, Accept, X-Requested-With , yourHeaderFeild');
res.setHeader("Access-Control-Allow-Methods","PUT,POST,GET,DELETE,OPTIONS");
res.setHeader("X-Powered-By",' 3.2.1');
next();
});
app.post('/seckill',function(req,res){
console.log('count='+count++);
let fn=function(optionalClient){
let client;
if(optionalClient=='undefined'||optionalClient==null){
client=redis.createClient(config);
}else{
client=optionalClient;
}
client.on('error',function(er){
console.error(er.stack);
client.end(true);
});
client.watch("counter");//監聽counter欄位
client.get("counter",function(err,reply){
if(parseInt(reply)>0){
let multi=client.multi();
multi.decr('counter');//更新redis的counter數量減一。
multi.exec(function(err,replies){
if(replies==null){//counter欄位正在操作中,等待counter被其他釋放
console.log("counter被使用");
fn(client);
}else{
var args = {
openid: 'b05NZ2Y1WjbE9fRV9MZTBWWQ==',
seckillTime: '2018-12-12 00:00:01',
}
let payload=[{
topic:'PROUDCT_NUMBER',
messages:[JSON.stringify(args)],
key:"seckill",
partition:0
}];
console.log("payload:",payload);
producer.send(payload,function(err,data){
console.log(data);
});
res.send(replies);
client.end(true);
}
})
}else{
console.log("已經賣光了");
res.send("已經賣光了");
client.end(true);
}
});
}
fn(null);
});
app.listen(8888, "0.0.0.0",function () {
console.log(`Please link connectorServer http://localhost:8888`);
});
4.seckill_kafka_consumer.js
消費者消費生產者的資料,並將資料同步到mysql中。
var kafka=require('kafka-node'),
Consumer=kafka.Consumer,
client=new kafka.Client(),
consumer=new Consumer(
client,
[
{topic:'CAR_NUMBER',partition:0}
],
{
autoCommit:false,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
fromOffset: true
}
);
var mysql=require('mysql');
var connection=mysql.createConnection({
"host":"**.***.***.**",
"user":"root",
"password":"password",
"database":"seckill"
});
connection.connect();
consumer.on('message',function(message){
console.log("得到的生產者的資料為:",message);
let value=JSON.parse(message.value);
connection.query('insert into seckill set ?',{date:new Date(),uid:value.openid},function(error,results,fields){
if(error){
console.error(error);
}
console.log("插入資料庫成功");
});
})
四:啟動專案
1.啟動雲伺服器的redis
2.啟動雲伺服器的zookeeper
3.啟動雲伺服器的kafka
4.將專案部署到雲伺服器中。
5.進入專案根目錄,啟動seckillService.js和seckill_kafka_consumer.js,我是使用pm2去管理這兩個服務。
五:測試
開啟postman如圖:
先將這個連結儲存一個檔案,然後點選Runner按鈕。
如圖:
可以使用post請求:http://seckill.ykplay.com/seckill 測試。
伺服器列印資訊如下:
seckillService.js
seckill_kafka_consumer.js
有這些資訊證明秒殺功能成功實現了。