1. 程式人生 > >OpenStack建立例項完整過程原始碼詳細分析(14)----依據AMQP通訊架構實現訊息接收機制解析之一

OpenStack建立例項完整過程原始碼詳細分析(14)----依據AMQP通訊架構實現訊息接收機制解析之一

感謝朋友支援本部落格,歡迎共同探討交流,由於能力和時間有限,錯誤之處在所難免,歡迎指正!
如果轉載,請保留作者資訊。
部落格地址:http://blog.csdn.net/gaoxingnengjisuan
郵箱地址:[email protected]

這篇博文開始解析NOVA中的AMQP架構下訊息的消費者如何從特定的訊息佇列中讀取傳送給自己的訊息,並進行執行操作。

總體來講,Nova中的各個服務在啟動的時候就會初始化會用到的佇列,而且會啟動一個綠色執行緒,不斷的迴圈驗證新的訊息的到來,一旦有新的訊息,將會由合適的consumer進行讀取,並進一步進行訊息的解析和執行操作。

下面,我將會以compute服務的啟動作為例項,重點解析AMQP架構下訊息的消費操作。


1.kombu consumer程式碼示例

kombu中訊息接收消費機制的簡單實現如示例所示:

    #!/usr/bin/python  
      
    from kombu.entity import Exchange, Queue  
    from kombu.messaging import Consumer  
    from kombu.connection import Connection  
      
      
      
    def process_media(body, message):  
        print body  
        message.ack()  
      
    connection = Connection('amqp://guest:
[email protected]
:5672//') channel = connection.channel() media_exchange = Exchange('media', 'direct', channel) video_queue = Queue('video', exchange=media_exchange, routing_key='video', channel=channel) consumer = Consumer(channel, queues=[video_queue], callbacks=[process_media]) consumer.consume() while True: connection.drain_events() consumer.cancel()
思路也很簡單:

(1)建立連線;

(2)獲取channel;

(3)建立exchange;

(4)建立佇列並與exchange繫結;

(5)建立Consumer;

(6)consume()向server註冊,表明可以接受訊息了;

(7)drain_enents阻塞程式,等待訊息到來;

(8)cancel()通知server不要向該consumer傳送任何訊息了;

在nova中,當然不能實現的這麼簡便,而是進行了一系列的封裝操作,但是基本的實現思路是一致的。

2.以nova-compute服務啟動為例,解析AMQP架構下訊息的消費操作

首先來看nova-compute服務啟動指令碼程式碼:

"""
Nova Compute的啟動指令碼;
"""

import eventlet
import os

if os.name == 'nt':
    eventlet.monkey_patch(os=False)
else:
    eventlet.monkey_patch()

import os
import sys
import traceback

from oslo.config import cfg

POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
                                   os.pardir,
                                   os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')):
    sys.path.insert(0, POSSIBLE_TOPDIR)


from nova import config
import nova.db.api
from nova import exception
from nova.openstack.common import log as logging
from nova import service
from nova import utils

CONF = cfg.CONF
CONF.import_opt('compute_topic', 'nova.compute.rpcapi')
CONF.import_opt('use_local', 'nova.conductor.api', group='conductor')
LOG = logging.getLogger('nova.compute')


def block_db_access():
    class NoDB(object):
        def __getattr__(self, attr):
            return self

        def __call__(self, *args, **kwargs):
            stacktrace = "".join(traceback.format_stack())
            LOG.error('No db access allowed in nova-compute: %s' % stacktrace)
            raise exception.DBNotAllowed('nova-compute')

    nova.db.api.IMPL = NoDB()


if __name__ == '__main__':
    config.parse_args(sys.argv)
    logging.setup('nova')
    utils.monkey_patch()

    if not CONF.conductor.use_local:
        block_db_access()

    # 初始化Service這個類,並且獲取其例項化物件;
    server = service.Service.create(binary='nova-compute',
                                    topic=CONF.compute_topic,
                                    db_allowed=False)
    service.serve(server)
    service.wait()
簡單的說,nova-compute服務啟動指令碼中將會有如下的程式碼執行順序:
def serve(server, workers=None):
    global _launcher
    if _launcher:
        raise RuntimeError(_('serve() can only be called once'))

    if workers:
        _launcher = ProcessLauncher()
        _launcher.launch_server(server, workers=workers)
    else:
        _launcher = ServiceLauncher()
        _launcher.launch_server(server)
def launch_server(self, server):
    """
    載入並啟動給定的服務;
    """
    if self.backdoor_port is not None:
        server.backdoor_port = self.backdoor_port
    gt = eventlet.spawn(self.run_server, server)
    self._services.append(gt)
def run_server(server):
    """
    啟動並等待一個服務的執行的結束;
    """
    server.start()
    server.wait()
可見在nova-compute服務啟動的過程中將會呼叫/nova/server.py中類Service下的方法def start(self),這也將會是我們主要進行解析的一個方法,來看實現的程式碼:
    def start(self):
        verstr = version.version_string_with_package()
        LOG.audit(_('Starting %(topic)s node (version %(version)s)'),
                  {'topic': self.topic, 'version': verstr})
        # 在程序開始之前執行基本配置的檢測;
        self.basic_config_check()
        self.manager.init_host()
        self.model_disconnected = False
        # 獲取上下文資訊;
        ctxt = context.get_admin_context()
        try:
            # 查詢資料庫獲取topic、host、binary型別指定的所有的服務;
            self.service_ref = self.conductor_api.service_get_by_args(ctxt, self.host, self.binary)
            # 獲取這些服務的ID值;
            self.service_id = self.service_ref['id']
        except exception.NotFound:
            # 如果沒有合適的服務存在,則根據上下文要求建立一個服務環境;
            self.service_ref = self._create_service_ref(ctxt)

        if self.backdoor_port is not None:
            self.manager.backdoor_port = self.backdoor_port

        # 建立一個到用於RPC的訊息匯流排的連線;
        # 建立獲取到RabbitMQ的連線;
        # 建立連線,預設是kombu實現;
        self.conn = rpc.create_connection(new=True)
        LOG.debug(_("Creating Consumer connection for Service %s") %
                  self.topic)
        # 更新現有資源;
        # 讀取系統的總共資源以及可用的資源,更新資源,算出已經使用的資源;
        self.manager.pre_start_hook(rpc_connection=self.conn)

        # 獲取RPC排程器;
        # 初始化RPC排程器;
        rpc_dispatcher = self.manager.create_rpc_dispatcher()

        # 建立不同的訊息消費者;
        # 建立以服務的topic為路由鍵的消費者;
        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)

        # 建立以服務的topic和本機名為路由鍵的消費者(基於topic&host,可用來接收定向訊息);
        node_topic = '%s.%s' % (self.topic, self.host)
        self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)

        # fanout直接投遞訊息,不進行匹配,速度最快(fanout型別,可用於接收廣播訊息);
        self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)

        # 啟動消費者執行緒;
        # consume_in_thread用evelent.spawn建立一個協程一直執行;
        # 等待訊息,在有消費到來時會建立新的協程執行遠端呼叫的函式;
        self.conn.consume_in_thread()

        # 在啟動服務之後,並能夠通過RPC接收訊息之後,廣播通知每個節點更新capabilities屬性值,由控制器獲取每個節點上的這個屬性值;
        self.manager.post_start_hook()

        LOG.debug(_("Join ServiceGroup membership for this service %s")
                  % self.topic)
 # 新增服務到服務成員組;
        pulse = self.servicegroup_api.join(self.host, self.topic, self)
        if pulse:
            self.timers.append(pulse)

        if self.periodic_enable:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            periodic = utils.DynamicLoopingCall(self.periodic_tasks)
            periodic.start(initial_delay=initial_delay,
                           periodic_interval_max=self.periodic_interval_max)
            self.timers.append(periodic)
這個方法主要實現了獲取所有服務、建立到RPC的連線,建立不同型別的訊息消費者,啟動消費者執行緒用來執行獲取的訊息,並在啟動服務後新增服務到服務成員組等等操作。下面我們來詳細的解析這個方法。

2.1 語句self.conn = rpc.create_connection(new=True)的解析

這條語句實現了建立獲取到RabbitMQ的連線,具體來看方法create_connection的程式碼實現:

def create_connection(new=True):
    """
    建立一個到用於RPC的訊息匯流排的連線;
    建立獲取到RabbitMQ的連線;
    """
    return _get_impl().create_connection(CONF, new=new)
def create_connection(conf, new=True):
    """
    建立獲取到RabbitMQ的連線;
    """
    
    # get_connection_pool:獲取到RabbitMQ的連線池,並返回這個連線池的物件;
    # Connection:連線到RabbitMQ的實現類;
    return rpc_amqp.create_connection(
        conf, new,
        rpc_amqp.get_connection_pool(conf, Connection))
注:方法中有關的方法在前面的博文中已經進行過解析,所以這裡不再贅述。
def create_connection(conf, new, connection_pool):
    """
    建立連線;
    """
    return ConnectionContext(conf, connection_pool, pooled=not new)
class ConnectionContext(rpc_common.Connection):
    """
    這個類是對連線功能的一個封裝;
    這個類提供方法建立新的連線,也可以實現從連線池中直接獲取一個連線;
    當然,也有方法可以實現對連線的刪除,當連線刪除之後,如果是從連線池獲取的連線會把連線返回連線池;
    """

    def __init__(self, conf, connection_pool, pooled=True, server_params=None):
        """
        建立一個新的連線,或者從連線池中獲取一個連線;
        """
        self.connection = None
        self.conf = conf
        self.connection_pool = connection_pool
        
        # 如果已經獲取連線池物件,直接從連線池中獲取一個連線;
        if pooled:
            self.connection = connection_pool.get()
        else:
            self.connection = connection_pool.connection_cls(conf, server_params=server_params)
        self.pooled = pooled
到這裡,一個到RabbitMQ的連線已經建立好,也就是實現了前面訊息消費者示例所說的第一個步驟,建立連線。

2.2 語句self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)的解析

這裡有三條相似的語句,如下:

    # 建立不同的訊息消費者;
    # 建立以服務的topic為路由鍵的消費者;
    self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)

    # 建立以服務的topic和本機名為路由鍵的消費者(基於topic&host,可用來接收定向訊息);
    node_topic = '%s.%s' % (self.topic, self.host)
    self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)

    # fanout直接投遞訊息,不進行匹配,速度最快(fanout型別,可用於接收廣播訊息);
    self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
這三條語句實現了建立三種不同的訊息消費者,分別是以topic為路由鍵的主題式消費者,以topic.<host>為路由鍵的主題式消費者和以topic為路由鍵的廣播式消費者。這裡主題式和廣播式是以訊息消費者所應用的交換器的型別來進行區分的;而交換器的不同,主要體現在路由匹配方法的不同;這些後面會涉及到。

這裡現以語句self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)作為示例進行解析。

首先來看方法create_consumer,實現了訊息消費者的建立,具體來看程式碼的實現:

def create_consumer(self, topic, proxy, fanout=False):
        """
        根據引數建立具體的訊息消費者;
        """
        
        # Connection:連線到RabbitMQ的實現類;
        # get_connection_pool:獲取到RabbitMQ的連線池,並返回這個連線池的物件;
        # connection_cls:是一個連線類的物件;
        # 獲取ProxyCallback類的初始化例項物件;
        proxy_cb = rpc_amqp.ProxyCallback(
            self.conf, proxy,
            rpc_amqp.get_connection_pool(self.conf, Connection))
        self.proxy_callbacks.append(proxy_cb)

        if fanout:
            # 建立一個廣播型別的消費者;
            # 宣告佇列和交換器,通過routing key繫結佇列到交換器;
            self.declare_fanout_consumer(topic, proxy_cb)
        else:
            # declare_topic_consumer:建立一個主題式的資訊消費者;
            # 宣告佇列和交換器,通過routing key繫結佇列到交換器;
            self.declare_topic_consumer(topic, proxy_cb)
2.2.1 首先來看語句proxy_cb = rpc_amqp.ProxyCallback(......):
class ProxyCallback(_ThreadPoolWithWait):
    def __init__(self, conf, proxy, connection_pool):
        # 這個類實現了啟動一個用於處理傳入資訊的綠色執行緒;
        super(ProxyCallback, self).__init__(
            conf=conf,
            connection_pool=connection_pool,
        )
        self.proxy = proxy
        self.msg_id_cache = _MsgIdCache()

    def __call__(self, message_data):
        if hasattr(local.store, 'context'):
            del local.store.context
        # 記錄日誌,但是一些敏感資訊,像建立虛擬機器時的密碼等資訊不會
        # 顯示在日誌中,面是替換為"<SANITIZED>"
        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
        # 重複訊息檢測;
        # ack返回前,AMQP消費者可能會出現兩次讀取相同資訊的異常,這個方法可以防止這樣的情況出現;
        self.msg_id_cache.check_duplicate_message(message_data)
        # 從message_data中解析出上下文資訊;
        ctxt = unpack_context(self.conf, message_data)
        # 從message_data中獲取要執行的方法method;
        method = message_data.get('method')
        # 從message_data中獲取相關引數args;
        args = message_data.get('args', {})
        # 從message_data中獲取版本的相關資訊;
        version = message_data.get('version', None)
        if not method:
            LOG.warn(_('no method for message: %s') % message_data)
            ctxt.reply(_('No method for message: %s') % message_data,
                       connection_pool=self.connection_pool)
            return
        # 建立一個新的綠色執行緒來執行方法method;
        self.pool.spawn_n(self._process_data, ctxt, version, method, args)
class _ThreadPoolWithWait(object):
    """
    這個類實現了啟動一個用於處理傳入資訊的綠色執行緒;
    """

    def __init__(self, conf, connection_pool):
        self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
        self.connection_pool = connection_pool
        self.conf = conf

這裡的_call_方法是非常非常重要的,當然這裡我只是拿出來給大家看一下,這裡還沒有呼叫這個方法,這個方法就是執行了對獲取的訊息進行最終處理的過程。其實nova中呼叫這個方法真的是輾轉挪移,七拐八拐,不留意的話很難看到這個方法的呼叫,後面我會專門進行解析說明這個方法是怎麼被呼叫的。

2.2.2 語句self.declare_topic_consumer(topic, proxy_cb)的解析

這條語句實現了建立一個主題式的訊息消費者,具體來看方法declare_topic_consumer的程式碼:

def declare_topic_consumer(self, topic, callback=None, queue_name=None,
                               exchange_name=None):
    """
    建立一個主題式的資訊消費者;
    宣告佇列和交換器,通過routing key繫結佇列到交換器;
    """
        
    # 根據傳入的類建立資訊消費者,把建立好的消費者物件加入到consumers列表中;
    self.declare_consumer(functools.partial(TopicConsumer,
                                            name=queue_name,
                                            exchange_name=exchange_name,
                                            ),
                          topic, callback)
首先來看方法declare_consumer:
def declare_consumer(self, consumer_cls, topic, callback):
        """
        根據傳入的類建立資訊消費者,把建立好的消費者物件加入到consumers列表中;
        """

        def _connect_error(exc):
            log_info = {'topic': topic, 'err_str': str(exc)}
            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
                      "%(err_str)s") % log_info)

        def _declare_consumer():
            # 傳遞進來的類consumer_cls應該是TopicConsumer;
            # 宣告佇列和交換器,通過routing key繫結佇列到交換器;
            consumer = consumer_cls(self.conf, self.channel, topic, callback,
                                    self.consumer_num.next())
            self.consumers.append(consumer)
            return consumer

        return self.ensure(_connect_error, _declare_consumer)
在方法_declare_consumer中,傳遞進來的consumer_cls應該是TopicConsumer類,所以語句consumer = consumer_cls(self.conf, self.channel, topic, callback, self.consumer_num.next())實現的就是對類TopicConsumer進行初始化,並獲取其例項化物件,再把建立好的消費者物件加入到consumers列表中。

下面對類TopicConsumer的初始化過程進行比較細緻的解析,來看程式碼的實現:

class TopicConsumer(ConsumerBase):
    """
    主題式訊息消費者類;
    """

    def __init__(self, conf, channel, topic, callback, tag, name=None,
                 exchange_name=None, **kwargs):
        # 預設選項的設定;
        
        # rabbit_durable_queues:這個引數定義了在RabbitMQ中是否使用永續性的佇列;
        # 引數的預設值為False;
        options = {'durable': conf.rabbit_durable_queues,
                   'queue_arguments': _get_queue_arguments(conf),
                   'auto_delete': False,
                   'exclusive': False}
        options.update(kwargs)
        
        # 獲取交換器的名稱;
        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
        # 根據相關的配置引數宣告產生一個交換器;
        exchange = kombu.entity.Exchange(name=exchange_name,
                                         type='topic',
                                         durable=options['durable'],
                                         auto_delete=options['auto_delete'])
        # 重連線到channel;
        # 宣告佇列和交換器,通過routing key繫結佇列到交換器;
        super(TopicConsumer, self).__init__(channel,
                                            callback,
                                            tag,
                                            name=name or topic,
                                            exchange=exchange,
                                            routing_key=topic,
                                            **options)
首先來看交換器的宣告類Exchange的初始化過程:
class Exchange(MaybeChannelBound):
    """
    交換器宣告;
    """
    # 短暫性訊息傳遞方式;
    TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
    # 永續性訊息傳遞方式;
    PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE

    # 交換器名稱;
    name = ""
    # 如果沒有定義交換器型別,預設的交換器型別(直接式交換器),但是這裡面已經定義為主題式交換器;
    type = "direct"
    # 預設交換器是持久型別的;
    durable = True
    auto_delete = False
    # 永續性訊息傳遞方式;
    delivery_mode = PERSISTENT_DELIVERY_MODE

    attrs = (("name", None),
             ("type", None),
             ("arguments", None),
             ("durable", bool),
             ("auto_delete", bool),
             ("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m))

    def __init__(self, name="", type="", channel=None, **kwargs):
        super(Exchange, self).__init__(**kwargs)
        self.name = name or self.name
        self.type = type or self.type
        # maybe_bind:如果沒有繫結,則繫結例項到channel;
        self.maybe_bind(channel)
def maybe_bind(self, channel):
        """
        如果沒有繫結,則繫結例項到channel;
        """
        if not self.is_bound and channel:
            self._channel = channel
            self.when_bound()
            self._is_bound = True
        return self
再來看語句super(TopicConsumer, self).__init__(channel,callback,tag,name=name or topic,exchange=exchange,routing_key=topic,**options),這條語句實現的是對類TopicConsumer的父類ConsumerBase進行進一步的初始化操作。
class ConsumerBase(object):
    """
    Consumer(消費者)基類;
    """

    def __init__(self, channel, callback, tag, **kwargs):
        """
        根據相關引數在一個amqp channel上宣告產生一個佇列;
        """
        self.callback = callback
        self.tag = str(tag)
        self.kwargs = kwargs
        self.queue = None
        # 重連線到channel;
        # 宣告佇列和交換器,通過routing key繫結佇列到交換器;
        self.reconnect(channel)
ConsumerBase類是消費者的基類,在這個初始化的過程中,呼叫了方法reconnect,這個方法中實現了佇列的宣告和初始化,以及交換器與佇列的繫結等重要的過程,這也是我們很關注的,具體來看方法reconnect的程式碼實現:
def reconnect(self, channel):
        """
        重連線到channel;
        宣告佇列和交換器,通過routing key繫結佇列到交換器;
        """
        self.channel = channel
        self.kwargs['channel'] = channel
        # 佇列的宣告類,實現佇列初始化,並且實現佇列與交換器和channel的繫結;
        self.queue = kombu.entity.Queue(**self.kwargs)
        
        # 序列重新的申報;
        # 宣告佇列和交換器,通過routing key繫結佇列到交換器;
        self.queue.declare()
先來看語句self.queue = kombu.entity.Queue(**self.kwargs),看看佇列的定義類Queue,看看如何實現佇列的初始化過程:
class Queue(MaybeChannelBound):
    """
    佇列的宣告類,實現佇列初始化,並且實現佇列與交換器和channel的繫結;
    """
    name = ""
    exchange = None
    routing_key = ""

    durable = True
    exclusive = False
    auto_delete = False
    no_ack = False

    attrs = (("name", None),
             ("exchange", None),
             ("routing_key", None),
             ("queue_arguments", None),
             ("binding_arguments", None),
             ("durable", bool),
             ("exclusive", bool),
             ("auto_delete", bool),
             ("no_ack", None),
             ("alias", None))

    def __init__(self, name="", exchange=None, routing_key="", channel=None,
            **kwargs):
        super(Queue, self).__init__(**kwargs)
        self.name = name or self.name
        self.exchange = exchange or self.exchange
        self.routing_key = routing_key or self.routing_key
        # exclusive implies auto-delete.
        if self.exclusive:
            self.auto_delete = True
        # 如果沒有繫結,則繫結到channel;
        self.maybe_bind(channel)
這裡獲取了佇列的例項化物件,但是這個過程無非就是初始化了一些相關的引數資訊。

再來看語句self.queue.declare(),這條語句實現了宣告佇列和交換器,通過routing key繫結佇列到交換器,具體的實現來看方法declare的程式碼:

def declare(self, nowait=False):
    """
    宣告佇列和交換器,通過routing key繫結佇列到交換器;
    """
    # 佇列名稱;
    name = self.name
    if name:
        if self.exchange:
            self.exchange.declare(nowait)
    self.queue_declare(nowait, passive=False)
        
    # 通過routing key繫結佇列到交換器;
    if name:
        self.queue_bind(nowait)
    return self.name
在這個方法中主要是線了交換器的宣告、佇列的宣告,並且實現了交換器和佇列的繫結等操作。

首先來看語句self.exchange.declare(nowait),這條語句實現了對特定的交換器的宣告操作,具體來看方法declare的程式碼實現:

def declare(self, nowait=False):
        """
        宣告交換器,並在代理上建立交換器;
        """
        
        # 在綠色執行緒中執行方法exchange_declare,實現交換器的宣告和繫結;
        return _SYN(self.channel.exchange_declare, exchange=self.name,
                                                type=self.type,
                                                durable=self.durable,
                                                auto_delete=self.auto_delete,
                                                arguments=self.arguments,
                                                nowait=nowait)
def exchange_declare(self, exchange, type="direct", durable=False,
            auto_delete=False, arguments=None, nowait=False):
        """
        宣告交換器;
        """
        try:
            prev = self.state.exchanges[exchange]
            if not self.typeof(exchange).equivalent(prev, exchange, type,
                                                    durable, auto_delete,
                                                    arguments):
                raise NotEquivalentError(
                        "Cannot redeclare exchange %r in vhost %r with "
                        "different type, durable or autodelete value" % (
                            exchange,
                            self.connection.client.virtual_host or "/"))
        except KeyError:
            self.state.exchanges[exchange] = {
                    "type": type,
                    "durable": durable,
                    "auto_delete": auto_delete,
                    "arguments": arguments or {},
                    "table": [],
            }
再來看語句self.queue_declare(nowait, passive=False),這條語句實現了對特定佇列的宣告操作,具體來看方法queue_declare的程式碼實現:
def queue_declare(self, nowait=False, passive=False):
        """
        在伺服器上宣告佇列;
        """
        ret = _SYN(self.channel.queue_declare, queue=self.name,
                                               passive=passive,
                                               durable=self.durable,
                                               exclusive=self.exclusive,
                                               auto_delete=self.auto_delete,
                                               arguments=self.queue_arguments,
                                               nowait=nowait)
        if not self.name:
            self.name = ret[0]
        return ret
def queue_declare(self, queue, passive=False, auto_delete=False, **kwargs):
        """
        宣告佇列;
        """
        if auto_delete:
            self.auto_delete_queues.setdefault(queue, 0)
        if passive and not self._has_queue(queue, **kwargs):
            raise StdChannelError("404",
                    u"NOT_FOUND - no queue %r in vhost %r" % (
                        queue, self.connection.client.virtual_host or '/'),
                    (50, 10), "Channel.queue_declare")
        else:
            self._new_queue(queue, **kwargs)
        return queue, self._size(queue), 0
再來看語句self.queue_bind(nowait),這條語句實現了通過routing key繫結佇列到交換器,具體來看方法queue_bind的程式碼實現:
def queue_bind(self, nowait=False):
        """
        在伺服器上建立佇列的繫結;
        通過routing key繫結佇列到交換器;
        """
        return _SYN(self.channel.queue_bind, queue=self.name,
                                             exchange=self.exchange.name,
                                             routing_key=self.routing_key,
                                             arguments=self.binding_arguments,
                                             nowait=nowait)
def queue_bind(self, queue, exchange, routing_key, arguments=None,
            **kwargs):
        """
        通過routing key繫結佇列到交換器;
        """
        # 如果佇列已經在繫結的列表中,則直接返回;
        if queue in self.state.bindings:
            return
        table = self.state.exchanges[exchange].setdefault("table", [])
        # 佇列繫結的資訊加入到列表中;
        self.state.bindings[queue] = exchange, routing_key, arguments
        meta = self.typeof(exchange).prepare_bind(queue,
                                                  exchange,
                                                  routing_key,
                                                  arguments)
        table.append(meta)
        if self.supports_fanout:
            self._queue_bind(exchange, *meta)
def prepare_bind(self, queue, exchange, routing_key, arguments):
        return routing_key, self.key_to_pattern(routing_key), queue
至此,建立訊息消費者所需要的交換器和佇列均已建立完成,並實現了交換器到佇列的繫結,因此,也完成了一個主題式訊息消費者的建立。也就是說,前面訊息消費者示例所說的前5個步驟都已完成。

在下一篇博文中,將繼續解析/nova/server.py中類Service下的方法def start(self)中的下一條也是很重要的語句:self.conn.consume_in_thread(),這條語句實現了從佇列中獲取訊息,並最終實現了對訊息的處理和執行操作。

相關推薦

OpenStack建立例項完整過程原始碼詳細分析14----依據AMQP通訊架構實現訊息接收機制解析之一

感謝朋友支援本部落格,歡迎共同探討交流,由於能力和時間有限,錯誤之處在所難免,歡迎指正! 如果轉載,請保留作者資訊。 部落格地址:http://blog.csdn.net/gaoxingnengjisuan 郵箱地址:[email protected] 這篇博文

nova建立虛擬機器過程原始碼簡要分析

nova部署虛擬機器原始碼呼叫過程簡要分析,關於novaclient的程式處理流程暫時還沒有分析。後期如果有時間會進一步分析novaclient的程式執行過程,以及客戶端和服務之間的http請求響應關係。 nova/api/openstack/compute/

Qemu-KVM虛擬機器初始化及建立過程原始碼簡要分析

    我們知道,Qemu-KVM實際上包括Qemu和KVM兩部分,那麼在建立以及初始化虛擬機器時,實際上也是在這兩部分進行的。     KVM實際上就是kvm核心模組,包括kvm.ko、kvm-intel.ko、kvm-amd.ko三部分,後兩部分分別對應Intel體系的

Qemu-KVM虛擬機器初始化及建立過程原始碼簡要分析

    前面我們講了KVM核心層建立及初始化虛擬機器的一些工作過程,現在講一下Qemu層的流程以及與KVM核心層的配合過程。         Qemu層是從vl.c中的main()函式開始的,這裡通過在程式碼中新增一些註釋的方式來進行講解,中間省略了很多不重要或者我也沒有搞

LinkedHashMap 原始碼詳細分析JDK1.8

1. 概述 LinkedHashMap 繼承自 HashMap,在 HashMap 基礎上,通過維護一條雙向連結串列,解決了 HashMap 不能隨時保持遍歷順序和插入順序一致的問題。除此之外,LinkedHashMap 對訪問順序也提供了相關支援。在一些場景下,該特性很有用,比如快取。在實現上

以太坊原始碼深入分析3-- 以太坊RPC通訊例項和原理程式碼分析

上一節提到,以太坊在node start的時候啟動了RPC服務,以太坊通過Rpc服務來實現以太坊相關介面的遠端呼叫。這節我們用個例項來看看以太坊 RPC是如何工作的,以及以太坊RPC的原始碼的實現一,RPC通訊例項1,RPC啟動命令 :geth --rpcgo-ethereu

vivi原始碼最為詳細分析

通過vivi研究bootloader有一段時間了,基本是在與之相關的基礎方面做工作,還沒有真正深入研究vivi。以後的學習重心就要放到研究vivi原始碼上面了。我想,真正細緻地弄清楚vivi實現的細節,對C語言水平的提高,對ARM體系結構的認識,對S3C2410的熟悉,對嵌入

以太坊原始碼深入分析4-- 以太坊RPC通訊例項和原理程式碼分析

上一節我們試著寫了一個RPC的請求例項,通過分析原始碼知道了RPC服務的建立流程,以及Http RPC server建立過程,Http RPC Client的請求流程。這一節,先分析一下Http RPC server如何處理client的請求。然後再分析一下IPC RPC的處

檔案壓縮演算法詳細分析ZIP及解壓例項解釋

原文地址:https://www.cnblogs.com/esingchan/p/3958962.html 最近自己實現了一個ZIP壓縮資料的解壓程式,覺得有必要把ZIP壓縮格式進行一下詳細總結,資料壓縮是一門通訊原理和電腦科學都會涉及到的學科,在通訊原理中,一般稱為信

前程無憂爬蟲原始碼分析

一、網頁分析     1.1 關鍵字頁面(url入口)         首先在前程無憂網站上檢索關鍵詞"大資料":      &n

構建Maven專案的完整過程--普通web專案Eclipse

進行以下步驟的前提是你已經安裝好本地maven庫和eclipse中的maven外掛了(有的eclipse中已經集成了maven外掛) 一、Maven專案的新建 1、滑鼠右鍵---->New----->Other... 2、直接點選下一步 3、選中 m

MFC原始碼實戰分析——訊息對映原理與訊息路由機制初探

如果在看完上一篇文章後覺得有點暈,不要害怕。本節我們就不用這些巨集,而是用其中的內容重新完成開頭那個程式,進而探究MFC訊息對映的本來面目。 MFC訊息對映機制初探 還我本來面目 class CMyWnd : public CFrameWnd

itop exynos4412 lcd驅動 詳細分析

(若轉載,請註明出處,若有錯誤請指正,謝謝) (以下分析皆基於:itop4412精英板裝置和程式碼資源) (核心為:iTop4412_Kernel_3.0提供) (看客需要一定的linux平臺驅動基礎,和lcd操作基礎) (針對lcd基本操作,我準備寫

以太坊原始碼深入分析2-- go-ethereum 客戶端入口和Node分析

一,geth makefile 以及編譯邏輯上篇提到用 make geth 來編譯geth客戶端。我們來看看make file做了什麼: geth: build/env.sh go run build/ci.go install ./cmd/geth @echo

MVC之前的那點事兒系列4:Http Pipeline詳細分析

文章內容 繼續上一章節的內容,通過HttpApplicationFactory的GetApplicationInstance靜態方法獲取例項,然後執行該例項的BeginProcessRequest方法進行執行餘下的Http Pipeline 操作,程式碼如下: // Get application i

MVC之前的那點事兒系列5:Http Pipeline詳細分析

文章內容 接上面的章節,我們這篇要講解的是Pipeline是執行的各種事件,我們知道,在自定義的HttpModule的Init方法裡,我們可以新增自己的事件,比如如下程式碼: public class Test : IHttpModule { public void Init(HttpAp

Jeesite-匯入匯出原始碼跟蹤分析匯出

使用Jeesite開發的時候,我們都少不了Excel匯入匯出的功能。這部分需要我我們掌握基本的POI,反射,當然在我們的框架中還定義了註解,也樣在程式碼上整潔許多,下面我們先看一下: 一. 匯入匯出的公共工具: /** * Copyright &copy; 20

Atlas框架原始碼簡要分析--Atlas中bundle的安裝和初始化

Atlas框架原始碼簡要分析(中)–Atlas中bundle的安裝和初始化 在上一篇中大致的看了下Atlas整體框架的初始化及啟動,下面我們以啟動一個沒有安裝的子Bundle中的Activity為切入點,來跟蹤一個Bundle是如何載入並啟動在這個Bun

《Linux驅動》iTop4412開發板LCD驅動 詳細分析

接下來我們來詳解介紹probe中的函式: 第一個函式: s3cfb_set_lcd_info(fbdev[i]); 1.該函式原始碼如下: /*該函式在s3cfb_wa101s.c 中*/ /* name should be fixed as

Muduo庫原始碼分析8:單例模式實現

單例模式 保證一個類只有一個例項,並提供一個訪問它的全域性訪問點 Muduo庫實現單例模式的思想: 通過pthread_once在多個執行緒中只會初始化一次的特性實現的執行緒安全的單例模式