Source code for ceilometer.publisher.http

#
# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import strutils
import requests
from requests import adapters
from six.moves.urllib import parse as urlparse

from ceilometer.i18n import _LE
from ceilometer import publisher

LOG = log.getLogger(__name__)


[docs]class HttpPublisher(publisher.ConfigPublisherBase): """Publish metering data to a http endpoint This publisher pushes metering data to a specified http endpoint. The endpoint should be configured in ceilometer pipeline configuration file. If the `timeout` and/or `max_retries` are not specified, the default `timeout` and `max_retries` will be set to 5 and 2 respectively. Additional parameters are: - ssl can be enabled by setting `verify_ssl` - batching can be configured by `batch` - connection pool size configured using `poolsize` To use this publisher for samples, add the following section to the /etc/ceilometer/pipeline.yaml file or simply add it to an existing pipeline:: - name: meter_file interval: 600 counters: - "*" transformers: publishers: - http://host:80/path?timeout=1&max_retries=2&batch=False In the event_pipeline.yaml file, you can use the publisher in one of the sinks like the following: - name: event_sink transformers: publishers: - http://host:80/path?timeout=1&max_retries=2 """ def __init__(self, conf, parsed_url): super(HttpPublisher, self).__init__(conf, parsed_url) self.target = parsed_url.geturl() if not parsed_url.hostname: raise ValueError('The hostname of an endpoint for ' 'HttpPublisher is required') # non-numeric port from the url string will cause a ValueError # exception when the port is read. Do a read to make sure the port # is valid, if not, ValueError will be thrown. parsed_url.port self.headers = {'Content-type': 'application/json'} # Handling other configuration options in the query string params = urlparse.parse_qs(parsed_url.query) self.timeout = self._get_param(params, 'timeout', 5, int) self.max_retries = self._get_param(params, 'max_retries', 2, int) self.poster = ( self._do_post if strutils.bool_from_string(self._get_param( params, 'batch', True)) else self._individual_post) try: self.verify_ssl = strutils.bool_from_string( self._get_param(params, 'verify_ssl', None), strict=True) except ValueError: self.verify_ssl = (self._get_param(params, 'verify_ssl', None) or True) self.raw_only = strutils.bool_from_string( self._get_param(params, 'raw_only', False)) pool_size = self._get_param(params, 'poolsize', 10, int) kwargs = {'max_retries': self.max_retries, 'pool_connections': pool_size, 'pool_maxsize': pool_size} self.session = requests.Session() # FIXME(gordc): support https in addition to http self.session.mount(self.target, adapters.HTTPAdapter(**kwargs)) LOG.debug('HttpPublisher for endpoint %s is initialized!' % self.target) @staticmethod def _get_param(params, name, default_value, cast=None): try: return cast(params.get(name)[-1]) if cast else params.get(name)[-1] except (ValueError, TypeError): LOG.debug('Default value %(value)s is used for %(name)s' % {'value': default_value, 'name': name}) return default_value def _individual_post(self, data): for d in data: self._do_post(d) def _do_post(self, data): if not data: LOG.debug('Data set is empty!') return data = jsonutils.dumps(data) LOG.trace('Message: %s', data) try: res = self.session.post(self.target, data=data, headers=self.headers, timeout=self.timeout, verify=self.verify_ssl) res.raise_for_status() LOG.debug('Message posting to %s: status code %d.', self.target, res.status_code) except requests.exceptions.HTTPError: LOG.exception(_LE('Status Code: %(code)s. ' 'Failed to dispatch message: %(data)s') % {'code': res.status_code, 'data': data})
[docs] def publish_samples(self, samples): """Send a metering message for publishing :param samples: Samples from pipeline after transformation """ self.poster([sample.as_dict() for sample in samples])
[docs] def publish_events(self, events): """Send an event message for publishing :param events: events from pipeline after transformation """ if self.raw_only: data = [evt.as_dict()['raw']['payload'] for evt in events if evt.as_dict().get('raw', {}).get('payload')] else: data = [event.serialize() for event in events] self.poster(data)

Project Source