1. 程式人生 > >Ceilometer外掛採集資料原理及過程

Ceilometer外掛採集資料原理及過程

本部落格所有文章採用的授權方式為 自由轉載-非商用-非衍生-保持署名 ,轉載請務必註明出處,謝謝。

宣告:
本部落格歡迎轉發,但請註明出處,保留原作者資訊
部落格地址:孟阿龍的部落格
所有內容為本人學習、研究、總結。如有雷同,實屬榮幸


注:

  1. 本文以Openstack Q版本為準分析

目錄

 

1. 概述

2. 相關定義

3. 程式碼邏輯

3.1 獲取resources列表

3.2 AgentManager中discover方法的實現:

3.3 呼叫memory.resident外掛的get_sample方法

3.4 外掛memory.resident的get_sample方法的實現:

3.5 外掛memory.resident的inspect方法

3.6 通過本外掛的insperctor的inspect_method對應的方法獲取指定虛擬機器的監控資料

3.7 資料入庫

3.7.1 呼叫agent的notifier的sample方法,將資料傳送到訊息佇列

3.7.2 notifier的sample方法:

3.8 資料最終入庫:


1. 概述

前邊已經對ceilometer polling-agent 的整體框架進行了分析,本文將以memory.resident這個外掛為例,分析單個外掛採集資料的過程。

2. 相關定義

  1. memory.resident 外掛定義如下:
memory.resident = ceilometer.compute.pollsters.instance_stats:MemoryResidentPollster
  1. polling.yaml 中定義如下,代表該外掛為300s執行一次
---
sources:
    - name: some_pollsters
      interval: 300
      meters:
        - memory.resident

3. 程式碼邏輯

3.1 獲取resources列表

程式碼入口:ceilometer.polling.manager.PollingTask#poll_and_notify

  1 for source_name, pollsters in iter_random(
  2         self.pollster_matches.items()):
  3     for pollster in iter_random(pollsters):
  4         key = Resources.key(source_name, pollster)
  5         candidate_res = list(
  6             self.resources[key].get(discovery_cache))
  7         if not candidate_res and pollster.obj.default_discovery:
  8             candidate_res = self.manager.discover(
  9                 [pollster.obj.default_discovery], discovery_cache)
  1. 在poll_and_notify函式的如上幾行中,就是獲取resources的過程
  2. 第4~6行,都是在cache中獲取資料,假設沒有cache,首次初始化的情況,就直接第8行
  3. 第8行,pollster.obj.default_discovery 是memory.resident這個外掛物件中的default_discovery屬性
    ceilometer.compute.pollsters.GenericComputePollster#default_discovery
    @property
    def default_discovery(self):
        return 'local_instances'
  1. 第8行這裡呼叫self.manager.discover傳入的引數為:
    • ['local_instance']
    • {}
  2. 進而去看self.manager.discover方法

3.2 AgentManager中discover方法的實現:

入口:ceilometer.polling.manager.AgentManager#discover

  1 discover(self, discovery=None, discovery_cache=None):
  2 resources = []
  3 discovery = discovery or []
  4 for url in discovery:
  5     if discovery_cache is not None and url in discovery_cache:
  6         resources.extend(discovery_cache[url])
  7         continue
  8     name, param = self._parse_discoverer(url)
  9     discoverer = self._discoverer(name)
 10     if discoverer:
 11         try:
 12             if discoverer.KEYSTONE_REQUIRED_FOR_SERVICE:
 13                 service_type = getattr(
 14                     self.conf.service_types,
 15                     discoverer.KEYSTONE_REQUIRED_FOR_SERVICE)
 16                 if not keystone_client.get_service_catalog(
 17                         self.keystone).get_endpoints(
 18                             service_type=service_type):
 19                     LOG.warning(
 20                         'Skipping %(name)s, %(service_type)s service '
 21                         'is not registered in keystone',
 22                         {'name': name, 'service_type': service_type})
 23                     continue
 24
 25             discovered = discoverer.discover(self, param)
 26
 27             if self.partition_coordinator:
 28                 discovered = [
 29                     v for v in discovered if self.hashrings[
 30                         self.construct_group_id(discoverer.group_id)
 31                     ].belongs_to_self(six.text_type(v))]
 32
 33             resources.extend(discovered)
 34             if discovery_cache is not None:
 35                 discovery_cache[url] = discovered
 36         except ka_exceptions.ClientException as e:
 37             LOG.error('Skipping %(name)s, keystone issue: '
 38                       '%(exc)s', {'name': name, 'exc': e})
 39         except Exception as err:
 40             LOG.exception('Unable to discover resources: %s', err)
 41     else:
 42         LOG.warning('Unknown discovery extension: %s', name)
 43 return resources
  1. 第8行,解析url,當前url = 'local_instance', 解析之後name='local_instance', param=None
  2. 第9行,根據local_instance這個名字,從self.discoveries 中獲取對應discovery的物件,實際對應的也就是setup.cfg中定義的該外掛物件,該物件實現了一個預設的方法 discover
local_instances = ceilometer.compute.discovery:InstanceDiscovery

    3. 第25行,呼叫該外掛的discover方法,即可獲取到resources列表。當前的local_instance 外掛內部實現方法為呼叫libvirt的介面,獲取當前agent-compute所在主機上的虛擬機器列表,以我當前環境為例,本機上建立了一個名為mengalong的kvm虛擬機器,獲取到的結果如下:

(Pdb) p discovered
[<NovaLikeServer: mengalong>]
(Pdb) p discovered[0].__dict__
{'status': 'active', 'flavor': {'name': 'm1.tiny', 'ram': 512, 'ephemeral': 0, 'vcpus': 1, 'swap': 0, 'disk': 1, 'id': u'1'}, 'hostId': '4e63f5b48c402619503a916fc411c549a6b4337f14c7a379f24778d2', 'OS-EXT-SRV-ATTR:host': 'localhost.localdomain', 'name': 'mengalong', 'tenant_id': '2b45d8c84d24435aa29ec12e6d1426e0', 'image': {'id': 'f12c5ea1-a14c-4f71-b622-30d57c163b62'}, 'architecture': 'x86_64', 'OS-EXT-STS:vm_state': 'running', 'OS-EXT-SRV-ATTR:instance_name': 'instance-00000001', 'user_id': 'bed6963f32454af1ba30093ee415fbfe', 'os_type': 'hvm', 'id': '40b0fcf9-6770-4ebc-95e7-8a3b15ebebd0', 'metadata': {}}

3.3 呼叫memory.resident外掛的get_sample方法

以上兩步獲取到了虛擬機器列表之後,再回到之前的 poll_and_notify方法中,下一步就是呼叫memory.resident 外掛對應的get_sample 方法來採集對應虛擬機器的詳細資料
入口:ceilometer/polling/manager.py:183

    samples = pollster.obj.get_samples(
                        manager=self.manager,
                        cache=cache,
                        resources=polling_resources
                    )

3.4 外掛memory.resident的get_sample方法的實現:

入口:ceilometer.compute.pollsters.GenericComputePollster#get_samples

  1 def get_samples(self, manager, cache, resources):
  2     self._inspection_duration = self._record_poll_time()
  3     for instance in resources:
  4         try:
  5             polled_time, result = self._inspect_cached(
  6                 cache, instance, self._inspection_duration)
  7             if not result:
  8                 continue
  9             for stats in self.aggregate_method(result):
 10                 yield self._stats_to_sample(instance, stats, polled_time)
  1. 第2行,獲取該外掛本次執行時距離上次執行的時間差
  2. 第3行,遍歷虛擬機器列表
  3. 呼叫self._inspect_cached方法獲取指定虛擬機器的相關監控資料

3.5 外掛memory.resident的inspect方法

  1. 在該外掛的父類中,定義了inspector_method = 'inspect_instance'
  2. 在外掛初始化的時候,self.inspector = self._get_inspector(self.conf) 初始化了本外掛對應的inspector
  3. inspector物件初始化的時候,會根據配置檔案中定義的hypervisor_inspector從ceilometer.compute.virt中載入對應的inspector。當前我的環境中hypervisor_inspector=libvirt(具體在ceilometer/compute/virt/inspector.py中定義)

3.6 通過本外掛的insperctor的inspect_method對應的方法獲取指定虛擬機器的監控資料

  1. 本外掛定義的inspect_method='inspect_instance'
  2. 載入的inspector為libvirt對應的物件
libvirt = ceilometer.compute.virt.libvirt.inspector:LibvirtInspector

     3. 呼叫ceilometer.compute.virt.libvirt.inspector.LibvirtInspector#inspect_instance 這個方法獲取到的資料為:

{'cpu_number': 1, 'cpu_time': 561600559525L, 'memory_swap_in': None, 'memory_usage': None, 'memory_bandwidth_total': None, 'memory_bandwidth_local': None, 'cpu_cycles': None, 'cache_references': None, 'cpu_util': None, 'memory_swap_out': None, 'cache_misses': None, 'cpu_l3_cache_usage': None, 'memory_resident': 8L, 'instructions': None}

    4. 本例項中會再獲取返回資料中的 memory_resident 對應的值,層層返回到poll_and_notify方法中,對應的sample結構為:

(Pdb) p sample
<name: memory.resident, volume: 181, resource_id: 40b0fcf9-6770-4ebc-95e7-8a3b15ebebd0, timestamp: None>
(Pdb) p sample.__dict__
{'user_id': 'bed6963f32454af1ba30093ee415fbfe', 'name': 'memory.resident', 'resource_id': '40b0fcf9-6770-4ebc-95e7-8a3b15ebebd0', 'timestamp': None, 'id': '93cad640-920e-11e8-aa10-080027e77a00', 'volume': 181L, 'source': 'openstack', 'monotonic_time': 2703.940111794, 'project_id': '2b45d8c84d24435aa29ec12e6d1426e0', 'type': 'gauge', 'resource_metadata': {'status': 'active', 'disk_gb': 1, 'instance_host': 'localhost.localdomain', 'image': {'id': 'f12c5ea1-a14c-4f71-b622-30d57c163b62'}, 'ephemeral_gb': 0, 'host': '4e63f5b48c402619503a916fc411c549a6b4337f14c7a379f24778d2', 'flavor': {'name': 'm1.tiny', 'ram': 512, 'ephemeral': 0, 'vcpus': 1, 'swap': 0, 'disk': 1, 'id': u'1'}, 'task_state': u'', 'image_ref_url': None, 'memory_mb': 512, 'root_gb': 1, 'display_name': 'mengalong', 'name': 'instance-00000001', 'vcpus': 1, 'instance_id': '40b0fcf9-6770-4ebc-95e7-8a3b15ebebd0', 'instance_type': 'm1.tiny', 'state': 'running', 'image_ref': 'f12c5ea1-a14c-4f71-b622-30d57c163b62', 'architecture': 'x86_64', 'os_type': 'hvm'}, 'unit': 'MB'}

3.7 資料入庫

3.7.1 呼叫agent的notifier的sample方法,將資料傳送到訊息佇列

程式碼入口:ceilometer.polling.manager.PollingTask#_send_notification

    def _send_notification(self, samples):
        self.manager.notifier.sample(
            {},
            'telemetry.polling',
            {'samples': samples}
        )

這裡self.manager.notifier 是oslo_messaging.Notifier的物件

3.7.2 notifier的sample方法:

程式碼入口:oslo_messaging.notify.notifier.Notifier#sample
這就對應的是公共庫oslo.messaging庫中的方法,最終將資料傳送到了訊息佇列的 telemetry.polling 隊裡上

3.8 資料最終入庫:

截止到上邊3.7.2,外掛採集到的sample資料都被髮送到了訊息佇列,之後ceilometer-agent-notification會監聽訊息佇列上的內容,獲取到訊息資料之後,根據對應資料的定義將其傳送到合適的後端入庫。具體過程下一篇文章再繼續分析。