Source code for ceilometer.publisher.http

#
# Copyright 2016 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from oslo_log import log
from oslo_serialization import jsonutils
import requests
from requests import adapters
from six.moves.urllib import parse as urlparse

from ceilometer.i18n import _LE
from ceilometer import publisher

LOG = log.getLogger(__name__)


[docs]class HttpPublisher(publisher.PublisherBase): """Publisher metering data to a http endpoint The publisher which records metering data into a http endpoint. The endpoint should be configured in ceilometer pipeline configuration file. If the timeout and/or retry_count are not specified, the default timeout and retry_count will be set to 1000 and 2 respectively. To use this publisher for samples, add the following section to the /etc/ceilometer/publisher.yaml file or simply add it to an existing pipeline:: - name: meter_file interval: 600 counters: - "*" transformers: publishers: - http://host:80/path?timeout=1&max_retries=2 To use this publisher for events, the raw message needs to be present in the event. To enable that, ceilometer.conf file will need to have a section like the following: [event] store_raw = info Then in the event_pipeline.yaml file, you can use the publisher in one of the sinks like the following: - name: event_sink transformers: publishers: - http://host:80/path?timeout=1&max_retries=2 Http end point is required for this publisher to work properly. """ def __init__(self, parsed_url): super(HttpPublisher, self).__init__(parsed_url) self.target = parsed_url.geturl() if not parsed_url.hostname: raise ValueError('The hostname of an endpoint for ' 'HttpPublisher is required') # non-numeric port from the url string will cause a ValueError # exception when the port is read. Do a read to make sure the port # is valid, if not, ValueError will be thrown. parsed_url.port self.headers = {'Content-type': 'application/json'} # Handling other configuration options in the query string if parsed_url.query: params = urlparse.parse_qs(parsed_url.query) self.timeout = self._get_param(params, 'timeout', 1) self.max_retries = self._get_param(params, 'max_retries', 2) else: self.timeout = 1 self.max_retries = 2 LOG.debug('HttpPublisher for endpoint %s is initialized!' % self.target) @staticmethod def _get_param(params, name, default_value): try: return int(params.get(name)[-1]) except (ValueError, TypeError): LOG.debug('Default value %(value)s is used for %(name)s' % {'value': default_value, 'name': name}) return default_value def _do_post(self, data): if not data: LOG.debug('Data set is empty!') return session = requests.Session() session.mount(self.target, adapters.HTTPAdapter(max_retries=self.max_retries)) content = ','.join([jsonutils.dumps(item) for item in data]) content = '[' + content + ']' LOG.debug('Data to be posted by HttpPublisher: %s', content) res = session.post(self.target, data=content, headers=self.headers, timeout=self.timeout) if res.status_code >= 300: LOG.error(_LE('Data post failed with status code %s'), res.status_code)
[docs] def publish_samples(self, samples): """Send a metering message for publishing :param samples: Samples from pipeline after transformation """ data = [sample.as_dict() for sample in samples] self._do_post(data)
[docs] def publish_events(self, events): """Send an event message for publishing :param events: events from pipeline after transformation """ data = [evt.as_dict()['raw']['payload'] for evt in events if evt.as_dict().get('raw', {}).get('payload')] self._do_post(data)

Project Source