1. 程式人生 > >rabbitMq超詳解

rabbitMq超詳解

python

在此向前輩們致敬:http://blog.csdn.net/shatty/article/details/9529463


為什麽要學rabbitMQ

在此之前,我們想來進行一個概念區分

threading queue :只能用於線程之間的消息傳發

進程queue:可以用於進程(父進程與子進程或者同屬於同一父進程之間的子進程交互)之間的消息傳發

那麽不同的語言之間,不同的機器之間怎麽實現相互通信呢,這是一個問題吧

因此,我們的rabbitMq就起了很大的作用

接下來,我們對函數進行一一的相關介紹

connection = pika.BlockingConnection(pika.ConnectionParameters(

host=‘localhost‘))#固定格式,創建一個類似於socket連接,,因為是在本地進行,所以可以直接用localhost

如果與其他電腦連接

pika.BlockingConnection(pika.ConnectionParameters(‘127.0.0.1‘,5672,‘simple‘,credentials))

這樣

我們來看一看關於這個函數的介紹

def get_connection_parameters (self ,host ,port ,vhost ,username ,password ,
                                  heartbeat_interval ):“”“返回一個pika連接的連接參數。
        
        :參數str主機:連接到的RabbitMQ主機
        :param int port:連接的端口
        :param str vhost:虛擬主機
        :參數str用戶名:使用的用戶名
        :參數str密碼:使用的密碼
        :param int heartbeat_interval:AMQP心跳間隔
        :rtype:pika。ConnectionParameters
        “””

第三步:channel = connection.channel() #在連接上創建一個頻道

channel = connection.channel() #進行一個管道的聲明

channel.queue_declare(queue=‘hello‘) #聲明一個隊列,進行消息的傳送#客戶端與服務端都需要這樣

#註意以上都是套路,固定格式

接下來就是消息的發送呢

channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,#消息隊列的名字

body=‘Hello World!‘)#消息的內容

connection.close() #當生產者發送完消息後,可選擇關閉連接

我們再來看看消費者的寫法

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))

channel = connection.channel()

channel.queue_declare(queue=‘hello‘)#這個是防止不知道誰先運行而造成的錯誤

#上面的幾行代碼都是套路,服務端的時候已介紹,此處不做過解釋

def callback(ch, method, properties, body):

print(" [x] Received %r" % body)

#下面代碼是對消息的處理

channel.basic_consume(callback,#一旦接收到消息,就調用callback函數
queue=‘hello‘,

no_ack=True)

原理代碼如下:生產者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
channel = connection.channel()

# 聲明queue
channel.queue_declare(queue=‘hello‘)

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()

消費者:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(‘--->‘,ch,method,properties,body)
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue=‘hello‘,
no_ack=True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)

channel.start_consuming()

最後結果為:

C:\Python\Python36\python.exe C:/Users/Administrator/PycharmProjects/untitled3/python/day9/消費者.py

[*] Waiting for messages. To exit press CTRL+C

---> <BlockingChannel impl=<Channel number=1 OPEN conn=<SelectConnection OPEN socket=(‘::1‘, 58661, 0, 0)->(‘::1‘, 5672, 0, 0) params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>> #ch是我們剛聲明的內存對象的地址<Basic.Deliver([‘consumer_tag=ctag1.3ee0d6275e9f43288f95fe2ba2c83e1a‘, ‘delivery_tag=1‘, ‘exchange=‘, ‘redelivered=False‘, ‘routing_key=hello‘])> #這個包含你要把消息發給哪個queue的信息<BasicProperties> b‘Hello World!‘

[x] Received b‘Hello World!‘

好了,我們可以同時開三個消費者,不斷地接收消息,

那麽生產者沒有收到接收消息的確認怎麽辦呢

消費者:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
print(‘--->‘,ch,method,properties,body)
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue=‘hello‘,
#no_ack=True
)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)

channel.start_consuming()

生產者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
channel = connection.channel()

# 聲明queue
channel.queue_declare(queue=‘hello‘)

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")

connection.close()

結果是發現,生產者發送給一個消費者的消息傳遞給生產者了

rabbitMq超詳解