Source code for ceilometer.tests.unit.event.test_endpoint

#
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for Ceilometer notify daemon."""

import mock
from oslo_config import fixture as fixture_config
import oslo_messaging
from oslo_utils import fileutils
from oslotest import mockpatch
import six
import yaml

from ceilometer.event import endpoint as event_endpoint
from ceilometer import pipeline
from ceilometer import publisher
from ceilometer.publisher import test
from ceilometer.tests import base as tests_base


TEST_NOTICE_CTXT = {
    u'auth_token': u'3d8b13de1b7d499587dfc69b77dc09c2',
    u'is_admin': True,
    u'project_id': u'7c150a59fe714e6f9263774af9688f0e',
    u'quota_class': None,
    u'read_deleted': u'no',
    u'remote_address': u'10.0.2.15',
    u'request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',
    u'roles': [u'admin'],
    u'timestamp': u'2012-05-08T20:23:41.425105',
    u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
}

TEST_NOTICE_METADATA = {
    u'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',
    u'timestamp': u'2012-05-08 20:23:48.028195',
}

TEST_NOTICE_PAYLOAD = {
    u'created_at': u'2012-05-08 20:23:41',
    u'deleted_at': u'',
    u'disk_gb': 0,
    u'display_name': u'testme',
    u'fixed_ips': [{u'address': u'10.0.0.2',
                    u'floating_ips': [],
                    u'meta': {},
                    u'type': u'fixed',
                    u'version': 4}],
    u'image_ref_url': u'http://10.0.2.15:9292/images/UUID',
    u'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',
    u'instance_type': u'm1.tiny',
    u'instance_type_id': 2,
    u'launched_at': u'2012-05-08 20:23:47.985999',
    u'memory_mb': 512,
    u'state': u'active',
    u'state_description': u'',
    u'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',
    u'user_id': u'1e3ce043029547f1a61c1996d1a531a2',
    u'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',
    u'vcpus': 1,
    u'root_gb': 0,
    u'ephemeral_gb': 0,
    u'host': u'compute-host-name',
    u'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',
    u'os_type': u'linux?',
    u'architecture': u'x86',
    u'image_ref': u'UUID',
    u'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',
    u'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',
}


[docs]class TestEventEndpoint(tests_base.BaseTestCase):
[docs] def get_publisher(self, url, namespace=''): fake_drivers = {'test://': test.TestPublisher, 'except://': test.TestPublisher} return fake_drivers[url](url)
def _setup_pipeline(self, publishers): ev_pipeline = yaml.dump({ 'sources': [{ 'name': 'test_event', 'events': ['test.test'], 'sinks': ['test_sink'] }], 'sinks': [{ 'name': 'test_sink', 'publishers': publishers }] }) if six.PY3: ev_pipeline = ev_pipeline.encode('utf-8') ev_pipeline_cfg_file = fileutils.write_to_tempfile( content=ev_pipeline, prefix="event_pipeline", suffix="yaml") self.CONF.set_override('event_pipeline_cfg_file', ev_pipeline_cfg_file) ev_pipeline_mgr = pipeline.setup_event_pipeline() return ev_pipeline_mgr def _setup_endpoint(self, publishers): ev_pipeline_mgr = self._setup_pipeline(publishers) self.endpoint = event_endpoint.EventsNotificationEndpoint( ev_pipeline_mgr) self.endpoint.event_converter = mock.MagicMock() self.endpoint.event_converter.to_event.return_value = mock.MagicMock( event_type='test.test')
[docs] def setUp(self): super(TestEventEndpoint, self).setUp() self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF([]) self.CONF.set_override("connection", "log://", group='database') self.setup_messaging(self.CONF) self.useFixture(mockpatch.PatchObject(publisher, 'get_publisher', side_effect=self.get_publisher)) self.fake_publisher = mock.Mock() self.useFixture(mockpatch.Patch( 'ceilometer.publisher.test.TestPublisher', return_value=self.fake_publisher))
[docs] def test_message_to_event(self): self._setup_endpoint(['test://']) self.endpoint.info([{'ctxt': TEST_NOTICE_CTXT, 'publisher_id': 'compute.vagrant-precise', 'event_type': 'compute.instance.create.end', 'payload': TEST_NOTICE_PAYLOAD, 'metadata': TEST_NOTICE_METADATA}])
[docs] def test_bad_event_non_ack_and_requeue(self): self._setup_endpoint(['test://']) self.fake_publisher.publish_events.side_effect = Exception self.CONF.set_override("ack_on_event_error", False, group="notification") ret = self.endpoint.info([{'ctxt': TEST_NOTICE_CTXT, 'publisher_id': 'compute.vagrant-precise', 'event_type': 'compute.instance.create.end', 'payload': TEST_NOTICE_PAYLOAD, 'metadata': TEST_NOTICE_METADATA}]) self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)
[docs] def test_message_to_event_bad_event(self): self._setup_endpoint(['test://']) self.fake_publisher.publish_events.side_effect = Exception self.CONF.set_override("ack_on_event_error", False, group="notification") message = { 'payload': {'event_type': "foo", 'message_id': "abc"}, 'metadata': {}, 'ctxt': {} } with mock.patch("ceilometer.pipeline.LOG") as mock_logger: ret = self.endpoint.process_notification('info', [message]) self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret) exception_mock = mock_logger.exception self.assertIn('Exit after error from publisher', exception_mock.call_args_list[0][0][0])
[docs] def test_message_to_event_bad_event_multi_publish(self): self._setup_endpoint(['test://', 'except://']) self.fake_publisher.publish_events.side_effect = Exception self.CONF.set_override("ack_on_event_error", False, group="notification") message = { 'payload': {'event_type': "foo", 'message_id': "abc"}, 'metadata': {}, 'ctxt': {} } with mock.patch("ceilometer.pipeline.LOG") as mock_logger: ret = self.endpoint.process_notification('info', [message]) self.assertEqual(oslo_messaging.NotificationResult.HANDLED, ret) exception_mock = mock_logger.exception self.assertIn('Continue after error from publisher', exception_mock.call_args_list[0][0][0])

Project Source