1. 程式人生 > >Ceilometer Compute Agent 原理和程式碼分析

Ceilometer Compute Agent 原理和程式碼分析

本部落格所有文章採用的授權方式為 自由轉載-非商用-非衍生-保持署名 ,轉載請務必註明出處,謝謝。

宣告:
本部落格歡迎轉發,但請註明出處,保留原作者資訊
部落格地址:孟阿龍的部落格
所有內容為本人學習、研究、總結。如有雷同,實屬榮幸


注:

  1. 本文以Openstack Q版本為準分析
  2. 本文內容較長,通過本文,你可以從程式碼層面瞭解到polling-agent的整個啟動執行過程,其中也包含了部分程式碼的細節分析

 

目錄

1. 背景:

2. 服務啟動命令

3. 程序啟動基本流程

4. 程式碼分析

4.1 配置初始化:

4.2. AgentManager初始化

4.2.1 AgentManager初始化入口:

4.2.2 AgentManager初始化過程

4.3. AgentManager服務啟動

4.3.1 AgentManager 服務啟動入口:

4.3.2 初始化polling_manager

4.3.3 AgentManager服務啟動過程:

5. 總結


1. 背景:

前邊我們說過,Ceilometer中的agent主要是兩個,Polling Agent 和 Notification Agent,而Polling Agent中,主要又包含兩個:Compute Agent + Central Agent, 本文就來分析Polling Agent的程式碼基本原理。通過對程式碼的分析,來進一步瞭解Polling Agent的工作機制。

2. 服務啟動命令

本文測試環境是通過packstack在虛擬機器環境中安裝的一套 all-in-one Openstack環境。
Polling Agent的啟動命令如下:

/usr/bin/ceilometer-polling --logfile /var/log/ceilometer/polling.log

3. 程序啟動基本流程

啟動程式碼的入口在
ceilometer.cmd.polling.main

def main():
    conf = cfg.ConfigOpts()
    conf.register_cli_opts(CLI_OPTS)
    service.prepare_service(conf=conf)
    sm = cotyledon.ServiceManager()
    sm.add(create_polling_service, args=(conf,))
    oslo_config_glue.setup(sm, conf)
    sm.run()

從這裡可以看到,整體的基本過程為:

  1. 初始化配置檔案,註冊預設的namespaces配置項
  2. 呼叫service.prepare_service方法,初始化基礎配置項,主要包含:keystone認證、日誌配置項、配置檔案等
  3. 啟動程序框架
  4. 通過sm.add(create_polling_service,args=(conf,))初始化程序例項AgentManager
  5. 通過sm.run() 啟動資料採集程序
    以上,這裡邊最主要的在於2,4,5這三步,接下來分別分析。

4. 程式碼分析

4.1 配置初始化:

配置初始化的過程主要在service.prepare_service()這個方法中,這裡我們只簡單說明一下基礎配置項如何進行初始化
在ceilometer.service.prepare_service函式中:

def prepare_service(argv=None, config_files=None, conf=None):
    if argv is None:
        argv = sys.argv

    if conf is None:
        conf = cfg.ConfigOpts()
...
    for group, options in opts.list_opts():
        conf.register_opts(list(options),
                           group=None if group == "DEFAULT" else group)
 ...

    conf(argv[1:], project='ceilometer', validate_default_values=True,
         version=version.version_info.version_string(),
         default_config_files=config_files)

這裡基本的過程為:

  1. 初始化一個oslo_config的例項conf
  2. 對ceilometer.opts.list_opts中定義的各類引數進行遍歷初始化預設值(具體初始化的方法,這裡不做贅述
  3. 最終的效果是在conf這個例項中,獲取list_opts所列出來的各類引數項以及他們的預設值初始化到conf中
  4. 呼叫conf(argv[1:],project='ceilometer',......) 方法,用argv[1]中指定配置檔案的配置項對conf中的預設值進行覆蓋完成配置項初始化

4.2. AgentManager初始化

4.2.1 AgentManager初始化入口:

程式碼路徑:ceilometer.cmd.polling.create_polling_service

def create_polling_service(worker_id, conf):
    return manager.AgentManager(worker_id,
                                conf,
                                conf.polling_namespaces)

這個函式中,呼叫AgentManager進行初始化,傳入三個引數:
worker_id: 這個引數在哪裡初始化的還沒搞清楚
conf:即上一步初始化的配置物件
conf.polling_namespaces: 使用預設的配置,即:['compute', 'central']

4.2.2 AgentManager初始化過程

程式碼路徑:ceilometer.polling.manager.AgentManager#init

 1     def __init__(self, worker_id, conf, namespaces=None):
  2         namespaces = namespaces or ['compute', 'central']
  3         group_prefix = conf.polling.partitioning_group_prefix
  4
  5         super(AgentManager, self).__init__(worker_id)
  6
  7         self.conf = conf
  8
  9         if type(namespaces) is not list:
 10             namespaces = [namespaces]
 11
 12         # we'll have default ['compute', 'central'] here if no namespaces will
 13         # be passed
 14         extensions = (self._extensions('poll', namespace, self.conf).extensions
 15                       for namespace in namespaces)
 16         # get the extensions from pollster builder
 17         extensions_fb = (self._extensions_from_builder('poll', namespace)
 18                          for namespace in namespaces)
 19
 20         self.extensions = list(itertools.chain(*list(extensions))) + list(
 21             itertools.chain(*list(extensions_fb)))
 22
 23         if not self.extensions:
 24             LOG.warning('No valid pollsters can be loaded from %s '
 25                         'namespaces', namespaces)
 26
 27         discoveries = (self._extensions('discover', namespace,
 28                                         self.conf).extensions
 29                        for namespace in namespaces)
 30         self.discoveries = list(itertools.chain(*list(discoveries)))
 31         self.polling_periodics = None
 32
 33         self.hashrings = None
 34         self.partition_coordinator = None
 35         if self.conf.coordination.backend_url:
 36             # XXX uuid4().bytes ought to work, but it requires ascii for now
 37             coordination_id = str(uuid.uuid4()).encode('ascii')
 38             self.partition_coordinator = coordination.get_coordinator(
 39                 self.conf.coordination.backend_url, coordination_id)
 40
 41         # Compose coordination group prefix.
 42         # We'll use namespaces as the basement for this partitioning.
 43         namespace_prefix = '-'.join(sorted(namespaces))
 44         self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
 45                              if group_prefix else namespace_prefix)
 46
 47         self.notifier = oslo_messaging.Notifier(
 48             messaging.get_transport(self.conf),
 49             driver=self.conf.publisher_notifier.telemetry_driver,
 50             publisher_id="ceilometer.polling")
 51
 52         self._keystone = None
 53         self._keystone_last_exception = None
  1. 第2行,初始化namespaces為 ['compute', 'central']
  2. 第3行,是用於partition功能的,暫時忽略
  3. 第14~21行,載入資料採集外掛,載入了ceilometer.poll.compute, ceilometer.poll.central, ceilometer.builder.poll.central 這三個namespaces下對應的外掛,具體包含哪些外掛可以去看setup.cfg中的定義
  4. 載入完成之後,self.extensions 是一個list,其中是多個元素,每個元素就是一個外掛物件
(Pdb) p self.extensions[1]
<stevedore.extension.Extension object at 0x7f1cc9ef9310>
(Pdb) p self.extensions[1].__dict__
{'obj': <ceilometer.network.services.fwaas.FirewallPollster object at 0x7f1cc9ef92d0>, 'entry_point': EntryPoint.parse('network.services.firewall = ceilometer.network.services.fwaas:FirewallPollster'), 'name': 'network.services.firewall', 'plugin': <class 'ceilometer.network.services.fwaas.FirewallPollster'>}
  1. 第30行,載入discovery外掛,載入了ceilometer.discover.central,ceilometer.discover.central 這兩個namespaces下對應的外掛。discovery外掛在資料採集的時候會用到,這裡不展開,後續單獨介紹。載入完成後,self.discoveries 和 self.extensions的型別一樣,都是stevedore.extension.Extension 物件。
  2. 第47~50行,初始化self.notifier 物件,這個物件主要是用來和訊息佇列進行互動的,預設訊息佇列使用的是rabbitmq。該物件的作用主要是未來在資料採集完成之後,會呼叫這裡self.notifier的介面將資料傳送到rabbitmq。該物件的主要變數是:
(Pdb) p self.notifier.__dict__
{'_serializer': <oslo_messaging.serializer.NoOpSerializer object at 0x7f1cc9aa3710>, '_driver_mgr': <stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>, 'retry': -1, '_driver_names': ['messagingv2'], '_topics': ['notifications'], 'publisher_id': 'ceilometer.polling', 'transport': <oslo_messaging.transport.NotificationTransport object at 0x7f1cc9b26550>}
(Pdb) p self.notifier._driver_mgr
<stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>
(Pdb) p self.notifier._driver_mgr.__dict__
{'_extensions_by_name_cache': None, '_names': ['messagingv2'], 'namespace': 'oslo.messaging.notify.drivers', '_on_load_failure_callback': None, 'extensions': [<stevedore.extension.Extension object at 0x7f1cc9aa3c10>], 'propagate_map_exceptions': False, '_name_order': False, '_missing_names': set([])}
  • self.notifier.topics = ['notifications']
  • self.notifier._driver_names = 'messagingv2'(定義在ceilometer/publisher/messaging.py 中的 telemetry_driver這個變數),代表的是ceilometer向訊息佇列傳送訊息時使用的驅動型別
  • self.notifier.publisher_id = 'ceilometer.polling'
  • self.notifier._driver_mgr 是從oslo.messaging.notify.drivers 中載入名稱為messagingv2對應的外掛
  • self.notifier.transport 是從oslo.messaging.drivers 中載入名稱為 rabbit對應的外掛。因為ceilometer.conf 中 transport_url定義是:transport_url=rabbit://guest:[email protected]:5672/

4.3. AgentManager服務啟動

4.3.1 AgentManager 服務啟動入口:

程式碼路徑: ceilometer.polling.manager.AgentManager#run

    def run(self):
        super(AgentManager, self).run()
        self.polling_manager = PollingManager(self.conf)
        if self.partition_coordinator:
            self.partition_coordinator.start()
            self.join_partitioning_groups()
        self.start_polling_tasks()

這裡主要是第三行和最後一行,進入程序啟動的具體過程,其中:
第3行,初始化polling_manager
第4行,啟動週期任務

4.3.2 初始化polling_manager

  1. 入口:ceilometer/polling/manager.py:385
self.polling_manager = PollingManager(self.conf)

通過PollingManager+conf檔案初始化polling_manager

  1. 在ceilometer/polling/manager.py:54中定義了polling.cfg_file 的初始化為 polling.yaml,該配置檔案對應的格式如下,其中定義了不同外掛的採集週期,這裡的meters中的名稱需要和extensions中載入的外掛名對應,只有在這裡定義了採集週期的外掛,最終才會週期性排程
{"sources": [{"name": source_1,
                      "interval": interval_time,
                      "meters" : ["meter_1", "meter_2"],
                      "resources": ["resource_uri1", "resource_uri2"],
                     },
                     {"name": source_2,
                      "interval": interval_time,
                      "meters" : ["meter_3"],
                     },
                    ]}
        }
  1. 根據載入的配置檔案,初始化sources,
    入口:ceilometer.polling.manager.PollingSource#init
        super(PollingManager, self).__init__(conf)
        cfg = self.load_config(conf.polling.cfg_file)
        self.sources = []
        if 'sources' not in cfg:
            raise PollingException("sources required", cfg)
        for s in cfg.get('sources'):
            self.sources.append(PollingSource(s))

  1. 按照polling.yaml 中的分組,對sources進行初始化,最終對應的就是self.polling_manager.sources
  2. self.polling_manager.sources中的每一個物件就是一個PollingSource,每一個PollingSource關鍵的引數:
    • meters: 即為上邊polling.yaml 中每一組source中的meter
    • interval:即為polling.yaml 中的interval
(Pdb) p self.polling_manager.sources
[<ceilometer.polling.manager.PollingSource object at 0x7f71a2f48150>]
(Pdb) p self.polling_manager.sources[0].__dict__
{'name': 'some_pollsters', 'cfg': {'interval': 300, 'meters': ['cpu', 'cpu_l3_cache', 'memory.usage', 'network.incoming.bytes', 'network.incoming.packets', 'network.outgoing.bytes', 'network.outgoing.packets', 'disk.device.read.bytes', 'disk.device.read.requests', 'disk.device.write.bytes', 'disk.device.write.requests', 'hardware.cpu.util', 'hardware.memory.used', 'hardware.memory.total', 'hardware.memory.buffer', 'hardware.memory.cached', 'hardware.memory.swap.avail', 'hardware.memory.swap.total', 'hardware.system_stats.io.outgoing.blocks', 'hardware.system_stats.io.incoming.blocks', 'hardware.network.ip.incoming.datagrams', 'hardware.network.ip.outgoing.datagrams'], 'name': 'some_pollsters'}, 'interval': 300, 'meters': ['cpu', 'cpu_l3_cache', 'memory.usage', 'network.incoming.bytes', 'network.incoming.packets', 'network.outgoing.bytes', 'network.outgoing.packets', 'disk.device.read.bytes', 'disk.device.read.requests', 'disk.device.write.bytes', 'disk.device.write.requests', 'hardware.cpu.util', 'hardware.memory.used', 'hardware.memory.total', 'hardware.memory.buffer', 'hardware.memory.cached', 'hardware.memory.swap.avail', 'hardware.memory.swap.total', 'hardware.system_stats.io.outgoing.blocks', 'hardware.system_stats.io.incoming.blocks', 'hardware.network.ip.incoming.datagrams', 'hardware.network.ip.outgoing.datagrams'], 'resources': [], 'discovery': []}

4.3.3 AgentManager服務啟動過程:

啟動入口:ceilometer.polling.manager.AgentManager#start_polling_tasks

  1 def start_polling_tasks(self):
  2     data = self.setup_polling_tasks()
  3
  4     # Don't start useless threads if no task will run
  5     if not data:
  6         return
  7
  8     # One thread per polling tasks is enough
  9     self.polling_periodics = periodics.PeriodicWorker.create(
 10         [], executor_factory=lambda:
 11         futures.ThreadPoolExecutor(max_workers=len(data)))
 12
 13     for interval, polling_task in data.items():
 14
 15         @periodics.periodic(spacing=interval, run_immediately=True)
 16         def task(running_task):
 17             self.interval_task(running_task)
 18
 19         self.polling_periodics.add(task, polling_task)
 20
 21     utils.spawn_thread(self.polling_periodics.start, allow_empty=True)

服務啟動的具體過程主要是如下兩步:

  1. 通過setup_polling_tasks()獲取需要加入週期任務的外掛列表
  2. 按照設定的外掛執行週期進行分組,同一週期的外掛加入到相同的執行佇列中啟動執行

4.3.3.1 獲取週期任務列表

程式碼入口:ceilometer.polling.manager.AgentManager#setup_polling_tasks

  1 def setup_polling_tasks(self):
  2     polling_tasks = {}
  3     for source in self.polling_manager.sources:
  4         for pollster in self.extensions:
  5             if source.support_meter(pollster.name):
  6                 polling_task = polling_tasks.get(source.get_interval())
  7                 if not polling_task:
  8                     polling_task = PollingTask(self)
  9                     polling_tasks[source.get_interval()] = polling_task
 10                 polling_task.add(pollster, source)
 11     return polling_tasks
  1. 遍歷self.polling_manager.sources中的source
  2. 遍歷self.extensions,檢視如果對應外掛在對應source的meters列表中,代表該外掛需要定期執行,獲取該外掛定義的執行週期,將其加入到polling_tasks中
  3. 最終polling_tasks格式如下:
(Pdb) p data
{300: <ceilometer.polling.manager.PollingTask object at 0x7fa6103394d0>}
(Pdb) p data[300]
<ceilometer.polling.manager.PollingTask object at 0x7fa6103394d0>
(Pdb) p data[300].__dict__
{'manager': <ceilometer.polling.manager.AgentManager object at 0x7fa610fb47d0>, '_telemetry_secret': '2e30d044a69343e0', 'pollster_matches': defaultdict(<type 'set'>, {'some_pollsters': set([<stevedore.extension.Extension object at 0x7fa610689850>, <stevedore.extension.Extension object at 0x7fa610697090>, <stevedore.extension.Extension object at 0x7fa61066c0d0>, <stevedore.extension.Extension object at 0x7fa61066c150>, <stevedore.extension.Extension object at 0x7fa610690190>, <stevedore.extension.Extension object at 0x7fa61066c190>, <stevedore.extension.Extension object at 0x7fa610690050>, <stevedore.extension.Extension object at 0x7fa61066ca50>, <stevedore.extension.Extension object at 0x7fa610690290>, <stevedore.extension.Extension object at 0x7fa61066c2d0>, <stevedore.extension.Extension object at 0x7fa610690890>, <stevedore.extension.Extension object at 0x7fa610733bd0>, <stevedore.extension.Extension object at 0x7fa610667c10>, <stevedore.extension.Extension object at 0x7fa610667490>, <stevedore.extension.Extension object at 0x7fa6106970d0>, <stevedore.extension.Extension object at 0x7fa610689550>, <stevedore.extension.Extension object at 0x7fa610667dd0>, <stevedore.extension.Extension object at 0x7fa610689e50>, <stevedore.extension.Extension object at 0x7fa610667e10>, <stevedore.extension.Extension object at 0x7fa610667e50>, <stevedore.extension.Extension object at 0x7fa610667e90>, <stevedore.extension.Extension object at 0x7fa610683790>])}), 'resources': defaultdict(<function <lambda> at 0x7fa610330b90>, {'some_pollsters-cpu_l3_cache': <ceilometer.polling.manager.Resources object at 0x7fa610339110>, 'some_pollsters-disk.device.read.bytes': <ceilometer.polling.manager.Resources object at 0x7fa6103398d0>, 'some_pollsters-hardware.memory.cached': <ceilometer.polling.manager.Resources object at 0x7fa610340310>, 'some_pollsters-disk.device.write.requests': <ceilometer.polling.manager.Resources object at 0x7fa6103401d0>, 'some_pollsters-hardware.cpu.util': <ceilometer.polling.manager.Resources object at 0x7fa6103404d0>, 'some_pollsters-memory.usage': <ceilometer.polling.manager.Resources object at 0x7fa610339790>, 'some_pollsters-hardware.system_stats.io.outgoing.blocks': <ceilometer.polling.manager.Resources object at 0x7fa610340410>, 'some_pollsters-hardware.memory.buffer': <ceilometer.polling.manager.Resources object at 0x7fa610340150>, 'some_pollsters-hardware.memory.swap.total': <ceilometer.polling.manager.Resources object at 0x7fa610340490>, 'some_pollsters-network.incoming.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610339c10>, 'some_pollsters-cpu': <ceilometer.polling.manager.Resources object at 0x7fa610340190>, 'some_pollsters-disk.device.write.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610340290>, 'some_pollsters-hardware.network.ip.outgoing.datagrams': <ceilometer.polling.manager.Resources object at 0x7fa610340210>, 'some_pollsters-disk.device.read.requests': <ceilometer.polling.manager.Resources object at 0x7fa610339a10>, 'some_pollsters-hardware.memory.swap.avail': <ceilometer.polling.manager.Resources object at 0x7fa610340090>, 'some_pollsters-network.incoming.packets': <ceilometer.polling.manager.Resources object at 0x7fa610339d10>, 'some_pollsters-hardware.system_stats.io.incoming.blocks': <ceilometer.polling.manager.Resources object at 0x7fa610340590>, 'some_pollsters-network.outgoing.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610339a50>, 'some_pollsters-hardware.network.ip.incoming.datagrams': <ceilometer.polling.manager.Resources object at 0x7fa610340350>, 'some_pollsters-hardware.memory.total': <ceilometer.polling.manager.Resources object at 0x7fa610340250>, 'some_pollsters-network.outgoing.packets': <ceilometer.polling.manager.Resources object at 0x7fa6103400d0>, 'some_pollsters-hardware.memory.used': <ceilometer.polling.manager.Resources object at 0x7fa6103403d0>}), '_batch': True}

4.3.3.2 啟動週期性任務

程式碼入口: ceilometer.polling.manager.AgentManager#start_polling_tasks

  1 def start_polling_tasks(self):
  2     data = self.setup_polling_tasks()
  3
  4     # Don't start useless threads if no task will run
  5     if not data:
  6         return
  7
  8     # One thread per polling tasks is enough
  9     self.polling_periodics = periodics.PeriodicWorker.create(
 10         [], executor_factory=lambda:
 11         futures.ThreadPoolExecutor(max_workers=len(data)))
 12
 13     for interval, polling_task in data.items():
 14
 15         @periodics.periodic(spacing=interval, run_immediately=True)
 16         def task(running_task):
 17             self.interval_task(running_task)
 18
 19         self.polling_periodics.add(task, polling_task)
 20
 21     utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
  1. 第9~11行,根據data中任務組的個數,建立N個執行緒池大小
  2. 第13~19行,遍歷data中的任務,呼叫將其加入到執行緒池中的執行緒排程任務中,主要在於self.interval_task,在self.interval_task 中實際呼叫的是task.poll_and_notify, 這裡的task就是上邊的 ceilometer.polling.manager.PollingTask 物件.
  3. 第21行,啟動執行緒的週期排程,最終程序框架中的多個執行緒就週期性的呼叫對應執行緒繫結的PollingTask 中的 poll_and_notify 方法,排程外掛週期性採集資料

4.3.3.3 週期性採集任務

程式碼入口:ceilometer.polling.manager.PollingTask#poll_and_notify

  1 def poll_and_notify(self):
  2     """Polling sample and notify."""
  3     cache = {}
  4     discovery_cache = {}
  5     poll_history = {}
  6     for source_name, pollsters in iter_random(
  7             self.pollster_matches.items()):
  8         for pollster in iter_random(pollsters):
  9             key = Resources.key(source_name, pollster)
 10             candidate_res = list(
 11                 self.resources[key].get(discovery_cache))
 12             if not candidate_res and pollster.obj.default_discovery:
 13                 candidate_res = self.manager.discover(
 14                     [pollster.obj.default_discovery], discovery_cache)
 15
 16             # Remove duplicated resources and black resources. Using
 17             # set() requires well defined __hash__ for each resource.
 18             # Since __eq__ is defined, 'not in' is safe here.
 19             polling_resources = []
 20             black_res = self.resources[key].blacklist
 21             history = poll_history.get(pollster.name, [])
 22             for x in candidate_res:
 23                 if x not in history:
 24                     history.append(x)
 25                     if x not in black_res:
 26                         polling_resources.append(x)
 27             poll_history[pollster.name] = history
 28
 29             # If no resources, skip for this pollster
 30             if not polling_resources:
 31                 p_context = 'new ' if history else ''
 32                 LOG.debug("Skip pollster %(name)s, no %(p_context)s"
 33                           "resources found this cycle",
 34                           {'name': pollster.name, 'p_context': p_context})
 35                 continue
 36
 37             LOG.info("Polling pollster %(poll)s in the context of "
 38                      "%(src)s",
 39                      dict(poll=pollster.name, src=source_name))
 40             try:
 41                 polling_timestamp = timeutils.utcnow().isoformat()
 42                 samples = pollster.obj.get_samples(
 43                     manager=self.manager,
 44                     cache=cache,
 45                     resources=polling_resources
 46                 )
47                 sample_batch = []
 48
 49                 for sample in samples:
 50                     # Note(yuywz): Unify the timestamp of polled samples
 51                     sample.set_timestamp(polling_timestamp)
 52                     sample_dict = (
 53                         publisher_utils.meter_message_from_counter(
 54                             sample, self._telemetry_secret
 55                         ))
 56                     if self._batch:
 57                         sample_batch.append(sample_dict)
 58                     else:
 59                         self._send_notification([sample_dict])
 60
 61                 if sample_batch:
 62                     self._send_notification(sample_batch)
 63
 64             except plugin_base.PollsterPermanentError as err:
 65                 LOG.error(
 66                     'Prevent pollster %(name)s from '
 67                     'polling %(res_list)s on source %(source)s anymore!',
 68                     dict(name=pollster.name,
 69                          res_list=str(err.fail_res_list),
 70                          source=source_name))
 71                 self.resources[key].blacklist.extend(err.fail_res_list)
 72             except Exception as err:
 73                 LOG.error(
 74                     'Continue after error from %(name)s: %(error)s'
 75                     % ({'name': pollster.name, 'error': err}),
 76                     exc_info=True)
  1. 第6~7 行,對上文的data中的pollster_matches進行遍歷,其格式如下:
(Pdb) p data[300].pollster_matches
defaultdict(<type 'set'>, {'some_pollsters': set([<stevedore.extension.Extension object at 0x7fa610689850>, <stevedore.extension.Extension object at 0x7fa610697090>, <stevedore.extension.Extension object at 0x7fa61066c0d0>, <stevedore.extension.Extension object at 0x7fa61066c150>, <stevedore.extension.Extension object at 0x7fa610690190>, <stevedore.extension.Extension object at 0x7fa61066c190>, <stevedore.extension.Extension object at 0x7fa610690050>, <stevedore.extension.Extension object at 0x7fa61066ca50>, <stevedore.extension.Extension object at 0x7fa610690290>, <stevedore.extension.Extension object at 0x7fa61066c2d0>, <stevedore.extension.Extension object at 0x7fa610690890>, <stevedore.extension.Extension object at 0x7fa610733bd0>, <stevedore.extension.Extension object at 0x7fa610667c10>, <stevedore.extension.Extension object at 0x7fa610667490>, <stevedore.extension.Extension object at 0x7fa6106970d0>, <stevedore.extension.Extension object at 0x7fa610689550>, <stevedore.extension.Extension object at 0x7fa610667dd0>, <stevedore.extension.Extension object at 0x7fa610689e50>, <stevedore.extension.Extension object at 0x7fa610667e10>, <stevedore.extension.Extension object at 0x7fa610667e50>, <stevedore.extension.Extension object at 0x7fa610667e90>, <stevedore.extension.Extension object at 0x7fa610683790>])})

可以看到,第6行的source_name 即為polling.yaml 中的每一組的sourcename,pollsters為對應組中meters所對應的extensions物件的集合

  1. 第8行開始對所有的pollsters進行遍歷,排程外掛採集資料
  2. 每一個外掛採集資料的時候有1個必備元素為:
    • 該外掛採集的資料所對應的物件(resources),比如虛擬機器列表,該列表是通過對應外掛定義的discovery方法獲取
  3. 第9~39行,獲取該外掛所對應的resources,即資料採集物件列表
  4. 第42~46行,呼叫該外掛的get_samples方法,獲取對應的採集資料(sample)
  5. 第49~62行,呼叫指定的notifier方法將資料傳送出去

5. 總結

至此,Compute Agent整體啟動過程,外掛排程過程就完成了。下一篇文章再對單個外掛的排程過程進行分析說明。