Source code for heat.common.wsgi

#
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Utility methods for working with WSGI servers."""

import abc
import errno
import os
import signal
import sys
import time

import eventlet
from eventlet.green import socket
from eventlet.green import ssl
import eventlet.greenio
import eventlet.wsgi
import functools
from oslo_concurrency import processutils
from oslo_config import cfg
import oslo_i18n as i18n
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
from oslo_utils import importutils
from paste.deploy import loadwsgi
from routes import middleware
import webob.dec
import webob.exc

from heat.api.aws import exception as aws_exception
from heat.common import exception
from heat.common.i18n import _
from heat.common import serializers


LOG = logging.getLogger(__name__)
URL_LENGTH_LIMIT = 50000

api_opts = [
    cfg.IPOpt('bind_host', default='0.0.0.0',
              help=_('Address to bind the server. Useful when '
                     'selecting a particular network interface.'),
              deprecated_group='DEFAULT'),
    cfg.PortOpt('bind_port', default=8004,
                help=_('The port on which the server will listen.'),
                deprecated_group='DEFAULT'),
    cfg.IntOpt('backlog', default=4096,
               help=_("Number of backlog requests "
                      "to configure the socket with."),
               deprecated_group='DEFAULT'),
    cfg.StrOpt('cert_file',
               help=_("Location of the SSL certificate file "
                      "to use for SSL mode."),
               deprecated_group='DEFAULT'),
    cfg.StrOpt('key_file',
               help=_("Location of the SSL key file to use "
                      "for enabling SSL mode."),
               deprecated_group='DEFAULT'),
    cfg.IntOpt('workers', min=0, default=0,
               help=_("Number of workers for Heat service. "
                      "Default value 0 means, that service will start number "
                      "of workers equal number of cores on server."),
               deprecated_group='DEFAULT'),
    cfg.IntOpt('max_header_line', default=16384,
               help=_('Maximum line size of message headers to be accepted. '
                      'max_header_line may need to be increased when using '
                      'large tokens (typically those generated by the '
                      'Keystone v3 API with big service catalogs).')),
    cfg.IntOpt('tcp_keepidle', default=600,
               help=_('The value for the socket option TCP_KEEPIDLE.  This is '
                      'the time in seconds that the connection must be idle '
                      'before TCP starts sending keepalive probes.')),
]
api_group = cfg.OptGroup('heat_api')
cfg.CONF.register_group(api_group)
cfg.CONF.register_opts(api_opts,
                       group=api_group)

api_cfn_opts = [
    cfg.IPOpt('bind_host', default='0.0.0.0',
              help=_('Address to bind the server. Useful when '
                     'selecting a particular network interface.'),
              deprecated_group='DEFAULT'),
    cfg.PortOpt('bind_port', default=8000,
                help=_('The port on which the server will listen.'),
                deprecated_group='DEFAULT'),
    cfg.IntOpt('backlog', default=4096,
               help=_("Number of backlog requests "
                      "to configure the socket with."),
               deprecated_group='DEFAULT'),
    cfg.StrOpt('cert_file',
               help=_("Location of the SSL certificate file "
                      "to use for SSL mode."),
               deprecated_group='DEFAULT'),
    cfg.StrOpt('key_file',
               help=_("Location of the SSL key file to use "
                      "for enabling SSL mode."),
               deprecated_group='DEFAULT'),
    cfg.IntOpt('workers', min=0, default=1,
               help=_("Number of workers for Heat service."),
               deprecated_group='DEFAULT'),
    cfg.IntOpt('max_header_line', default=16384,
               help=_('Maximum line size of message headers to be accepted. '
                      'max_header_line may need to be increased when using '
                      'large tokens (typically those generated by the '
                      'Keystone v3 API with big service catalogs).')),
    cfg.IntOpt('tcp_keepidle', default=600,
               help=_('The value for the socket option TCP_KEEPIDLE.  This is '
                      'the time in seconds that the connection must be idle '
                      'before TCP starts sending keepalive probes.')),
]
api_cfn_group = cfg.OptGroup('heat_api_cfn')
cfg.CONF.register_group(api_cfn_group)
cfg.CONF.register_opts(api_cfn_opts,
                       group=api_cfn_group)

api_cw_opts = [
    cfg.IPOpt('bind_host', default='0.0.0.0',
              help=_('Address to bind the server. Useful when '
                     'selecting a particular network interface.'),
              deprecated_group='DEFAULT',
              deprecated_for_removal=True,
              deprecated_reason='Heat CloudWatch API has been removed.',
              deprecated_since='10.0.0'),
    cfg.PortOpt('bind_port', default=8003,
                help=_('The port on which the server will listen.'),
                deprecated_group='DEFAULT',
                deprecated_for_removal=True,
                deprecated_reason='Heat CloudWatch API has been removed.',
                deprecated_since='10.0.0'),
    cfg.IntOpt('backlog', default=4096,
               help=_("Number of backlog requests "
                      "to configure the socket with."),
               deprecated_group='DEFAULT',
               deprecated_for_removal=True,
               deprecated_reason='Heat CloudWatch API has been removed.',
               deprecated_since='10.0.0'),
    cfg.StrOpt('cert_file',
               help=_("Location of the SSL certificate file "
                      "to use for SSL mode."),
               deprecated_group='DEFAULT',
               deprecated_for_removal=True,
               deprecated_reason='Heat CloudWatch API has been Removed.',
               deprecated_since='10.0.0'),
    cfg.StrOpt('key_file',
               help=_("Location of the SSL key file to use "
                      "for enabling SSL mode."),
               deprecated_group='DEFAULT',
               deprecated_for_removal=True,
               deprecated_reason='Heat CloudWatch API has been Removed.',
               deprecated_since='10.0.0'),
    cfg.IntOpt('workers', min=0, default=1,
               help=_("Number of workers for Heat service."),
               deprecated_group='DEFAULT',
               deprecated_for_removal=True,
               deprecated_reason='Heat CloudWatch API has been Removed.',
               deprecated_since='10.0.0'),
    cfg.IntOpt('max_header_line', default=16384,
               help=_('Maximum line size of message headers to be accepted. '
                      'max_header_line may need to be increased when using '
                      'large tokens (typically those generated by the '
                      'Keystone v3 API with big service catalogs.)'),
               deprecated_for_removal=True,
               deprecated_reason='Heat CloudWatch API has been Removed.',
               deprecated_since='10.0.0'),
    cfg.IntOpt('tcp_keepidle', default=600,
               help=_('The value for the socket option TCP_KEEPIDLE.  This is '
                      'the time in seconds that the connection must be idle '
                      'before TCP starts sending keepalive probes.'),
               deprecated_for_removal=True,
               deprecated_reason='Heat CloudWatch API has been Removed.',
               deprecated_since='10.0.0')
]
api_cw_group = cfg.OptGroup('heat_api_cloudwatch')
cfg.CONF.register_group(api_cw_group)
cfg.CONF.register_opts(api_cw_opts,
                       group=api_cw_group)

wsgi_elt_opts = [
    cfg.BoolOpt('wsgi_keep_alive',
                default=True,
                help=_("If False, closes the client socket connection "
                       "explicitly.")),
    cfg.IntOpt('client_socket_timeout', default=900,
               help=_("Timeout for client connections' socket operations. "
                      "If an incoming connection is idle for this number of "
                      "seconds it will be closed. A value of '0' means "
                      "wait forever.")),
]
wsgi_elt_group = cfg.OptGroup('eventlet_opts')
cfg.CONF.register_group(wsgi_elt_group)
cfg.CONF.register_opts(wsgi_elt_opts,
                       group=wsgi_elt_group)
json_size_opt = cfg.IntOpt('max_json_body_size',
                           default=1048576,
                           help=_('Maximum raw byte size of JSON request body.'
                                  ' Should be larger than max_template_size.'))
cfg.CONF.register_opt(json_size_opt)


[docs]def list_opts(): yield None, [json_size_opt] yield 'heat_api', api_opts yield 'heat_api_cfn', api_cfn_opts yield 'heat_api_cloudwatch', api_cw_opts yield 'eventlet_opts', wsgi_elt_opts
[docs]def get_bind_addr(conf, default_port=None): """Return the host and port to bind to.""" return (conf.bind_host, conf.bind_port or default_port)
[docs]def get_socket(conf, default_port): """Bind socket to bind ip:port in conf. Note: Mostly comes from Swift with a few small changes... :param conf: a cfg.ConfigOpts object :param default_port: port to bind to if none is specified in conf :returns: a socket object as returned from socket.listen or ssl.wrap_socket if conf specifies cert_file """ bind_addr = get_bind_addr(conf, default_port) # TODO(jaypipes): eventlet's greened socket module does not actually # support IPv6 in getaddrinfo(). We need to get around this in the # future or monitor upstream for a fix address_family = [addr[0] for addr in socket.getaddrinfo(bind_addr[0], bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM) if addr[0] in (socket.AF_INET, socket.AF_INET6)][0] cert_file = conf.cert_file key_file = conf.key_file use_ssl = cert_file or key_file if use_ssl and (not cert_file or not key_file): raise RuntimeError(_("When running server in SSL mode, you must " "specify both a cert_file and key_file " "option value in your configuration file")) sock = None retry_until = time.time() + 30 while not sock and time.time() < retry_until: try: sock = eventlet.listen(bind_addr, backlog=conf.backlog, family=address_family) except socket.error as err: if err.errno != errno.EADDRINUSE: raise eventlet.sleep(0.1) if not sock: raise RuntimeError(_("Could not bind to %(bind_addr)s " "after trying for 30 seconds") % {'bind_addr': bind_addr}) return sock
[docs]class Server(object): """Server class to manage multiple WSGI sockets and applications.""" def __init__(self, name, conf, threads=1000): os.umask(0o27) # ensure files are created with the correct privileges self._logger = logging.getLogger("eventlet.wsgi.server") self.name = name self.threads = threads self.children = set() self.stale_children = set() self.running = True self.pgid = os.getpid() self.conf = conf try: os.setpgid(self.pgid, self.pgid) except OSError: self.pgid = 0
[docs] def kill_children(self, *args): """Kills the entire process group.""" LOG.error('SIGTERM received') signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) self.running = False os.killpg(0, signal.SIGTERM)
[docs] def hup(self, *args): """Reloads configuration files with zero down time.""" LOG.error('SIGHUP received') signal.signal(signal.SIGHUP, signal.SIG_IGN) raise exception.SIGHUPInterrupt
[docs] def start(self, application, default_port): """Run a WSGI server with the given application. :param application: The application to run in the WSGI server :param default_port: Port to bind to if none is specified in conf """ eventlet.wsgi.MAX_HEADER_LINE = self.conf.max_header_line self.application = application self.default_port = default_port self.configure_socket() self.start_wsgi()
[docs] def start_wsgi(self): workers = self.conf.workers # childs == num of cores if workers == 0: childs_num = processutils.get_worker_count() # launch only one GreenPool without childs elif workers == 1: # Useful for profiling, test, debug etc. self.pool = eventlet.GreenPool(size=self.threads) self.pool.spawn_n(self._single_run, self.application, self.sock) return # childs equal specified value of workers else: childs_num = workers LOG.info("Starting %d workers", workers) signal.signal(signal.SIGTERM, self.kill_children) signal.signal(signal.SIGINT, self.kill_children) signal.signal(signal.SIGHUP, self.hup) rfd, self.writepipe = os.pipe() self.readpipe = eventlet.greenio.GreenPipe(rfd, 'r') while len(self.children) < childs_num: self.run_child()
[docs] def wait_on_children(self): while self.running: try: pid, status = os.wait() if os.WIFEXITED(status) or os.WIFSIGNALED(status): self._remove_children(pid) self._verify_and_respawn_children(pid, status) except OSError as err: if err.errno not in (errno.EINTR, errno.ECHILD): raise except KeyboardInterrupt: LOG.info('Caught keyboard interrupt. Exiting.') os.killpg(0, signal.SIGTERM) break except exception.SIGHUPInterrupt: self.reload() continue eventlet.greenio.shutdown_safe(self.sock) self.sock.close() LOG.debug('Exited')
[docs] def configure_socket(self, old_conf=None, has_changed=None): """Ensure a socket exists and is appropriately configured. This function is called on start up, and can also be called in the event of a configuration reload. When called for the first time a new socket is created. If reloading and either bind_host or bind port have been changed the existing socket must be closed and a new socket opened (laws of physics). In all other cases (bind_host/bind_port have not changed) the existing socket is reused. :param old_conf: Cached old configuration settings (if any) :param has changed: callable to determine if a parameter has changed """ # Do we need a fresh socket? new_sock = (old_conf is None or ( has_changed('bind_host') or has_changed('bind_port'))) # Will we be using https? use_ssl = not (not self.conf.cert_file or not self.conf.key_file) # Were we using https before? old_use_ssl = (old_conf is not None and not ( not old_conf.get('key_file') or not old_conf.get('cert_file'))) # Do we now need to perform an SSL wrap on the socket? wrap_sock = use_ssl is True and (old_use_ssl is False or new_sock) # Do we now need to perform an SSL unwrap on the socket? unwrap_sock = use_ssl is False and old_use_ssl is True if new_sock: self._sock = None if old_conf is not None: self.sock.close() _sock = get_socket(self.conf, self.default_port) _sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # sockets can hang around forever without keepalive _sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) self._sock = _sock if wrap_sock: self.sock = ssl.wrap_socket(self._sock, certfile=self.conf.cert_file, keyfile=self.conf.key_file) if unwrap_sock: self.sock = self._sock if new_sock and not use_ssl: self.sock = self._sock # Pick up newly deployed certs if old_conf is not None and use_ssl is True and old_use_ssl is True: if has_changed('cert_file'): self.sock.certfile = self.conf.cert_file if has_changed('key_file'): self.sock.keyfile = self.conf.key_file if new_sock or (old_conf is not None and has_changed('tcp_keepidle')): # This option isn't available in the OS X version of eventlet if hasattr(socket, 'TCP_KEEPIDLE'): self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, self.conf.tcp_keepidle) if old_conf is not None and has_changed('backlog'): self.sock.listen(self.conf.backlog)
def _remove_children(self, pid): if pid in self.children: self.children.remove(pid) LOG.info('Removed dead child %s', pid) elif pid in self.stale_children: self.stale_children.remove(pid) LOG.info('Removed stale child %s', pid) else: LOG.warning('Unrecognised child %s', pid) def _verify_and_respawn_children(self, pid, status): if len(self.stale_children) == 0: LOG.debug('No stale children') if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0: LOG.error('Not respawning child %d, cannot ' 'recover from termination', pid) if not self.children and not self.stale_children: LOG.info( 'All workers have terminated. Exiting') self.running = False else: if len(self.children) < self.conf.workers: self.run_child()
[docs] def stash_conf_values(self): """Make a copy of some of the current global CONF's settings. Allows determining if any of these values have changed when the config is reloaded. """ conf = {} conf['bind_host'] = self.conf.bind_host conf['bind_port'] = self.conf.bind_port conf['backlog'] = self.conf.backlog conf['key_file'] = self.conf.key_file conf['cert_file'] = self.conf.cert_file return conf
[docs] def reload(self): """Reload and re-apply configuration settings. Existing child processes are sent a SIGHUP signal and will exit after completing existing requests. New child processes, which will have the updated configuration, are spawned. This allows preventing interruption to the service. """ def _has_changed(old, new, param): old = old.get(param) new = getattr(new, param) return (new != old) old_conf = self.stash_conf_values() has_changed = functools.partial(_has_changed, old_conf, self.conf) cfg.CONF.reload_config_files() os.killpg(self.pgid, signal.SIGHUP) self.stale_children = self.children self.children = set() # Ensure any logging config changes are picked up logging.setup(cfg.CONF, self.name) self.configure_socket(old_conf, has_changed) self.start_wsgi()
[docs] def wait(self): """Wait until all servers have completed running.""" try: if self.children: self.wait_on_children() else: self.pool.waitall() except KeyboardInterrupt: pass
[docs] def run_child(self): def child_hup(*args): """Shuts down child processes, existing requests are handled.""" signal.signal(signal.SIGHUP, signal.SIG_IGN) eventlet.wsgi.is_accepting = False self.sock.close() pid = os.fork() if pid == 0: signal.signal(signal.SIGHUP, child_hup) signal.signal(signal.SIGTERM, signal.SIG_DFL) # ignore the interrupt signal to avoid a race whereby # a child worker receives the signal before the parent # and is respawned unnecessarily as a result signal.signal(signal.SIGINT, signal.SIG_IGN) # The child has no need to stash the unwrapped # socket, and the reference prevents a clean # exit on sighup self._sock = None self.run_server() LOG.info('Child %d exiting normally', os.getpid()) # self.pool.waitall() is now called in wsgi's server so # it's safe to exit here sys.exit(0) else: LOG.info('Started child %s', pid) self.children.add(pid)
def _pipe_watcher(self): def _on_timeout_exit(*args): LOG.info('Graceful shutdown timeout exceeded, ' 'instantaneous exiting') os._exit(1) # This will block until the write end is closed when the parent # dies unexpectedly self.readpipe.read(1) LOG.info('Parent process has died unexpectedly, exiting') # allow up to 1 second for sys.exit to gracefully shutdown signal.signal(signal.SIGALRM, _on_timeout_exit) signal.alarm(1) # do the same as child_hup eventlet.wsgi.is_accepting = False self.sock.close() sys.exit(1)
[docs] def run_server(self): """Run a WSGI server.""" eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0" eventlet.hubs.use_hub('poll') eventlet.patcher.monkey_patch(all=False, socket=True) self.pool = eventlet.GreenPool(size=self.threads) socket_timeout = cfg.CONF.eventlet_opts.client_socket_timeout or None # Close write to ensure only parent has it open os.close(self.writepipe) # Create greenthread to watch for parent to close pipe eventlet.spawn_n(self._pipe_watcher) try: eventlet.wsgi.server( self.sock, self.application, custom_pool=self.pool, url_length_limit=URL_LENGTH_LIMIT, log=self._logger, debug=cfg.CONF.debug, keepalive=cfg.CONF.eventlet_opts.wsgi_keep_alive, socket_timeout=socket_timeout) except socket.error as err: if err.errno != errno.EINVAL: raise self.pool.waitall()
def _single_run(self, application, sock): """Start a WSGI server in a new green thread.""" LOG.info("Starting single process server") eventlet.wsgi.server(sock, application, custom_pool=self.pool, url_length_limit=URL_LENGTH_LIMIT, log=self._logger, debug=cfg.CONF.debug)
[docs]class Middleware(object): """Base WSGI middleware wrapper. These classes require an application to be initialized that will be called next. By default the middleware will simply call its wrapped app, or you can override __call__ to customize its behavior. """ def __init__(self, application): self.application = application
[docs] def process_request(self, req): """Called on each request. If this returns None, the next application down the stack will be executed. If it returns a response then that response will be returned and execution will stop here. """ return None
[docs] def process_response(self, response): """Do whatever you'd like to the response.""" return response
@webob.dec.wsgify def __call__(self, req): response = self.process_request(req) if response: return response response = req.get_response(self.application) return self.process_response(response)
[docs]class Debug(Middleware): """Helper class to get information about the request and response. Helper class that can be inserted into any WSGI application chain to get information about the request and response. """ @webob.dec.wsgify def __call__(self, req): print(("*" * 40) + " REQUEST ENVIRON") for key, value in req.environ.items(): print(key, "=", value) print('') resp = req.get_response(self.application) print(("*" * 40) + " RESPONSE HEADERS") for (key, value) in resp.headers.items(): print(key, "=", value) print('') resp.app_iter = self.print_generator(resp.app_iter) return resp
[docs] @staticmethod def print_generator(app_iter): """Prints the contents of a wrapper string iterator when iterated.""" print(("*" * 40) + " BODY") for part in app_iter: sys.stdout.write(part) sys.stdout.flush() yield part print('')
[docs]def debug_filter(app, conf, **local_conf): return Debug(app)
[docs]class DefaultMethodController(object): """Controller that handles the OPTIONS request method. This controller handles the OPTIONS request method and any of the HTTP methods that are not explicitly implemented by the application. """
[docs] def options(self, req, allowed_methods, *args, **kwargs): """Return a response that includes the 'Allow' header. Return a response that includes the 'Allow' header listing the methods that are implemented. A 204 status code is used for this response. """ raise webob.exc.HTTPNoContent(headers=[('Allow', allowed_methods)])
[docs] def reject(self, req, allowed_methods, *args, **kwargs): """Return a 405 method not allowed error. As a convenience, the 'Allow' header with the list of implemented methods is included in the response as well. """ raise webob.exc.HTTPMethodNotAllowed( headers=[('Allow', allowed_methods)])
[docs]class Router(object): """WSGI middleware that maps incoming requests to WSGI apps.""" def __init__(self, mapper): """Create a router for the given routes.Mapper. Each route in `mapper` must specify a 'controller', which is a WSGI app to call. You'll probably want to specify an 'action' as well and have your controller be a wsgi.Controller, who will route the request to the action method. Examples: mapper = routes.Mapper() sc = ServerController() # Explicit mapping of one route to a controller+action mapper.connect(None, "/svrlist", controller=sc, action="list") # Actions are all implicitly defined mapper.resource("server", "servers", controller=sc) # Pointing to an arbitrary WSGI app. You can specify the # {path_info:.*} parameter so the target app can be handed just that # section of the URL. mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp()) """ self.map = mapper self._router = middleware.RoutesMiddleware(self._dispatch, self.map) @webob.dec.wsgify def __call__(self, req): """Route the incoming request to a controller based on self.map. If no match, return a 404. """ return self._router @staticmethod @webob.dec.wsgify def _dispatch(req): """Returns controller after matching the incoming request to a route. Called by self._router after matching the incoming request to a route and putting the information into req.environ. Either returns 404 or the routed WSGI app's response. """ match = req.environ['wsgiorg.routing_args'][1] if not match: return webob.exc.HTTPNotFound() app = match['controller'] return app
[docs]class Request(webob.Request): """Add some OpenStack API-specific logic to the base webob.Request."""
[docs] def best_match_content_type(self): """Determine the requested response content-type.""" supported = ('application/json',) bm = self.accept.best_match(supported) return bm or 'application/json'
[docs] def get_content_type(self, allowed_content_types): """Determine content type of the request body.""" if "Content-Type" not in self.headers: raise exception.InvalidContentType(content_type=None) content_type = self.content_type if content_type not in allowed_content_types: raise exception.InvalidContentType(content_type=content_type) else: return content_type
[docs] def best_match_language(self): """Determines best available locale from the Accept-Language header. :returns: the best language match or None if the 'Accept-Language' header was not available in the request. """ if not self.accept_language: return None all_languages = i18n.get_available_languages('heat') return self.accept_language.best_match(all_languages)
[docs]def is_json_content_type(request): if request.method == 'GET': try: aws_content_type = request.params.get("ContentType") except Exception: aws_content_type = None # respect aws_content_type when both available content_type = aws_content_type or request.content_type else: content_type = request.content_type # bug #1887882 # for back compatible for null or plain content type if not content_type or content_type.startswith('text/plain'): content_type = 'application/json' if (content_type in ('JSON', 'application/json') and request.body.startswith(b'{')): return True return False
[docs]class JSONRequestDeserializer(object):
[docs] def has_body(self, request): """Returns whether a Webob.Request object will possess an entity body. :param request: Webob.Request object """ if (int(request.content_length or 0) > 0 and is_json_content_type(request)): return True return False
[docs] def from_json(self, datastring): try: if len(datastring) > cfg.CONF.max_json_body_size: msg = _('JSON body size (%(len)s bytes) exceeds maximum ' 'allowed size (%(limit)s bytes).' ) % {'len': len(datastring), 'limit': cfg.CONF.max_json_body_size} raise exception.RequestLimitExceeded(message=msg) return jsonutils.loads(datastring) except ValueError as ex: raise webob.exc.HTTPBadRequest(str(ex))
[docs] def default(self, request): if self.has_body(request): return {'body': self.from_json(request.body)} else: return {}
[docs]class Resource(object): """WSGI app that handles (de)serialization and controller dispatch. Reads routing information supplied by RoutesMiddleware and calls the requested action method upon its deserializer, controller, and serializer. Those three objects may implement any of the basic controller action methods (create, update, show, index, delete) along with any that may be specified in the api router. A 'default' method may also be implemented to be used in place of any non-implemented actions. Deserializer methods must accept a request argument and return a dictionary. Controller methods must accept a request argument. Additionally, they must also accept keyword arguments that represent the keys returned by the Deserializer. They may raise a webob.exc exception or return a dict, which will be serialized by requested content type. """ def __init__(self, controller, deserializer, serializer=None): """Initialisation of the WSGI app. :param controller: object that implement methods created by routes lib :param deserializer: object that supports webob request deserialization through controller-like actions :param serializer: object that supports webob response serialization through controller-like actions """ self.controller = controller self.deserializer = deserializer self.serializer = serializer @webob.dec.wsgify(RequestClass=Request) def __call__(self, request): """WSGI method that controls (de)serialization and method dispatch.""" action_args = self.get_action_args(request.environ) action = action_args.pop('action', None) # From reading the boto code, and observation of real AWS api responses # it seems that the AWS api ignores the content-type in the html header # Instead it looks at a "ContentType" GET query parameter # This doesn't seem to be documented in the AWS cfn API spec, but it # would appear that the default response serialization is XML, as # described in the API docs, but passing a query parameter of # ContentType=JSON results in a JSON serialized response... content_type = request.params.get("ContentType") LOG.info("Processing request: %(method)s %(path)s", {'method': request.method, 'path': request.path}) try: deserialized_request = self.dispatch(self.deserializer, action, request) action_args.update(deserialized_request) LOG.debug(('Calling %(controller)s.%(action)s'), {'controller': type(self.controller).__name__, 'action': action}) action_result = self.dispatch(self.controller, action, request, **action_args) except TypeError as err: LOG.error('Exception handling resource: %s', err) msg = _('The server could not comply with the request since ' 'it is either malformed or otherwise incorrect.') err = webob.exc.HTTPBadRequest(msg) http_exc = translate_exception(err, request.best_match_language()) # NOTE(luisg): We disguise HTTP exceptions, otherwise they will be # treated by wsgi as responses ready to be sent back and they # won't make it into the pipeline app that serializes errors raise exception.HTTPExceptionDisguise(http_exc) except webob.exc.HTTPException as err: if isinstance(err, aws_exception.HeatAPIException): # The AWS compatible API's don't use faultwrap, so # we want to detect the HeatAPIException subclasses # and raise rather than wrapping in HTTPExceptionDisguise raise if not isinstance(err, webob.exc.HTTPError): # Some HTTPException are actually not errors, they are # responses ready to be sent back to the users, so we don't # error log, disguise or translate those raise if isinstance(err, webob.exc.HTTPServerError): LOG.error( "Returning %(code)s to user: %(explanation)s", {'code': err.code, 'explanation': err.explanation}) http_exc = translate_exception(err, request.best_match_language()) raise exception.HTTPExceptionDisguise(http_exc) except exception.HeatException as err: raise translate_exception(err, request.best_match_language()) except Exception as err: log_exception(err, sys.exc_info()) raise translate_exception(err, request.best_match_language()) # Here we support either passing in a serializer or detecting it # based on the content type. try: serializer = self.serializer if serializer is None: if content_type == "JSON": serializer = serializers.JSONResponseSerializer() else: serializer = serializers.XMLResponseSerializer() response = webob.Response(request=request) self.dispatch(serializer, action, response, action_result) return response # return unserializable result (typically an exception) except Exception: # Here we should get API exceptions derived from HeatAPIException # these implement get_unserialized_body(), which allow us to get # a dict containing the unserialized error response. # We only need to serialize for JSON content_type, as the # exception body is pre-serialized to the default XML in the # HeatAPIException constructor # If we get something else here (e.g a webob.exc exception), # this will fail, and we just return it without serializing, # which will not conform to the expected AWS error response format if content_type == "JSON": try: err_body = action_result.get_unserialized_body() serializer.default(action_result, err_body) except Exception: LOG.warning("Unable to serialize exception response") return action_result
[docs] def dispatch(self, obj, action, *args, **kwargs): """Find action-specific method on self and call it.""" try: method = getattr(obj, action) except AttributeError: method = getattr(obj, 'default') return method(*args, **kwargs)
[docs] def get_action_args(self, request_environment): """Parse dictionary created by routes library.""" try: args = request_environment['wsgiorg.routing_args'][1].copy() except Exception: return {} try: del args['controller'] except KeyError: pass try: del args['format'] except KeyError: pass return args
[docs]def log_exception(err, exc_info): args = {'exc_info': exc_info} if cfg.CONF.debug else {} LOG.error("Unexpected error occurred serving API: %s", err, **args)
[docs]def translate_exception(exc, locale): """Translates all translatable elements of the given exception.""" if isinstance(exc, exception.HeatException): exc.message = i18n.translate(exc.message, locale) else: err_msg = encodeutils.exception_to_unicode(exc) exc.message = i18n.translate(err_msg, locale) if isinstance(exc, webob.exc.HTTPError): exc.explanation = i18n.translate(exc.explanation, locale) exc.detail = i18n.translate(getattr(exc, 'detail', ''), locale) return exc
[docs]class BasePasteFactory(object, metaclass=abc.ABCMeta): """A base class for paste app and filter factories. Sub-classes must override the KEY class attribute and provide a __call__ method. """ KEY = None def __init__(self, conf): self.conf = conf @abc.abstractmethod def __call__(self, global_conf, **local_conf): return def _import_factory(self, local_conf): """Import an app/filter class. Lookup the KEY from the PasteDeploy local conf and import the class named there. This class can then be used as an app or filter factory. Note we support the <module>:<class> format. Note also that if you do e.g. key = value then ConfigParser returns a value with a leading newline, so we strip() the value before using it. """ class_name = local_conf[self.KEY].replace(':', '.').strip() return importutils.import_class(class_name)
[docs]class AppFactory(BasePasteFactory): """A Generic paste.deploy app factory. This requires heat.app_factory to be set to a callable which returns a WSGI app when invoked. The format of the name is <module>:<callable> e.g. [app:apiv1app] paste.app_factory = heat.common.wsgi:app_factory heat.app_factory = heat.api.cfn.v1:API The WSGI app constructor must accept a ConfigOpts object and a local config dict as its two arguments. """ KEY = 'heat.app_factory' def __call__(self, global_conf, **local_conf): """The actual paste.app_factory protocol method.""" factory = self._import_factory(local_conf) return factory(self.conf, **local_conf)
[docs]class FilterFactory(AppFactory): """A Generic paste.deploy filter factory. This requires heat.filter_factory to be set to a callable which returns a WSGI filter when invoked. The format is <module>:<callable> e.g. [filter:cache] paste.filter_factory = heat.common.wsgi:filter_factory heat.filter_factory = heat.api.middleware.cache:CacheFilter The WSGI filter constructor must accept a WSGI app, a ConfigOpts object and a local config dict as its three arguments. """ KEY = 'heat.filter_factory' def __call__(self, global_conf, **local_conf): """The actual paste.filter_factory protocol method.""" factory = self._import_factory(local_conf) def filter(app): return factory(app, self.conf, **local_conf) return filter
[docs]def setup_paste_factories(conf): """Set up the generic paste app and filter factories. Set things up so that: paste.app_factory = heat.common.wsgi:app_factory and paste.filter_factory = heat.common.wsgi:filter_factory work correctly while loading PasteDeploy configuration. The app factories are constructed at runtime to allow us to pass a ConfigOpts object to the WSGI classes. :param conf: a ConfigOpts object """ global app_factory, filter_factory app_factory = AppFactory(conf) filter_factory = FilterFactory(conf)
[docs]def teardown_paste_factories(): """Reverse the effect of setup_paste_factories().""" global app_factory, filter_factory del app_factory del filter_factory
[docs]def paste_deploy_app(paste_config_file, app_name, conf): """Load a WSGI app from a PasteDeploy configuration. Use deploy.loadapp() to load the app from the PasteDeploy configuration, ensuring that the supplied ConfigOpts object is passed to the app and filter constructors. :param paste_config_file: a PasteDeploy config file :param app_name: the name of the app/pipeline to load from the file :param conf: a ConfigOpts object to supply to the app and its filters :returns: the WSGI app """ setup_paste_factories(conf) try: return loadwsgi.loadapp("config:%s" % paste_config_file, name=app_name) finally: teardown_paste_factories()