通過 http 方式判斷 RabbitMQ 佇列是否存在
阿新 • • 發佈:2018-12-17
# summary: 通過 http 方式獲取 RabbitMQ 佇列狀態 # import os, sys, time import requests import json from nlp_property import NLP_FRAMEWORK_IP from urllib import parse class RabbitMQTool: def __init__(self, host, vhost, queue): self.host = host self.vhost = vhost self.queue = queue self.user = 'admin' self.passwd = '
[email protected]' def get_queue_status(self): vhost = parse.quote_plus(self.vhost) queue = parse.quote_plus(self.queue) url = 'http://%s:15672/api/queues/%s/%s' % (self.host, vhost, queue) # url = url.encode(encoding='utf-8') # url = 'http://%s:15672/api/queues' % self.host print(url) # r = requests.get(url, auth=(self.user, self.passwd)) r = requests.get(url, auth=(self.user, self.passwd)) print(r) if r.status_code != 200: return False else: return True # dic = json.loads(r.text) # print(dic) # # return dic['messages_ready'], dic['messages_unacknowledged'], dic['messages'] if __name__ == '__main__': mqTool = RabbitMQTool(host=NLP_FRAMEWORK_IP, vhost="/", queue="nlp.classify.cnn.banktest.request.#") status = mqTool.get_queue_status() print(status)