1. 程式人生 > >Ocata Neutron代碼分析(三)——oslo_service中的ServiceLauncher和ProcessLauncher(轉載)

Ocata Neutron代碼分析(三)——oslo_service中的ServiceLauncher和ProcessLauncher(轉載)

mic return cme down ice post you tin system

1.概述 Openstack中有一個叫Launcher的概念,即專門用來啟動服務的,這個類被放在了oslo_service這個包裏面。Launcher分為兩種,一種是ServiceLauncher,另一種為ProcessLauncher。ServiceLauncher用來啟動單進程的服務,而ProcessLauncher用來啟動有多個worker子進程的服務。 2.ServiceLauncher ServiceLauncher繼承自Launcher,啟動服務的一個重要成員就是launcher_service,ServiceLauncher沒有對該成員函數進行任何改寫。
def launch_service(self, service, workers=1):
        
"""Load and start the given service. :param service: The service you would like to start, must be an instance of :class:`oslo_service.service.ServiceBase` :param workers: This param makes this method compatible with ProcessLauncher.launch_service. It must be None, 1 or omitted. :returns: None
""" if workers is not None and workers != 1: raise ValueError(_("Launcher asked to start multiple workers")) _check_service_base(service) service.backdoor_port = self.backdoor_port self.services.add(service)

laucher_service就是將服務添加到self.services成員裏面,services成員的類型是class Services,看看它的add方法。

class Services(object):

    def __init__(self):
        self.services = []
        self.tg = threadgroup.ThreadGroup()
        self.done = event.Event()

    def add(self, service):
        """Add a service to a list and create a thread to run it.

        :param service: service to run
        """
        self.services.append(service)
        self.tg.add_thread(self.run_service, service, self.done)

    @staticmethod
    def run_service(service, done):
        """Service start wrapper.

        :param service: service to run
        :param done: event to wait on until a shutdown is triggered
        :returns: None

        """
        try:
            service.start()
        except Exception:
            LOG.exception(Error starting thread.)
            raise SystemExit(1)
        else:
            done.wait()
Services這個類的初始化很簡單,即創建一個ThreadGroup,ThreadGroup其實是eventlet的GreenPool,Openstack利用eventlet實現並發。add方法,將self.run_service這個方法放入pool中,而service就是它的參數。run_service方法很簡單,就是調用service的start方法,這樣就完成了服務的啟動。 3.ProcessLauncher ProcessLauncher直接繼承於Object,所以其對launch_service方法進行了實現。
class ProcessLauncher(object):
    def launch_service(self, service, workers=1):
        """Launch a service with a given number of workers.

       :param service: a service to launch, must be an instance of
              :class:`oslo_service.service.ServiceBase`
       :param workers: a number of processes in which a service
              will be running
        """
        _check_service_base(service)
        wrap = ServiceWrapper(service, workers)

        LOG.info(Starting %d workers, wrap.workers)
        while self.running and len(wrap.children) < wrap.workers:
            self._start_child(wrap)

lauch_service除了接受service參數以外,還需要接受一個workers參數,即子進程的個數,然後調用_start_child啟動多個子進程。

    def _start_child(self, wrap):
        if len(wrap.forktimes) > wrap.workers:
            # Limit ourselves to one process a second (over the period of
            # number of workers * 1 second). This will allow workers to
            # start up quickly but ensure we don‘t fork off children that
            # die instantly too quickly.
            if time.time() - wrap.forktimes[0] < wrap.workers:
                LOG.info(Forking too fast, sleeping)
                time.sleep(1)

            wrap.forktimes.pop(0)

        wrap.forktimes.append(time.time())

        pid = os.fork()
        if pid == 0:
            self.launcher = self._child_process(wrap.service)
            while True:
                self._child_process_handle_signal()
                status, signo = self._child_wait_for_exit_or_signal(
                    self.launcher)
                if not _is_sighup_and_daemon(signo):
                    self.launcher.wait()
                    break
                self.launcher.restart()

            os._exit(status)

        LOG.debug(Started child %d, pid)

        wrap.children.add(pid)
        self.children[pid] = wrap

        return pid

_start_child只是簡單的調用了一個os.fork(),然後子進程開始運行,子進程調用_child_process。

   def _child_process(self, service):
        self._child_process_handle_signal()

        # Reopen the eventlet hub to make sure we don‘t share an epoll
        # fd with parent and/or siblings, which would be bad
        eventlet.hubs.use_hub()

        # Close write to ensure only parent has it open
        os.close(self.writepipe)
        # Create greenthread to watch for parent to close pipe
        eventlet.spawn_n(self._pipe_watcher)

        # Reseed random number generator
        random.seed()

        launcher = Launcher(self.conf, restart_method=self.restart_method)
        launcher.launch_service(service)
        return launcher

_child_process其實很簡單,創建一個Launcher,調用Laucher.launch_service方法。前面介紹過ServiceLauncher繼承自Launcher,也是調用的launch_service方法,將服務啟動,因此接下來的步驟與前面一致,最終都將調用service.start方法啟動服務。

Ocata Neutron代碼分析(三)——oslo_service中的ServiceLauncher和ProcessLauncher(轉載)