1. 程式人生 > >通過 http 方式判斷 RabbitMQ 佇列是否存在

通過 http 方式判斷 RabbitMQ 佇列是否存在

# summary: 通過 http 方式獲取 RabbitMQ 佇列狀態

# import os, sys, time
import requests
import json
from nlp_property import NLP_FRAMEWORK_IP
from urllib import parse


class RabbitMQTool:
    def __init__(self, host, vhost, queue):
        self.host = host
        self.vhost = vhost
        self.queue = queue
        self.user = 'admin'
        self.passwd = '
[email protected]
' def get_queue_status(self): vhost = parse.quote_plus(self.vhost) queue = parse.quote_plus(self.queue) url = 'http://%s:15672/api/queues/%s/%s' % (self.host, vhost, queue) # url = url.encode(encoding='utf-8') # url = 'http://%s:15672/api/queues' % self.host print(url) # r = requests.get(url, auth=(self.user, self.passwd)) r = requests.get(url, auth=(self.user, self.passwd)) print(r) if r.status_code != 200: return False else: return True # dic = json.loads(r.text) # print(dic) # # return dic['messages_ready'], dic['messages_unacknowledged'], dic['messages'] if __name__ == '__main__': mqTool = RabbitMQTool(host=NLP_FRAMEWORK_IP, vhost="/", queue="nlp.classify.cnn.banktest.request.#") status = mqTool.get_queue_status() print(status)