Node.js連線RabbitMQ,斷線重連,動態繫結routing key
阿新 • • 發佈:2018-12-25
RabbitMQ官方提供的教程https://www.rabbitmq.com/tuto...,是基於回撥的。
下面將給出基於Promise式的寫法。並且實現動態的佇列繫結
初始化配置
const amqp = require('amqplib') // rabbitMQ地址 const {amqpAddrHost} = require('../config/index.js') // 交換機名稱 const ex = 'amq.topic' const amqpAddr = `amqp://${amqpAddrHost}` // 讀取HOSTNAME, 在跑多例項時,例如在k8s中,HOSTNAME可以獲取當前pod的名稱 // 多例項時,寫日誌,或者建立連線時,最好帶上pod名稱,如果出現問題,也比較好定位哪個pod出現的問題。 const hostName = process.env.HOSTNAME // 佇列的屬性設定 // 一般來說,最好設定佇列自動刪除autoDelete,當連結斷開時,佇列也會刪除,這樣不會產生非常多的無用佇列 // durable是用來的持久化的,最好也可以設定成不持久化 const queueAttr = {autoDelete: true, durable: false} // 定義channel的引用,當連結建立時,所有方法都可以通過引用CH來獲取channel方法 let CH = null
向佇列傳送訊息的函式
// 向佇列傳送訊息的函式
function publishMessage (msg) {
if (!CH) {
return ''
}
msg = JSON.stringify(msg)
// 指定交換機ex, routing key, 以及訊息的內容
CH.publish(ex, eventBusTopic, Buffer.from(msg))
}
當連結rabbitMQ斷開時,要主動去重連
function reconnectRabbitMq () { log.info('reconnect_rabbit_mq') connectRabbitMq() }
連線rabbitMQ的主要函式
function connectRabbitMq () { amqp.connect(amqpAddr, { // 設定connection_name的屬性,可以在rabbitMQ的控制檯的UI上,看到連線是來自哪個例項 clientProperties: { connection_name: hostName } }) .then((conn) => { log.info('rabbitmq_connect_successd') // 一定要加上鍊接的報錯事件處理,否則一旦報error錯,如果不處理這個錯誤,程式就會崩潰 // error是個特別的事件,務必要處理的 // 報錯就直接去重連 conn.on('error', (err) => { log.error('connect_error ' + err.message, err) reconnectRabbitMq() }) // 建立channel return conn.createChannel() }) .then((ch) => { CH = ch // 初始化交換機 ch.assertExchange(ex, 'topic', {durable: true}) // 初始化一個佇列,佇列名就用hostName, 比較容易從對列名上知道是哪個例項建立的佇列 return ch.assertQueue(hostName, queueAttr) }) .then((q) => { // 可以在佇列初始化完畢就立即繫結routing key, 也可以暫時不繫結,後續動態的繫結 // CH.bindQueue(q.queue, ex, 'some.topic.aaa') // 消費者,獲取訊息 CH.consume(q.queue, (msg) => { var _msg = msg.content.toString() var MSG = JSON.parse(_msg) log.info(_msg, MSG) }, {noAck: true}) }) .catch((err) => { console.log(err) }) }
動態給佇列繫結或者解綁routing key
function toggleBindQueue (routingKey, bind) {
return new Promise((resolve, reject) => {
if (!CH) {
log.error('channel not established')
reject(new Error('channel not established'))
return ''
}
// 初始化佇列,如果佇列已經存在,就會直接使用
CH.assertQueue(`${hostName}`, queueAttr)
.then((q) => {
// 如果bind是true,就繫結。否則就解綁
if (bind) {
log.info(`bindQueue ${hostName} ${topic}`)
return CH.bindQueue(q.queue, ex, topic)
} else {
return CH.unbindQueue(q.queue, ex, topic)
}
})
.then((res) => {
resolve()
})
.catch((err) => {
reject(err)
log.error(err)
})
})
}
module.exports = {
connectRabbitMq,
toggleBindQueue,
publishMessage
}
使用方法
加入你的服務端用的是Express, 那麼在app.js中可以
...
const {connectRabbitMq} = require('./connect-mq.js')
connectRabbitMq()
...
完整程式碼
// onnect-mq.js
const amqp = require('amqplib')
// rabbitMQ地址
const {amqpAddrHost} = require('../config/index.js')
// 交換機名稱
const ex = 'amq.topic'
const amqpAddr = `amqp://${amqpAddrHost}`
// 讀取HOSTNAME, 在跑多例項時,例如在k8s中,HOSTNAME可以獲取當前pod的名稱
// 多例項時,寫日誌,或者建立連線時,最好帶上pod名稱,如果出現問題,也比較好定位哪個pod出現的問題。
const hostName = process.env.HOSTNAME
// 佇列的屬性設定
// 一般來說,最好設定佇列自動刪除autoDelete,當連結斷開時,佇列也會刪除,這樣不會產生非常多的無用佇列
// durable是用來的持久化的,最好也可以設定成不持久化
const queueAttr = {autoDelete: true, durable: false}
// 定義channel的引用,當連結建立時,所有方法都可以通過引用CH來獲取channel方法
let CH = null
// 向佇列傳送訊息的函式
function publishMessage (msg) {
if (!CH) {
return ''
}
msg = JSON.stringify(msg)
// 指定交換機ex, routing key, 以及訊息的內容
CH.publish(ex, eventBusTopic, Buffer.from(msg))
}
// 當連結rabbitMQ斷開時,要主動去重連
function reconnectRabbitMq () {
log.info('reconnect_rabbit_mq')
connectRabbitMq()
}
// 連結rabbitMQ的主要函式
function connectRabbitMq () {
amqp.connect(amqpAddr, {
// 設定connection_name的屬性,可以在rabbitMQ的控制檯的UI上,看到連結是來自哪個例項
clientProperties: {
connection_name: hostName
}
})
.then((conn) => {
log.info('rabbitmq_connect_successd')
// 一定要加上鍊接的報錯事件處理,否則一旦報error錯,如果不處理這個錯誤,程式就會崩潰
// error是個特別的事件,務必要處理的
// 報錯就直接去重連
conn.on('error', (err) => {
log.error('connect_error ' + err.message, err)
reconnectRabbitMq()
})
// 建立channel
return conn.createChannel()
})
.then((ch) => {
CH = ch
// 初始化交換機
ch.assertExchange(ex, 'topic', {durable: true})
// 初始化一個佇列,佇列名就用hostName, 比較容易從對列名上知道是哪個例項建立的佇列
return ch.assertQueue(hostName, queueAttr)
})
.then((q) => {
// 可以在佇列初始化完畢就立即繫結routing key, 也可以暫時不繫結,後續動態的繫結
// CH.bindQueue(q.queue, ex, 'some.topic.aaa')
// 消費者,獲取訊息
CH.consume(q.queue, (msg) => {
var _msg = msg.content.toString()
var MSG = JSON.parse(_msg)
log.info(_msg, MSG)
}, {noAck: true})
})
.catch((err) => {
console.log(err)
})
}
// 動態給佇列繫結或者解綁routing key
function toggleBindQueue (routingKey, bind) {
return new Promise((resolve, reject) => {
if (!CH) {
log.error('channel not established')
reject(new Error('channel not established'))
return ''
}
// 初始化佇列,如果佇列已經存在,就會直接使用
CH.assertQueue(`${hostName}`, queueAttr)
.then((q) => {
// 如果bind是true,就繫結。否則就解綁
if (bind) {
log.info(`bindQueue ${hostName} ${topic}`)
return CH.bindQueue(q.queue, ex, topic)
} else {
return CH.unbindQueue(q.queue, ex, topic)
}
})
.then((res) => {
resolve()
})
.catch((err) => {
reject(err)
log.error(err)
})
})
}
module.exports = {
connectRabbitMq,
toggleBindQueue,
publishMessage
}