1. 程式人生 > >nova boot程式碼流程分析(一):Claim機制

nova boot程式碼流程分析(一):Claim機制

nova boot建立VM的流程大致為:

1. novaclient傳送HTTP請求到nova-api(這裡內部細節包括keystone對使用者的驗證及使用者從keystone獲取token和endpoints等資訊,具體參考《keystone WSGI流程》)。

2. nova-api通過rpc呼叫到nova-conductor。

3. nova-conductor呼叫rpc進入nova-scheduler進行compute節點的選擇,nova-scheduler將compute節點選擇的資訊的返回給nova-conductor。

4.最後nova-conductor執行rpc呼叫到nova-compute到選擇的compute建立VM。

如下圖所示:


由於網上有很多介紹這方面的VM建立流程,所以我有些細節會一帶而過,本篇文章將著重分析從建立VM開始到進行Claim機制的程式碼流程。

#/nova/api/openstack/compute/servers.py:Controller
    @wsgi.response(202)
    def create(self, req, body):
        """Creates a new server for a given user."""
        if not self.is_valid_body(body, 'server'):
            raise exc.HTTPUnprocessableEntity()

        context = req.environ['nova.context']
        server_dict = body['server']
        password = self._get_server_admin_password(server_dict)

        if 'name' not in server_dict:
            msg = _("Server name is not defined")
            raise exc.HTTPBadRequest(explanation=msg)

        name = server_dict['name']
        self._validate_server_name(name)
        name = name.strip()

        image_uuid = self._image_from_req_data(body)

        personality = server_dict.get('personality')
        config_drive = None
        if self.ext_mgr.is_loaded('os-config-drive'):
            config_drive = server_dict.get('config_drive')
        ... ... ...

        try:
            _get_inst_type = flavors.get_flavor_by_flavor_id
            inst_type = _get_inst_type(flavor_id, ctxt=context,
                                       read_deleted="no")

            (instances, resv_id) = self.compute_api.create(context,
                        inst_type,
                        image_uuid,
                        display_name=name,
                        display_description=name,
                        key_name=key_name,
                        metadata=server_dict.get('metadata', {}),
                        access_ip_v4=access_ip_v4,
                        access_ip_v6=access_ip_v6,
                        injected_files=injected_files,
                        admin_password=password,
                        min_count=min_count,
                        max_count=max_count,
                        requested_networks=requested_networks,
                        security_group=sg_names,
                        user_data=user_data,
                        availability_zone=availability_zone,
                        config_drive=config_drive,
                        block_device_mapping=block_device_mapping,
                        auto_disk_config=auto_disk_config,
                        scheduler_hints=scheduler_hints,
                        legacy_bdm=legacy_bdm,
                        check_server_group_quota=check_server_group_quota)
        except (exception.QuotaError,
                exception.PortLimitExceeded) as error:
            raise exc.HTTPForbidden(
                explanation=error.format_message(),
                headers={'Retry-After': 0})
        except messaging.RemoteError as err:
            msg = "%(err_type)s: %(err_msg)s" % {'err_type': err.exc_type,
                                                 'err_msg': err.value}
            raise exc.HTTPBadRequest(explanation=msg)
        except UnicodeDecodeError as error:
            msg = "UnicodeError: %s" % error
            raise exc.HTTPBadRequest(explanation=msg)
        except Exception as error:
            # The remaining cases can be handled in a standard fashion.
            self._handle_create_exception(*sys.exc_info())

        # If the caller wanted a reservation_id, return it
        if ret_resv_id:
            return wsgi.ResponseObject({'reservation_id': resv_id})

        req.cache_db_instances(instances)
        server = self._view_builder.create(req, instances[0])

        if CONF.enable_instance_password:
            server['server']['adminPass'] = password

        robj = wsgi.ResponseObject(server)

        return self._add_location(robj)

當novaclient傳送HTTP請求,最終通過nova-api服務啟動時建立的WSGI server路由到/nova/api/openstack/compute/servers.py:Controller中的create函式,在呼叫nova-api的介面之前,首先對從HTTP請求的獲取的req和body中的相關資訊進行驗證,驗證完成後呼叫nova-api的create函式。其中nova-api有兩種型別,即根據配置引數進行選擇self.compute_api的型別。如下:

#/nova/compute/__init__.py
CELL_TYPE_TO_CLS_NAME = {'api': 'nova.compute.cells_api.ComputeCellsAPI',
                         'compute': 'nova.compute.api.API',
                         None: 'nova.compute.api.API',
                        }


def _get_compute_api_class_name():
    """Returns the name of compute API class."""
    cell_type = nova.cells.opts.get_cell_type()
    return CELL_TYPE_TO_CLS_NAME[cell_type]


def API(*args, **kwargs):
    class_name = _get_compute_api_class_name()
return importutils.import_object(class_name, *args, **kwargs)

#/nova/cells/opts.py
def get_cell_type():
    """Return the cell type, 'api', 'compute', or None (if cells is disabled).
    """
    if not CONF.cells.enable:
        return
    return CONF.cells.cell_type

即根據/etc/nova/nova.conf配置檔案cells欄位下面的enable引數來設定nova-api的型別,由於我們的環境採用預設配置,即enable=false,所以get_cell_type函式將返回None值,因此nova-api為nova.compute.api.API,即self.compute_api為nova.compute.api.API物件。所以將會呼叫nova.compute.api.API中的create函式。

#/nova/compute/api.py:API
    @hooks.add_hook("create_instance")
    def create(self, context, instance_type,
               image_href, kernel_id=None, ramdisk_id=None,
               min_count=None, max_count=None,
               display_name=None, display_description=None,
               key_name=None, key_data=None, security_group=None,
               availability_zone=None, user_data=None, metadata=None,
               injected_files=None, admin_password=None,
               block_device_mapping=None, access_ip_v4=None,
               access_ip_v6=None, requested_networks=None, config_drive=None,
               auto_disk_config=None, scheduler_hints=None, legacy_bdm=True,
               shutdown_terminate=False, check_server_group_quota=False):
        """Provision instances, sending instance information to the
        scheduler.  The scheduler will determine where the instance(s)
        go and will handle creating the DB entries.

        Returns a tuple of (instances, reservation_id)
        """

        self._check_create_policies(context, availability_zone,
                requested_networks, block_device_mapping)

        if requested_networks and max_count > 1:
            self._check_multiple_instances_and_specified_ip(requested_networks)
            if utils.is_neutron():
                self._check_multiple_instances_neutron_ports(
                    requested_networks)

        return self._create_instance(
                       context, instance_type,
                       image_href, kernel_id, ramdisk_id,
                       min_count, max_count,
                       display_name, display_description,
                       key_name, key_data, security_group,
                       availability_zone, user_data, metadata,
                       injected_files, admin_password,
                       access_ip_v4, access_ip_v6,
                       requested_networks, config_drive,
                       block_device_mapping, auto_disk_config,
                       scheduler_hints=scheduler_hints,
                       legacy_bdm=legacy_bdm,
                       shutdown_terminate=shutdown_terminate,
                       check_server_group_quota=check_server_group_quota)

#/nova/compute/api.py:API
    def _create_instance(self, context, instance_type,
               image_href, kernel_id, ramdisk_id,
               min_count, max_count,
               display_name, display_description,
               key_name, key_data, security_groups,
               availability_zone, user_data, metadata,
               injected_files, admin_password,
               access_ip_v4, access_ip_v6,
               requested_networks, config_drive,
               block_device_mapping, auto_disk_config,
               reservation_id=None, scheduler_hints=None,
               legacy_bdm=True, shutdown_terminate=False,
               check_server_group_quota=False):
        """Verify all the input parameters regardless of the provisioning
        strategy being performed and schedule the instance(s) for
        creation.
        """

        # Normalize and setup some parameters
        if reservation_id is None:
            reservation_id = utils.generate_uid('r')
        security_groups = security_groups or ['default']
        min_count = min_count or 1
        max_count = max_count or min_count
        block_device_mapping = block_device_mapping or []
        if not instance_type:
            instance_type = flavors.get_default_flavor()

        if image_href:
            image_id, boot_meta = self._get_image(context, image_href)
        else:
            image_id = None
            boot_meta = self._get_bdm_image_metadata(
                context, block_device_mapping, legacy_bdm)

        self._check_auto_disk_config(image=boot_meta,
                                     auto_disk_config=auto_disk_config)

        handle_az = self._handle_availability_zone
        availability_zone, forced_host, forced_node = handle_az(context,
                                                            availability_zone)

        if not self.skip_policy_check and (forced_host or forced_node):
            check_policy(context, 'create:forced_host', {})

        base_options, max_net_count = self._validate_and_build_base_options(
                context,
                instance_type, boot_meta, image_href, image_id, kernel_id,
                ramdisk_id, display_name, display_description,
                key_name, key_data, security_groups, availability_zone,
                forced_host, user_data, metadata, injected_files, access_ip_v4,
                access_ip_v6, requested_networks, config_drive,
                auto_disk_config, reservation_id, max_count)

        # max_net_count is the maximum number of instances requested by the
        # user adjusted for any network quota constraints, including
        # considertaion of connections to each requested network
        if max_net_count == 0:
            raise exception.PortLimitExceeded()
        elif max_net_count < max_count:
            LOG.debug("max count reduced from %(max_count)d to "
                      "%(max_net_count)d due to network port quota",
                      {'max_count': max_count,
                       'max_net_count': max_net_count})
            max_count = max_net_count

        block_device_mapping = self._check_and_transform_bdm(context,
            base_options, instance_type, boot_meta, min_count, max_count,
            block_device_mapping, legacy_bdm)

        instance_group = self._get_requested_instance_group(context,
                                   scheduler_hints, check_server_group_quota)

        instances = self._provision_instances(context, instance_type,
                min_count, max_count, base_options, boot_meta, security_groups,
                block_device_mapping, shutdown_terminate,
                instance_group, check_server_group_quota)

        filter_properties = self._build_filter_properties(context,
                scheduler_hints, forced_host,
                forced_node, instance_type,
                base_options.get('pci_requests'))

        for instance in instances:
            self._record_action_start(context, instance,
                                      instance_actions.CREATE)

        self.compute_task_api.build_instances(context,
                instances=instances, image=boot_meta,
                filter_properties=filter_properties,
                admin_password=admin_password,
                injected_files=injected_files,
                requested_networks=requested_networks,
                security_groups=security_groups,
                block_device_mapping=block_device_mapping,
                legacy_bdm=False)

        return (instances, reservation_id)

上述的程式碼也主要是一些驗證和引數的組裝操作。下一步執行nova-conductor中的build_instances函式。

#/nova/compute/api.py:API
    @property
    def compute_task_api(self):
        if self._compute_task_api is None:
            # TODO(alaski): Remove calls into here from conductor manager so
            # that this isn't necessary. #1180540
            from nova import conductor
            self._compute_task_api = conductor.ComputeTaskAPI()
        return self._compute_task_api

#/nova/conductor/__init__.py
def ComputeTaskAPI(*args, **kwargs):
    use_local = kwargs.pop('use_local', False)
    if oslo_config.cfg.CONF.conductor.use_local or use_local:
        api = conductor_api.LocalComputeTaskAPI
    else:
        api = conductor_api.ComputeTaskAPI
    return api(*args, **kwargs)

這裡nova-conductor的API也是根據/etc/nova/nova.conf配置檔案中的conductor欄位中的use_local引數值或者從上層函式傳遞下來的use_local值進行設定的,本環境中根據/etc/nova/nova.conf配置檔案進行設定,即use_local採用預設值,use_local=false。所以self.compute_task_api為/nova/conductor/api.py:ComputeTaskAPI物件。因此最終呼叫/nova/conductor/api.py:ComputeTaskAPI中的build_instances函式。

#/nova/conductor/api.py:ComputeTaskAPI
class ComputeTaskAPI(object):
    """ComputeTask API that queues up compute tasks for nova-conductor."""

    def __init__(self):
        self.conductor_compute_rpcapi = rpcapi.ComputeTaskAPI()

    def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping, legacy_bdm=True):
        self.conductor_compute_rpcapi.build_instances(context,
                instances=instances, image=image,
                filter_properties=filter_properties,
                admin_password=admin_password, injected_files=injected_files,
                requested_networks=requested_networks,
                security_groups=security_groups,
                block_device_mapping=block_device_mapping,
                legacy_bdm=legacy_bdm)

#/nova/conductor/rpcapi.py:ComputeTaskAPI
    def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping, legacy_bdm=True):
        image_p = jsonutils.to_primitive(image)
        version = '1.10'
        if not self.client.can_send_version(version):
            version = '1.9'
            if 'instance_type' in filter_properties:
                flavor = filter_properties['instance_type']
                flavor_p = objects_base.obj_to_primitive(flavor)
                filter_properties = dict(filter_properties,
                                         instance_type=flavor_p)
        kw = {'instances': instances, 'image': image_p,
               'filter_properties': filter_properties,
               'admin_password': admin_password,
               'injected_files': injected_files,
               'requested_networks': requested_networks,
               'security_groups': security_groups}
        if not self.client.can_send_version(version):
            version = '1.8'
            kw['requested_networks'] = kw['requested_networks'].as_tuples()
        if not self.client.can_send_version('1.7'):
            version = '1.5'
            bdm_p = objects_base.obj_to_primitive(block_device_mapping)
            kw.update({'block_device_mapping': bdm_p,
                       'legacy_bdm': legacy_bdm})

        cctxt = self.client.prepare(version=version)
        cctxt.cast(context, 'build_instances', **kw)

從上面的程式碼分析可以看出,最終通過RPC的cast去呼叫/nova/conductor/manager.py:ComputeTaskManager中的build_instances函式,這裡說明一下這個為什麼通過RPC的cast呼叫就會呼叫到該位置的build_instances函式呢?這是因為在nova-conductor服務啟動時,會去建立相關的RPC-server,而這些RPC-server建立時候將會去指定一些endpoints(與keystone中的endpoints含義不同,這裡只是名稱相同而已),而這些endpoints中包括一些物件列表,當RPC-client去呼叫相應的RPC-server中的函式時,則會在這些endpoints的物件列表中進行查詢,然後呼叫相應的函式,比如這裡的nova-conductor服務啟動時,我們看看它載入了哪些endpoints呢?如下

#/nova/cmd/conductor.py
def main():
    config.parse_args(sys.argv)
    logging.setup(CONF, "nova")
    utils.monkey_patch()
    objects.register_all()

    gmr.TextGuruMeditation.setup_autorun(version)

    server = service.Service.create(binary='nova-conductor',
                                    topic=CONF.conductor.topic,
                                    manager=CONF.conductor.manager)
    workers = CONF.conductor.workers or processutils.get_worker_count()
    service.serve(server, workers=workers)
service.wait()

#/nova/service.py:Service
    def start(self):
        verstr = version.version_string_with_package()
        LOG.info(_LI('Starting %(topic)s node (version %(version)s)'),
                  {'topic': self.topic, 'version': verstr})
        self.basic_config_check()
        self.manager.init_host()
        self.model_disconnected = False
        ctxt = context.get_admin_context()
        ... ... ...

        LOG.debug("Creating RPC server for service %s", self.topic)

        target = messaging.Target(topic=self.topic, server=self.host)

        endpoints = [
            self.manager,
            baserpc.BaseRPCAPI(self.manager.service_name, self.backdoor_port)
        ]
        endpoints.extend(self.manager.additional_endpoints)

        serializer = objects_base.NovaObjectSerializer()

        self.rpcserver = rpc.get_server(target, endpoints, serializer)
        self.rpcserver.start()

        self.manager.post_start_hook()

        LOG.debug("Join ServiceGroup membership for this service %s",
                  self.topic)
        # Add service to the ServiceGroup membership group.
        self.servicegroup_api.join(self.host, self.topic, self)

        if self.periodic_enable:
            if self.periodic_fuzzy_delay:
                initial_delay = random.randint(0, self.periodic_fuzzy_delay)
            else:
                initial_delay = None

            self.tg.add_dynamic_timer(self.periodic_tasks,
                                     initial_delay=initial_delay,
                                     periodic_interval_max=
                                        self.periodic_interval_max)

#/nova/conductor/manager.py:ConductorManager
class ConductorManager(manager.Manager):
    """Mission: Conduct things.

    The methods in the base API for nova-conductor are various proxy operations
    performed on behalf of the nova-compute service running on compute nodes.
    Compute nodes are not allowed to directly access the database, so this set
    of methods allows them to get specific work done without locally accessing
    the database.

    The nova-conductor service also exposes an API in the 'compute_task'
    namespace.  See the ComputeTaskManager class for details.
    """

    target = messaging.Target(version='2.1')

    def __init__(self, *args, **kwargs):
        super(ConductorManager, self).__init__(service_name='conductor',
                                               *args, **kwargs)
        self.security_group_api = (
            openstack_driver.get_openstack_security_group_driver())
        self._network_api = None
        self._compute_api = None
        self.compute_task_mgr = ComputeTaskManager()
        self.cells_rpcapi = cells_rpcapi.CellsAPI()
        self.additional_endpoints.append(self.compute_task_mgr)

根據/nova/service.py:Service中建立nova-conductor的RPC-server可以看出,其endpoints有3個物件列表(其實這個介面是nova所有有關建立RPC-server都共用的介面,只是根據傳遞進去的manager值不同,為不同的nova服務建立不同的endpoints列表),nova-conductor的endpoints的3個物件列表分別為:

/nova/conductor/manager.py:ConductorManager

/nova/baserpc.py:BaseRPCAPI

/nova/conductor/manager.py:ComputeTaskManager

其中最後一個物件列表是通過endpoints.extend(self.manager.additional_endpoints)進行加入的。對於更加具體RPC的遠端呼叫可參看我的其他幾篇文章。

就拿本次呼叫build_instances來說,最終呼叫到/nova/conductor/manager.py:ComputeTaskManager的build_instances函式。

#/nova/conductor/Manager.py:ComputeTaskManager
    def build_instances(self, context, instances, image, filter_properties,
            admin_password, injected_files, requested_networks,
            security_groups, block_device_mapping=None, legacy_bdm=True):
        # TODO(ndipanov): Remove block_device_mapping and legacy_bdm in version
        #                 2.0 of the RPC API.
        request_spec = scheduler_utils.build_request_spec(context, image,
                                                          instances)
        # TODO(danms): Remove this in version 2.0 of the RPC API
        if (requested_networks and
                not isinstance(requested_networks,
                               objects.NetworkRequestList)):
            requested_networks = objects.NetworkRequestList(
                objects=[objects.NetworkRequest.from_tuple(t)
                         for t in requested_networks])
        # TODO(melwitt): Remove this in version 2.0 of the RPC API
        flavor = filter_properties.get('instance_type')
        if flavor and not isinstance(flavor, objects.Flavor):
            # Code downstream may expect extra_specs to be populated since it
            # is receiving an object, so lookup the flavor to ensure this.
            flavor = objects.Flavor.get_by_id(context, flavor['id'])
            filter_properties = dict(filter_properties, instance_type=flavor)

        try:
            scheduler_utils.setup_instance_group(context, request_spec,
                                                 filter_properties)
            # check retry policy. Rather ugly use of instances[0]...
            # but if we've exceeded max retries... then we really only
            # have a single instance.
            scheduler_utils.populate_retry(filter_properties,
                instances[0].uuid)
            hosts = self.scheduler_client.select_destinations(context,
                    request_spec, filter_properties)
        except Exception as exc:
            updates = {'vm_state': vm_states.ERROR, 'task_state': None}
            for instance in instances:
                self._set_vm_state_and_notify(
                    context, instance.uuid, 'build_instances', updates,
                    exc, request_spec)
            return

        for (instance, host) in itertools.izip(instances, hosts):
            try:
                instance.refresh()
            except (exception.InstanceNotFound,
                    exception.InstanceInfoCacheNotFound):
                LOG.debug('Instance deleted during build', instance=instance)
                continue
            local_filter_props = copy.deepcopy(filter_properties)
            scheduler_utils.populate_filter_properties(local_filter_props,
                host)
            # The block_device_mapping passed from the api doesn't contain
            # instance specific information
            bdms = objects.BlockDeviceMappingList.get_by_instance_uuid(
                    context, instance.uuid)

            self.compute_rpcapi.build_and_run_instance(context,
                    instance=instance, host=host['host'], image=image,
                    request_spec=request_spec,
                    filter_properties=local_filter_props,
                    admin_password=admin_password,
                    injected_files=injected_files,
                    requested_networks=requested_networks,
                    security_groups=security_groups,
                    block_device_mapping=bdms, node=host['nodename'],
                    limits=host['limits'])

其中

hosts =self.scheduler_client.select_destinations(context,

       request_spec, filter_properties)

是nova-scheduler服務選擇host的程式碼流程,其host的選擇我們將在另外一篇文章進行學習。

在選擇到合適的host後,執行下面的程式碼。

#/nova/compute/rpcapi.py:ComputeAPI
    def build_and_run_instance(self, ctxt, instance, host, image, request_spec,
            filter_properties, admin_password=None, injected_files=None,
            requested_networks=None, security_groups=None,
            block_device_mapping=None, node=None, limits=None):

        version = '4.0'
        if not self.client.can_send_version(version):
            version = '3.40'
        if not self.client.can_send_version(version):
            version = '3.36'
            if 'numa_topology' in limits and limits['numa_topology']:
                topology_limits = limits['numa_topology']
                if node is not None:
                    cnode = objects.ComputeNode.get_by_host_and_nodename(
                        ctxt, host, node)
                else:
                    cnode = (
                        objects.ComputeNode.
                        get_first_node_by_host_for_old_compat(
                            ctxt, host))
                host_topology = objects.NUMATopology.obj_from_db_obj(
                    cnode.numa_topology)
                limits['numa_topology'] = jsonutils.dumps(
                    topology_limits.to_dict_legacy(host_topology))
        if not self.client.can_send_version(version):
            version = '3.33'
            if 'instance_type' in filter_properties:
                flavor = filter_properties['instance_type']
                flavor_p = objects_base.obj_to_primitive(flavor)
                filter_properties = dict(filter_properties,
                                         instance_type=flavor_p)
        if not self.client.can_send_version(version):
            version = '3.23'
            if requested_networks is not None:
                if utils.is_neutron():
                    requested_networks = [(network_id, address, port_id)
                        for (network_id, address, port_id, _) in
                            requested_networks.as_tuples()]
                else:
                    requested_networks = [(network_id, address)
                        for (network_id, address) in
                            requested_networks.as_tuples()]

        cctxt = self.client.prepare(server=host, version=version)
        cctxt.cast(ctxt, 'build_and_run_instance', instance=instance,
                image=image, request_spec=request_spec,
                filter_properties=filter_properties,
                admin_password=admin_password,
                injected_files=injected_files,
                requested_networks=requested_networks,
                security_groups=security_groups,
                block_device_mapping=block_device_mapping, node=node,
                limits=limits)

最終通過RPC的cast呼叫去執行被選擇的host上的nova-compute服務的相關程式碼,即執行被選擇host的nova-compute服務的build_and_run_instance函式。即最終執行/nova/compute/manager.py中的build_and_run_instances函式。因為nova-compute建立RPC-server時的3個endpoints中有2個endpoints列表來自/nova/compute/manager.py中,即/nova/compute/manager.py:ComputeManager

/nova/compute/manager.py: _ComputeV4Proxy,且這兩個物件都有相同的build_and_run_instances函式,不過/nova/compute/manager.py: _ComputeV4Proxy最終還是呼叫/nova/compute/manager.py:ComputeManager的build_and_run_instances函式。從下面可以看出。

#/nova/compute/manager.py:ComputeManager
class ComputeManager(manager.Manager):
    """Manages the running instances from creation to destruction."""

    target = messaging.Target(version='3.40')

    # How long to wait in seconds before re-issuing a shutdown
    # signal to a instance during power off.  The overall
    # time to wait is set by CONF.shutdown_timeout.
    SHUTDOWN_RETRY_INTERVAL = 10

    def __init__(self, compute_driver=None, *args, **kwargs):
        """Load configuration options and connect to the hypervisor."""
        self.virtapi = ComputeVirtAPI(self)
        self.network_api = network.API()
        ... ... ...

        super(ComputeManager, self).__init__(service_name="compute",
                                             *args, **kwargs)
        self.additional_endpoints.append(_ComputeV4Proxy(self))

        # NOTE(russellb) Load the driver last.  It may call back into the
        # compute manager via the virtapi, so we want it to be fully
        # initialized before that happens.
        self.driver = driver.load_compute_driver(self.virtapi, compute_driver)
        self.use_legacy_block_device_info = \
                            self.driver.need_legacy_block_device_info

#/nova/compute/manager.py:_ComputeV4Proxy
class _ComputeV4Proxy(object):

    target = messaging.Target(version='4.0')

    def __init__(self, manager):
        self.manager = manager

所以最終我們可以看/nova/compute/manager.py:ComputeManager的build_and_run_instances函式的程式碼流程。

    #/nova/compute/manager.py:ComputeManager
# NOTE(mikal): No object_compat wrapper on this method because its
    # callers all pass objects already
    @wrap_exception()
    @reverts_task_state
    @wrap_instance_fault
    def build_and_run_instance(self, context, instance, image, request_spec,
                     filter_properties, admin_password=None,
                     injected_files=None, requested_networks=None,
                     security_groups=None, block_device_mapping=None,
                     node=None, limits=None):

        # NOTE(danms): Remove this in v4.0 of the RPC API
        if (requested_networks and
                not isinstance(requested_networks,
                               objects.NetworkRequestList)):
            requested_networks = objects.NetworkRequestList(
                objects=[objects.NetworkRequest.from_tuple(t)
                         for t in requested_networks])
        # NOTE(melwitt): Remove this in v4.0 of the RPC API
        flavor = filter_properties.get('instance_type')
        if flavor and not isinstance(flavor, objects.Flavor):
            # Code downstream may expect extra_specs to be populated since it
            # is receiving an object, so lookup the flavor to ensure this.
            flavor = objects.Flavor.get_by_id(context, flavor['id'])
            filter_properties = dict(filter_properties, instance_type=flavor)

        # NOTE(sahid): Remove this in v4.0 of the RPC API
        if (limits and 'numa_topology' in limits and
                isinstance(limits['numa_topology'], six.string_types)):
            db_obj = jsonutils.loads(limits['numa_topology'])
            limits['numa_topology'] = (
                objects.NUMATopologyLimits.obj_from_db_obj(db_obj))

        @utils.synchronized(instance.uuid)
        def _locked_do_build_and_run_instance(*args, **kwargs):
            # NOTE(danms): We grab the semaphore with the instance uuid
            # locked because we could wait in line to build this instance
            # for a while and we want to make sure that nothing else tries
            # to do anything with this instance while we wait.
            with self._build_semaphore:
                self._do_build_and_run_instance(*args, **kwargs)

        # NOTE(danms): We spawn here to return the RPC worker thread back to
        # the pool. Since what follows could take a really long time, we don't
        # want to tie up RPC workers.
        utils.spawn_n(_locked_do_build_and_run_instance,
                      context, instance, image, request_spec,
                      filter_properties, admin_password, injected_files,
                      requested_networks, security_groups,
                      block_device_mapping, node, limits)

#/nova/utils.py
def spawn_n(func, *args, **kwargs):
    """Passthrough method for eventlet.spawn_n.

    This utility exists so that it can be stubbed for testing without
    interfering with the service spawns.

    It will also grab the context from the threadlocal store and add it to
    the store on the new thread.  This allows for continuity in logging the
    context when using this method to spawn a new thread.
    """
    _context = common_context.get_current()

    @functools.wraps(func)
    def context_wrapper(*args, **kwargs):
        # NOTE: If update_store is not called after spawn_n it won't be
        # available for the logger to pull from threadlocal storage.
        if _context is not None:
            _context.update_store()
        func(*args, **kwargs)

    eventlet.spawn_n(context_wrapper, *args, **kwargs)

這裡開啟了一個新的執行緒去執行_locked_do_build_and_run_instance函式。

#/nova/compute/manager.py:ComputeManager
    @hooks.add_hook('build_instance')
    @wrap_exception()
    @reverts_task_state
    @wrap_instance_event
    @wrap_instance_fault
    def _do_build_and_run_instance(self, context, instance, image,
            request_spec, filter_properties, admin_password, injected_files,
            requested_networks, security_groups, block_device_mapping,
            node=None, limits=None):

        try:
            LOG.info(_LI('Starting instance...'), context=context,
                  instance=instance)
            instance.vm_state = vm_states.BUILDING
            instance.task_state = None
            instance.save(expected_task_state=
                    (task_states.SCHEDULING, None))
        except exception.InstanceNotFound:
            msg = 'Instance disappeared before build.'
            LOG.debug(msg, instance=instance)
            return build_results.FAILED
        except exception.UnexpectedTaskStateError as e:
            LOG.debug(e.format_message(), instance=instance)
            return build_results.FAILED

        # b64 decode the files to inject:
        decoded_files = self._decode_files(injected_files)

        if limits is None:
            limits = {}

        if node is None:
            node = self.driver.get_available_nodes(refresh=True)[0]
            LOG.debug('No node specified, defaulting to %s', node,
                      instance=instance)

        try:
            self._build_and_run_instance(context, instance, image,
                    decoded_files, admin_password, requested_networks,
                    security_groups, block_device_mapping, node, limits,
                    filter_properties)
            return build_results.ACTIVE
        except exception.RescheduledException as e:
            retry = filter_properties.get('retry', None)
            if not retry:
                # no retry information, do not reschedule.
                LOG.debug("Retry info not present, will not reschedule",
                    instance=instance)
                self._cleanup_allocated_networks(context, instance,
                    requested_networks)
                compute_utils.add_instance_fault_from_exc(context,
                        instance, e, sys.exc_info())
                self._set_instance_error_state(context, instance)
                return build_results.FAILED
            LOG.debug(e.format_message(), instance=instance)
            retry['exc'] = traceback.format_exception(*sys.exc_info())
            # NOTE(comstud): Deallocate networks if the driver wants
            # us to do so.
            if self.driver.deallocate_networks_on_reschedule(instance):
                self._cleanup_allocated_networks(context, instance,
                        requested_networks)
            else:
                # NOTE(alex_xu): Network already allocated and we don't
                # want to deallocate them before rescheduling. But we need
                # cleanup those network resource setup on this host before
                # rescheduling.
                self.network_api.cleanup_instance_network_on_host(
                    context, instance, self.host)

            instance.task_state = task_states.SCHEDULING
            instance.save()

            self.compute_task_api.build_instances(context, [instance],
                    image, filter_properties, admin_password,
                    injected_files, requested_networks, security_groups,
                    block_device_mapping)
            return build_results.RESCHEDULED
        except (exception.InstanceNotFound,
                exception.UnexpectedDeletingTaskStateError):
            msg = 'Instance disappeared during build.'
            LOG.debug(msg, instance=instance)
            self._cleanup_allocated_networks(context, instance,
                    requested_networks)
            return build_results.FAILED
        except exception.BuildAbortException as e:
            LOG.exception(e.format_message(), instance=instance)
            self._cleanup_allocated_networks(context, instance,
                    requested_networks)
            self._cleanup_volumes(context, instance.uuid,
                    block_device_mapping, raise_exc=False)
            compute_utils.add_instance_fault_from_exc(context, instance,
                    e, sys.exc_info())
            self._set_instance_error_state(context, instance)
            return build_results.FAILED
        except Exception as e:
            # Should not reach here.
            msg = _LE('Unexpected build failure, not rescheduling build.')
            LOG.exception(msg, instance=instance)
            self._cleanup_allocated_networks(context, instance,
                    requested_networks)
            self._cleanup_volumes(context, instance.uuid,
                    block_device_mapping, raise_exc=False)
            compute_utils.add_instance_fault_from_exc(context, instance,
                    e, sys.exc_info())
            self._set_instance_error_state(context, instance)
            return build_results.FAILED

這裡_do_build_and_run_instance函式呼叫_build_and_run_instance函式在本host去繼續執行建立VM剩下的程式碼流程,如果在本host建立的過程中獲得RescheduledException異常,則會根據filter_properties中的retry的值去判斷是否重新排程到新的host上去建立VM。這裡我們主要分析_build_and_run_instance函式在本host去繼續執行建立VM剩下的程式碼流程。

#/nova/compute/manager.py:ComputeManager
    def _build_and_run_instance(self, context, instance, image, injected_files,
            admin_password, requested_networks, security_groups,
            block_device_mapping, node, limits, filter_properties):

        image_name = image.get('name')
        self._notify_about_instance_usage(context, instance, 'create.start',
                extra_usage_info={'image_name': image_name})
        try:
            rt = self._get_resource_tracker(node)
            with rt.instance_claim(context, instance, limits) as inst_claim:
                # NOTE(russellb) It's important that this validation be done
                # *after* the resource tracker instance claim, as that is where
                # the host is set on the instance.
                self._validate_instance_group_policy(context, instance,
                        filter_properties)
                with self._build_resources(context, instance,
                        requested_networks, security_groups, image,
                        block_device_mapping) as resources:
                    instance.vm_state = vm_states.BUILDING
                    instance.task_state = task_states.SPAWNING
                    instance.numa_topology = inst_claim.claimed_numa_topology
                    # NOTE(JoshNang) This also saves the changes to the
                    # instance from _allocate_network_async, as they aren't
                    # saved in that function to prevent races.
                    instance.save(expected_task_state=
                            task_states.BLOCK_DEVICE_MAPPING)
                    block_device_info = resources['block_device_info']
                    network_info = resources['network_info']
                    self.driver.spawn(context, instance, image,
                                      injected_files, admin_password,
                                      network_info=network_info,
                                      block_device_info=block_device_info)
        except (exception.InstanceNotFound,
                exception.UnexpectedDeletingTaskStateError) as e:
            with excutils.save_and_reraise_exception():
                self._notify_about_instance_usage(context, instance,
                    'create.end', fault=e)
        except exception.ComputeResourcesUnavailable as e:
            LOG.debug(e.format_message(), instance=instance)
            self._notify_about_instance_usage(context, instance,
                    'create.error', fault=e)
            raise exception.RescheduledException(
                    instance_uuid=instance.uuid, reason=e.format_message())
        except exception.BuildAbortException as e:
            with excutils.save_and_reraise_exception():
                LOG.debug(e.format_message(), instance=instance)
                self._notify_about_instance_usage(context, instance,
                    'create.error', fault=e)
        except (exception.FixedIpLimitExceeded,
                exception.NoMoreNetworks, exception.NoMoreFixedIps) as e:
            LOG.warning(_LW('No more network or fixed IP to be allocated'),
                        instance=instance)
            self._notify_about_instance_usage(context, instance,
                    'create.error', fault=e)
            msg = _('Failed to allocate the network(s) with error %s, '
                    'not rescheduling.') % e.format_message()
            raise exception.BuildAbortException(instance_uuid=instance.uuid,
                    reason=msg)
        except (exception.VirtualInterfaceCreateException,
                exception.VirtualInterfaceMacAddressException) as e:
            LOG.exception(_LE('Failed to allocate network(s)'),
                          instance=instance)
            self._notify_about_instance_usage(context, instance,
                    'create.error', fault=e)
            msg = _('Failed to allocate the network(s), not rescheduling.')
            raise exception.BuildAbortException(instance_uuid=instance.uuid,
                    reason=msg)
        except (exception.FlavorDiskTooSmall,
                exception.FlavorMemoryTooSmall,
                exception.ImageNotActive,
                exception.ImageUnacceptable) as e:
            self._notify_about_instance_usage(context, instance,
                    'create.error', fault=e)
            raise exception.BuildAbortException(instance_uuid=instance.uuid,
                    reason=e.format_message())
        except Exception as e:
            self._notify_about_instance_usage(context, instance,
                    'create.error', fault=e)
            raise exception.RescheduledException(
                    instance_uuid=instance.uuid, reason=six.text_type(e))

        # NOTE(alaski): This is only useful during reschedules, remove it now.
        instance.system_metadata.pop('network_allocated', None)

        self._update_instance_after_spawn(context, instance)

        try:
            instance.save(expected_task_state=task_states.SPAWNING)
        except (exception.InstanceNotFound,
                exception.UnexpectedDeletingTaskStateError) as e:
            with excutils.save_and_reraise_exception():
                self._notify_about_instance_usage(context, instance,
                    'create.end', fault=e)

        self._update_scheduler_instance_info(context, instance)
        self._notify_about_instance_usage(context, instance, 'create.end',
                extra_usage_info={'message': _('Success')},
                network_info=network_info)

在這裡,首先使用Claim機制對在建立VM之間的主機的可用資源進行驗證,看是否滿足建立VM的要求,Claim機制主要解決的問題是:當一臺主機被多個nova-scheduler同時選中併發送建立VM的請求時,這臺主機並不一定有足夠的資源來滿足這些虛擬機器的建立要求,所以需要在建立VM之前使用Claim機制對主機的資源進行驗證,如果滿足,則更新資料庫,將VM申請的資源從主機可用的資源中減掉,如果後來建立失敗或者將VM刪除時,會再通過Claim加上之前減掉的部分。下面我們可以簡單分析一下Claim機制的相關程式碼。

#/nova/compute/resource_tracker.py:ResourceTracker
    @utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
    def instance_claim(self, context, instance_ref, limits=None):
        """Indicate that some resources are needed for an upcoming compute
        instance build operation.

        This should be called before the compute node is about to perform
        an instance build operation that will consume additional resources.

        :param context: security context
        :param instance_ref: instance to reserve resources for
        :param limits: Dict of oversubscription limits for memory, disk,
                       and CPUs.
        :returns: A Claim ticket representing the reserved resources.  It can
                  be used to revert the resource usage if an error occurs
                  during the instance build.
        """
        if self.disabled:
            # compute_driver doesn't support resource tracking, just
            # set the 'host' and node fields and continue the build:
            self._set_instance_host_and_node(context, instance_ref)
            return claims.NopClaim()

        # sanity checks:
        if instance_ref['host']:
            LOG.warning(_LW("Host field should not be set on the instance "
                            "until resources have been claimed."),
                        instance=instance_ref)

        if instance_ref['node']:
            LOG.warning(_LW("Node field should not be set on the instance "
                            "until resources have been claimed."),
                        instance=instance_ref)

        # get memory overhead required to build this instance:
        overhead = self.driver.estimate_instance_overhead(instance_ref)
        LOG.debug("Memory overhead for %(flavor)d MB instance; %(overhead)d "
                  "MB", {'flavor': instance_ref['memory_mb'],
                          'overhead': overhead['memory_mb']})

        claim = claims.Claim(context, instance_ref, self, self.compute_node,
                             overhead=overhead, limits=limits)

        self._set_instance_host_and_node(context, instance_ref)
        instance_ref['numa_topology'] = claim.claimed_numa_topology

        # Mark resources in-use and update stats
        self._update_usage_from_instance(context, self.compute_node,
                                         instance_ref)

        elevated = context.elevated()
        # persist changes to the compute node:
        self._update(elevated, self.compute_node)

        return claim

我們重點看看instance_claim函式中建立Claim物件都做了什麼操作。如下

#/nova/compute/claims.py:Claim
class Claim(NopClaim):
    """A declaration that a compute host operation will require free resources.
    Claims serve as marker objects that resources are being held until the
    update_available_resource audit process runs to do a full reconciliation
    of resource usage.

    This information will be used to help keep the local compute hosts's
    ComputeNode model in sync to aid the scheduler in making efficient / more
    correct decisions with respect to host selection.
    """

    def __init__(self, context, instance, tracker, resources, overhead=None,
                 limits=None):
        super(Claim, self).__init__()
        # Stash a copy of the instance at the current point of time
        if isinstance(instance, obj_base.NovaObject):
            self.instance = instance.obj_clone()
        else:
            # This does not use copy.deepcopy() because it could be
            # a sqlalchemy model, and it's best to make sure we have
            # the primitive form.
            self.instance = jsonutils.to_primitive(instance)
        self._numa_topology_loaded = False
        self.tracker = tracker

        if not overhead:
            overhead = {'memory_mb': 0}

        self.overhead = overhead
        self.context = context

        # Check claim at constructor to avoid mess code
        # Raise exception ComputeResourcesUnavailable if claim failed
        self._claim_test(resources, limits)

#/nova/compute/claims.py:Claim
    def _claim_test(self, resources, limits=None):
        """Test if this claim can be satisfied given available resources and
        optional oversubscription limits

        This should be called before the compute node actually consumes the
        resources required to execute the claim.

        :param resources: available local compute node resources
        :returns: Return true if resources are available to claim.
        """
        if not limits:
            limits = {}

        # If an individual limit is None, the resource will be considered
        # unlimited:
        memory_mb_limit = limits.get('memory_mb')
        disk_gb_limit = limits.get('disk_gb')
        numa_topology_limit = limits.get('numa_topology')

        msg = _("Attempting claim: memory %(memory_mb)d MB, disk %(disk_gb)d "
                "GB")
        params = {'memory_mb': self.memory_mb, 'disk_gb': self.disk_gb}
        LOG.info(msg % params, instance=self.instance)

        reasons = [self._test_memory(resources, memory_mb_limit),
                   self._test_disk(resources, disk_gb_limit),
                   self._test_numa_topology(resources, numa_topology_limit),
                   self._test_pci()]
        reasons = reasons + self._test_ext_resources(limits)
        reasons = [r for r in reasons if r is not None]
        if len(reasons) > 0:
            raise exception.ComputeResourcesUnavailable(reason=
                    "; ".join(reasons))

        LOG.info(_LI('Claim successful'), instance=self.instance)

從上述程式碼可以看出,在建立Claim物件時,它會呼叫_claim_test函式進行檢測Memory和disk等資訊。我們舉Memory的檢測方法進行簡要說明。

#/nova/compute/claims.py:Claim
    def _test_memory(self, resources, limit):
        type_ = _("memory")
        unit = "MB"
        total = resources['memory_mb']
        used = resources['memory_mb_used']
        requested = self.memory_mb

        return self._test(type_, unit, total, used, requested, limit)

#/nova/compute/claims.py:Claim
    def _test(self, type_, unit, total, used, requested, limit):
        """Test if the given type of resource needed for a claim can be safely
        allocated.
        """
        LOG.info(_LI('Total %(type)s: %(total)d %(unit)s, used: %(used).02f '
                    '%(unit)s'),
                  {'type': type_, 'total': total, 'unit': unit, 'used': used},
                  instance=self.instance)

        if limit is None:
            # treat resource as unlimited:
            LOG.info(_LI('%(type)s limit not specified, defaulting to '
                        'unlimited'), {'type': type_}, instance=self.instance)
            return

        free = limit - used

        # Oversubscribed resource policy info:
        LOG.info(_LI('%(type)s limit: %(limit).02f %(unit)s, '
                     'free: %(free).02f %(unit)s'),
                  {'type': type_, 'limit': limit, 'free': free, 'unit': unit},
                  instance=self.instance)

        if requested > free:
            return (_('Free %(type)s %(free).02f '
                      '%(unit)s < requested %(requested)d %(unit)s') %
                      {'type': type_, 'free': free, 'unit': unit,
                       'requested': requested})

Memory檢測方法比較簡單,就是簡單比較底層的記憶體資源是否符合VM所要求的記憶體大小。如果不滿足,則返回值(即不為None),如果滿足,則預設返回None。這裡需要說明的resources是#/nova/compute/resource_tracker.py:ResourceTracker物件中的compute_node屬性。compute_node的資訊是通過nova-conductor從資料庫中更新的host資源資訊的,而資料庫中關於各個host的資源資訊是通過nova-compute的periodic task進位制去定時的上報底層資源到資料庫的(具體參考nova-computePeriodic tasks 機制)。

繼續回到_claim_test函式,當各種相關資源對比完成後,根據返回的值進行處理,即reasons是全部資源監測完成的返回結果,如果reasons中至少有一個不為None,即有至少一類(如disk)不滿足建立VM的request規格引數,此時將raise一個ComputeResourcesUnavailable到上層,且會呼叫__exit__()方法將佔用的資源返回給主機的可用資源中。這裡我們考慮主機的可用資源滿足新建VM的需求。在進行claim機制驗證後,將執行self._set_instance_host_and_node(context, instance_ref) 方法,該方法將通過nova-conductor更新Instance的host、node與launched_on屬性,然後執行self._update_usage_from_instance(...)方法來根據新建VM的需求去進行主機可用的資源。最後執行self._update(...)來根據上面的計算結果更新資料庫。

至此,nova-compute中的claim機制分析完畢。這裡需要對nova-compute中的claim機制和periodictask機制做一個說明(參考《OpenStack設計與實現》):Claim機制是在資料庫當前資料的基礎上去計算並更新(Claim機制獲取的資料庫中主機的資訊是儲存在/nova/compute/resource_tracker.py:ResourceTracker中的self.compute_node中),能夠保證資料庫裡的可用資源及時更新,以便為nova-scheduler提供最新的資料。Periodictask機制是為了保證資料庫內資訊的準確性,它每次都會通過hypervisor獲取主機的資訊,並將這些資訊更新到資料庫中(定時更新到資料庫中,預設時間間隔為60s,具體參考《nova-computePeriodic tasks 機制》)。