# Copyright 2011, OpenStack Foundation
# Copyright 2012, Red Hat, Inc.
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import glance_store
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_utils import encodeutils
from oslo_utils import excutils
import six
import webob
from glance.common import exception
from glance.common import timeutils
from glance.domain import proxy as domain_proxy
from glance.i18n import _, _LE
notifier_opts = [
cfg.StrOpt('default_publisher_id', default="image.localhost",
help='Default publisher_id for outgoing notifications.'),
cfg.ListOpt('disabled_notifications', default=[],
help='List of disabled notifications. A notification can be '
'given either as a notification type to disable a single '
'event, or as a notification group prefix to disable all '
'events within a group. Example: if this config option '
'is set to ["image.create", "metadef_namespace"], then '
'"image.create" notification will not be sent after '
'image is created and none of the notifications for '
'metadefinition namespaces will be sent.'),
]
CONF = cfg.CONF
CONF.register_opts(notifier_opts)
LOG = logging.getLogger(__name__)
_ALIASES = {
'glance.openstack.common.rpc.impl_kombu': 'rabbit',
'glance.openstack.common.rpc.impl_qpid': 'qpid',
'glance.openstack.common.rpc.impl_zmq': 'zmq',
}
[docs]def set_defaults(control_exchange='glance'):
oslo_messaging.set_transport_defaults(control_exchange)
[docs]def get_transport():
return oslo_messaging.get_transport(CONF, aliases=_ALIASES)
[docs]class Notifier(object):
"""Uses a notification strategy to send out messages about events."""
def __init__(self):
publisher_id = CONF.default_publisher_id
self._transport = get_transport()
self._notifier = oslo_messaging.Notifier(self._transport,
publisher_id=publisher_id)
[docs] def warn(self, event_type, payload):
self._notifier.warn({}, event_type, payload)
[docs] def info(self, event_type, payload):
self._notifier.info({}, event_type, payload)
[docs] def error(self, event_type, payload):
self._notifier.error({}, event_type, payload)
def _get_notification_group(notification):
return notification.split('.', 1)[0]
def _is_notification_enabled(notification):
disabled_notifications = CONF.disabled_notifications
notification_group = _get_notification_group(notification)
notifications = (notification, notification_group)
for disabled_notification in disabled_notifications:
if disabled_notification in notifications:
return False
return True
def _send_notification(notify, notification_type, payload):
if _is_notification_enabled(notification_type):
notify(notification_type, payload)
def _format_metadef_object_property(name, metadef_property):
return {
'name': name,
'type': metadef_property.type or None,
'title': metadef_property.title or None,
'description': metadef_property.description or None,
'default': metadef_property.default or None,
'minimum': metadef_property.minimum or None,
'maximum': metadef_property.maximum or None,
'enum': metadef_property.enum or None,
'pattern': metadef_property.pattern or None,
'minLength': metadef_property.minLength or None,
'maxLength': metadef_property.maxLength or None,
'confidential': metadef_property.confidential or None,
'items': metadef_property.items or None,
'uniqueItems': metadef_property.uniqueItems or None,
'minItems': metadef_property.minItems or None,
'maxItems': metadef_property.maxItems or None,
'additionalItems': metadef_property.additionalItems or None,
}
[docs]class NotificationBase(object):
[docs] def get_payload(self, obj):
return {}
[docs] def send_notification(self, notification_id, obj, extra_payload=None):
payload = self.get_payload(obj)
if extra_payload is not None:
payload.update(extra_payload)
_send_notification(self.notifier.info, notification_id, payload)
@six.add_metaclass(abc.ABCMeta)
[docs]class NotificationProxy(NotificationBase):
def __init__(self, repo, context, notifier):
self.repo = repo
self.context = context
self.notifier = notifier
super_class = self.get_super_class()
super_class.__init__(self, repo)
@abc.abstractmethod
[docs] def get_super_class(self):
pass
@six.add_metaclass(abc.ABCMeta)
[docs]class NotificationRepoProxy(NotificationBase):
def __init__(self, repo, context, notifier):
self.repo = repo
self.context = context
self.notifier = notifier
proxy_kwargs = {'context': self.context, 'notifier': self.notifier}
proxy_class = self.get_proxy_class()
super_class = self.get_super_class()
super_class.__init__(self, repo, proxy_class, proxy_kwargs)
@abc.abstractmethod
[docs] def get_super_class(self):
pass
@abc.abstractmethod
[docs] def get_proxy_class(self):
pass
@six.add_metaclass(abc.ABCMeta)
[docs]class NotificationFactoryProxy(object):
def __init__(self, factory, context, notifier):
kwargs = {'context': context, 'notifier': notifier}
proxy_class = self.get_proxy_class()
super_class = self.get_super_class()
super_class.__init__(self, factory, proxy_class, kwargs)
@abc.abstractmethod
[docs] def get_super_class(self):
pass
@abc.abstractmethod
[docs] def get_proxy_class(self):
pass
[docs]class ImageProxy(NotificationProxy, domain_proxy.Image):
[docs] def get_super_class(self):
return domain_proxy.Image
[docs] def get_payload(self, obj):
return format_image_notification(obj)
def _format_image_send(self, bytes_sent):
return {
'bytes_sent': bytes_sent,
'image_id': self.repo.image_id,
'owner_id': self.repo.owner,
'receiver_tenant_id': self.context.tenant,
'receiver_user_id': self.context.user,
}
def _get_chunk_data_iterator(self, data, chunk_size=None):
sent = 0
for chunk in data:
yield chunk
sent += len(chunk)
if sent != (chunk_size or self.repo.size):
notify = self.notifier.error
else:
notify = self.notifier.info
try:
_send_notification(notify, 'image.send',
self._format_image_send(sent))
except Exception as err:
msg = (_LE("An error occurred during image.send"
" notification: %(err)s") % {'err': err})
LOG.error(msg)
[docs] def get_data(self, offset=0, chunk_size=None):
# Due to the need of evaluating subsequent proxies, this one
# should return a generator, the call should be done before
# generator creation
data = self.repo.get_data(offset=offset, chunk_size=chunk_size)
return self._get_chunk_data_iterator(data, chunk_size=chunk_size)
[docs] def set_data(self, data, size=None):
self.send_notification('image.prepare', self.repo)
notify_error = self.notifier.error
try:
self.repo.set_data(data, size)
except glance_store.StorageFull as e:
msg = (_("Image storage media is full: %s") %
encodeutils.exception_to_unicode(e))
_send_notification(notify_error, 'image.upload', msg)
raise webob.exc.HTTPRequestEntityTooLarge(explanation=msg)
except glance_store.StorageWriteDenied as e:
msg = (_("Insufficient permissions on image storage media: %s")
% encodeutils.exception_to_unicode(e))
_send_notification(notify_error, 'image.upload', msg)
raise webob.exc.HTTPServiceUnavailable(explanation=msg)
except ValueError as e:
msg = (_("Cannot save data for image %(image_id)s: %(error)s") %
{'image_id': self.repo.image_id,
'error': encodeutils.exception_to_unicode(e)})
_send_notification(notify_error, 'image.upload', msg)
raise webob.exc.HTTPBadRequest(
explanation=encodeutils.exception_to_unicode(e))
except exception.Duplicate as e:
msg = (_("Unable to upload duplicate image data for image"
"%(image_id)s: %(error)s") %
{'image_id': self.repo.image_id,
'error': encodeutils.exception_to_unicode(e)})
_send_notification(notify_error, 'image.upload', msg)
raise webob.exc.HTTPConflict(explanation=msg)
except exception.Forbidden as e:
msg = (_("Not allowed to upload image data for image %(image_id)s:"
" %(error)s")
% {'image_id': self.repo.image_id,
'error': encodeutils.exception_to_unicode(e)})
_send_notification(notify_error, 'image.upload', msg)
raise webob.exc.HTTPForbidden(explanation=msg)
except exception.NotFound as e:
exc_str = encodeutils.exception_to_unicode(e)
msg = (_("Image %(image_id)s could not be found after upload."
" The image may have been deleted during the upload:"
" %(error)s") % {'image_id': self.repo.image_id,
'error': exc_str})
_send_notification(notify_error, 'image.upload', msg)
raise webob.exc.HTTPNotFound(explanation=exc_str)
except webob.exc.HTTPError as e:
with excutils.save_and_reraise_exception():
msg = (_("Failed to upload image data for image %(image_id)s"
" due to HTTP error: %(error)s") %
{'image_id': self.repo.image_id,
'error': encodeutils.exception_to_unicode(e)})
_send_notification(notify_error, 'image.upload', msg)
except Exception as e:
with excutils.save_and_reraise_exception():
msg = (_("Failed to upload image data for image %(image_id)s "
"due to internal error: %(error)s") %
{'image_id': self.repo.image_id,
'error': encodeutils.exception_to_unicode(e)})
_send_notification(notify_error, 'image.upload', msg)
else:
self.send_notification('image.upload', self.repo)
self.send_notification('image.activate', self.repo)
[docs]class ImageMemberProxy(NotificationProxy, domain_proxy.ImageMember):
[docs] def get_super_class(self):
return domain_proxy.ImageMember
[docs]class ImageFactoryProxy(NotificationFactoryProxy, domain_proxy.ImageFactory):
[docs] def get_super_class(self):
return domain_proxy.ImageFactory
[docs] def get_proxy_class(self):
return ImageProxy
[docs]class ImageRepoProxy(NotificationRepoProxy, domain_proxy.Repo):
[docs] def get_super_class(self):
return domain_proxy.Repo
[docs] def get_proxy_class(self):
return ImageProxy
[docs] def get_payload(self, obj):
return format_image_notification(obj)
[docs] def save(self, image, from_state=None):
super(ImageRepoProxy, self).save(image, from_state=from_state)
self.send_notification('image.update', image)
[docs] def add(self, image):
super(ImageRepoProxy, self).add(image)
self.send_notification('image.create', image)
[docs] def remove(self, image):
super(ImageRepoProxy, self).remove(image)
self.send_notification('image.delete', image, extra_payload={
'deleted': True, 'deleted_at': timeutils.isotime()
})
[docs]class ImageMemberRepoProxy(NotificationBase, domain_proxy.MemberRepo):
def __init__(self, repo, image, context, notifier):
self.repo = repo
self.image = image
self.context = context
self.notifier = notifier
proxy_kwargs = {'context': self.context, 'notifier': self.notifier}
proxy_class = self.get_proxy_class()
super_class = self.get_super_class()
super_class.__init__(self, image, repo, proxy_class, proxy_kwargs)
[docs] def get_super_class(self):
return domain_proxy.MemberRepo
[docs] def get_proxy_class(self):
return ImageMemberProxy
[docs] def get_payload(self, obj):
return format_image_member_notification(obj)
[docs] def save(self, member, from_state=None):
super(ImageMemberRepoProxy, self).save(member, from_state=from_state)
self.send_notification('image.member.update', member)
[docs] def add(self, member):
super(ImageMemberRepoProxy, self).add(member)
self.send_notification('image.member.create', member)
[docs] def remove(self, member):
super(ImageMemberRepoProxy, self).remove(member)
self.send_notification('image.member.delete', member, extra_payload={
'deleted': True, 'deleted_at': timeutils.isotime()
})
[docs]class TaskProxy(NotificationProxy, domain_proxy.Task):
[docs] def get_super_class(self):
return domain_proxy.Task
[docs] def get_payload(self, obj):
return format_task_notification(obj)
[docs] def begin_processing(self):
super(TaskProxy, self).begin_processing()
self.send_notification('task.processing', self.repo)
[docs] def succeed(self, result):
super(TaskProxy, self).succeed(result)
self.send_notification('task.success', self.repo)
[docs] def fail(self, message):
super(TaskProxy, self).fail(message)
self.send_notification('task.failure', self.repo)
[docs] def run(self, executor):
super(TaskProxy, self).run(executor)
self.send_notification('task.run', self.repo)
[docs]class TaskFactoryProxy(NotificationFactoryProxy, domain_proxy.TaskFactory):
[docs] def get_super_class(self):
return domain_proxy.TaskFactory
[docs] def get_proxy_class(self):
return TaskProxy
[docs]class TaskRepoProxy(NotificationRepoProxy, domain_proxy.TaskRepo):
[docs] def get_super_class(self):
return domain_proxy.TaskRepo
[docs] def get_proxy_class(self):
return TaskProxy
[docs] def get_payload(self, obj):
return format_task_notification(obj)
[docs] def add(self, task):
result = super(TaskRepoProxy, self).add(task)
self.send_notification('task.create', task)
return result
[docs] def remove(self, task):
result = super(TaskRepoProxy, self).remove(task)
self.send_notification('task.delete', task, extra_payload={
'deleted': True, 'deleted_at': timeutils.isotime()
})
return result
[docs]class TaskStubProxy(NotificationProxy, domain_proxy.TaskStub):
[docs] def get_super_class(self):
return domain_proxy.TaskStub
[docs]class TaskStubRepoProxy(NotificationRepoProxy, domain_proxy.TaskStubRepo):
[docs] def get_super_class(self):
return domain_proxy.TaskStubRepo
[docs] def get_proxy_class(self):
return TaskStubProxy