1. 程式人生 > >Ocata Neutron代碼分析(一)——Neutron API啟動過程分析

Ocata Neutron代碼分析(一)——Neutron API啟動過程分析

process fig ddr arch 異常 run tap 文件中 bridge

首先,Neutron Server作為一種服務(neutron-server.service),可以到Neutron項目目錄中的setup.cfg配置文件中找到對應的代碼入口。

[entry_points]
console_scripts =
    neutron-db-manage = neutron.db.migration.cli:main
    neutron-debug = neutron.debug.shell:main
    neutron-dhcp-agent = neutron.cmd.eventlet.agents.dhcp:main
    neutron-keepalived-state-change = neutron.cmd.keepalived_state_change:main
    neutron
-ipset-cleanup = neutron.cmd.ipset_cleanup:main neutron-l3-agent = neutron.cmd.eventlet.agents.l3:main neutron-linuxbridge-agent = neutron.cmd.eventlet.plugins.linuxbridge_neutron_agent:main neutron-linuxbridge-cleanup = neutron.cmd.linuxbridge_cleanup:main neutron-macvtap-agent = neutron.cmd.eventlet.plugins.macvtap_neutron_agent:main neutron
-metadata-agent = neutron.cmd.eventlet.agents.metadata:main neutron-netns-cleanup = neutron.cmd.netns_cleanup:main neutron-ns-metadata-proxy = neutron.cmd.eventlet.agents.metadata_proxy:main neutron-openvswitch-agent = neutron.cmd.eventlet.plugins.ovs_neutron_agent:main neutron-ovs-cleanup = neutron.cmd.ovs_cleanup:main neutron
-pd-notify = neutron.cmd.pd_notify:main neutron-server = neutron.cmd.eventlet.server:main neutron-rpc-server = neutron.cmd.eventlet.server:main_rpc_eventlet neutron-rootwrap = oslo_rootwrap.cmd:main neutron-rootwrap-daemon = oslo_rootwrap.cmd:daemon neutron-usage-audit = neutron.cmd.eventlet.usage_audit:main neutron-metering-agent = neutron.cmd.eventlet.services.metering_agent:main neutron-sriov-nic-agent = neutron.cmd.eventlet.plugins.sriov_nic_neutron_agent:main neutron-sanity-check = neutron.cmd.sanity_check:main

neutron-server代碼入口:

def main():
    server.boot_server(_main_neutron_server)

調用neutron.server.__init__中的boot_server函數:

def boot_server(server_func):
    # the configuration will be read into the cfg.CONF global data structure
    config.init(sys.argv[1:])
    config.setup_logging()
    config.set_config_defaults()
    if not cfg.CONF.config_file:
        sys.exit(_("ERROR: Unable to find configuration file via the default"
                   " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and"
                   " the ‘--config-file‘ option!"))
    try:
        server_func()
    except KeyboardInterrupt:
        pass
    except RuntimeError as e:
        sys.exit(_("ERROR: %s") % e)

該函數在外加了一層初始化日誌文件的殼,執行server_func(也就是這裏的 _main_neutron_server),並進行異常的捕獲。

def _main_neutron_server():
    if cfg.CONF.web_framework == legacy:
        wsgi_eventlet.eventlet_wsgi_server()
    else:
        wsgi_pecan.pecan_wsgi_server()

根據Neutron配置文件中的web_framework進行判斷,默認為legacy。執行neutron.server.wsgi_eventlet.py中的eventlet_wsgi_server。

def eventlet_wsgi_server():
    neutron_api = service.serve_wsgi(service.NeutronApiService)     # 初始化neutron_api
    start_api_and_rpc_workers(neutron_api)                          # 啟動neutron_api和rpc

先分析一下neutron_api的啟動過程,rpc的啟動過程下一節分析。調用neutron.service.py中的serve_wsgi函數,相關的類代碼都在一塊,一起貼出來分析。

class WsgiService(object):
    """Base class for WSGI based services.

    For each api you define, you must also define these flags:
    :<api>_listen: The address on which to listen
    :<api>_listen_port: The port on which to listen

    """

    def __init__(self, app_name):                    # 初始化類變量,wsgi_app變量在start函數中初始化
        self.app_name = app_name
        self.wsgi_app = None

    def start(self):
        self.wsgi_app = _run_wsgi(self.app_name)

    def wait(self):
        self.wsgi_app.wait()

class NeutronApiService(WsgiService):
    """Class for neutron-api service."""
    def __init__(self, app_name):
        profiler.setup(neutron-server, cfg.CONF.host)
        super(NeutronApiService, self).__init__(app_name)     # super調用父類的構造函數

    @classmethod
    def create(cls, app_name=neutron):
        # Setup logging early
        config.setup_logging()
        service = cls(app_name)          # 調用構造函數,app_name為neutron
        return service                   # 返回一個NeutronApiService的實例

def serve_wsgi(cls):                    # 傳入的cls參數為service.NeutronApiService

    try:
        service = cls.create()          # 調用NeutronApiService.create
        service.start()                 # 初始化service.wsgi_app
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_LE(Unrecoverable error: please check log 
                              for details.))

    registry.notify(resources.PROCESS, events.BEFORE_SPAWN, service)
    return service

下面分析_run_wsgi函數,傳入參數app_name為‘neutron‘。

def _run_wsgi(app_name):
    app = config.load_paste_app(app_name)          # 加載wsgi_app
    if not app:
        LOG.error(_LE(No known API applications configured.))
        return
    return run_wsgi_app(app)                       # 啟動wsgi_server
def load_paste_app(app_name):
    """Builds and returns a WSGI app from a paste config file.

    :param app_name: Name of the application to load
    """
    loader = wsgi.Loader(cfg.CONF)
    app = loader.load_app(app_name)
    return app

load_paste_app函數首先實例化oslo_service.wsgi.py中的Loader類,返回一個loader對象。然後再調用loader對象的load_app函數來加載wsgi_app。

class Loader(object):
    """Used to load WSGI applications from paste configurations."""

    def __init__(self, conf):
        """Initialize the loader, and attempt to find the config.

        :param conf: Application config
        :returns: None

        """
        conf.register_opts(_options.wsgi_opts)
        self.config_path = None

        config_path = conf.api_paste_config                    # 獲取配置文件中api_paste_config指定的api-paste.ini文件路徑
        if not os.path.isabs(config_path):
            self.config_path = conf.find_file(config_path)
        elif os.path.exists(config_path):
            self.config_path = config_path                     # 賦值給loader.config_path,供load_app中調用

        if not self.config_path:
            raise ConfigNotFound(path=config_path)

    def load_app(self, name):
        """Return the paste URLMap wrapped WSGI application.

        :param name: Name of the application to load.
        :returns: Paste URLMap object wrapping the requested application.
        :raises: PasteAppNotFound

        """
        try:
            LOG.debug("Loading app %(name)s from %(path)s",
                      {name: name, path: self.config_path})
            return deploy.loadapp("config:%s" % self.config_path, name=name)    # 調用第三方庫paste.deploy的loadapp函數來加載wsgi_app並返回,後續章節分析其配置文件  
        except LookupError:
            LOG.exception("Couldn‘t lookup app: %s", name)
            raise PasteAppNotFound(name=name, path=self.config_path)

完成加載的wsgi_app在_run_wsgi函數中作為參數傳入run_wsgi_app函數中,進行wsgi_server的創建。

def run_wsgi_app(app):
    server = wsgi.Server("Neutron")
    server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host,
                 workers=_get_api_workers())
    LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"),
             {host: cfg.CONF.bind_host, port: cfg.CONF.bind_port})
    return server

run_wsgi_app函數首先實例化Server,然後調用start函數來啟動wsgi_server。這裏的workers指啟動workers個進程來運行Neutron API。先分析_get_api_workers函數再分析Server類中的相關內容。

def _get_api_workers():
    workers = cfg.CONF.api_workers
    if workers is None:
        workers = processutils.get_worker_count()
    return workers

_get_api_workers先獲取配置文件中的api_workers。如果沒有配置,則workers默認為CPU個數。

class Server(object):
    def start(self, application, port, host=0.0.0.0, workers=0):
        """Run a WSGI server with the given application."""
        self._host = host
        self._port = port
        backlog = CONF.backlog

        self._socket = self._get_socket(self._host,
                                        self._port,
                                        backlog=backlog)

        self._launch(application, workers)

    def _launch(self, application, workers=0):
        service = WorkerService(self, application, self.disable_ssl, workers)
        if workers < 1:
            # The API service should run in the current process.
            self._server = service
            # Dump the initial option values
            cfg.CONF.log_opt_values(LOG, logging.DEBUG)
            service.start()
            systemd.notify_once()
        else:
            # dispose the whole pool before os.fork, otherwise there will
            # be shared DB connections in child processes which may cause
            # DB errors.
            api.context_manager.dispose_pool()
            # The API service runs in a number of child processes.
            # Minimize the cost of checking for child exit by extending the
            # wait interval past the default of 0.01s.
            self._server = common_service.ProcessLauncher(cfg.CONF,
                                                          wait_interval=1.0)
            self._server.launch_service(service,
                                        workers=service.worker_process_count)

首先,_launch函數會把傳入的wsgi_app封裝成一個WorkerService,然後對workers進行判斷。當workers的值大於或等於1時(默認情況下,workers的值為1,因此會創建一個子進程來運行Neutron API),與接下來分析的rpc服務啟動部分和其他服務啟動部分一樣,先實例化oslo_service.service.py中的ProcessLauncher類,再調用其中的launch_service來啟動WorkerService。

class ProcessLauncher(object):
    def launch_service(self, service, workers=1):
        """Launch a service with a given number of workers.

       :param service: a service to launch, must be an instance of
              :class:`oslo_service.service.ServiceBase`
       :param workers: a number of processes in which a service
              will be running
        """
        _check_service_base(service)                        # 檢查傳入參數service的基類是否為oslo_service.service.ServiceBase
        wrap = ServiceWrapper(service, workers)             # 將servcie相關參數記錄到wrap變量中,包括service和workers

        LOG.info(Starting %d workers, wrap.workers)
        while self.running and len(wrap.children) < wrap.workers:   # 依次啟動子進程,數量為wrap.workers
            self._start_child(wrap)
class WorkerService(neutron_worker.NeutronWorker):
    """Wraps a worker to be handled by ProcessLauncher"""
    def __init__(self, service, application, disable_ssl=False,
                 worker_process_count=0):
        super(WorkerService, self).__init__(worker_process_count)

        self._service = service               # service為neutron.wsgi.py中Server的實例server(run_wsgi_app函數中的server)
        self._application = application       # wsgi_app
        self._disable_ssl = disable_ssl
        self._server = None

    def start(self):
        super(WorkerService, self).start()
        # When api worker is stopped it kills the eventlet wsgi server which
        # internally closes the wsgi server socket object. This server socket
        # object becomes not usable which leads to "Bad file descriptor"
        # errors on service restart.
        # Duplicate a socket object to keep a file descriptor usable.
        dup_sock = self._service._socket.dup()
        if CONF.use_ssl and not self._disable_ssl:
            dup_sock = sslutils.wrap(CONF, dup_sock)
        self._server = self._service.pool.spawn(self._service._run,     
                                                self._application,
                                                dup_sock)          # 這裏調用self._service也就是上面提到的server對象的pool(eventlet的GreenPool)
                                                                   # 的spawn函數來運行Server的_run函數
def _run(self, application, socket):
    """Start a WSGI server in a new green thread."""
    eventlet.wsgi.server(socket, application,
                        max_size=self.num_threads,
                        log=LOG,
                        keepalive=CONF.wsgi_keep_alive,
                        socket_timeout=self.client_socket_timeout)
Neutron API部分啟動過程總結: Neutron API的啟動過程是Neutron Server啟動過程中的一大重要步驟,另一個為RPC部分。主要分為兩個部分:wsgi app的加載和wsgi server的啟動。其中,wsgi app的加載最終是調用paste.deploy庫中的loadapp函數來實現的,默認的配置文件api-paste.ini在/etc/neutron目錄下。wsgi server是通過進程+Eventlet的GreenPool的方式來啟動服務,後面要分析的RPC服務也是通過這種方式啟動。最終實例化oslo_service.service::ProcessLaucher類並調用其launch_service函數來實現,傳入的service參數(這裏是WorkerService )必須是oslo_service.service::ServiceBase的派生類並需要實現start函數。

Ocata Neutron代碼分析(一)——Neutron API啟動過程分析