Ceilometer Compute Agent 原理和程式碼分析
本部落格所有文章採用的授權方式為 自由轉載-非商用-非衍生-保持署名 ,轉載請務必註明出處,謝謝。
宣告:
本部落格歡迎轉發,但請註明出處,保留原作者資訊
部落格地址:孟阿龍的部落格
所有內容為本人學習、研究、總結。如有雷同,實屬榮幸
注:
- 本文以Openstack Q版本為準分析
- 本文內容較長,通過本文,你可以從程式碼層面瞭解到polling-agent的整個啟動執行過程,其中也包含了部分程式碼的細節分析
目錄
1. 背景:
前邊我們說過,Ceilometer中的agent主要是兩個,Polling Agent 和 Notification Agent,而Polling Agent中,主要又包含兩個:Compute Agent + Central Agent, 本文就來分析Polling Agent的程式碼基本原理。通過對程式碼的分析,來進一步瞭解Polling Agent的工作機制。
2. 服務啟動命令
本文測試環境是通過packstack在虛擬機器環境中安裝的一套 all-in-one Openstack環境。
Polling Agent的啟動命令如下:
/usr/bin/ceilometer-polling --logfile /var/log/ceilometer/polling.log
3. 程序啟動基本流程
啟動程式碼的入口在
ceilometer.cmd.polling.main
def main(): conf = cfg.ConfigOpts() conf.register_cli_opts(CLI_OPTS) service.prepare_service(conf=conf) sm = cotyledon.ServiceManager() sm.add(create_polling_service, args=(conf,)) oslo_config_glue.setup(sm, conf) sm.run()
從這裡可以看到,整體的基本過程為:
- 初始化配置檔案,註冊預設的namespaces配置項
- 呼叫service.prepare_service方法,初始化基礎配置項,主要包含:keystone認證、日誌配置項、配置檔案等
- 啟動程序框架
- 通過sm.add(create_polling_service,args=(conf,))初始化程序例項AgentManager
- 通過sm.run() 啟動資料採集程序
以上,這裡邊最主要的在於2,4,5這三步,接下來分別分析。
4. 程式碼分析
4.1 配置初始化:
配置初始化的過程主要在service.prepare_service()這個方法中,這裡我們只簡單說明一下基礎配置項如何進行初始化
在ceilometer.service.prepare_service函式中:
def prepare_service(argv=None, config_files=None, conf=None):
if argv is None:
argv = sys.argv
if conf is None:
conf = cfg.ConfigOpts()
...
for group, options in opts.list_opts():
conf.register_opts(list(options),
group=None if group == "DEFAULT" else group)
...
conf(argv[1:], project='ceilometer', validate_default_values=True,
version=version.version_info.version_string(),
default_config_files=config_files)
這裡基本的過程為:
- 初始化一個oslo_config的例項conf
- 對ceilometer.opts.list_opts中定義的各類引數進行遍歷初始化預設值(具體初始化的方法,這裡不做贅述
- 最終的效果是在conf這個例項中,獲取list_opts所列出來的各類引數項以及他們的預設值初始化到conf中
- 呼叫conf(argv[1:],project='ceilometer',......) 方法,用argv[1]中指定配置檔案的配置項對conf中的預設值進行覆蓋完成配置項初始化
4.2. AgentManager初始化
4.2.1 AgentManager初始化入口:
程式碼路徑:ceilometer.cmd.polling.create_polling_service
def create_polling_service(worker_id, conf):
return manager.AgentManager(worker_id,
conf,
conf.polling_namespaces)
這個函式中,呼叫AgentManager進行初始化,傳入三個引數:
worker_id: 這個引數在哪裡初始化的還沒搞清楚
conf:即上一步初始化的配置物件
conf.polling_namespaces: 使用預設的配置,即:['compute', 'central']
4.2.2 AgentManager初始化過程
程式碼路徑:ceilometer.polling.manager.AgentManager#init
1 def __init__(self, worker_id, conf, namespaces=None):
2 namespaces = namespaces or ['compute', 'central']
3 group_prefix = conf.polling.partitioning_group_prefix
4
5 super(AgentManager, self).__init__(worker_id)
6
7 self.conf = conf
8
9 if type(namespaces) is not list:
10 namespaces = [namespaces]
11
12 # we'll have default ['compute', 'central'] here if no namespaces will
13 # be passed
14 extensions = (self._extensions('poll', namespace, self.conf).extensions
15 for namespace in namespaces)
16 # get the extensions from pollster builder
17 extensions_fb = (self._extensions_from_builder('poll', namespace)
18 for namespace in namespaces)
19
20 self.extensions = list(itertools.chain(*list(extensions))) + list(
21 itertools.chain(*list(extensions_fb)))
22
23 if not self.extensions:
24 LOG.warning('No valid pollsters can be loaded from %s '
25 'namespaces', namespaces)
26
27 discoveries = (self._extensions('discover', namespace,
28 self.conf).extensions
29 for namespace in namespaces)
30 self.discoveries = list(itertools.chain(*list(discoveries)))
31 self.polling_periodics = None
32
33 self.hashrings = None
34 self.partition_coordinator = None
35 if self.conf.coordination.backend_url:
36 # XXX uuid4().bytes ought to work, but it requires ascii for now
37 coordination_id = str(uuid.uuid4()).encode('ascii')
38 self.partition_coordinator = coordination.get_coordinator(
39 self.conf.coordination.backend_url, coordination_id)
40
41 # Compose coordination group prefix.
42 # We'll use namespaces as the basement for this partitioning.
43 namespace_prefix = '-'.join(sorted(namespaces))
44 self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
45 if group_prefix else namespace_prefix)
46
47 self.notifier = oslo_messaging.Notifier(
48 messaging.get_transport(self.conf),
49 driver=self.conf.publisher_notifier.telemetry_driver,
50 publisher_id="ceilometer.polling")
51
52 self._keystone = None
53 self._keystone_last_exception = None
- 第2行,初始化namespaces為 ['compute', 'central']
- 第3行,是用於partition功能的,暫時忽略
- 第14~21行,載入資料採集外掛,載入了ceilometer.poll.compute, ceilometer.poll.central, ceilometer.builder.poll.central 這三個namespaces下對應的外掛,具體包含哪些外掛可以去看setup.cfg中的定義
- 載入完成之後,self.extensions 是一個list,其中是多個元素,每個元素就是一個外掛物件
(Pdb) p self.extensions[1]
<stevedore.extension.Extension object at 0x7f1cc9ef9310>
(Pdb) p self.extensions[1].__dict__
{'obj': <ceilometer.network.services.fwaas.FirewallPollster object at 0x7f1cc9ef92d0>, 'entry_point': EntryPoint.parse('network.services.firewall = ceilometer.network.services.fwaas:FirewallPollster'), 'name': 'network.services.firewall', 'plugin': <class 'ceilometer.network.services.fwaas.FirewallPollster'>}
- 第30行,載入discovery外掛,載入了ceilometer.discover.central,ceilometer.discover.central 這兩個namespaces下對應的外掛。discovery外掛在資料採集的時候會用到,這裡不展開,後續單獨介紹。載入完成後,self.discoveries 和 self.extensions的型別一樣,都是stevedore.extension.Extension 物件。
- 第47~50行,初始化self.notifier 物件,這個物件主要是用來和訊息佇列進行互動的,預設訊息佇列使用的是rabbitmq。該物件的作用主要是未來在資料採集完成之後,會呼叫這裡self.notifier的介面將資料傳送到rabbitmq。該物件的主要變數是:
(Pdb) p self.notifier.__dict__
{'_serializer': <oslo_messaging.serializer.NoOpSerializer object at 0x7f1cc9aa3710>, '_driver_mgr': <stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>, 'retry': -1, '_driver_names': ['messagingv2'], '_topics': ['notifications'], 'publisher_id': 'ceilometer.polling', 'transport': <oslo_messaging.transport.NotificationTransport object at 0x7f1cc9b26550>}
(Pdb) p self.notifier._driver_mgr
<stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>
(Pdb) p self.notifier._driver_mgr.__dict__
{'_extensions_by_name_cache': None, '_names': ['messagingv2'], 'namespace': 'oslo.messaging.notify.drivers', '_on_load_failure_callback': None, 'extensions': [<stevedore.extension.Extension object at 0x7f1cc9aa3c10>], 'propagate_map_exceptions': False, '_name_order': False, '_missing_names': set([])}
- self.notifier.topics = ['notifications']
- self.notifier._driver_names = 'messagingv2'(定義在ceilometer/publisher/messaging.py 中的 telemetry_driver這個變數),代表的是ceilometer向訊息佇列傳送訊息時使用的驅動型別
- self.notifier.publisher_id = 'ceilometer.polling'
- self.notifier._driver_mgr 是從oslo.messaging.notify.drivers 中載入名稱為messagingv2對應的外掛
- self.notifier.transport 是從oslo.messaging.drivers 中載入名稱為 rabbit對應的外掛。因為ceilometer.conf 中 transport_url定義是:transport_url=rabbit://guest:[email protected]:5672/
4.3. AgentManager服務啟動
4.3.1 AgentManager 服務啟動入口:
程式碼路徑: ceilometer.polling.manager.AgentManager#run
def run(self):
super(AgentManager, self).run()
self.polling_manager = PollingManager(self.conf)
if self.partition_coordinator:
self.partition_coordinator.start()
self.join_partitioning_groups()
self.start_polling_tasks()
這裡主要是第三行和最後一行,進入程序啟動的具體過程,其中:
第3行,初始化polling_manager
第4行,啟動週期任務
4.3.2 初始化polling_manager
- 入口:ceilometer/polling/manager.py:385
self.polling_manager = PollingManager(self.conf)
通過PollingManager+conf檔案初始化polling_manager
- 在ceilometer/polling/manager.py:54中定義了polling.cfg_file 的初始化為 polling.yaml,該配置檔案對應的格式如下,其中定義了不同外掛的採集週期,這裡的meters中的名稱需要和extensions中載入的外掛名對應,只有在這裡定義了採集週期的外掛,最終才會週期性排程
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
},
]}
}
- 根據載入的配置檔案,初始化sources,
入口:ceilometer.polling.manager.PollingSource#init
super(PollingManager, self).__init__(conf)
cfg = self.load_config(conf.polling.cfg_file)
self.sources = []
if 'sources' not in cfg:
raise PollingException("sources required", cfg)
for s in cfg.get('sources'):
self.sources.append(PollingSource(s))
- 按照polling.yaml 中的分組,對sources進行初始化,最終對應的就是self.polling_manager.sources
- self.polling_manager.sources中的每一個物件就是一個PollingSource,每一個PollingSource關鍵的引數:
- meters: 即為上邊polling.yaml 中每一組source中的meter
- interval:即為polling.yaml 中的interval
(Pdb) p self.polling_manager.sources
[<ceilometer.polling.manager.PollingSource object at 0x7f71a2f48150>]
(Pdb) p self.polling_manager.sources[0].__dict__
{'name': 'some_pollsters', 'cfg': {'interval': 300, 'meters': ['cpu', 'cpu_l3_cache', 'memory.usage', 'network.incoming.bytes', 'network.incoming.packets', 'network.outgoing.bytes', 'network.outgoing.packets', 'disk.device.read.bytes', 'disk.device.read.requests', 'disk.device.write.bytes', 'disk.device.write.requests', 'hardware.cpu.util', 'hardware.memory.used', 'hardware.memory.total', 'hardware.memory.buffer', 'hardware.memory.cached', 'hardware.memory.swap.avail', 'hardware.memory.swap.total', 'hardware.system_stats.io.outgoing.blocks', 'hardware.system_stats.io.incoming.blocks', 'hardware.network.ip.incoming.datagrams', 'hardware.network.ip.outgoing.datagrams'], 'name': 'some_pollsters'}, 'interval': 300, 'meters': ['cpu', 'cpu_l3_cache', 'memory.usage', 'network.incoming.bytes', 'network.incoming.packets', 'network.outgoing.bytes', 'network.outgoing.packets', 'disk.device.read.bytes', 'disk.device.read.requests', 'disk.device.write.bytes', 'disk.device.write.requests', 'hardware.cpu.util', 'hardware.memory.used', 'hardware.memory.total', 'hardware.memory.buffer', 'hardware.memory.cached', 'hardware.memory.swap.avail', 'hardware.memory.swap.total', 'hardware.system_stats.io.outgoing.blocks', 'hardware.system_stats.io.incoming.blocks', 'hardware.network.ip.incoming.datagrams', 'hardware.network.ip.outgoing.datagrams'], 'resources': [], 'discovery': []}
4.3.3 AgentManager服務啟動過程:
啟動入口:ceilometer.polling.manager.AgentManager#start_polling_tasks
1 def start_polling_tasks(self):
2 data = self.setup_polling_tasks()
3
4 # Don't start useless threads if no task will run
5 if not data:
6 return
7
8 # One thread per polling tasks is enough
9 self.polling_periodics = periodics.PeriodicWorker.create(
10 [], executor_factory=lambda:
11 futures.ThreadPoolExecutor(max_workers=len(data)))
12
13 for interval, polling_task in data.items():
14
15 @periodics.periodic(spacing=interval, run_immediately=True)
16 def task(running_task):
17 self.interval_task(running_task)
18
19 self.polling_periodics.add(task, polling_task)
20
21 utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
服務啟動的具體過程主要是如下兩步:
- 通過setup_polling_tasks()獲取需要加入週期任務的外掛列表
- 按照設定的外掛執行週期進行分組,同一週期的外掛加入到相同的執行佇列中啟動執行
4.3.3.1 獲取週期任務列表
程式碼入口:ceilometer.polling.manager.AgentManager#setup_polling_tasks
1 def setup_polling_tasks(self):
2 polling_tasks = {}
3 for source in self.polling_manager.sources:
4 for pollster in self.extensions:
5 if source.support_meter(pollster.name):
6 polling_task = polling_tasks.get(source.get_interval())
7 if not polling_task:
8 polling_task = PollingTask(self)
9 polling_tasks[source.get_interval()] = polling_task
10 polling_task.add(pollster, source)
11 return polling_tasks
- 遍歷self.polling_manager.sources中的source
- 遍歷self.extensions,檢視如果對應外掛在對應source的meters列表中,代表該外掛需要定期執行,獲取該外掛定義的執行週期,將其加入到polling_tasks中
- 最終polling_tasks格式如下:
(Pdb) p data
{300: <ceilometer.polling.manager.PollingTask object at 0x7fa6103394d0>}
(Pdb) p data[300]
<ceilometer.polling.manager.PollingTask object at 0x7fa6103394d0>
(Pdb) p data[300].__dict__
{'manager': <ceilometer.polling.manager.AgentManager object at 0x7fa610fb47d0>, '_telemetry_secret': '2e30d044a69343e0', 'pollster_matches': defaultdict(<type 'set'>, {'some_pollsters': set([<stevedore.extension.Extension object at 0x7fa610689850>, <stevedore.extension.Extension object at 0x7fa610697090>, <stevedore.extension.Extension object at 0x7fa61066c0d0>, <stevedore.extension.Extension object at 0x7fa61066c150>, <stevedore.extension.Extension object at 0x7fa610690190>, <stevedore.extension.Extension object at 0x7fa61066c190>, <stevedore.extension.Extension object at 0x7fa610690050>, <stevedore.extension.Extension object at 0x7fa61066ca50>, <stevedore.extension.Extension object at 0x7fa610690290>, <stevedore.extension.Extension object at 0x7fa61066c2d0>, <stevedore.extension.Extension object at 0x7fa610690890>, <stevedore.extension.Extension object at 0x7fa610733bd0>, <stevedore.extension.Extension object at 0x7fa610667c10>, <stevedore.extension.Extension object at 0x7fa610667490>, <stevedore.extension.Extension object at 0x7fa6106970d0>, <stevedore.extension.Extension object at 0x7fa610689550>, <stevedore.extension.Extension object at 0x7fa610667dd0>, <stevedore.extension.Extension object at 0x7fa610689e50>, <stevedore.extension.Extension object at 0x7fa610667e10>, <stevedore.extension.Extension object at 0x7fa610667e50>, <stevedore.extension.Extension object at 0x7fa610667e90>, <stevedore.extension.Extension object at 0x7fa610683790>])}), 'resources': defaultdict(<function <lambda> at 0x7fa610330b90>, {'some_pollsters-cpu_l3_cache': <ceilometer.polling.manager.Resources object at 0x7fa610339110>, 'some_pollsters-disk.device.read.bytes': <ceilometer.polling.manager.Resources object at 0x7fa6103398d0>, 'some_pollsters-hardware.memory.cached': <ceilometer.polling.manager.Resources object at 0x7fa610340310>, 'some_pollsters-disk.device.write.requests': <ceilometer.polling.manager.Resources object at 0x7fa6103401d0>, 'some_pollsters-hardware.cpu.util': <ceilometer.polling.manager.Resources object at 0x7fa6103404d0>, 'some_pollsters-memory.usage': <ceilometer.polling.manager.Resources object at 0x7fa610339790>, 'some_pollsters-hardware.system_stats.io.outgoing.blocks': <ceilometer.polling.manager.Resources object at 0x7fa610340410>, 'some_pollsters-hardware.memory.buffer': <ceilometer.polling.manager.Resources object at 0x7fa610340150>, 'some_pollsters-hardware.memory.swap.total': <ceilometer.polling.manager.Resources object at 0x7fa610340490>, 'some_pollsters-network.incoming.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610339c10>, 'some_pollsters-cpu': <ceilometer.polling.manager.Resources object at 0x7fa610340190>, 'some_pollsters-disk.device.write.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610340290>, 'some_pollsters-hardware.network.ip.outgoing.datagrams': <ceilometer.polling.manager.Resources object at 0x7fa610340210>, 'some_pollsters-disk.device.read.requests': <ceilometer.polling.manager.Resources object at 0x7fa610339a10>, 'some_pollsters-hardware.memory.swap.avail': <ceilometer.polling.manager.Resources object at 0x7fa610340090>, 'some_pollsters-network.incoming.packets': <ceilometer.polling.manager.Resources object at 0x7fa610339d10>, 'some_pollsters-hardware.system_stats.io.incoming.blocks': <ceilometer.polling.manager.Resources object at 0x7fa610340590>, 'some_pollsters-network.outgoing.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610339a50>, 'some_pollsters-hardware.network.ip.incoming.datagrams': <ceilometer.polling.manager.Resources object at 0x7fa610340350>, 'some_pollsters-hardware.memory.total': <ceilometer.polling.manager.Resources object at 0x7fa610340250>, 'some_pollsters-network.outgoing.packets': <ceilometer.polling.manager.Resources object at 0x7fa6103400d0>, 'some_pollsters-hardware.memory.used': <ceilometer.polling.manager.Resources object at 0x7fa6103403d0>}), '_batch': True}
4.3.3.2 啟動週期性任務
程式碼入口: ceilometer.polling.manager.AgentManager#start_polling_tasks
1 def start_polling_tasks(self):
2 data = self.setup_polling_tasks()
3
4 # Don't start useless threads if no task will run
5 if not data:
6 return
7
8 # One thread per polling tasks is enough
9 self.polling_periodics = periodics.PeriodicWorker.create(
10 [], executor_factory=lambda:
11 futures.ThreadPoolExecutor(max_workers=len(data)))
12
13 for interval, polling_task in data.items():
14
15 @periodics.periodic(spacing=interval, run_immediately=True)
16 def task(running_task):
17 self.interval_task(running_task)
18
19 self.polling_periodics.add(task, polling_task)
20
21 utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
- 第9~11行,根據data中任務組的個數,建立N個執行緒池大小
- 第13~19行,遍歷data中的任務,呼叫將其加入到執行緒池中的執行緒排程任務中,主要在於self.interval_task,在self.interval_task 中實際呼叫的是task.poll_and_notify, 這裡的task就是上邊的 ceilometer.polling.manager.PollingTask 物件.
- 第21行,啟動執行緒的週期排程,最終程序框架中的多個執行緒就週期性的呼叫對應執行緒繫結的PollingTask 中的 poll_and_notify 方法,排程外掛週期性採集資料
4.3.3.3 週期性採集任務
程式碼入口:ceilometer.polling.manager.PollingTask#poll_and_notify
1 def poll_and_notify(self):
2 """Polling sample and notify."""
3 cache = {}
4 discovery_cache = {}
5 poll_history = {}
6 for source_name, pollsters in iter_random(
7 self.pollster_matches.items()):
8 for pollster in iter_random(pollsters):
9 key = Resources.key(source_name, pollster)
10 candidate_res = list(
11 self.resources[key].get(discovery_cache))
12 if not candidate_res and pollster.obj.default_discovery:
13 candidate_res = self.manager.discover(
14 [pollster.obj.default_discovery], discovery_cache)
15
16 # Remove duplicated resources and black resources. Using
17 # set() requires well defined __hash__ for each resource.
18 # Since __eq__ is defined, 'not in' is safe here.
19 polling_resources = []
20 black_res = self.resources[key].blacklist
21 history = poll_history.get(pollster.name, [])
22 for x in candidate_res:
23 if x not in history:
24 history.append(x)
25 if x not in black_res:
26 polling_resources.append(x)
27 poll_history[pollster.name] = history
28
29 # If no resources, skip for this pollster
30 if not polling_resources:
31 p_context = 'new ' if history else ''
32 LOG.debug("Skip pollster %(name)s, no %(p_context)s"
33 "resources found this cycle",
34 {'name': pollster.name, 'p_context': p_context})
35 continue
36
37 LOG.info("Polling pollster %(poll)s in the context of "
38 "%(src)s",
39 dict(poll=pollster.name, src=source_name))
40 try:
41 polling_timestamp = timeutils.utcnow().isoformat()
42 samples = pollster.obj.get_samples(
43 manager=self.manager,
44 cache=cache,
45 resources=polling_resources
46 )
47 sample_batch = []
48
49 for sample in samples:
50 # Note(yuywz): Unify the timestamp of polled samples
51 sample.set_timestamp(polling_timestamp)
52 sample_dict = (
53 publisher_utils.meter_message_from_counter(
54 sample, self._telemetry_secret
55 ))
56 if self._batch:
57 sample_batch.append(sample_dict)
58 else:
59 self._send_notification([sample_dict])
60
61 if sample_batch:
62 self._send_notification(sample_batch)
63
64 except plugin_base.PollsterPermanentError as err:
65 LOG.error(
66 'Prevent pollster %(name)s from '
67 'polling %(res_list)s on source %(source)s anymore!',
68 dict(name=pollster.name,
69 res_list=str(err.fail_res_list),
70 source=source_name))
71 self.resources[key].blacklist.extend(err.fail_res_list)
72 except Exception as err:
73 LOG.error(
74 'Continue after error from %(name)s: %(error)s'
75 % ({'name': pollster.name, 'error': err}),
76 exc_info=True)
- 第6~7 行,對上文的data中的pollster_matches進行遍歷,其格式如下:
(Pdb) p data[300].pollster_matches
defaultdict(<type 'set'>, {'some_pollsters': set([<stevedore.extension.Extension object at 0x7fa610689850>, <stevedore.extension.Extension object at 0x7fa610697090>, <stevedore.extension.Extension object at 0x7fa61066c0d0>, <stevedore.extension.Extension object at 0x7fa61066c150>, <stevedore.extension.Extension object at 0x7fa610690190>, <stevedore.extension.Extension object at 0x7fa61066c190>, <stevedore.extension.Extension object at 0x7fa610690050>, <stevedore.extension.Extension object at 0x7fa61066ca50>, <stevedore.extension.Extension object at 0x7fa610690290>, <stevedore.extension.Extension object at 0x7fa61066c2d0>, <stevedore.extension.Extension object at 0x7fa610690890>, <stevedore.extension.Extension object at 0x7fa610733bd0>, <stevedore.extension.Extension object at 0x7fa610667c10>, <stevedore.extension.Extension object at 0x7fa610667490>, <stevedore.extension.Extension object at 0x7fa6106970d0>, <stevedore.extension.Extension object at 0x7fa610689550>, <stevedore.extension.Extension object at 0x7fa610667dd0>, <stevedore.extension.Extension object at 0x7fa610689e50>, <stevedore.extension.Extension object at 0x7fa610667e10>, <stevedore.extension.Extension object at 0x7fa610667e50>, <stevedore.extension.Extension object at 0x7fa610667e90>, <stevedore.extension.Extension object at 0x7fa610683790>])})
可以看到,第6行的source_name 即為polling.yaml 中的每一組的sourcename,pollsters為對應組中meters所對應的extensions物件的集合
- 第8行開始對所有的pollsters進行遍歷,排程外掛採集資料
- 每一個外掛採集資料的時候有1個必備元素為:
- 該外掛採集的資料所對應的物件(resources),比如虛擬機器列表,該列表是通過對應外掛定義的discovery方法獲取
- 第9~39行,獲取該外掛所對應的resources,即資料採集物件列表
- 第42~46行,呼叫該外掛的get_samples方法,獲取對應的採集資料(sample)
- 第49~62行,呼叫指定的notifier方法將資料傳送出去
5. 總結
至此,Compute Agent整體啟動過程,外掛排程過程就完成了。下一篇文章再對單個外掛的排程過程進行分析說明。