Source code for keystone.tests.unit.resource.test_core

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import copy
import uuid

import fixtures
import mock
from testtools import matchers

import keystone.conf
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit.ksfixtures import database


CONF = keystone.conf.CONF


[docs]class TestResourceManagerNoFixtures(unit.SQLDriverOverrides, unit.TestCase):
[docs] def setUp(self): super(TestResourceManagerNoFixtures, self).setUp() self.useFixture(database.Database(self.sql_driver_version_overrides)) self.load_backends()
[docs] def test_ensure_default_domain_exists(self): # When there's no default domain, ensure_default_domain_exists creates # it. # First make sure there's no default domain. self.assertRaises( exception.DomainNotFound, self.resource_api.get_domain, CONF.identity.default_domain_id) self.resource_api.ensure_default_domain_exists() default_domain = self.resource_api.get_domain( CONF.identity.default_domain_id) expected_domain = { 'id': CONF.identity.default_domain_id, 'name': 'Default', 'enabled': True, 'description': 'Domain created automatically to support V2.0 ' 'operations.', } self.assertEqual(expected_domain, default_domain)
[docs] def test_ensure_default_domain_exists_already_exists(self): # When there's already a default domain, ensure_default_domain_exists # doesn't do anything. name = uuid.uuid4().hex description = uuid.uuid4().hex domain_attrs = { 'id': CONF.identity.default_domain_id, 'name': name, 'description': description, } self.resource_api.create_domain(CONF.identity.default_domain_id, domain_attrs) self.resource_api.ensure_default_domain_exists() default_domain = self.resource_api.get_domain( CONF.identity.default_domain_id) expected_domain = { 'id': CONF.identity.default_domain_id, 'name': name, 'enabled': True, 'description': description, } self.assertEqual(expected_domain, default_domain)
[docs] def test_ensure_default_domain_exists_fails(self): # When there's an unexpected exception creating domain it's passed on. self.useFixture(fixtures.MockPatchObject( self.resource_api, 'create_domain', side_effect=exception.UnexpectedError)) self.assertRaises(exception.UnexpectedError, self.resource_api.ensure_default_domain_exists)
[docs] def test_update_project_name_conflict(self): name = uuid.uuid4().hex description = uuid.uuid4().hex domain_attrs = { 'id': CONF.identity.default_domain_id, 'name': name, 'description': description, } domain = self.resource_api.create_domain( CONF.identity.default_domain_id, domain_attrs) project1 = unit.new_project_ref(domain_id=domain['id'], name=uuid.uuid4().hex) self.resource_api.create_project(project1['id'], project1) project2 = unit.new_project_ref(domain_id=domain['id'], name=uuid.uuid4().hex) project = self.resource_api.create_project(project2['id'], project2) self.assertRaises(exception.Conflict, self.resource_api.update_project, project['id'], {'name': project1['name']})
[docs]class DomainConfigDriverTests(object): def _domain_config_crud(self, sensitive): domain = uuid.uuid4().hex group = uuid.uuid4().hex option = uuid.uuid4().hex value = uuid.uuid4().hex config = {'group': group, 'option': option, 'value': value, 'sensitive': sensitive} self.driver.create_config_options(domain, [config]) res = self.driver.get_config_option( domain, group, option, sensitive) config.pop('sensitive') self.assertEqual(config, res) value = uuid.uuid4().hex config = {'group': group, 'option': option, 'value': value, 'sensitive': sensitive} self.driver.update_config_options(domain, [config]) res = self.driver.get_config_option( domain, group, option, sensitive) config.pop('sensitive') self.assertEqual(config, res) self.driver.delete_config_options(domain, group, option) self.assertRaises(exception.DomainConfigNotFound, self.driver.get_config_option, domain, group, option, sensitive) # ...and silent if we try to delete it again self.driver.delete_config_options(domain, group, option)
[docs] def test_whitelisted_domain_config_crud(self): self._domain_config_crud(sensitive=False)
[docs] def test_sensitive_domain_config_crud(self): self._domain_config_crud(sensitive=True)
def _list_domain_config(self, sensitive): """Test listing by combination of domain, group & option.""" config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, 'value': uuid.uuid4().hex, 'sensitive': sensitive} # Put config2 in the same group as config1 config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, 'value': uuid.uuid4().hex, 'sensitive': sensitive} config3 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, 'value': 100, 'sensitive': sensitive} domain = uuid.uuid4().hex self.driver.create_config_options( domain, [config1, config2, config3]) for config in [config1, config2, config3]: config.pop('sensitive') # Try listing all items from a domain res = self.driver.list_config_options( domain, sensitive=sensitive) self.assertThat(res, matchers.HasLength(3)) for res_entry in res: self.assertIn(res_entry, [config1, config2, config3]) # Try listing by domain and group res = self.driver.list_config_options( domain, group=config1['group'], sensitive=sensitive) self.assertThat(res, matchers.HasLength(2)) for res_entry in res: self.assertIn(res_entry, [config1, config2]) # Try listing by domain, group and option res = self.driver.list_config_options( domain, group=config2['group'], option=config2['option'], sensitive=sensitive) self.assertThat(res, matchers.HasLength(1)) self.assertEqual(config2, res[0])
[docs] def test_list_whitelisted_domain_config_crud(self): self._list_domain_config(False)
[docs] def test_list_sensitive_domain_config_crud(self): self._list_domain_config(True)
def _delete_domain_configs(self, sensitive): """Test deleting by combination of domain, group & option.""" config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, 'value': uuid.uuid4().hex, 'sensitive': sensitive} # Put config2 and config3 in the same group as config1 config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, 'value': uuid.uuid4().hex, 'sensitive': sensitive} config3 = {'group': config1['group'], 'option': uuid.uuid4().hex, 'value': uuid.uuid4().hex, 'sensitive': sensitive} config4 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, 'value': uuid.uuid4().hex, 'sensitive': sensitive} domain = uuid.uuid4().hex self.driver.create_config_options( domain, [config1, config2, config3, config4]) for config in [config1, config2, config3, config4]: config.pop('sensitive') # Try deleting by domain, group and option res = self.driver.delete_config_options( domain, group=config2['group'], option=config2['option']) res = self.driver.list_config_options(domain, sensitive=sensitive) self.assertThat(res, matchers.HasLength(3)) for res_entry in res: self.assertIn(res_entry, [config1, config3, config4]) # Try deleting by domain and group res = self.driver.delete_config_options(domain, group=config4['group']) res = self.driver.list_config_options(domain, sensitive=sensitive) self.assertThat(res, matchers.HasLength(2)) for res_entry in res: self.assertIn(res_entry, [config1, config3]) # Try deleting all items from a domain res = self.driver.delete_config_options(domain) res = self.driver.list_config_options(domain, sensitive=sensitive) self.assertThat(res, matchers.HasLength(0))
[docs] def test_delete_whitelisted_domain_configs(self): self._delete_domain_configs(False)
[docs] def test_delete_sensitive_domain_configs(self): self._delete_domain_configs(True)
def _create_domain_config_twice(self, sensitive): """Test create the same option twice just overwrites.""" config = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, 'value': uuid.uuid4().hex, 'sensitive': sensitive} domain = uuid.uuid4().hex self.driver.create_config_options(domain, [config]) config['value'] = uuid.uuid4().hex self.driver.create_config_options(domain, [config]) res = self.driver.get_config_option( domain, config['group'], config['option'], sensitive) config.pop('sensitive') self.assertEqual(config, res)
[docs] def test_create_whitelisted_domain_config_twice(self): self._create_domain_config_twice(False)
[docs] def test_create_sensitive_domain_config_twice(self): self._create_domain_config_twice(True)
[docs]class DomainConfigTests(object):
[docs] def setUp(self): self.domain = unit.new_domain_ref() self.resource_api.create_domain(self.domain['id'], self.domain) self.addCleanup(self.clean_up_domain)
[docs] def clean_up_domain(self): # NOTE(henry-nash): Deleting the domain will also delete any domain # configs for this domain. self.domain['enabled'] = False self.resource_api.update_domain(self.domain['id'], self.domain) self.resource_api.delete_domain(self.domain['id']) del self.domain
[docs] def test_create_domain_config_including_sensitive_option(self): config = {'ldap': {'url': uuid.uuid4().hex, 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) # password is sensitive, so check that the whitelisted portion and # the sensitive piece have been stored in the appropriate locations. res = self.domain_config_api.get_config(self.domain['id']) config_whitelisted = copy.deepcopy(config) config_whitelisted['ldap'].pop('password') self.assertEqual(config_whitelisted, res) res = self.domain_config_api.driver.get_config_option( self.domain['id'], 'ldap', 'password', sensitive=True) self.assertEqual(config['ldap']['password'], res['value']) # Finally, use the non-public API to get back the whole config res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) self.assertEqual(config, res)
[docs] def test_get_partial_domain_config(self): config = {'ldap': {'url': uuid.uuid4().hex, 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) res = self.domain_config_api.get_config(self.domain['id'], group='identity') config_partial = copy.deepcopy(config) config_partial.pop('ldap') self.assertEqual(config_partial, res) res = self.domain_config_api.get_config( self.domain['id'], group='ldap', option='user_tree_dn') self.assertEqual({'user_tree_dn': config['ldap']['user_tree_dn']}, res) # ...but we should fail to get a sensitive option self.assertRaises(exception.DomainConfigNotFound, self.domain_config_api.get_config, self.domain['id'], group='ldap', option='password')
[docs] def test_delete_partial_domain_config(self): config = {'ldap': {'url': uuid.uuid4().hex, 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) self.domain_config_api.delete_config( self.domain['id'], group='identity') config_partial = copy.deepcopy(config) config_partial.pop('identity') config_partial['ldap'].pop('password') res = self.domain_config_api.get_config(self.domain['id']) self.assertEqual(config_partial, res) self.domain_config_api.delete_config( self.domain['id'], group='ldap', option='url') config_partial = copy.deepcopy(config_partial) config_partial['ldap'].pop('url') res = self.domain_config_api.get_config(self.domain['id']) self.assertEqual(config_partial, res)
[docs] def test_get_options_not_in_domain_config(self): self.assertRaises(exception.DomainConfigNotFound, self.domain_config_api.get_config, self.domain['id']) config = {'ldap': {'url': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) self.assertRaises(exception.DomainConfigNotFound, self.domain_config_api.get_config, self.domain['id'], group='identity') self.assertRaises(exception.DomainConfigNotFound, self.domain_config_api.get_config, self.domain['id'], group='ldap', option='user_tree_dn')
[docs] def test_get_sensitive_config(self): config = {'ldap': {'url': uuid.uuid4().hex, 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) self.assertEqual({}, res) self.domain_config_api.create_config(self.domain['id'], config) res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) self.assertEqual(config, res)
[docs] def test_update_partial_domain_config(self): config = {'ldap': {'url': uuid.uuid4().hex, 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) # Try updating a group new_config = {'ldap': {'url': uuid.uuid4().hex, 'user_filter': uuid.uuid4().hex}} res = self.domain_config_api.update_config( self.domain['id'], new_config, group='ldap') expected_config = copy.deepcopy(config) expected_config['ldap']['url'] = new_config['ldap']['url'] expected_config['ldap']['user_filter'] = ( new_config['ldap']['user_filter']) expected_full_config = copy.deepcopy(expected_config) expected_config['ldap'].pop('password') res = self.domain_config_api.get_config(self.domain['id']) self.assertEqual(expected_config, res) # The sensitive option should still exist res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) self.assertEqual(expected_full_config, res) # Try updating a single whitelisted option self.domain_config_api.delete_config(self.domain['id']) self.domain_config_api.create_config(self.domain['id'], config) new_config = {'url': uuid.uuid4().hex} res = self.domain_config_api.update_config( self.domain['id'], new_config, group='ldap', option='url') # Make sure whitelisted and full config is updated expected_whitelisted_config = copy.deepcopy(config) expected_whitelisted_config['ldap']['url'] = new_config['url'] expected_full_config = copy.deepcopy(expected_whitelisted_config) expected_whitelisted_config['ldap'].pop('password') self.assertEqual(expected_whitelisted_config, res) res = self.domain_config_api.get_config(self.domain['id']) self.assertEqual(expected_whitelisted_config, res) res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) self.assertEqual(expected_full_config, res) # Try updating a single sensitive option self.domain_config_api.delete_config(self.domain['id']) self.domain_config_api.create_config(self.domain['id'], config) new_config = {'password': uuid.uuid4().hex} res = self.domain_config_api.update_config( self.domain['id'], new_config, group='ldap', option='password') # The whitelisted config should not have changed... expected_whitelisted_config = copy.deepcopy(config) expected_full_config = copy.deepcopy(config) expected_whitelisted_config['ldap'].pop('password') self.assertEqual(expected_whitelisted_config, res) res = self.domain_config_api.get_config(self.domain['id']) self.assertEqual(expected_whitelisted_config, res) expected_full_config['ldap']['password'] = new_config['password'] res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) # ...but the sensitive piece should have. self.assertEqual(expected_full_config, res)
[docs] def test_update_invalid_partial_domain_config(self): config = {'ldap': {'url': uuid.uuid4().hex, 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} # An extra group, when specifying one group should fail self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.update_config, self.domain['id'], config, group='ldap') # An extra option, when specifying one option should fail self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.update_config, self.domain['id'], config['ldap'], group='ldap', option='url') # Now try the right number of groups/options, but just not # ones that are in the config provided config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.update_config, self.domain['id'], config, group='identity') self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.update_config, self.domain['id'], config['ldap'], group='ldap', option='url') # Now some valid groups/options, but just not ones that are in the # existing config config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) config_wrong_group = {'identity': {'driver': uuid.uuid4().hex}} self.assertRaises(exception.DomainConfigNotFound, self.domain_config_api.update_config, self.domain['id'], config_wrong_group, group='identity') config_wrong_option = {'url': uuid.uuid4().hex} self.assertRaises(exception.DomainConfigNotFound, self.domain_config_api.update_config, self.domain['id'], config_wrong_option, group='ldap', option='url') # And finally just some bad groups/options bad_group = uuid.uuid4().hex config = {bad_group: {'user': uuid.uuid4().hex}} self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.update_config, self.domain['id'], config, group=bad_group, option='user') bad_option = uuid.uuid4().hex config = {'ldap': {bad_option: uuid.uuid4().hex}} self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.update_config, self.domain['id'], config, group='ldap', option=bad_option)
[docs] def test_create_invalid_domain_config(self): self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.create_config, self.domain['id'], {}) config = {uuid.uuid4().hex: uuid.uuid4().hex} self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.create_config, self.domain['id'], config) config = {uuid.uuid4().hex: {uuid.uuid4().hex: uuid.uuid4().hex}} self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.create_config, self.domain['id'], config) config = {'ldap': {uuid.uuid4().hex: uuid.uuid4().hex}} self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.create_config, self.domain['id'], config) # Try an option that IS in the standard conf, but neither whitelisted # or marked as sensitive config = {'identity': {'user_tree_dn': uuid.uuid4().hex}} self.assertRaises(exception.InvalidDomainConfig, self.domain_config_api.create_config, self.domain['id'], config)
[docs] def test_delete_invalid_partial_domain_config(self): config = {'ldap': {'url': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) # Try deleting a group not in the config self.assertRaises(exception.DomainConfigNotFound, self.domain_config_api.delete_config, self.domain['id'], group='identity') # Try deleting an option not in the config self.assertRaises(exception.DomainConfigNotFound, self.domain_config_api.delete_config, self.domain['id'], group='ldap', option='user_tree_dn')
[docs] def test_sensitive_substitution_in_domain_config(self): # Create a config that contains a whitelisted option that requires # substitution of a sensitive option. config = {'ldap': {'url': 'my_url/%(password)s', 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) # Read back the config with the internal method and ensure that the # substitution has taken place. res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) expected_url = ( config['ldap']['url'] % {'password': config['ldap']['password']}) self.assertEqual(expected_url, res['ldap']['url'])
[docs] def test_invalid_sensitive_substitution_in_domain_config(self): """Check that invalid substitutions raise warnings.""" mock_log = mock.Mock() invalid_option_config = { 'ldap': {'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} for invalid_option in ['my_url/%(passssword)s', 'my_url/%(password', 'my_url/%(password)', 'my_url/%(password)d']: invalid_option_config['ldap']['url'] = invalid_option self.domain_config_api.create_config( self.domain['id'], invalid_option_config) with mock.patch('keystone.resource.core.LOG', mock_log): res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) mock_log.warning.assert_any_call(mock.ANY) self.assertEqual( invalid_option_config['ldap']['url'], res['ldap']['url'])
[docs] def test_escaped_sequence_in_domain_config(self): """Check that escaped '%(' doesn't get interpreted.""" mock_log = mock.Mock() escaped_option_config = { 'ldap': {'url': 'my_url/%%(password)s', 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} self.domain_config_api.create_config( self.domain['id'], escaped_option_config) with mock.patch('keystone.resource.core.LOG', mock_log): res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) self.assertFalse(mock_log.warn.called) # The escaping '%' should have been removed self.assertEqual('my_url/%(password)s', res['ldap']['url'])
@unit.skip_if_cache_disabled('domain_config')
[docs] def test_cache_layer_get_sensitive_config(self): config = {'ldap': {'url': uuid.uuid4().hex, 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}, 'identity': {'driver': uuid.uuid4().hex}} self.domain_config_api.create_config(self.domain['id'], config) # cache the result res = self.domain_config_api.get_config_with_sensitive_info( self.domain['id']) self.assertEqual(config, res) # delete, bypassing domain config manager api self.domain_config_api.delete_config_options(self.domain['id']) self.assertDictEqual( res, self.domain_config_api.get_config_with_sensitive_info( self.domain['id'])) self.domain_config_api.get_config_with_sensitive_info.invalidate( self.domain_config_api, self.domain['id']) self.assertDictEqual( {}, self.domain_config_api.get_config_with_sensitive_info( self.domain['id']))
[docs] def test_delete_domain_deletes_configs(self): """Test domain deletion clears the domain configs.""" domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) config = {'ldap': {'url': uuid.uuid4().hex, 'user_tree_dn': uuid.uuid4().hex, 'password': uuid.uuid4().hex}} self.domain_config_api.create_config(domain['id'], config) # Now delete the domain domain['enabled'] = False self.resource_api.update_domain(domain['id'], domain) self.resource_api.delete_domain(domain['id']) # Check domain configs have also been deleted self.assertRaises( exception.DomainConfigNotFound, self.domain_config_api.get_config, domain['id']) # The get_config_with_sensitive_info does not throw an exception if # the config is empty, it just returns an empty dict self.assertDictEqual( {}, self.domain_config_api.get_config_with_sensitive_info( domain['id']))
[docs] def test_config_registration(self): type = uuid.uuid4().hex self.domain_config_api.obtain_registration( self.domain['id'], type) self.domain_config_api.release_registration( self.domain['id'], type=type) # Make sure that once someone has it, nobody else can get it. # This includes the domain who already has it. self.domain_config_api.obtain_registration( self.domain['id'], type) self.assertFalse( self.domain_config_api.obtain_registration( self.domain['id'], type)) # Make sure we can read who does have it self.assertEqual( self.domain['id'], self.domain_config_api.read_registration(type)) # Make sure releasing it is silent if the domain specified doesn't # have the registration domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.resource_api.create_domain(domain2['id'], domain2) self.domain_config_api.release_registration( domain2['id'], type=type) # If nobody has the type registered, then trying to read it should # raise ConfigRegistrationNotFound self.domain_config_api.release_registration( self.domain['id'], type=type) self.assertRaises(exception.ConfigRegistrationNotFound, self.domain_config_api.read_registration, type) # Finally check multiple registrations are cleared if you free the # registration without specifying the type type2 = uuid.uuid4().hex self.domain_config_api.obtain_registration( self.domain['id'], type) self.domain_config_api.obtain_registration( self.domain['id'], type2) self.domain_config_api.release_registration(self.domain['id']) self.assertRaises(exception.ConfigRegistrationNotFound, self.domain_config_api.read_registration, type) self.assertRaises(exception.ConfigRegistrationNotFound, self.domain_config_api.read_registration, type2)

Project Source