Source code for ceilometer.storage.impl_sqlalchemy

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""SQLAlchemy storage backend."""

from __future__ import absolute_import
import datetime
import hashlib
import os

from oslo_config import cfg
from oslo_db import api
from oslo_db import exception as dbexc
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import timeutils
import six
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import distinct
from sqlalchemy import func
from sqlalchemy.orm import aliased
from sqlalchemy.sql.expression import cast

import ceilometer
from ceilometer.i18n import _, _LI
from ceilometer import storage
from ceilometer.storage import base
from ceilometer.storage import models as api_models
from ceilometer.storage.sqlalchemy import models
from ceilometer.storage.sqlalchemy import utils as sql_utils
from ceilometer import utils

LOG = log.getLogger(__name__)


STANDARD_AGGREGATES = dict(
    avg=func.avg(models.Sample.volume).label('avg'),
    sum=func.sum(models.Sample.volume).label('sum'),
    min=func.min(models.Sample.volume).label('min'),
    max=func.max(models.Sample.volume).label('max'),
    count=func.count(models.Sample.volume).label('count')
)

UNPARAMETERIZED_AGGREGATES = dict(
    stddev=func.stddev_pop(models.Sample.volume).label('stddev')
)

PARAMETERIZED_AGGREGATES = dict(
    validate=dict(
        cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id']
    ),
    compute=dict(
        cardinality=lambda p: func.count(
            distinct(getattr(models.Resource, p))
        ).label('cardinality/%s' % p)
    )
)

AVAILABLE_CAPABILITIES = {
    'meters': {'query': {'simple': True,
                         'metadata': True}},
    'resources': {'query': {'simple': True,
                            'metadata': True}},
    'samples': {'query': {'simple': True,
                          'metadata': True,
                          'complex': True}},
    'statistics': {'groupby': True,
                   'query': {'simple': True,
                             'metadata': True},
                   'aggregation': {'standard': True,
                                   'selectable': {
                                       'max': True,
                                       'min': True,
                                       'sum': True,
                                       'avg': True,
                                       'count': True,
                                       'stddev': True,
                                       'cardinality': True}}
                   },
}


AVAILABLE_STORAGE_CAPABILITIES = {
    'storage': {'production_ready': True},
}


def apply_metaquery_filter(session, query, metaquery):
    """Apply provided metaquery filter to existing query.

    :param session: session used for original query
    :param query: Query instance
    :param metaquery: dict with metadata to match on.
    """
    for k, value in six.iteritems(metaquery):
        key = k[9:]  # strip out 'metadata.' prefix
        try:
            _model = sql_utils.META_TYPE_MAP[type(value)]
        except KeyError:
            raise ceilometer.NotImplementedError(
                'Query on %(key)s is of %(value)s '
                'type and is not supported' %
                {"key": k, "value": type(value)})
        else:
            meta_alias = aliased(_model)
            on_clause = and_(models.Resource.internal_id == meta_alias.id,
                             meta_alias.meta_key == key)
            # outer join is needed to support metaquery
            # with or operator on non existent metadata field
            # see: test_query_non_existing_metadata_with_result
            # test case.
            query = query.outerjoin(meta_alias, on_clause)
            query = query.filter(meta_alias.value == value)

    return query


def make_query_from_filter(session, query, sample_filter, require_meter=True):
    """Return a query dictionary based on the settings in the filter.

    :param session: session used for original query
    :param query: Query instance
    :param sample_filter: SampleFilter instance
    :param require_meter: If true and the filter does not have a meter,
                          raise an error.
    """

    if sample_filter.meter:
        query = query.filter(models.Meter.name == sample_filter.meter)
    elif require_meter:
        raise RuntimeError('Missing required meter specifier')
    if sample_filter.source:
        query = query.filter(
            models.Resource.source_id == sample_filter.source)
    if sample_filter.start_timestamp:
        ts_start = sample_filter.start_timestamp
        if sample_filter.start_timestamp_op == 'gt':
            query = query.filter(models.Sample.timestamp > ts_start)
        else:
            query = query.filter(models.Sample.timestamp >= ts_start)
    if sample_filter.end_timestamp:
        ts_end = sample_filter.end_timestamp
        if sample_filter.end_timestamp_op == 'le':
            query = query.filter(models.Sample.timestamp <= ts_end)
        else:
            query = query.filter(models.Sample.timestamp < ts_end)
    if sample_filter.user:
        if sample_filter.user == 'None':
            sample_filter.user = None
        query = query.filter(models.Resource.user_id == sample_filter.user)
    if sample_filter.project:
        if sample_filter.project == 'None':
            sample_filter.project = None
        query = query.filter(
            models.Resource.project_id == sample_filter.project)
    if sample_filter.resource:
        query = query.filter(
            models.Resource.resource_id == sample_filter.resource)
    if sample_filter.message_id:
        query = query.filter(
            models.Sample.message_id == sample_filter.message_id)

    if sample_filter.metaquery:
        query = apply_metaquery_filter(session, query,
                                       sample_filter.metaquery)

    return query


[docs]class Connection(base.Connection): """Put the data into a SQLAlchemy database. Tables:: - meter - meter definition - { id: meter id name: meter name type: meter type unit: meter unit } - resource - resource definition - { internal_id: resource id resource_id: resource uuid user_id: user uuid project_id: project uuid source_id: source id resource_metadata: metadata dictionary metadata_hash: metadata dictionary hash } - sample - the raw incoming data - { id: sample id meter_id: meter id (->meter.id) resource_id: resource id (->resource.internal_id) volume: sample volume timestamp: datetime recorded_at: datetime message_signature: message signature message_id: message uuid } """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, AVAILABLE_CAPABILITIES) STORAGE_CAPABILITIES = utils.update_nested( base.Connection.STORAGE_CAPABILITIES, AVAILABLE_STORAGE_CAPABILITIES, ) def __init__(self, url): # Set max_retries to 0, since oslo.db in certain cases may attempt # to retry making the db connection retried max_retries ^ 2 times # in failure case and db reconnection has already been implemented # in storage.__init__.get_connection_from_config function options = dict(cfg.CONF.database.items()) options['max_retries'] = 0 # oslo.db doesn't support options defined by Ceilometer for opt in storage.OPTS: options.pop(opt.name, None) self._engine_facade = db_session.EngineFacade(url, **options)
[docs] def upgrade(self): # NOTE(gordc): to minimise memory, only import migration when needed from oslo_db.sqlalchemy import migration path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sqlalchemy', 'migrate_repo') engine = self._engine_facade.get_engine() from migrate import exceptions as migrate_exc from migrate.versioning import api from migrate.versioning import repository repo = repository.Repository(path) try: api.db_version(engine, repo) except migrate_exc.DatabaseNotControlledError: models.Base.metadata.create_all(engine) api.version_control(engine, repo, repo.latest) else: migration.db_sync(engine, path)
[docs] def clear(self): engine = self._engine_facade.get_engine() for table in reversed(models.Base.metadata.sorted_tables): engine.execute(table.delete()) engine.dispose()
@staticmethod def _create_meter(conn, name, type, unit): # TODO(gordc): implement lru_cache to improve performance try: meter = models.Meter.__table__ trans = conn.begin_nested() if conn.dialect.name == 'sqlite': trans = conn.begin() with trans: meter_row = conn.execute( sa.select([meter.c.id]) .where(sa.and_(meter.c.name == name, meter.c.type == type, meter.c.unit == unit))).first() meter_id = meter_row[0] if meter_row else None if meter_id is None: result = conn.execute(meter.insert(), name=name, type=type, unit=unit) meter_id = result.inserted_primary_key[0] except dbexc.DBDuplicateEntry: # retry function to pick up duplicate committed object meter_id = Connection._create_meter(conn, name, type, unit) return meter_id @staticmethod def _create_resource(conn, res_id, user_id, project_id, source_id, rmeta): # TODO(gordc): implement lru_cache to improve performance try: res = models.Resource.__table__ m_hash = jsonutils.dumps(rmeta, sort_keys=True) if six.PY3: m_hash = m_hash.encode('utf-8') m_hash = hashlib.md5(m_hash).hexdigest() trans = conn.begin_nested() if conn.dialect.name == 'sqlite': trans = conn.begin() with trans: res_row = conn.execute( sa.select([res.c.internal_id]) .where(sa.and_(res.c.resource_id == res_id, res.c.user_id == user_id, res.c.project_id == project_id, res.c.source_id == source_id, res.c.metadata_hash == m_hash))).first() internal_id = res_row[0] if res_row else None if internal_id is None: result = conn.execute(res.insert(), resource_id=res_id, user_id=user_id, project_id=project_id, source_id=source_id, resource_metadata=rmeta, metadata_hash=m_hash) internal_id = result.inserted_primary_key[0] if rmeta and isinstance(rmeta, dict): meta_map = {} for key, v in utils.dict_to_keyval(rmeta): try: _model = sql_utils.META_TYPE_MAP[type(v)] if meta_map.get(_model) is None: meta_map[_model] = [] meta_map[_model].append( {'id': internal_id, 'meta_key': key, 'value': v}) except KeyError: LOG.warning(_("Unknown metadata type. Key " "(%s) will not be queryable."), key) for _model in meta_map.keys(): conn.execute(_model.__table__.insert(), meta_map[_model]) except dbexc.DBDuplicateEntry: # retry function to pick up duplicate committed object internal_id = Connection._create_resource( conn, res_id, user_id, project_id, source_id, rmeta) return internal_id @api.wrap_db_retry(retry_interval=cfg.CONF.database.retry_interval, max_retries=cfg.CONF.database.max_retries, retry_on_deadlock=True)
[docs] def record_metering_data(self, data): """Write the data to the backend storage system. :param data: a dictionary such as returned by ceilometer.publisher.utils.meter_message_from_counter """ engine = self._engine_facade.get_engine() with engine.begin() as conn: # Record the raw data for the sample. m_id = self._create_meter(conn, data['counter_name'], data['counter_type'], data['counter_unit']) res_id = self._create_resource(conn, data['resource_id'], data['user_id'], data['project_id'], data['source'], data['resource_metadata']) sample = models.Sample.__table__ conn.execute(sample.insert(), meter_id=m_id, resource_id=res_id, timestamp=data['timestamp'], volume=data['counter_volume'], message_signature=data['message_signature'], message_id=data['message_id'])
[docs] def clear_expired_metering_data(self, ttl): """Clear expired data from the backend storage system. Clearing occurs according to the time-to-live. :param ttl: Number of seconds to keep records for. """ # Prevent database deadlocks from occurring by # using separate transaction for each delete session = self._engine_facade.get_session() with session.begin(): end = timeutils.utcnow() - datetime.timedelta(seconds=ttl) sample_q = (session.query(models.Sample) .filter(models.Sample.timestamp < end)) rows = sample_q.delete() LOG.info(_LI("%d samples removed from database"), rows) if not cfg.CONF.database.sql_expire_samples_only: with session.begin(): # remove Meter definitions with no matching samples (session.query(models.Meter) .filter(~models.Meter.samples.any()) .delete(synchronize_session=False)) with session.begin(): resource_q = (session.query(models.Resource.internal_id) .filter(~models.Resource.samples.any())) # mark resource with no matching samples for delete resource_q.update({models.Resource.metadata_hash: "delete_" + cast(models.Resource.internal_id, sa.String)}, synchronize_session=False) # remove metadata of resources marked for delete for table in [models.MetaText, models.MetaBigInt, models.MetaFloat, models.MetaBool]: with session.begin(): resource_q = (session.query(models.Resource.internal_id) .filter(models.Resource.metadata_hash .like('delete_%'))) resource_subq = resource_q.subquery() (session.query(table) .filter(table.id.in_(resource_subq)) .delete(synchronize_session=False)) # remove resource marked for delete with session.begin(): resource_q = (session.query(models.Resource.internal_id) .filter(models.Resource.metadata_hash .like('delete_%'))) resource_q.delete(synchronize_session=False) LOG.info(_LI("Expired residual resource and" " meter definition data"))
[docs] def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, metaquery=None, resource=None, limit=None): """Return an iterable of api_models.Resource instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param source: Optional source filter. :param start_timestamp: Optional modified timestamp start range. :param start_timestamp_op: Optional start time operator, like gt, ge. :param end_timestamp: Optional modified timestamp end range. :param end_timestamp_op: Optional end time operator, like lt, le. :param metaquery: Optional dict with metadata to match on. :param resource: Optional resource filter. :param limit: Maximum number of results to return. """ if limit == 0: return s_filter = storage.SampleFilter(user=user, project=project, source=source, start_timestamp=start_timestamp, start_timestamp_op=start_timestamp_op, end_timestamp=end_timestamp, end_timestamp_op=end_timestamp_op, metaquery=metaquery, resource=resource) session = self._engine_facade.get_session() # get list of resource_ids has_timestamp = start_timestamp or end_timestamp # NOTE: When sql_expire_samples_only is enabled, there will be some # resources without any sample, in such case we should use inner # join on sample table to avoid wrong result. if cfg.CONF.database.sql_expire_samples_only or has_timestamp: res_q = session.query(distinct(models.Resource.resource_id)).join( models.Sample, models.Sample.resource_id == models.Resource.internal_id) else: res_q = session.query(distinct(models.Resource.resource_id)) res_q = make_query_from_filter(session, res_q, s_filter, require_meter=False) res_q = res_q.limit(limit) if limit else res_q for res_id in res_q.all(): # get max and min sample timestamp value min_max_q = (session.query(func.max(models.Sample.timestamp) .label('max_timestamp'), func.min(models.Sample.timestamp) .label('min_timestamp')) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id) .filter(models.Resource.resource_id == res_id[0])) min_max_q = make_query_from_filter(session, min_max_q, s_filter, require_meter=False) min_max = min_max_q.first() # get resource details for latest sample res_q = (session.query(models.Resource.resource_id, models.Resource.user_id, models.Resource.project_id, models.Resource.source_id, models.Resource.resource_metadata) .join(models.Sample, models.Sample.resource_id == models.Resource.internal_id) .filter(models.Sample.timestamp == min_max.max_timestamp) .filter(models.Resource.resource_id == res_id[0]) .order_by(models.Sample.id.desc()).limit(1)) res = res_q.first() yield api_models.Resource( resource_id=res.resource_id, project_id=res.project_id, first_sample_timestamp=min_max.min_timestamp, last_sample_timestamp=min_max.max_timestamp, source=res.source_id, user_id=res.user_id, metadata=res.resource_metadata )
[docs] def get_meters(self, user=None, project=None, resource=None, source=None, metaquery=None, limit=None, unique=False): """Return an iterable of api_models.Meter instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param resource: Optional ID of the resource. :param source: Optional source filter. :param metaquery: Optional dict with metadata to match on. :param limit: Maximum number of results to return. :param unique: If set to true, return only unique meter information. """ if limit == 0: return s_filter = storage.SampleFilter(user=user, project=project, source=source, metaquery=metaquery, resource=resource) # NOTE(gordc): get latest sample of each meter/resource. we do not # filter here as we want to filter only on latest record. session = self._engine_facade.get_session() subq = session.query(func.max(models.Sample.id).label('id')).join( models.Resource, models.Resource.internal_id == models.Sample.resource_id) if unique: subq = subq.group_by(models.Sample.meter_id) else: subq = subq.group_by(models.Sample.meter_id, models.Resource.resource_id) if resource: subq = subq.filter(models.Resource.resource_id == resource) subq = subq.subquery() # get meter details for samples. query_sample = (session.query(models.Sample.meter_id, models.Meter.name, models.Meter.type, models.Meter.unit, models.Resource.resource_id, models.Resource.project_id, models.Resource.source_id, models.Resource.user_id).join( subq, subq.c.id == models.Sample.id) .join(models.Meter, models.Meter.id == models.Sample.meter_id) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id)) query_sample = make_query_from_filter(session, query_sample, s_filter, require_meter=False) query_sample = query_sample.limit(limit) if limit else query_sample if unique: for row in query_sample.all(): yield api_models.Meter( name=row.name, type=row.type, unit=row.unit, resource_id=None, project_id=None, source=None, user_id=None) else: for row in query_sample.all(): yield api_models.Meter( name=row.name, type=row.type, unit=row.unit, resource_id=row.resource_id, project_id=row.project_id, source=row.source_id, user_id=row.user_id)
@staticmethod def _retrieve_samples(query): samples = query.all() for s in samples: # Remove the id generated by the database when # the sample was inserted. It is an implementation # detail that should not leak outside of the driver. yield api_models.Sample( source=s.source_id, counter_name=s.counter_name, counter_type=s.counter_type, counter_unit=s.counter_unit, counter_volume=s.counter_volume, user_id=s.user_id, project_id=s.project_id, resource_id=s.resource_id, timestamp=s.timestamp, recorded_at=s.recorded_at, resource_metadata=s.resource_metadata, message_id=s.message_id, message_signature=s.message_signature, )
[docs] def get_samples(self, sample_filter, limit=None): """Return an iterable of api_models.Samples. :param sample_filter: Filter. :param limit: Maximum number of results to return. """ if limit == 0: return [] session = self._engine_facade.get_session() query = session.query(models.Sample.timestamp, models.Sample.recorded_at, models.Sample.message_id, models.Sample.message_signature, models.Sample.volume.label('counter_volume'), models.Meter.name.label('counter_name'), models.Meter.type.label('counter_type'), models.Meter.unit.label('counter_unit'), models.Resource.source_id, models.Resource.user_id, models.Resource.project_id, models.Resource.resource_metadata, models.Resource.resource_id).join( models.Meter, models.Meter.id == models.Sample.meter_id).join( models.Resource, models.Resource.internal_id == models.Sample.resource_id).order_by( models.Sample.timestamp.desc()) query = make_query_from_filter(session, query, sample_filter, require_meter=False) if limit: query = query.limit(limit) return self._retrieve_samples(query)
[docs] def query_samples(self, filter_expr=None, orderby=None, limit=None): if limit == 0: return [] session = self._engine_facade.get_session() engine = self._engine_facade.get_engine() query = session.query(models.Sample.timestamp, models.Sample.recorded_at, models.Sample.message_id, models.Sample.message_signature, models.Sample.volume.label('counter_volume'), models.Meter.name.label('counter_name'), models.Meter.type.label('counter_type'), models.Meter.unit.label('counter_unit'), models.Resource.source_id, models.Resource.user_id, models.Resource.project_id, models.Resource.resource_metadata, models.Resource.resource_id).join( models.Meter, models.Meter.id == models.Sample.meter_id).join( models.Resource, models.Resource.internal_id == models.Sample.resource_id) transformer = sql_utils.QueryTransformer(models.FullSample, query, dialect=engine.dialect.name) if filter_expr is not None: transformer.apply_filter(filter_expr) transformer.apply_options(orderby, limit) return self._retrieve_samples(transformer.get_query())
@staticmethod def _get_aggregate_functions(aggregate): if not aggregate: return [f for f in STANDARD_AGGREGATES.values()] functions = [] for a in aggregate: if a.func in STANDARD_AGGREGATES: functions.append(STANDARD_AGGREGATES[a.func]) elif a.func in UNPARAMETERIZED_AGGREGATES: functions.append(UNPARAMETERIZED_AGGREGATES[a.func]) elif a.func in PARAMETERIZED_AGGREGATES['compute']: validate = PARAMETERIZED_AGGREGATES['validate'].get(a.func) if not (validate and validate(a.param)): raise storage.StorageBadAggregate('Bad aggregate: %s.%s' % (a.func, a.param)) compute = PARAMETERIZED_AGGREGATES['compute'][a.func] functions.append(compute(a.param)) else: # NOTE(zqfan): We already have checked at API level, but # still leave it here in case of directly storage calls. msg = _('Invalid aggregation function: %s') % a.func raise storage.StorageBadAggregate(msg) return functions def _make_stats_query(self, sample_filter, groupby, aggregate): select = [ func.min(models.Sample.timestamp).label('tsmin'), func.max(models.Sample.timestamp).label('tsmax'), models.Meter.unit ] select.extend(self._get_aggregate_functions(aggregate)) session = self._engine_facade.get_session() if groupby: group_attributes = [] for g in groupby: if g != 'resource_metadata.instance_type': group_attributes.append(getattr(models.Resource, g)) else: group_attributes.append( getattr(models.MetaText, 'value') .label('resource_metadata.instance_type')) select.extend(group_attributes) query = ( session.query(*select) .join(models.Meter, models.Meter.id == models.Sample.meter_id) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id) .group_by(models.Meter.unit)) if groupby: for g in groupby: if g == 'resource_metadata.instance_type': query = query.join( models.MetaText, models.Resource.internal_id == models.MetaText.id) query = query.filter( models.MetaText.meta_key == 'instance_type') query = query.group_by(*group_attributes) return make_query_from_filter(session, query, sample_filter) @staticmethod def _stats_result_aggregates(result, aggregate): stats_args = {} if isinstance(result.count, six.integer_types): stats_args['count'] = result.count for attr in ['min', 'max', 'sum', 'avg']: if hasattr(result, attr): stats_args[attr] = getattr(result, attr) if aggregate: stats_args['aggregate'] = {} for a in aggregate: key = '%s%s' % (a.func, '/%s' % a.param if a.param else '') stats_args['aggregate'][key] = getattr(result, key) return stats_args @staticmethod def _stats_result_to_model(result, period, period_start, period_end, groupby, aggregate): stats_args = Connection._stats_result_aggregates(result, aggregate) stats_args['unit'] = result.unit duration = (timeutils.delta_seconds(result.tsmin, result.tsmax) if result.tsmin is not None and result.tsmax is not None else None) stats_args['duration'] = duration stats_args['duration_start'] = result.tsmin stats_args['duration_end'] = result.tsmax stats_args['period'] = period stats_args['period_start'] = period_start stats_args['period_end'] = period_end stats_args['groupby'] = (dict( (g, getattr(result, g)) for g in groupby) if groupby else None) return api_models.Statistics(**stats_args)
[docs] def get_meter_statistics(self, sample_filter, period=None, groupby=None, aggregate=None): """Return an iterable of api_models.Statistics instances. Items are containing meter statistics described by the query parameters. The filter must have a meter value set. """ if groupby: for group in groupby: if group not in ['user_id', 'project_id', 'resource_id', 'resource_metadata.instance_type']: raise ceilometer.NotImplementedError('Unable to group by ' 'these fields') if not period: for res in self._make_stats_query(sample_filter, groupby, aggregate): if res.count: yield self._stats_result_to_model(res, 0, res.tsmin, res.tsmax, groupby, aggregate) return if not (sample_filter.start_timestamp and sample_filter.end_timestamp): res = self._make_stats_query(sample_filter, None, aggregate).first() if not res: # NOTE(liusheng):The 'res' may be NoneType, because no # sample has found with sample filter(s). return query = self._make_stats_query(sample_filter, groupby, aggregate) # HACK(jd) This is an awful method to compute stats by period, but # since we're trying to be SQL agnostic we have to write portable # code, so here it is, admire! We're going to do one request to get # stats by period. We would like to use GROUP BY, but there's no # portable way to manipulate timestamp in SQL, so we can't. for period_start, period_end in base.iter_period( sample_filter.start_timestamp or res.tsmin, sample_filter.end_timestamp or res.tsmax, period): q = query.filter(models.Sample.timestamp >= period_start) q = q.filter(models.Sample.timestamp < period_end) for r in q.all(): if r.count: yield self._stats_result_to_model( result=r, period=int(timeutils.delta_seconds(period_start, period_end)), period_start=period_start, period_end=period_end, groupby=groupby, aggregate=aggregate )

Project Source