Source code for ceilometer.tests.unit.agent.test_discovery
#
# Copyright 2014 Red Hat Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for ceilometer/central/manager.py
"""
import mock
from oslo_config import fixture as fixture_config
from oslotest import base
from ceilometer.agent.discovery import endpoint
from ceilometer.agent.discovery import localnode
from ceilometer.hardware import discovery as hardware
[docs]class TestEndpointDiscovery(base.BaseTestCase):
[docs] def setUp(self):
super(TestEndpointDiscovery, self).setUp()
self.discovery = endpoint.EndpointDiscovery()
self.manager = mock.MagicMock()
self.CONF = self.useFixture(fixture_config.Config()).conf
self.CONF.set_override('interface', 'publicURL',
group='service_credentials')
self.CONF.set_override('region_name', 'test-region-name',
group='service_credentials')
self.catalog = (self.manager.keystone.session.auth.get_access.
return_value.service_catalog)
[docs] def test_keystone_called(self):
self.discovery.discover(self.manager, param='test-service-type')
expected = [mock.call(service_type='test-service-type',
interface='publicURL',
region_name='test-region-name')]
self.assertEqual(expected, self.catalog.get_urls.call_args_list)
[docs] def test_keystone_called_no_service_type(self):
self.discovery.discover(self.manager)
expected = [mock.call(service_type=None,
interface='publicURL',
region_name='test-region-name')]
self.assertEqual(expected,
self.catalog.get_urls
.call_args_list)
[docs] def test_keystone_called_no_endpoints(self):
self.catalog.get_urls.return_value = []
self.assertEqual([], self.discovery.discover(self.manager))
[docs]class TestLocalnodeDiscovery(base.BaseTestCase):
[docs] def setUp(self):
super(TestLocalnodeDiscovery, self).setUp()
self.discovery = localnode.LocalNodeDiscovery()
self.manager = mock.MagicMock()
[docs] def test_lockalnode_discovery(self):
self.assertEqual(['local_host'], self.discovery.discover(self.manager))
[docs]class TestHardwareDiscovery(base.BaseTestCase):
[docs] class MockInstance(object):
addresses = {'ctlplane': [
{'addr': '0.0.0.0',
'OS-EXT-IPS-MAC:mac_addr': '01-23-45-67-89-ab'}
]}
id = 'resource_id'
image = {'id': 'image_id'}
flavor = {'id': 'flavor_id'}
expected = {
'resource_id': 'resource_id',
'resource_url': 'snmp://ro_snmp_user:password@0.0.0.0',
'mac_addr': '01-23-45-67-89-ab',
'image_id': 'image_id',
'flavor_id': 'flavor_id',
}
expected_usm = {
'resource_id': 'resource_id',
'resource_url': ''.join(['snmp://ro_snmp_user:password@0.0.0.0',
'?priv_proto=aes192',
'&priv_password=priv_pass']),
'mac_addr': '01-23-45-67-89-ab',
'image_id': 'image_id',
'flavor_id': 'flavor_id',
}
[docs] def setUp(self):
super(TestHardwareDiscovery, self).setUp()
self.discovery = hardware.NodesDiscoveryTripleO()
self.discovery.nova_cli = mock.MagicMock()
self.manager = mock.MagicMock()
self.CONF = self.useFixture(fixture_config.Config()).conf
[docs] def test_hardware_discovery(self):
self.discovery.nova_cli.instance_get_all.return_value = [
self.MockInstance()]
resources = self.discovery.discover(self.manager)
self.assertEqual(1, len(resources))
self.assertEqual(self.expected, resources[0])
[docs] def test_hardware_discovery_without_flavor(self):
instance = self.MockInstance()
instance.flavor = {}
self.discovery.nova_cli.instance_get_all.return_value = [instance]
resources = self.discovery.discover(self.manager)
self.assertEqual(0, len(resources))
[docs] def test_hardware_discovery_usm(self):
self.CONF.set_override('readonly_user_priv_proto', 'aes192',
group='hardware')
self.CONF.set_override('readonly_user_priv_password', 'priv_pass',
group='hardware')
self.discovery.nova_cli.instance_get_all.return_value = [
self.MockInstance()]
resources = self.discovery.discover(self.manager)
self.assertEqual(self.expected_usm, resources[0])