Source code for ceilometer.publisher.messaging

#
# Copyright 2012 New Dream Network, LLC (DreamHost)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Publish a sample using the preferred RPC mechanism.
"""

import abc
import itertools
import operator

from oslo_config import cfg
from oslo_log import log
import oslo_messaging
from oslo_utils import encodeutils
from oslo_utils import excutils
import six
import six.moves.urllib.parse as urlparse

from ceilometer.i18n import _, _LE, _LI
from ceilometer import messaging
from ceilometer import publisher
from ceilometer.publisher import utils


LOG = log.getLogger(__name__)

NOTIFIER_OPTS = [
    cfg.StrOpt('metering_topic',
               default='metering',
               help='The topic that ceilometer uses for metering '
               'notifications.',
               ),
    cfg.StrOpt('event_topic',
               default='event',
               help='The topic that ceilometer uses for event '
               'notifications.',
               ),
    cfg.StrOpt('telemetry_driver',
               default='messagingv2',
               help='The driver that ceilometer uses for metering '
               'notifications.',
               deprecated_name='metering_driver',
               )
]

cfg.CONF.register_opts(NOTIFIER_OPTS,
                       group="publisher_notifier")
cfg.CONF.import_opt('host', 'ceilometer.service')


[docs]class DeliveryFailure(Exception): def __init__(self, message=None, cause=None): super(DeliveryFailure, self).__init__(message) self.cause = cause
def raise_delivery_failure(exc): excutils.raise_with_cause(DeliveryFailure, encodeutils.exception_to_unicode(exc), cause=exc) @six.add_metaclass(abc.ABCMeta)
[docs]class MessagingPublisher(publisher.PublisherBase): def __init__(self, parsed_url): options = urlparse.parse_qs(parsed_url.query) # the value of options is a list of url param values # only take care of the latest one if the option # is provided more than once self.per_meter_topic = bool(int( options.get('per_meter_topic', [0])[-1])) self.policy = options.get('policy', ['default'])[-1] self.max_queue_length = int(options.get( 'max_queue_length', [1024])[-1]) self.max_retry = 0 self.local_queue = [] if self.policy in ['default', 'queue', 'drop']: LOG.info(_LI('Publishing policy set to %s'), self.policy) else: LOG.warning(_('Publishing policy is unknown (%s) force to ' 'default'), self.policy) self.policy = 'default' self.retry = 1 if self.policy in ['queue', 'drop'] else None
[docs] def publish_samples(self, samples): """Publish samples on RPC. :param samples: Samples from pipeline after transformation. """ meters = [ utils.meter_message_from_counter( sample, cfg.CONF.publisher.telemetry_secret) for sample in samples ] topic = cfg.CONF.publisher_notifier.metering_topic self.local_queue.append((topic, meters)) if self.per_meter_topic: for meter_name, meter_list in itertools.groupby( sorted(meters, key=operator.itemgetter('counter_name')), operator.itemgetter('counter_name')): meter_list = list(meter_list) topic_name = topic + '.' + meter_name LOG.debug('Publishing %(m)d samples on %(n)s', {'m': len(meter_list), 'n': topic_name}) self.local_queue.append((topic_name, meter_list)) self.flush()
[docs] def flush(self): # NOTE(sileht): # this is why the self.local_queue is emptied before processing the # queue and the remaining messages in the queue are added to # self.local_queue after in case of another call having already added # something in the self.local_queue queue = self.local_queue self.local_queue = [] self.local_queue = (self._process_queue(queue, self.policy) + self.local_queue) if self.policy == 'queue': self._check_queue_length()
def _check_queue_length(self): queue_length = len(self.local_queue) if queue_length > self.max_queue_length > 0: count = queue_length - self.max_queue_length self.local_queue = self.local_queue[count:] LOG.warning(_("Publisher max local_queue length is exceeded, " "dropping %d oldest samples") % count) def _process_queue(self, queue, policy): current_retry = 0 while queue: topic, data = queue[0] try: self._send(topic, data) except DeliveryFailure: data = sum([len(m) for __, m in queue]) if policy == 'queue': LOG.warning(_("Failed to publish %d datapoints, queue " "them"), data) return queue elif policy == 'drop': LOG.warning(_("Failed to publish %d datapoints, " "dropping them"), data) return [] current_retry += 1 if current_retry >= self.max_retry: LOG.exception(_LE("Failed to retry to send sample data " "with max_retry times")) raise else: queue.pop(0) return []
[docs] def publish_events(self, events): """Send an event message for publishing :param events: events from pipeline after transformation """ ev_list = [utils.message_from_event( event, cfg.CONF.publisher.telemetry_secret) for event in events] topic = cfg.CONF.publisher_notifier.event_topic self.local_queue.append((topic, ev_list)) self.flush()
@abc.abstractmethod def _send(self, topic, meters): """Send the meters to the messaging topic."""
[docs]class NotifierPublisher(MessagingPublisher): def __init__(self, parsed_url, default_topic): super(NotifierPublisher, self).__init__(parsed_url) options = urlparse.parse_qs(parsed_url.query) topic = options.pop('topic', [default_topic]) driver = options.pop('driver', ['rabbit'])[0] url = None if parsed_url.netloc != '': url = urlparse.urlunsplit([driver, parsed_url.netloc, parsed_url.path, urlparse.urlencode(options, True), parsed_url.fragment]) self.notifier = oslo_messaging.Notifier( messaging.get_transport(url), driver=cfg.CONF.publisher_notifier.telemetry_driver, publisher_id='telemetry.publisher.%s' % cfg.CONF.host, topics=topic, retry=self.retry ) def _send(self, event_type, data): try: self.notifier.sample({}, event_type=event_type, payload=data) except oslo_messaging.MessageDeliveryFailure as e: raise_delivery_failure(e)
[docs]class SampleNotifierPublisher(NotifierPublisher): def __init__(self, parsed_url): super(SampleNotifierPublisher, self).__init__( parsed_url, cfg.CONF.publisher_notifier.metering_topic)
[docs]class EventNotifierPublisher(NotifierPublisher): def __init__(self, parsed_url): super(EventNotifierPublisher, self).__init__( parsed_url, cfg.CONF.publisher_notifier.event_topic)

Project Source