Source code for keystone.resource.controllers

# Copyright 2013 Metacloud, Inc.
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""Workflow Logic the Resource service."""

import uuid

from oslo_config import cfg

from keystone.common import controller
from keystone.common import dependency
from keystone.common import validation
from keystone.common import wsgi
from keystone import exception
from keystone.i18n import _
from keystone import notifications
from keystone.resource import schema


CONF = cfg.CONF


@dependency.requires('resource_api')
[docs]class Tenant(controller.V2Controller): @controller.v2_deprecated
[docs] def get_all_projects(self, context, **kw): """Gets a list of all tenants for an admin user.""" self.assert_admin(context) if 'name' in context['query_string']: return self._get_project_by_name(context['query_string']['name']) try: tenant_refs = self.resource_api.list_projects_in_domain( CONF.identity.default_domain_id) except exception.DomainNotFound: # If the default domain doesn't exist then there are no V2 # projects. tenant_refs = [] tenant_refs = [self.v3_to_v2_project(tenant_ref) for tenant_ref in tenant_refs if not tenant_ref.get('is_domain')] params = { 'limit': context['query_string'].get('limit'), 'marker': context['query_string'].get('marker'), } return self.format_project_list(tenant_refs, **params)
def _assert_not_is_domain_project(self, project_id, project_ref=None): # Projects acting as a domain should not be visible via v2 if not project_ref: project_ref = self.resource_api.get_project(project_id) if project_ref.get('is_domain'): raise exception.ProjectNotFound(project_id) @controller.v2_deprecated
[docs] def get_project(self, context, tenant_id): # TODO(termie): this stuff should probably be moved to middleware self.assert_admin(context) ref = self.resource_api.get_project(tenant_id) self._assert_not_is_domain_project(tenant_id, ref) return {'tenant': self.v3_to_v2_project(ref)}
def _get_project_by_name(self, tenant_name): # Projects acting as a domain should not be visible via v2 ref = self.resource_api.get_project_by_name( tenant_name, CONF.identity.default_domain_id) self._assert_not_is_domain_project(ref['id'], ref) return {'tenant': self.v3_to_v2_project(ref)} # CRUD Extension @controller.v2_deprecated
[docs] def create_project(self, context, tenant): tenant_ref = self._normalize_dict(tenant) if 'name' not in tenant_ref or not tenant_ref['name']: msg = _('Name field is required and cannot be empty') raise exception.ValidationError(message=msg) if 'is_domain' in tenant_ref: msg = _('The creation of projects acting as domains is not ' 'allowed in v2.') raise exception.ValidationError(message=msg) self.assert_admin(context) self.resource_api.ensure_default_domain_exists() tenant_ref['id'] = tenant_ref.get('id', uuid.uuid4().hex) initiator = notifications._get_request_audit_info(context) tenant = self.resource_api.create_project( tenant_ref['id'], self._normalize_domain_id(context, tenant_ref), initiator) return {'tenant': self.v3_to_v2_project(tenant)}
@controller.v2_deprecated
[docs] def update_project(self, context, tenant_id, tenant): self.assert_admin(context) self._assert_not_is_domain_project(tenant_id) # Remove domain_id and is_domain if specified - a v2 api caller # should not be specifying that clean_tenant = tenant.copy() clean_tenant.pop('domain_id', None) clean_tenant.pop('is_domain', None) initiator = notifications._get_request_audit_info(context) tenant_ref = self.resource_api.update_project( tenant_id, clean_tenant, initiator) return {'tenant': self.v3_to_v2_project(tenant_ref)}
@controller.v2_deprecated
[docs] def delete_project(self, context, tenant_id): self.assert_admin(context) self._assert_not_is_domain_project(tenant_id) initiator = notifications._get_request_audit_info(context) self.resource_api.delete_project(tenant_id, initiator)
@dependency.requires('resource_api')
[docs]class DomainV3(controller.V3Controller): collection_name = 'domains' member_name = 'domain' def __init__(self): super(DomainV3, self).__init__() self.get_member_from_driver = self.resource_api.get_domain @controller.protected() @validation.validated(schema.domain_create, 'domain')
[docs] def create_domain(self, context, domain): ref = self._assign_unique_id(self._normalize_dict(domain)) initiator = notifications._get_request_audit_info(context) ref = self.resource_api.create_domain(ref['id'], ref, initiator) return DomainV3.wrap_member(context, ref)
@controller.filterprotected('enabled', 'name')
[docs] def list_domains(self, context, filters): hints = DomainV3.build_driver_hints(context, filters) refs = self.resource_api.list_domains(hints=hints) return DomainV3.wrap_collection(context, refs, hints=hints)
@controller.protected()
[docs] def get_domain(self, context, domain_id): ref = self.resource_api.get_domain(domain_id) return DomainV3.wrap_member(context, ref)
@controller.protected() @validation.validated(schema.domain_update, 'domain')
[docs] def update_domain(self, context, domain_id, domain): self._require_matching_id(domain_id, domain) initiator = notifications._get_request_audit_info(context) ref = self.resource_api.update_domain(domain_id, domain, initiator) return DomainV3.wrap_member(context, ref)
@controller.protected()
[docs] def delete_domain(self, context, domain_id): initiator = notifications._get_request_audit_info(context) return self.resource_api.delete_domain(domain_id, initiator)
@dependency.requires('domain_config_api') @dependency.requires('resource_api')
[docs]class DomainConfigV3(controller.V3Controller): member_name = 'config' @controller.protected()
[docs] def create_domain_config(self, context, domain_id, config): self.resource_api.get_domain(domain_id) original_config = ( self.domain_config_api.get_config_with_sensitive_info(domain_id)) ref = self.domain_config_api.create_config(domain_id, config) if original_config: # Return status code 200, since config already existed return wsgi.render_response(body={self.member_name: ref}) else: return wsgi.render_response(body={self.member_name: ref}, status=('201', 'Created'))
@controller.protected()
[docs] def get_domain_config(self, context, domain_id, group=None, option=None): self.resource_api.get_domain(domain_id) ref = self.domain_config_api.get_config(domain_id, group, option) return {self.member_name: ref}
@controller.protected()
[docs] def update_domain_config( self, context, domain_id, config, group, option): self.resource_api.get_domain(domain_id) ref = self.domain_config_api.update_config( domain_id, config, group, option) return wsgi.render_response(body={self.member_name: ref})
[docs] def update_domain_config_group(self, context, domain_id, group, config): self.resource_api.get_domain(domain_id) return self.update_domain_config( context, domain_id, config, group, option=None)
[docs] def update_domain_config_only(self, context, domain_id, config): self.resource_api.get_domain(domain_id) return self.update_domain_config( context, domain_id, config, group=None, option=None)
@controller.protected()
[docs] def delete_domain_config( self, context, domain_id, group=None, option=None): self.resource_api.get_domain(domain_id) self.domain_config_api.delete_config(domain_id, group, option)
@controller.protected()
[docs] def get_domain_config_default(self, context, group=None, option=None): ref = self.domain_config_api.get_config_default(group, option) return {self.member_name: ref}
@dependency.requires('resource_api')
[docs]class ProjectV3(controller.V3Controller): collection_name = 'projects' member_name = 'project' def __init__(self): super(ProjectV3, self).__init__() self.get_member_from_driver = self.resource_api.get_project @controller.protected() @validation.validated(schema.project_create, 'project')
[docs] def create_project(self, context, project): ref = self._assign_unique_id(self._normalize_dict(project)) if not ref.get('is_domain'): ref = self._normalize_domain_id(context, ref) # Our API requires that you specify the location in the hierarchy # unambiguously. This could be by parent_id or, if it is a top level # project, just by providing a domain_id. if not ref.get('parent_id'): ref['parent_id'] = ref.get('domain_id') initiator = notifications._get_request_audit_info(context) try: ref = self.resource_api.create_project(ref['id'], ref, initiator=initiator) except (exception.DomainNotFound, exception.ProjectNotFound) as e: raise exception.ValidationError(e) return ProjectV3.wrap_member(context, ref)
@controller.filterprotected('domain_id', 'enabled', 'name', 'parent_id', 'is_domain')
[docs] def list_projects(self, context, filters): hints = ProjectV3.build_driver_hints(context, filters) # If 'is_domain' has not been included as a query, we default it to # False (which in query terms means '0' if 'is_domain' not in context['query_string']: hints.add_filter('is_domain', '0') refs = self.resource_api.list_projects(hints=hints) return ProjectV3.wrap_collection(context, refs, hints=hints)
def _expand_project_ref(self, context, ref): params = context['query_string'] parents_as_list = 'parents_as_list' in params and ( self.query_filter_is_true(params['parents_as_list'])) parents_as_ids = 'parents_as_ids' in params and ( self.query_filter_is_true(params['parents_as_ids'])) subtree_as_list = 'subtree_as_list' in params and ( self.query_filter_is_true(params['subtree_as_list'])) subtree_as_ids = 'subtree_as_ids' in params and ( self.query_filter_is_true(params['subtree_as_ids'])) # parents_as_list and parents_as_ids are mutually exclusive if parents_as_list and parents_as_ids: msg = _('Cannot use parents_as_list and parents_as_ids query ' 'params at the same time.') raise exception.ValidationError(msg) # subtree_as_list and subtree_as_ids are mutually exclusive if subtree_as_list and subtree_as_ids: msg = _('Cannot use subtree_as_list and subtree_as_ids query ' 'params at the same time.') raise exception.ValidationError(msg) user_id = self.get_auth_context(context).get('user_id') if parents_as_list: parents = self.resource_api.list_project_parents( ref['id'], user_id) ref['parents'] = [ProjectV3.wrap_member(context, p) for p in parents] elif parents_as_ids: ref['parents'] = self.resource_api.get_project_parents_as_ids(ref) if subtree_as_list: subtree = self.resource_api.list_projects_in_subtree( ref['id'], user_id) ref['subtree'] = [ProjectV3.wrap_member(context, p) for p in subtree] elif subtree_as_ids: ref['subtree'] = self.resource_api.get_projects_in_subtree_as_ids( ref['id']) @controller.protected()
[docs] def get_project(self, context, project_id): ref = self.resource_api.get_project(project_id) self._expand_project_ref(context, ref) return ProjectV3.wrap_member(context, ref)
@controller.protected() @validation.validated(schema.project_update, 'project')
[docs] def update_project(self, context, project_id, project): self._require_matching_id(project_id, project) self._require_matching_domain_id( project_id, project, self.resource_api.get_project) initiator = notifications._get_request_audit_info(context) ref = self.resource_api.update_project(project_id, project, initiator=initiator) return ProjectV3.wrap_member(context, ref)
@controller.protected()
[docs] def delete_project(self, context, project_id): initiator = notifications._get_request_audit_info(context) return self.resource_api.delete_project(project_id, initiator=initiator)

Project Source