# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of JSON RPC for communication between API and conductors.
This module implementa a subset of JSON RPC 2.0 as defined in
https://www.jsonrpc.org/specification. Main differences:
* No support for batched requests.
* No support for positional arguments passing.
* No JSON RPC 1.0 fallback.
"""
import json
import logging
try:
from keystonemiddleware import auth_token
except ImportError:
auth_token = None
from oslo_config import cfg
try:
import oslo_messaging
except ImportError:
oslo_messaging = None
from oslo_service import service
from oslo_service import wsgi
from oslo_utils import strutils
import webob
from ironic_lib import auth_basic
from ironic_lib.common.i18n import _
from ironic_lib import exception
from ironic_lib import json_rpc
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
_DENY_LIST = {'init_host', 'del_host', 'target', 'iter_nodes'}
def _build_method_map(manager):
"""Build mapping from method names to their bodies.
:param manager: A conductor manager.
:return: dict with mapping
"""
result = {}
for method in dir(manager):
if method.startswith('_') or method in _DENY_LIST:
continue
func = getattr(manager, method)
if not callable(func):
continue
LOG.debug('Adding RPC method %s', method)
result[method] = func
return result
[docs]class JsonRpcError(exception.IronicException):
pass
[docs]class ParseError(JsonRpcError):
code = -32700
_msg_fmt = _("Invalid JSON received by RPC server")
[docs]class InvalidRequest(JsonRpcError):
code = -32600
_msg_fmt = _("Invalid request object received by RPC server")
[docs]class MethodNotFound(JsonRpcError):
code = -32601
_msg_fmt = _("Method %(name)s was not found")
[docs]class InvalidParams(JsonRpcError):
code = -32602
_msg_fmt = _("Params %(params)s are invalid for %(method)s: %(error)s")
[docs]class EmptyContext:
request_id = None
def __init__(self, src):
self.__dict__.update(src)
[docs] def to_dict(self):
return self.__dict__.copy()
[docs]class WSGIService(service.Service):
"""Provides ability to launch JSON RPC as a WSGI application."""
def __init__(self, manager, serializer, context_class=EmptyContext):
"""Create a JSON RPC service.
:param manager: Object from which to expose methods.
:param serializer: A serializer that supports calls serialize_entity
and deserialize_entity.
:param context_class: A context class - a callable accepting a dict
received from network.
"""
self.manager = manager
self.serializer = serializer
self.context_class = context_class
self._method_map = _build_method_map(manager)
auth_strategy = json_rpc.auth_strategy()
if auth_strategy == 'keystone':
conf = dict(CONF.keystone_authtoken)
if auth_token is None:
raise exception.ConfigInvalid(
_("keystonemiddleware is required for keystone "
"authentication"))
app = auth_token.AuthProtocol(self._application, conf)
elif auth_strategy == 'http_basic':
app = auth_basic.BasicAuthMiddleware(
self._application,
cfg.CONF.json_rpc.http_basic_auth_user_file)
else:
app = self._application
self.server = wsgi.Server(CONF, 'ironic-json-rpc', app,
host=CONF.json_rpc.host_ip,
port=CONF.json_rpc.port,
use_ssl=CONF.json_rpc.use_ssl)
def _application(self, environment, start_response):
"""WSGI application for conductor JSON RPC."""
request = webob.Request(environment)
if request.method != 'POST':
body = {'error': {'code': 405,
'message': _('Only POST method can be used')}}
return webob.Response(status_code=405, json_body=body)(
environment, start_response)
if json_rpc.auth_strategy() == 'keystone':
roles = (request.headers.get('X-Roles') or '').split(',')
allowed_roles = CONF.json_rpc.allowed_roles
if set(roles).isdisjoint(allowed_roles):
LOG.debug('Roles %s do not contain any of %s, rejecting '
'request', roles, allowed_roles)
body = {'error': {'code': 403, 'message': _('Forbidden')}}
return webob.Response(status_code=403, json_body=body)(
environment, start_response)
result = self._call(request)
if result is not None:
response = webob.Response(content_type='application/json',
charset='UTF-8',
json_body=result)
else:
response = webob.Response(status_code=204)
return response(environment, start_response)
def _handle_error(self, exc, request_id=None):
"""Generate a JSON RPC 2.0 error body.
:param exc: Exception object.
:param request_id: ID of the request (if any).
:return: dict with response body
"""
if (oslo_messaging is not None
and isinstance(exc, oslo_messaging.ExpectedException)):
exc = exc.exc_info[1]
expected = isinstance(exc, exception.IronicException)
cls = exc.__class__
if expected:
LOG.debug('RPC error %s: %s', cls.__name__, exc)
else:
LOG.exception('Unexpected RPC exception %s', cls.__name__)
response = {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": getattr(exc, 'code', 500),
"message": str(exc),
}
}
if expected and not isinstance(exc, JsonRpcError):
# Allow de-serializing the correct class for expected errors.
response['error']['data'] = {
'class': '%s.%s' % (cls.__module__, cls.__name__)
}
return response
def _call(self, request):
"""Process a JSON RPC request.
:param request: ``webob.Request`` object.
:return: dict with response body.
"""
request_id = None
try:
try:
body = json.loads(request.text)
except ValueError:
LOG.error('Cannot parse JSON RPC request as JSON')
raise ParseError()
if not isinstance(body, dict):
LOG.error('JSON RPC request %s is not an object (batched '
'requests are not supported)', body)
raise InvalidRequest()
request_id = body.get('id')
params = body.get('params', {})
if (body.get('jsonrpc') != '2.0'
or not body.get('method')
or not isinstance(params, dict)):
LOG.error('JSON RPC request %s is invalid', body)
raise InvalidRequest()
except Exception as exc:
# We do not treat malformed requests as notifications and return
# a response even when request_id is None. This seems in agreement
# with the examples in the specification.
return self._handle_error(exc, request_id)
try:
method = body['method']
try:
func = self._method_map[method]
except KeyError:
raise MethodNotFound(name=method)
result = self._handle_requests(func, method, params)
if request_id is not None:
return {
"jsonrpc": "2.0",
"result": result,
"id": request_id
}
except Exception as exc:
result = self._handle_error(exc, request_id)
# We treat correctly formed requests without "id" as notifications
# and do not return any errors.
if request_id is not None:
return result
def _handle_requests(self, func, name, params):
"""Convert arguments and call a method.
:param func: Callable object.
:param name: RPC call name for logging.
:param params: Keyword arguments.
:return: call result as JSON.
"""
# TODO(dtantsur): server-side version check?
params.pop('rpc.version', None)
logged_params = strutils.mask_dict_password(params)
try:
context = params.pop('context')
except KeyError:
context = None
else:
# A valid context is required for deserialization
if not isinstance(context, dict):
raise InvalidParams(
_("Context must be a dictionary, if provided"))
context = self.context_class(context)
params = {key: self.serializer.deserialize_entity(context, value)
for key, value in params.items()}
params['context'] = context
LOG.debug('RPC %s with %s', name, logged_params)
try:
result = func(**params)
# FIXME(dtantsur): we could use the inspect module, but
# oslo_messaging.expected_exceptions messes up signatures.
except TypeError as exc:
raise InvalidParams(params=', '.join(params),
method=name, error=exc)
if context is not None:
# Currently it seems that we can serialize even with invalid
# context, but I'm not sure it's guaranteed to be the case.
result = self.serializer.serialize_entity(context, result)
LOG.debug('RPC %s returned %s', name,
strutils.mask_dict_password(result)
if isinstance(result, dict) else result)
return result
[docs] def start(self):
"""Start serving this service using loaded configuration.
:returns: None
"""
self.server.start()
[docs] def stop(self):
"""Stop serving this API.
:returns: None
"""
self.server.stop()
[docs] def wait(self):
"""Wait for the service to stop serving this API.
:returns: None
"""
self.server.wait()
[docs] def reset(self):
"""Reset server greenpool size to default.
:returns: None
"""
self.server.reset()