1. 程式人生 > >openstack -- nova的元件訊息佇列

openstack -- nova的元件訊息佇列

Nova元件

Nova是OpenStack雲中的計算組織控制器元件,支援OpenStack雲中例項(instances)生命週期的所有活動都由Nova處理。這樣使得Nova成為一個負責管理計算資源、網路、認證、所需可擴充套件性的平臺。但是,Nova自身並沒有提供任何虛擬化能力,相反它使用libvirt API來與被支援的Hypervisors互動。Nova 通過一個與Amazon Web Services(AWS)EC2 API相容的web services API來對外提供服務。

nova的主要元件

Nova 雲架構包括以下主要元件:

  • API Server (nova-api)

  • Message Queue (rabbit-mq server)

  • Compute Workers (nova-compute)

  • Network Controller (nova-network)

  • Volume Worker (nova-volume)

  • Scheduler (nova-scheduler)

訊息佇列Message Queue

  • 訊息(Message)是指在應用間傳送的資料。

  • 訊息佇列(Message Queue)是一種應用間的通訊方式,訊息傳送後可以立即返回,由訊息系統來確保訊息的可靠傳遞。訊息生產者只需要把訊息釋出到訊息佇列中而不用管誰來取,訊息消費者只需從訊息佇列中取訊息而不管是誰生產的。由此看出訊息佇列是一種應用間的非同步協作機制

RabbitMQ訊息佇列

  • 概念說明:

    • Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證資料能按照指定的方式進行傳輸,

    • Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列。

    • Queue:相當於存放訊息的緩衝區,每個訊息都會被投到一個或多個佇列。

    • Binding:繫結,它的作用就是把exchange和queue按照路由規則繫結起來.

    • Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞。

    • vhost:虛擬主機,一個broker裡可以有多個vhost,用作不同使用者的許可權分離。

    • Producer:訊息生產者,就是投遞訊息的程式.

    • Consumer:訊息消費者,就是接受訊息的程式.

    • Channel:訊息通道,在客戶端的每個連線裡,可建立多個channel.

交換器

  • 交換器根據生命週期的長短主要分為三種:持久交換器、臨時交換器與自動刪除交換器。

    持久交換器是在RabbitMQ伺服器中長久存在的,並不會因為系統重啟或者應用程式終止而消除,其相關資料長期駐留在硬碟之上;

    臨時交換器駐留在記憶體中,隨著系統的關閉而消失;

    自動刪除交換器隨著宿主應用程式的中止而自動消亡,可有效釋放伺服器資源。

  • 而交換器根據釋出訊息的方法又可以分為三種:廣播式(fanout)、直接式(direct)、主題式(topic)。

    廣播式交換器不分析所接收到訊息中的Routing Key,預設將訊息轉發到所有與該交換器繫結的佇列中去,其轉發效率最高,但是安全性較低,消費者應用程式可獲取不屬於自己的訊息;

    直接式交換器需要精確匹配Routing Key與BindingKey,如訊息的Routing Key = Cloud,那麼該條訊息只能被轉發至Binding Key = Cloud的訊息佇列中去,其轉發效率較高,安全性較好,但是缺乏靈活性,系統配置量較大;

    主題式交換器最為靈活,其通過訊息的Routing Key與Binding Key的模式匹配,將訊息轉發至所有符合繫結規則的佇列中。Binding Key支援萬用字元,其中“*”匹配一個片語,“#”匹配多個片語(包括零個)。

    其實還有一種header型別,headers 匹配訊息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但效能差很多,目前幾乎用不到了,

佇列

類似於交換器,訊息佇列也可以是持久的,臨時的或者自動刪除的。

持久的訊息佇列不會因為系統重啟或者應用程式終止而消除,其相關資料長期駐留在硬碟之上;

臨時訊息佇列在伺服器被關閉時停止工作;

自動刪除佇列在沒有應用程式使用它的時候被伺服器自動刪除。

訊息佇列將訊息儲存在記憶體、硬碟或兩者的組合之中。

佇列由消費者應用程式建立,主要用於實現儲存與轉發交換器傳送來的訊息。

繫結

繫結,用於訊息佇列和交換器之間的關聯。一個繫結就是基於路由鍵將交換器和訊息佇列連線起來的路由規則,可以將交換器理解成一個由繫結構成的路由表。

nova中的訊息佇列

nova模組內不同元件之間通訊利用RPC遠端呼叫完成,通過訊息佇列使用AMQP(Advanced Message Queue Protocol)完成通訊。

AMQP是應用層協議的一個開放標準,為面向訊息的中介軟體而設計,其中RabbitMQ是AMQP協議的一個開源實現,在openstack中可以使用其他實現,比如ActiveMQ,但是大部分openstack版本都使用了rabbitMQ。

Nova 通過非同步呼叫請求響應,使用回撥函式在收到響應時觸發。因為使用了非同步通訊,不會有使用者長時間卡在等待狀態。這是有效的,因為許多API呼叫預期的行為都非常耗時,例如載入一個例項,或者上傳一個映象。

rpc傳送訊息方式

nova中的每個元件都會連線訊息伺服器,一個元件可能是一個訊息傳送者(如API、Scheduler),也可能是一個訊息接收者(如compute、volume、network)。傳送訊息主要有兩種方式:rpc.call和rpc.cast。

  • rpc.call

    rpc.call:基於請求和響應方式,傳送訊息到訊息佇列,等待接受返回結果

例如在使用者想要啟動一個例項時,
1、nova-api作為訊息生產者,將“啟動例項”的訊息包裝成amqp訊息以rpc.call的方式通過topic交換機放入訊息佇列;
2、nova-compute作為訊息消費者,接受該訊息並通過底層虛擬化軟體執行相應操作;
3、虛擬機器啟動成功後,nova-compute作為訊息生產者將“例項啟動成功”的訊息通過direct交換機放入相應的響應佇列;
4、nova-api作為訊息消費者接受該訊息並通知使用者。

  • rpc.cast

    rpc.cast:只提供單向請求,傳送訊息到訊息佇列,沒有返回結果

以nova-conductor服務呼叫nova-compute服務build_and_run_instance(建立並啟動)為例:
1. nova-conductor服務向訊息佇列服務的compute佇列傳送RPC請求,請求結束,不需要等待請求的最終回覆。
2. nova-compute服務通過topic交換機從compute佇列中獲取訊息並作出相應的處理。

建立例項的rpc呼叫

//從nova/compute/api.py的create函式開始:

create(self, context, instance_type,
               image_href, kernel_id=None, ramdisk_id=None,
               min_count=None, max_count=None,
               display_name=None, display_description=None,
               key_name=None, key_data=None, security_group=None,
               availability_zone=None, forced_host=None, forced_node=None,
               user_data=None, metadata=None, injected_files=None,
               admin_password=None, block_device_mapping=None,
               access_ip_v4=None, access_ip_v6=None, requested_networks=None,
               config_drive=None, auto_disk_config=None, scheduler_hints=None,
               legacy_bdm=True, shutdown_terminate=False,
               check_server_group_quota=False)
     return self._create_instance
=>

def _create_instance(self, context, instance_type,
               image_href, kernel_id, ramdisk_id,
               min_count, max_count,
               display_name, display_description,
               key_name, key_data, security_groups,
               availability_zone, user_data, metadata, injected_files,
               admin_password, access_ip_v4, access_ip_v6,
               requested_networks, config_drive,
               block_device_mapping, auto_disk_config, filter_properties,
               reservation_id=None, legacy_bdm=True, shutdown_terminate=False,
               check_server_group_quota=False)

     self.compute_task_api.build_instances(context,
                instances=instances, image=boot_meta,
                filter_properties=filter_properties,
                admin_password=admin_password,
                injected_files=injected_files,
                requested_networks=requested_networks,
                security_groups=security_groups,
                block_device_mapping=block_device_mapping,
                legacy_bdm=False)
    return (instances, reservation_id)

//其中compute_task_api為conductor的一個computetaskapi的類,用於排列計算任務
self.compute_task_api = conductor.ComputeTaskAPI()

ComputeTaskAPI(object):
    """ComputeTask API that queues up compute tasks for nova-conductor."""

    def __init__(self):
    //computetaskapi類含有一個computetaskapi成員,作為rpc呼叫的客戶端
        self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI()

class ComputeTaskAPI(object):
    """Client side of the conductor 'compute' namespaced RPC API"""
def __init__(self):
        super(ComputeTaskAPI, self).__init__()
        //osol_messaging庫封裝了rpc通訊的底層實現,target包含了訊息目的地的相關引數
        target = messaging.Target(topic=CONF.conductor.topic,
                                  namespace='compute_task',
                                  version='1.0')
        self.client = rpc.get_client(target, serializer=serializer)

get_client(target, version_cap=None, serializer=None):
    return messaging.RPCClient(TRANSPORT,
                               target,
                               version_cap=version_cap,
                               serializer=serializer)

class RPCClient(object):
    """A class for invoking methods on remote servers.
    The RPCClient class is responsible for sending method invocations to remote
    servers via a messaging transport."""

=>

def build_instances(self, context, instances, image,
            filter_properties, admin_password, injected_files,
            requested_networks, security_groups, block_device_mapping,
            legacy_bdm=True):

            //呼叫ComputeTaskManager類的build_instances函式
        utils.spawn_n(self._manager.build_instances, context,
                instances=instances, image=image,
                filter_properties=filter_properties,
                admin_password=admin_password, injected_files=injected_files,
                requested_networks=requested_networks,
                security_groups=security_groups,
                block_device_mapping=block_device_mapping,
                legacy_bdm=legacy_bdm)

//computetask管理類,管理一些操作
ComputeTaskManager(base.Base):
    """Namespace for compute methods.
    This class presents an rpc API for nova-conductor under the 'compute_task'
    namespace.  The methods here are compute operations that are invoked
    by the API service.  These methods see the operation to completion, which
    may involve coordinating activities on multiple compute nodes.
    """
=>

def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping=None, legacy_bdm=True)

            //呼叫ComputeAPI類的build_and_run_instance函式,ComputeAPI為compute rpc客戶端
    self.compute_rpcapi.build_and_run_instance(context,
                    instance=instance, host=host['host'], image=image,
                    request_spec=request_spec,
                    filter_properties=local_filter_props,
                    admin_password=admin_password,
                    injected_files=injected_files,
                    requested_networks=requested_networks,
                    security_groups=security_groups,
                    block_device_mapping=bdms, node=host['nodename'],
                    limits=host['limits'])

self.compute_rpcapi = compute_rpcapi.ComputeAPI()

class ComputeAPI(object):
    """Client side of the compute rpc API."""

=>

def build_and_run_instance(self, ctxt, instance, host, image, request_spec,
            filter_properties, admin_password=None, injected_files=None,
            requested_networks=None, security_groups=None,
            block_device_mapping=None, node=None, limits=None):

        version = '4.0'
        cctxt = self.client.prepare(server=host, version=version)
        //呼叫cast方法,傳送請求不用等待響應
        cctxt.cast(ctxt, 'build_and_run_instance', instance=instance,
                image=image, request_spec=request_spec,
                filter_properties=filter_properties,
                admin_password=admin_password,
                injected_files=injected_files,
                requested_networks=requested_networks,
                security_groups=security_groups,
                block_device_mapping=block_device_mapping, node=node,
                limits=limits)

=>

def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
                version=_marker, server=_marker, fanout=_marker,
                timeout=_marker, version_cap=_marker, retry=_marker):
        """Prepare a method invocation context.
        Use this method to override client properties for an individual method
        invocation."""

        return self._prepare(self,
                             exchange, topic, namespace,
                             version, server, fanout,
                             timeout, version_cap, retry)

def _prepare(cls, base,
                 exchange=_marker, topic=_marker, namespace=_marker,
                 version=_marker, server=_marker, fanout=_marker,
                 timeout=_marker, version_cap=_marker, retry=_marker):
        """Prepare a method invocation context. See RPCClient.prepare()."""
      return _CallContext(base.transport, target,
                            base.serializer,
                            timeout, version_cap, retry)

//一個用於呼叫的上下文類
class _CallContext(object):
    _marker = object()
    def __init__(self, transport, target, serializer,
                 timeout=None, version_cap=None, retry=None):
        self.conf = transport.conf
        self.transport = transport
        self.target = target
        self.serializer = serializer
        self.timeout = timeout
        self.retry = retry
        self.version_cap = version_cap

        super(_CallContext, self).__init__()

=>
//呼叫其cast方法
def cast(self, ctxt, method, **kwargs):
        """Invoke a method and return immediately. See RPCClient.cast()."""
        msg = self._make_message(ctxt, method, kwargs)
        ctxt = self.serializer.serialize_context(ctxt)

        if self.version_cap:
            self._check_version_cap(msg.get('version'))
        try:
            self.transport._send(self.target, ctxt, msg, retry=self.retry)

=>
//封裝訊息
def _make_message(self, ctxt, method, args):
        msg = dict(method=method)

        msg['args'] = dict()
        for argname, arg in six.iteritems(args):
            msg['args'][argname] = self.serializer.serialize_entity(ctxt, arg)

        if self.target.namespace is not None:
            msg['namespace'] = self.target.namespace
        if self.target.version is not None:
            msg['version'] = self.target.version

        return msg

=>
//oslo_messaging庫封裝的一個類,其實就是rabbitmq等ampq的實現方式
class Transport(object):

    """A messaging transport.
    This is a mostly opaque handle for an underlying messaging transport
    driver.

    It has a single 'conf' property which is the cfg.ConfigOpts instance used
    to construct the transport object.
    """
    //呼叫傳送函式,其底層呼叫driver的send函式,driver就是kombu等實現庫
def _send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
              retry=None):
        if not target.topic:
            raise exceptions.InvalidTarget('A topic is required to send',
                                           target)
        return self._driver.send(target, ctxt, message,
                                 wait_for_reply=wait_for_reply,
                                 timeout=timeout, retry=retry)

=>
...

//伺服器端

//開啟服務
server = service.Service.create(binary='nova-scheduler',
                                    topic=CONF.scheduler_topic)
//呼叫Service類的create
def create(cls, host=None, binary=None, topic=None, manager=None,
               report_interval=None, periodic_enable=None,
               periodic_fuzzy_delay=None, periodic_interval_max=None,
               db_allowed=True):
        """Instantiates class and passes back application object."""
//執行此類的建構函式
    service_obj = cls(host, binary, topic, manager,
                          report_interval=report_interval,
                          periodic_enable=periodic_enable,
                          periodic_fuzzy_delay=periodic_fuzzy_delay,
                          periodic_interval_max=periodic_interval_max,
                          db_allowed=db_allowed)

        return service_obj

//service.start
def start(self):
    target = messaging.Target(topic=self.topic, server=self.host)
        //endpoint包含了方法
        endpoints = [
            self.manager,
            baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port)
        ]
        endpoints.extend(self.manager.additional_endpoints)

        serializer = objects_base.NovaObjectSerializer()

        self.rpcserver = rpc.get_server(target, endpoints, serializer)
        self.rpcserver.start()
//獲取rpc伺服器
def get_server(target, endpoints, serializer=None):
    assert TRANSPORT is not None
    serializer = RequestContextSerializer(serializer)
    return messaging.get_rpc_server(TRANSPORT,
                                    target,
                                    endpoints,
                                    executor='eventlet',
                                    serializer=serializer)

def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None):
    """Construct an RPC server.

    The executor parameter controls how incoming messages will be received and
    dispatched. By default, the most simple executor is used - the blocking
    executor."""

    dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
    return msg_server.MessageHandlingServer(transport, dispatcher, executor)

//分發器
class RPCDispatcher(dispatcher.DispatcherBase):
    """A message dispatcher which understands RPC messages.

    A MessageHandlingServer is constructed by passing a callable dispatcher
    which is invoked with context and message dictionaries each time a message
    is received.

    RPCDispatcher is one such dispatcher which understands the format of RPC
    messages. The dispatcher looks at the namespace, version and method values
    in the message and matches those against a list of available endpoints.

    Endpoints may have a target attribute describing the namespace and version
    of the methods exposed by that object. All public methods on an endpoint
    object are remotely invokable by clients."""

//訊息處理伺服器
class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
    """Server for handling messages.

    Connect a transport to a dispatcher that knows how to process the
    message using an executor that knows how the app wants to create
    new tasks."""
    def __init__(self, transport, dispatcher, executor='blocking'):
        """Construct a message handling server.

        The dispatcher parameter is a callable which is invoked with context
        and message dictionaries each time a message is received.

        The executor parameter controls how incoming messages will be received
        and dispatched. By default, the most simple executor is used - the
        blocking executor.

        :param transport: the messaging transport
        :type transport: Transport
        :param dispatcher: a callable which is invoked for each method
        :type dispatcher: callable
        :param executor: name of message executor - for example
                         'eventlet', 'blocking'
        :type executor: str
        """
        self.conf = transport.conf
        self.conf.register_opts(_pool_opts)

        self.transport = transport
        self.dispatcher = dispatcher
        self.executor_type = executor

        self.listener = None

        try:
            mgr = driver.DriverManager('oslo.messaging.executors',
                                       self.executor_type)
        except RuntimeError as ex:
            raise ExecutorLoadFailure(self.executor_type, ex)

        self._executor_cls = mgr.driver

        self._work_executor = None
        self._poll_executor = None

        self._started = False

        super(MessageHandlingServer, self).__init__()

//serve.start
def start(self, override_pool_size=None):
        """Start handling incoming messages.

        This method causes the server to begin polling the transport for
        incoming messages and passing them to the dispatcher. Message
        processing will continue until the stop() method is called.

        The executor controls how the server integrates with the applications
        I/O handling strategy - it may choose to poll for messages in a new
        process, thread or co-operatively scheduled coroutine or simply by
        registering a callback with an event loop. Similarly, the executor may
        choose to dispatch messages in a new thread, coroutine or simply the
        current thread.
        """
        # Warn that restarting will be deprecated
        if self._started:
            LOG.warning(_LW('Restarting a MessageHandlingServer is inherently '
                            'racy. It is deprecated, and will become a noop '
                            'in a future release of oslo.messaging. If you '
                            'need to restart MessageHandlingServer you should '
                            'instantiate a new object.'))
        self._started = True

        try:
            self.listener = self.dispatcher._listen(self.transport)
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        executor_opts = {}

        if self.executor_type == "threading":
            executor_opts["max_workers"] = (
                override_pool_size or self.conf.executor_thread_pool_size
            )
        elif self.executor_type == "eventlet":
            eventletutils.warn_eventlet_not_patched(
                expected_patched_modules=['thread'],
                what="the 'oslo.messaging eventlet executor'")
            executor_opts["max_workers"] = (
                override_pool_size or self.conf.executor_thread_pool_size
            )

        self._work_executor = self._executor_cls(**executor_opts)
        self._poll_executor = self._executor_cls(**executor_opts)

        return lambda: self._poll_executor.submit(self._runner)

    @ordered(after='start')
    def stop(self):
        """Stop handling incoming messages.

        Once this method returns, no new incoming messages will be handled by
        the server. However, the server may still be in the process of handling
        some messages, and underlying driver resources associated to this
        server are still in use. See 'wait' for more details.
        """
        self.listener.stop()
        self._started = False

//等待訊息過程完成
@ordered(after='stop')
    def wait(self):
        """Wait for message processing to complete.

        After calling stop(), there may still be some existing messages
        which have not been completely processed. The wait() method blocks
        until all message processing has completed.

        Once it's finished, the underlying driver resources associated to this
        server are released (like closing useless network connections).
        """
        self._poll_executor.shutdown(wait=True)
        self._work_executor.shutdown(wait=True)

        # Close listener connection after processing all messages
        self.listener.cleanup()
//執行訊息分發、處理
def _runner(self):
        while self._started:
            incoming = self.listener.poll(
                timeout=self.dispatcher.batch_timeout,
                prefetch_size=self.dispatcher.batch_size)

            if incoming:
                self._submit_work(self.dispatcher(incoming))

        # listener is stopped but we need to process all already consumed
        # messages
        while True:
            incoming = self.listener.poll(
                timeout=self.dispatcher.batch_timeout,
                prefetch_size=self.dispatcher.batch_size)

            if incoming:
                self._submit_work(self.dispatcher(incoming))
            else:
                return