Source code for keystone.tests.unit.test_sql_upgrade

# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Test for SQL migration extensions.

To run these tests against a live database:

1. Modify the file ``keystone/tests/unit/config_files/backend_sql.conf`` to use
   the connection for your live database.
2. Set up a blank, live database
3. Run the tests using::

    tox -e py27 -- keystone.tests.unit.test_sql_upgrade

WARNING::

    Your database will be wiped.

    Do not do this against a database with valuable data as
    all data will be lost.
"""

import json
import uuid

import migrate
from migrate.versioning import api as versioning_api
from migrate.versioning import repository
import mock
from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import migration
from oslo_db.sqlalchemy import test_base
from sqlalchemy.engine import reflection
import sqlalchemy.exc
from testtools import matchers

from keystone.common import sql
from keystone.common.sql import migration_helpers
import keystone.conf
from keystone.credential.providers import fernet as credential_fernet
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database


CONF = keystone.conf.CONF

# NOTE(morganfainberg): This should be updated when each DB migration collapse
# is done to mirror the expected structure of the DB in the format of
# { <DB_TABLE_NAME>: [<COLUMN>, <COLUMN>, ...], ... }
INITIAL_TABLE_STRUCTURE = {
    'credential': [
        'id', 'user_id', 'project_id', 'blob', 'type', 'extra',
    ],
    'domain': [
        'id', 'name', 'enabled', 'extra',
    ],
    'endpoint': [
        'id', 'legacy_endpoint_id', 'interface', 'region_id', 'service_id',
        'url', 'enabled', 'extra',
    ],
    'group': [
        'id', 'domain_id', 'name', 'description', 'extra',
    ],
    'policy': [
        'id', 'type', 'blob', 'extra',
    ],
    'project': [
        'id', 'name', 'extra', 'description', 'enabled', 'domain_id',
        'parent_id',
    ],
    'role': [
        'id', 'name', 'extra',
    ],
    'service': [
        'id', 'type', 'extra', 'enabled',
    ],
    'token': [
        'id', 'expires', 'extra', 'valid', 'trust_id', 'user_id',
    ],
    'trust': [
        'id', 'trustor_user_id', 'trustee_user_id', 'project_id',
        'impersonation', 'deleted_at', 'expires_at', 'remaining_uses', 'extra',
    ],
    'trust_role': [
        'trust_id', 'role_id',
    ],
    'user': [
        'id', 'name', 'extra', 'password', 'enabled', 'domain_id',
        'default_project_id',
    ],
    'user_group_membership': [
        'user_id', 'group_id',
    ],
    'region': [
        'id', 'description', 'parent_region_id', 'extra',
    ],
    'assignment': [
        'type', 'actor_id', 'target_id', 'role_id', 'inherited',
    ],
    'id_mapping': [
        'public_id', 'domain_id', 'local_id', 'entity_type',
    ],
    'whitelisted_config': [
        'domain_id', 'group', 'option', 'value',
    ],
    'sensitive_config': [
        'domain_id', 'group', 'option', 'value',
    ],
}

LEGACY_REPO = 'migrate_repo'
EXPAND_REPO = 'expand_repo'
DATA_MIGRATION_REPO = 'data_migration_repo'
CONTRACT_REPO = 'contract_repo'


# Test migration_helpers.get_init_version separately to ensure it works before
# using in the SqlUpgrade tests.
[docs]class MigrationHelpersGetInitVersionTests(unit.TestCase): @mock.patch.object(repository, 'Repository')
[docs] def test_get_init_version_no_path(self, repo): migrate_versions = mock.MagicMock() # make a version list starting with zero. `get_init_version` will # return None for this value. migrate_versions.versions.versions = list(range(0, 5)) repo.return_value = migrate_versions # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid # an exception. with mock.patch('os.path.isdir', return_value=True): # since 0 is the smallest version expect None version = migration_helpers.get_init_version() self.assertIsNone(version) # check that the default path was used as the first argument to the # first invocation of repo. Cannot match the full path because it is # based on where the test is run. param = repo.call_args_list[0][0][0] self.assertTrue(param.endswith('/sql/' + LEGACY_REPO))
@mock.patch.object(repository, 'Repository')
[docs] def test_get_init_version_with_path_initial_version_0(self, repo): migrate_versions = mock.MagicMock() # make a version list starting with zero. `get_init_version` will # return None for this value. migrate_versions.versions.versions = list(range(0, 5)) repo.return_value = migrate_versions # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid # an exception. with mock.patch('os.path.isdir', return_value=True): path = '/keystone/' + LEGACY_REPO + '/' # since 0 is the smallest version expect None version = migration_helpers.get_init_version(abs_path=path) self.assertIsNone(version)
@mock.patch.object(repository, 'Repository')
[docs] def test_get_init_version_with_path(self, repo): initial_version = 10 migrate_versions = mock.MagicMock() migrate_versions.versions.versions = list(range(initial_version + 1, initial_version + 5)) repo.return_value = migrate_versions # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid # an exception. with mock.patch('os.path.isdir', return_value=True): path = '/keystone/' + LEGACY_REPO + '/' version = migration_helpers.get_init_version(abs_path=path) self.assertEqual(initial_version, version)
[docs]class MigrationRepository(object): def __init__(self, engine, repo_name): self.repo_name = repo_name self.repo_path = migration_helpers.find_migrate_repo( package=sql, repo_name=self.repo_name) self.min_version = ( migration_helpers.get_init_version(abs_path=self.repo_path)) self.schema_ = versioning_api.ControlledSchema.create( engine, self.repo_path, self.min_version) self.max_version = self.schema_.repository.version().version
[docs] def upgrade(self, version=None, current_schema=None): version = version or self.max_version err = '' upgrade = True version = versioning_api._migrate_version( self.schema_, version, upgrade, err) if not current_schema: current_schema = self.schema_ changeset = current_schema.changeset(version) for ver, change in changeset: self.schema_.runchange(ver, change, changeset.step) if self.schema_.version != version: raise Exception( 'Actual version (%s) of %s does not equal expected ' 'version (%s)' % ( self.schema_.version, self.repo_name, version))
@property
[docs] def version(self): with sql.session_for_read() as session: return migration.db_version( session.get_bind(), self.repo_path, self.min_version)
[docs]class SqlMigrateBase(test_base.DbTestCase):
[docs] def setUp(self): super(SqlMigrateBase, self).setUp() # Set keystone's connection URL to be the test engine's url. database.initialize_sql_session(self.engine.url) # Override keystone's context manager to be oslo.db's global context # manager. sql.core._TESTING_USE_GLOBAL_CONTEXT_MANAGER = True self.addCleanup(setattr, sql.core, '_TESTING_USE_GLOBAL_CONTEXT_MANAGER', False) self.addCleanup(sql.cleanup) self.repos = { LEGACY_REPO: MigrationRepository(self.engine, LEGACY_REPO), EXPAND_REPO: MigrationRepository(self.engine, EXPAND_REPO), DATA_MIGRATION_REPO: MigrationRepository( self.engine, DATA_MIGRATION_REPO), CONTRACT_REPO: MigrationRepository(self.engine, CONTRACT_REPO)}
[docs] def upgrade(self, *args, **kwargs): """Upgrade the legacy migration repository.""" self.repos[LEGACY_REPO].upgrade(*args, **kwargs)
[docs] def expand(self, *args, **kwargs): """Expand database schema.""" self.repos[EXPAND_REPO].upgrade(*args, **kwargs)
[docs] def migrate(self, *args, **kwargs): """Migrate data.""" self.repos[DATA_MIGRATION_REPO].upgrade(*args, **kwargs)
[docs] def contract(self, *args, **kwargs): """Contract database schema.""" self.repos[CONTRACT_REPO].upgrade(*args, **kwargs)
@property
[docs] def metadata(self): """A collection of tables and their associated schemas.""" return sqlalchemy.MetaData(self.engine)
[docs] def select_table(self, name): table = sqlalchemy.Table(name, self.metadata, autoload=True) s = sqlalchemy.select([table]) return s
[docs] def assertTableExists(self, table_name): try: self.select_table(table_name) except sqlalchemy.exc.NoSuchTableError: raise AssertionError('Table "%s" does not exist' % table_name)
[docs] def assertTableDoesNotExist(self, table_name): """Assert that a given table exists cannot be selected by name.""" # Switch to a different metadata otherwise you might still # detect renamed or dropped tables try: sqlalchemy.Table(table_name, self.metadata, autoload=True) except sqlalchemy.exc.NoSuchTableError: pass else: raise AssertionError('Table "%s" already exists' % table_name)
[docs] def calc_table_row_count(self, table_name): """Return the number of rows in the table.""" t = sqlalchemy.Table(table_name, self.metadata, autoload=True) session = self.sessionmaker() row_count = session.query( sqlalchemy.func.count('*')).select_from(t).scalar() return row_count
[docs] def assertTableCountsMatch(self, table1_name, table2_name): table1_count = self.calc_table_row_count(table1_name) table2_count = self.calc_table_row_count(table2_name) if table1_count != table2_count: raise AssertionError('Table counts do not match: {0} ({1}), {2} ' '({3})'.format(table1_name, table1_count, table2_name, table2_count))
[docs] def assertTableColumns(self, table_name, expected_cols): """Assert that the table contains the expected set of columns.""" table = self.select_table(table_name) actual_cols = [col.name for col in table.columns] # Check if the columns are equal, but allow for a different order, # which might occur after an upgrade followed by a downgrade self.assertItemsEqual(expected_cols, actual_cols, '%s table' % table_name)
[docs] def insert_dict(self, session, table_name, d, table=None): """Naively inserts key-value pairs into a table, given a dictionary.""" if table is None: this_table = sqlalchemy.Table(table_name, self.metadata, autoload=True) else: this_table = table insert = this_table.insert().values(**d) session.execute(insert)
[docs]class SqlLegacyRepoUpgradeTests(SqlMigrateBase):
[docs] def test_blank_db_to_start(self): self.assertTableDoesNotExist('user')
[docs] def test_start_version_db_init_version(self): self.assertEqual( self.repos[LEGACY_REPO].min_version, self.repos[LEGACY_REPO].version, 'DB is not at version %s' % ( self.repos[LEGACY_REPO].min_version) )
[docs] def test_upgrade_add_initial_tables(self): self.upgrade(self.repos[LEGACY_REPO].min_version + 1) self.check_initial_table_structure()
[docs] def check_initial_table_structure(self): for table in INITIAL_TABLE_STRUCTURE: self.assertTableColumns(table, INITIAL_TABLE_STRUCTURE[table])
[docs] def test_kilo_squash(self): self.upgrade(67) # In 053 the size of ID and parent region ID columns were changed table = sqlalchemy.Table('region', self.metadata, autoload=True) self.assertEqual(255, table.c.id.type.length) self.assertEqual(255, table.c.parent_region_id.type.length) table = sqlalchemy.Table('endpoint', self.metadata, autoload=True) self.assertEqual(255, table.c.region_id.type.length) # In 054 an index was created for the actor_id of the assignment table table = sqlalchemy.Table('assignment', self.metadata, autoload=True) index_data = [(idx.name, list(idx.columns.keys())) for idx in table.indexes] self.assertIn(('ix_actor_id', ['actor_id']), index_data) # In 055 indexes were created for user and trust IDs in the token table table = sqlalchemy.Table('token', self.metadata, autoload=True) index_data = [(idx.name, list(idx.columns.keys())) for idx in table.indexes] self.assertIn(('ix_token_user_id', ['user_id']), index_data) self.assertIn(('ix_token_trust_id', ['trust_id']), index_data) # In 062 the role ID foreign key was removed from the assignment table if self.engine.name == "mysql": self.assertFalse(self.does_fk_exist('assignment', 'role_id')) # In 064 the domain ID FK was removed from the group and user tables if self.engine.name != 'sqlite': # sqlite does not support FK deletions (or enforcement) self.assertFalse(self.does_fk_exist('group', 'domain_id')) self.assertFalse(self.does_fk_exist('user', 'domain_id')) # In 067 the role ID index was removed from the assignment table if self.engine.name == "mysql": self.assertFalse(self.does_index_exist('assignment', 'assignment_role_id_fkey'))
[docs] def test_insert_assignment_inherited_pk(self): ASSIGNMENT_TABLE_NAME = 'assignment' INHERITED_COLUMN_NAME = 'inherited' ROLE_TABLE_NAME = 'role' self.upgrade(72) # Check that the 'inherited' column is not part of the PK self.assertFalse(self.does_pk_exist(ASSIGNMENT_TABLE_NAME, INHERITED_COLUMN_NAME)) session = self.sessionmaker() role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} self.insert_dict(session, ROLE_TABLE_NAME, role) # Create both inherited and noninherited role assignments inherited = {'type': 'UserProject', 'actor_id': uuid.uuid4().hex, 'target_id': uuid.uuid4().hex, 'role_id': role['id'], 'inherited': True} noninherited = inherited.copy() noninherited['inherited'] = False # Create another inherited role assignment as a spoiler spoiler = inherited.copy() spoiler['actor_id'] = uuid.uuid4().hex self.insert_dict(session, ASSIGNMENT_TABLE_NAME, inherited) self.insert_dict(session, ASSIGNMENT_TABLE_NAME, spoiler) # Since 'inherited' is not part of the PK, we can't insert noninherited self.assertRaises(db_exception.DBDuplicateEntry, self.insert_dict, session, ASSIGNMENT_TABLE_NAME, noninherited) session.close() self.upgrade(73) session = self.sessionmaker() # Check that the 'inherited' column is now part of the PK self.assertTrue(self.does_pk_exist(ASSIGNMENT_TABLE_NAME, INHERITED_COLUMN_NAME)) # The noninherited role assignment can now be inserted self.insert_dict(session, ASSIGNMENT_TABLE_NAME, noninherited) assignment_table = sqlalchemy.Table(ASSIGNMENT_TABLE_NAME, self.metadata, autoload=True) assignments = session.query(assignment_table).all() for assignment in (inherited, spoiler, noninherited): self.assertIn((assignment['type'], assignment['actor_id'], assignment['target_id'], assignment['role_id'], assignment['inherited']), assignments)
[docs] def does_pk_exist(self, table, pk_column): """Check whether a column is primary key on a table.""" inspector = reflection.Inspector.from_engine(self.engine) pk_columns = inspector.get_pk_constraint(table)['constrained_columns'] return pk_column in pk_columns
[docs] def does_fk_exist(self, table, fk_column): inspector = reflection.Inspector.from_engine(self.engine) for fk in inspector.get_foreign_keys(table): if fk_column in fk['constrained_columns']: return True return False
[docs] def does_index_exist(self, table_name, index_name): table = sqlalchemy.Table(table_name, self.metadata, autoload=True) return index_name in [idx.name for idx in table.indexes]
[docs] def does_constraint_exist(self, table_name, constraint_name): table = sqlalchemy.Table(table_name, self.metadata, autoload=True) return constraint_name in [con.name for con in table.constraints]
[docs] def test_endpoint_policy_upgrade(self): self.assertTableDoesNotExist('policy_association') self.upgrade(81) self.assertTableColumns('policy_association', ['id', 'policy_id', 'endpoint_id', 'service_id', 'region_id'])
@mock.patch.object(migration_helpers, 'get_db_version', return_value=1)
[docs] def test_endpoint_policy_already_migrated(self, mock_ep): # By setting the return value to 1, the migration has already been # run, and there's no need to create the table again self.upgrade(81) mock_ep.assert_called_once_with(extension='endpoint_policy', engine=mock.ANY) # It won't exist because we are mocking it, but we can verify # that 081 did not create the table self.assertTableDoesNotExist('policy_association')
[docs] def test_create_federation_tables(self): self.identity_provider = 'identity_provider' self.federation_protocol = 'federation_protocol' self.service_provider = 'service_provider' self.mapping = 'mapping' self.remote_ids = 'idp_remote_ids' self.assertTableDoesNotExist(self.identity_provider) self.assertTableDoesNotExist(self.federation_protocol) self.assertTableDoesNotExist(self.service_provider) self.assertTableDoesNotExist(self.mapping) self.assertTableDoesNotExist(self.remote_ids) self.upgrade(82) self.assertTableColumns(self.identity_provider, ['id', 'description', 'enabled']) self.assertTableColumns(self.federation_protocol, ['id', 'idp_id', 'mapping_id']) self.assertTableColumns(self.mapping, ['id', 'rules']) self.assertTableColumns(self.service_provider, ['id', 'description', 'enabled', 'auth_url', 'relay_state_prefix', 'sp_url']) self.assertTableColumns(self.remote_ids, ['idp_id', 'remote_id']) federation_protocol = sqlalchemy.Table(self.federation_protocol, self.metadata, autoload=True) self.assertFalse(federation_protocol.c.mapping_id.nullable) sp_table = sqlalchemy.Table(self.service_provider, self.metadata, autoload=True) self.assertFalse(sp_table.c.auth_url.nullable) self.assertFalse(sp_table.c.sp_url.nullable)
@mock.patch.object(migration_helpers, 'get_db_version', return_value=8)
[docs] def test_federation_already_migrated(self, mock_federation): # By setting the return value to 8, the migration has already been # run, and there's no need to create the table again. self.upgrade(82) mock_federation.assert_any_call(extension='federation', engine=mock.ANY) # It won't exist because we are mocking it, but we can verify # that 082 did not create the table. self.assertTableDoesNotExist('identity_provider') self.assertTableDoesNotExist('federation_protocol') self.assertTableDoesNotExist('mapping') self.assertTableDoesNotExist('service_provider') self.assertTableDoesNotExist('idp_remote_ids')
[docs] def test_create_oauth_tables(self): consumer = 'consumer' request_token = 'request_token' access_token = 'access_token' self.assertTableDoesNotExist(consumer) self.assertTableDoesNotExist(request_token) self.assertTableDoesNotExist(access_token) self.upgrade(83) self.assertTableColumns(consumer, ['id', 'description', 'secret', 'extra']) self.assertTableColumns(request_token, ['id', 'request_secret', 'verifier', 'authorizing_user_id', 'requested_project_id', 'role_ids', 'consumer_id', 'expires_at']) self.assertTableColumns(access_token, ['id', 'access_secret', 'authorizing_user_id', 'project_id', 'role_ids', 'consumer_id', 'expires_at'])
@mock.patch.object(migration_helpers, 'get_db_version', return_value=5)
[docs] def test_oauth1_already_migrated(self, mock_oauth1): # By setting the return value to 5, the migration has already been # run, and there's no need to create the table again. self.upgrade(83) mock_oauth1.assert_any_call(extension='oauth1', engine=mock.ANY) # It won't exist because we are mocking it, but we can verify # that 083 did not create the table. self.assertTableDoesNotExist('consumer') self.assertTableDoesNotExist('request_token') self.assertTableDoesNotExist('access_token')
[docs] def test_create_revoke_table(self): self.assertTableDoesNotExist('revocation_event') self.upgrade(84) self.assertTableColumns('revocation_event', ['id', 'domain_id', 'project_id', 'user_id', 'role_id', 'trust_id', 'consumer_id', 'access_token_id', 'issued_before', 'expires_at', 'revoked_at', 'audit_chain_id', 'audit_id'])
@mock.patch.object(migration_helpers, 'get_db_version', return_value=2)
[docs] def test_revoke_already_migrated(self, mock_revoke): # By setting the return value to 2, the migration has already been # run, and there's no need to create the table again. self.upgrade(84) mock_revoke.assert_any_call(extension='revoke', engine=mock.ANY) # It won't exist because we are mocking it, but we can verify # that 084 did not create the table. self.assertTableDoesNotExist('revocation_event')
[docs] def test_project_is_domain_upgrade(self): self.upgrade(74) self.assertTableColumns('project', ['id', 'name', 'extra', 'description', 'enabled', 'domain_id', 'parent_id', 'is_domain'])
[docs] def test_implied_roles_upgrade(self): self.upgrade(87) self.assertTableColumns('implied_role', ['prior_role_id', 'implied_role_id']) self.assertTrue(self.does_fk_exist('implied_role', 'prior_role_id')) self.assertTrue(self.does_fk_exist('implied_role', 'implied_role_id'))
[docs] def test_add_config_registration(self): config_registration = 'config_register' self.upgrade(74) self.assertTableDoesNotExist(config_registration) self.upgrade(75) self.assertTableColumns(config_registration, ['type', 'domain_id'])
[docs] def test_endpoint_filter_upgrade(self): def assert_tables_columns_exist(): self.assertTableColumns('project_endpoint', ['endpoint_id', 'project_id']) self.assertTableColumns('endpoint_group', ['id', 'name', 'description', 'filters']) self.assertTableColumns('project_endpoint_group', ['endpoint_group_id', 'project_id']) self.assertTableDoesNotExist('project_endpoint') self.upgrade(85) assert_tables_columns_exist()
@mock.patch.object(migration_helpers, 'get_db_version', return_value=2)
[docs] def test_endpoint_filter_already_migrated(self, mock_endpoint_filter): # By setting the return value to 2, the migration has already been # run, and there's no need to create the table again. self.upgrade(85) mock_endpoint_filter.assert_any_call(extension='endpoint_filter', engine=mock.ANY) # It won't exist because we are mocking it, but we can verify # that 085 did not create the table. self.assertTableDoesNotExist('project_endpoint') self.assertTableDoesNotExist('endpoint_group') self.assertTableDoesNotExist('project_endpoint_group')
[docs] def test_add_trust_unique_constraint_upgrade(self): self.upgrade(86) inspector = reflection.Inspector.from_engine(self.engine) constraints = inspector.get_unique_constraints('trust') constraint_names = [constraint['name'] for constraint in constraints] self.assertIn('duplicate_trust_constraint', constraint_names)
[docs] def test_add_domain_specific_roles(self): """Check database upgraded successfully for domain specific roles. The following items need to be checked: - The domain_id column has been added - That it has been added to the uniqueness constraints - Existing roles have their domain_id columns set to the specific string of '<<null>>' """ NULL_DOMAIN_ID = '<<null>>' self.upgrade(87) session = self.sessionmaker() role_table = sqlalchemy.Table('role', self.metadata, autoload=True) # Add a role before we upgrade, so we can check that its new domain_id # attribute is handled correctly role_id = uuid.uuid4().hex self.insert_dict(session, 'role', {'id': role_id, 'name': uuid.uuid4().hex}) session.close() self.upgrade(88) session = self.sessionmaker() self.assertTableColumns('role', ['id', 'name', 'domain_id', 'extra']) # Check the domain_id has been added to the uniqueness constraint inspector = reflection.Inspector.from_engine(self.engine) constraints = inspector.get_unique_constraints('role') constraint_columns = [ constraint['column_names'] for constraint in constraints if constraint['name'] == 'ixu_role_name_domain_id'] self.assertIn('domain_id', constraint_columns[0]) # Now check our role has its domain_id attribute set correctly role_table = sqlalchemy.Table('role', self.metadata, autoload=True) cols = [role_table.c.domain_id] filter = role_table.c.id == role_id statement = sqlalchemy.select(cols).where(filter) role_entry = session.execute(statement).fetchone() self.assertEqual(NULL_DOMAIN_ID, role_entry[0])
[docs] def test_add_root_of_all_domains(self): NULL_DOMAIN_ID = '<<keystone.domain.root>>' self.upgrade(89) session = self.sessionmaker() domain_table = sqlalchemy.Table( 'domain', self.metadata, autoload=True) query = session.query(domain_table).filter_by(id=NULL_DOMAIN_ID) domain_from_db = query.one() self.assertIn(NULL_DOMAIN_ID, domain_from_db) project_table = sqlalchemy.Table( 'project', self.metadata, autoload=True) query = session.query(project_table).filter_by(id=NULL_DOMAIN_ID) project_from_db = query.one() self.assertIn(NULL_DOMAIN_ID, project_from_db) session.close()
[docs] def test_add_local_user_and_password_tables(self): local_user_table = 'local_user' password_table = 'password' self.upgrade(89) self.assertTableDoesNotExist(local_user_table) self.assertTableDoesNotExist(password_table) self.upgrade(90) self.assertTableColumns(local_user_table, ['id', 'user_id', 'domain_id', 'name']) self.assertTableColumns(password_table, ['id', 'local_user_id', 'password'])
[docs] def test_migrate_data_to_local_user_and_password_tables(self): def get_expected_users(): expected_users = [] for test_user in default_fixtures.USERS: user = {} user['id'] = uuid.uuid4().hex user['name'] = test_user['name'] user['domain_id'] = test_user['domain_id'] user['password'] = test_user['password'] user['enabled'] = True user['extra'] = json.dumps(uuid.uuid4().hex) user['default_project_id'] = uuid.uuid4().hex expected_users.append(user) return expected_users def add_users_to_db(expected_users, user_table): for user in expected_users: ins = user_table.insert().values( {'id': user['id'], 'name': user['name'], 'domain_id': user['domain_id'], 'password': user['password'], 'enabled': user['enabled'], 'extra': user['extra'], 'default_project_id': user['default_project_id']}) ins.execute() def get_users_from_db(user_table, local_user_table, password_table): sel = ( sqlalchemy.select([user_table.c.id, user_table.c.enabled, user_table.c.extra, user_table.c.default_project_id, local_user_table.c.name, local_user_table.c.domain_id, password_table.c.password]) .select_from(user_table.join(local_user_table, user_table.c.id == local_user_table.c.user_id) .join(password_table, local_user_table.c.id == password_table.c.local_user_id)) ) user_rows = sel.execute() users = [] for row in user_rows: users.append( {'id': row['id'], 'name': row['name'], 'domain_id': row['domain_id'], 'password': row['password'], 'enabled': row['enabled'], 'extra': row['extra'], 'default_project_id': row['default_project_id']}) return users user_table_name = 'user' local_user_table_name = 'local_user' password_table_name = 'password' # populate current user table self.upgrade(90) user_table = sqlalchemy.Table( user_table_name, self.metadata, autoload=True) expected_users = get_expected_users() add_users_to_db(expected_users, user_table) # upgrade to migration and test self.upgrade(91) self.assertTableCountsMatch(user_table_name, local_user_table_name) self.assertTableCountsMatch(local_user_table_name, password_table_name) user_table = sqlalchemy.Table( user_table_name, self.metadata, autoload=True) local_user_table = sqlalchemy.Table( local_user_table_name, self.metadata, autoload=True) password_table = sqlalchemy.Table( password_table_name, self.metadata, autoload=True) actual_users = get_users_from_db(user_table, local_user_table, password_table) self.assertItemsEqual(expected_users, actual_users)
[docs] def test_migrate_user_with_null_password_to_password_tables(self): USER_TABLE_NAME = 'user' LOCAL_USER_TABLE_NAME = 'local_user' PASSWORD_TABLE_NAME = 'password' self.upgrade(90) user_ref = unit.new_user_ref(uuid.uuid4().hex) user_ref.pop('password') # pop extra attribute which doesn't recognized by SQL expression # layer. user_ref.pop('email') session = self.sessionmaker() self.insert_dict(session, USER_TABLE_NAME, user_ref) self.upgrade(91) # migration should be successful. self.assertTableCountsMatch(USER_TABLE_NAME, LOCAL_USER_TABLE_NAME) # no new entry was added to the password table because the # user doesn't have a password. rows = self.calc_table_row_count(PASSWORD_TABLE_NAME) self.assertEqual(0, rows)
[docs] def test_migrate_user_skip_user_already_exist_in_local_user(self): USER_TABLE_NAME = 'user' LOCAL_USER_TABLE_NAME = 'local_user' self.upgrade(90) user1_ref = unit.new_user_ref(uuid.uuid4().hex) # pop extra attribute which doesn't recognized by SQL expression # layer. user1_ref.pop('email') user2_ref = unit.new_user_ref(uuid.uuid4().hex) user2_ref.pop('email') session = self.sessionmaker() self.insert_dict(session, USER_TABLE_NAME, user1_ref) self.insert_dict(session, USER_TABLE_NAME, user2_ref) user_id = user1_ref.pop('id') user_name = user1_ref.pop('name') domain_id = user1_ref.pop('domain_id') local_user_ref = {'user_id': user_id, 'name': user_name, 'domain_id': domain_id} self.insert_dict(session, LOCAL_USER_TABLE_NAME, local_user_ref) self.upgrade(91) # migration should be successful and user2_ref has been migrated to # `local_user` table. self.assertTableCountsMatch(USER_TABLE_NAME, LOCAL_USER_TABLE_NAME)
[docs] def test_implied_roles_fk_on_delete_cascade(self): if self.engine.name == 'sqlite': self.skipTest('sqlite backend does not support foreign keys') self.upgrade(92) session = self.sessionmaker() ROLE_TABLE_NAME = 'role' role_table = sqlalchemy.Table(ROLE_TABLE_NAME, self.metadata, autoload=True) IMPLIED_ROLE_TABLE_NAME = 'implied_role' implied_role_table = sqlalchemy.Table( IMPLIED_ROLE_TABLE_NAME, self.metadata, autoload=True) def _create_three_roles(): id_list = [] for _ in range(3): new_role_fields = { 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, } self.insert_dict(session, ROLE_TABLE_NAME, new_role_fields, table=role_table) id_list.append(new_role_fields['id']) return id_list role_id_list = _create_three_roles() implied_role_fields = { 'prior_role_id': role_id_list[0], 'implied_role_id': role_id_list[1], } self.insert_dict(session, IMPLIED_ROLE_TABLE_NAME, implied_role_fields, table=implied_role_table) implied_role_fields = { 'prior_role_id': role_id_list[0], 'implied_role_id': role_id_list[2], } self.insert_dict(session, IMPLIED_ROLE_TABLE_NAME, implied_role_fields, table=implied_role_table) # assert that there are two roles implied by role 0. implied_roles = session.query(implied_role_table).filter_by( prior_role_id=role_id_list[0]).all() self.assertThat(implied_roles, matchers.HasLength(2)) session.execute( role_table.delete().where(role_table.c.id == role_id_list[0])) # assert the cascade deletion is effective. implied_roles = session.query(implied_role_table).filter_by( prior_role_id=role_id_list[0]).all() self.assertThat(implied_roles, matchers.HasLength(0))
[docs] def test_domain_as_project_upgrade(self): def _populate_domain_and_project_tables(session): # Three domains, with various different attributes self.domains = [{'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True, 'extra': {'description': uuid.uuid4().hex, 'another_attribute': True}}, {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': True, 'extra': {'description': uuid.uuid4().hex}}, {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'enabled': False}] # Four projects, two top level, two children self.projects = [] self.projects.append(unit.new_project_ref( domain_id=self.domains[0]['id'], parent_id=None)) self.projects.append(unit.new_project_ref( domain_id=self.domains[0]['id'], parent_id=self.projects[0]['id'])) self.projects.append(unit.new_project_ref( domain_id=self.domains[1]['id'], parent_id=None)) self.projects.append(unit.new_project_ref( domain_id=self.domains[1]['id'], parent_id=self.projects[2]['id'])) for domain in self.domains: this_domain = domain.copy() if 'extra' in this_domain: this_domain['extra'] = json.dumps(this_domain['extra']) self.insert_dict(session, 'domain', this_domain) for project in self.projects: self.insert_dict(session, 'project', project) def _check_projects(projects): def _assert_domain_matches_project(project): for domain in self.domains: if project.id == domain['id']: self.assertEqual(domain['name'], project.name) self.assertEqual(domain['enabled'], project.enabled) if domain['id'] == self.domains[0]['id']: self.assertEqual(domain['extra']['description'], project.description) self.assertEqual({'another_attribute': True}, json.loads(project.extra)) elif domain['id'] == self.domains[1]['id']: self.assertEqual(domain['extra']['description'], project.description) self.assertEqual({}, json.loads(project.extra)) # We had domains 3 we created, which should now be projects acting # as domains, To this we add the 4 original projects, plus the root # of all domains row. self.assertEqual(8, projects.count()) project_ids = [] for project in projects: if project.is_domain: self.assertEqual(NULL_DOMAIN_ID, project.domain_id) self.assertIsNone(project.parent_id) else: self.assertIsNotNone(project.domain_id) self.assertIsNotNone(project.parent_id) project_ids.append(project.id) for domain in self.domains: self.assertIn(domain['id'], project_ids) for project in self.projects: self.assertIn(project['id'], project_ids) # Now check the attributes of the domains came across OK for project in projects: _assert_domain_matches_project(project) NULL_DOMAIN_ID = '<<keystone.domain.root>>' self.upgrade(92) session = self.sessionmaker() _populate_domain_and_project_tables(session) self.upgrade(93) proj_table = sqlalchemy.Table('project', self.metadata, autoload=True) projects = session.query(proj_table) _check_projects(projects)
[docs] def test_add_federated_user_table(self): federated_user_table = 'federated_user' self.upgrade(93) self.assertTableDoesNotExist(federated_user_table) self.upgrade(94) self.assertTableColumns(federated_user_table, ['id', 'user_id', 'idp_id', 'protocol_id', 'unique_id', 'display_name'])
[docs] def test_add_int_pkey_to_revocation_event_table(self): REVOCATION_EVENT_TABLE_NAME = 'revocation_event' self.upgrade(94) revocation_event_table = sqlalchemy.Table(REVOCATION_EVENT_TABLE_NAME, self.metadata, autoload=True) # assert id column is a string (before) self.assertEqual('VARCHAR(64)', str(revocation_event_table.c.id.type)) self.upgrade(95) revocation_event_table = sqlalchemy.Table(REVOCATION_EVENT_TABLE_NAME, self.metadata, autoload=True) # assert id column is an integer (after) self.assertIsInstance(revocation_event_table.c.id.type, sql.Integer)
def _add_unique_constraint_to_role_name(self, constraint_name='ixu_role_name'): role_table = sqlalchemy.Table('role', self.metadata, autoload=True) migrate.UniqueConstraint(role_table.c.name, name=constraint_name).create() def _drop_unique_constraint_to_role_name(self, constraint_name='ixu_role_name'): role_table = sqlalchemy.Table('role', self.metadata, autoload=True) migrate.UniqueConstraint(role_table.c.name, name=constraint_name).drop() def _add_unique_constraint_to_user_name_domainid( self, constraint_name='ixu_role_name'): user_table = sqlalchemy.Table('user', self.metadata, autoload=True) migrate.UniqueConstraint(user_table.c.name, user_table.c.domain_id, name=constraint_name).create() def _add_name_domain_id_columns_to_user(self): user_table = sqlalchemy.Table('user', self.metadata, autoload=True) column_name = sqlalchemy.Column('name', sql.String(255)) column_domain_id = sqlalchemy.Column('domain_id', sql.String(64)) user_table.create_column(column_name) user_table.create_column(column_domain_id) def _drop_unique_constraint_to_user_name_domainid( self, constraint_name='ixu_user_name_domain_id'): user_table = sqlalchemy.Table('user', self.metadata, autoload=True) migrate.UniqueConstraint(user_table.c.name, user_table.c.domain_id, name=constraint_name).drop()
[docs] def test_migration_88_drops_unique_constraint(self): self.upgrade(87) if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist('role', 'ixu_role_name')) else: self.assertTrue(self.does_constraint_exist('role', 'ixu_role_name')) self.upgrade(88) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name'))
[docs] def test_migration_88_inconsistent_constraint_name(self): self.upgrade(87) self._drop_unique_constraint_to_role_name() constraint_name = uuid.uuid4().hex self._add_unique_constraint_to_role_name( constraint_name=constraint_name) if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist('role', constraint_name)) self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertTrue(self.does_constraint_exist('role', constraint_name)) self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name')) self.upgrade(88) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('role', constraint_name)) self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertFalse(self.does_constraint_exist('role', constraint_name)) self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name'))
[docs] def test_migration_91_drops_unique_constraint(self): self.upgrade(90) if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist('user', 'ixu_user_name_domain_id')) else: self.assertTrue(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id')) self.upgrade(91) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id'))
[docs] def test_migration_91_inconsistent_constraint_name(self): self.upgrade(90) self._drop_unique_constraint_to_user_name_domainid() constraint_name = uuid.uuid4().hex self._add_unique_constraint_to_user_name_domainid( constraint_name=constraint_name) if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist('user', constraint_name)) self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertTrue(self.does_constraint_exist('user', constraint_name)) self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id')) self.upgrade(91) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('user', constraint_name)) self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertFalse(self.does_constraint_exist('user', constraint_name)) self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id'))
[docs] def test_migration_96(self): self.upgrade(95) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name')) self.upgrade(96) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name'))
[docs] def test_migration_96_constraint_exists(self): self.upgrade(95) self._add_unique_constraint_to_role_name() if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist('role', 'ixu_role_name')) else: self.assertTrue(self.does_constraint_exist('role', 'ixu_role_name')) self.upgrade(96) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name'))
[docs] def test_migration_97(self): self.upgrade(96) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id')) self.upgrade(97) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id'))
[docs] def test_migration_97_constraint_exists(self): self.upgrade(96) self._add_name_domain_id_columns_to_user() self._add_unique_constraint_to_user_name_domainid( constraint_name='ixu_user_name_domain_id') if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertTrue(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id')) self.upgrade(97) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id'))
[docs] def test_migration_97_inconsistent_constraint_exists(self): self.upgrade(96) constraint_name = uuid.uuid4().hex self._add_name_domain_id_columns_to_user() self._add_unique_constraint_to_user_name_domainid( constraint_name=constraint_name) if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist('user', constraint_name)) else: self.assertTrue(self.does_constraint_exist('user', constraint_name)) self.upgrade(97) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('user', constraint_name)) else: self.assertFalse(self.does_constraint_exist('user', constraint_name))
[docs] def test_migration_101(self): self.upgrade(100) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name')) self.upgrade(101) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name'))
[docs] def test_migration_101_constraint_exists(self): self.upgrade(100) self._add_unique_constraint_to_role_name() if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist('role', 'ixu_role_name')) else: self.assertTrue(self.does_constraint_exist('role', 'ixu_role_name')) self.upgrade(101) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('role', 'ixu_role_name')) else: self.assertFalse(self.does_constraint_exist('role', 'ixu_role_name'))
[docs] def test_drop_domain_table(self): self.upgrade(101) self.assertTableExists('domain') self.upgrade(102) self.assertTableDoesNotExist('domain')
[docs] def test_add_nonlocal_user_table(self): nonlocal_user_table = 'nonlocal_user' self.upgrade(102) self.assertTableDoesNotExist(nonlocal_user_table) self.upgrade(103) self.assertTableColumns(nonlocal_user_table, ['domain_id', 'name', 'user_id'])
[docs] def test_migration_104(self): self.upgrade(103) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id')) self.upgrade(104) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id'))
[docs] def test_migration_104_constraint_exists(self): self.upgrade(103) self._add_name_domain_id_columns_to_user() self._add_unique_constraint_to_user_name_domainid( constraint_name='ixu_user_name_domain_id') if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertTrue(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id')) self.upgrade(104) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist( 'user', 'ixu_user_name_domain_id')) else: self.assertFalse(self.does_constraint_exist( 'user', 'ixu_user_name_domain_id'))
[docs] def test_migration_104_inconsistent_constraint_exists(self): self.upgrade(103) constraint_name = uuid.uuid4().hex self._add_name_domain_id_columns_to_user() self._add_unique_constraint_to_user_name_domainid( constraint_name=constraint_name) if self.engine.name == 'mysql': self.assertTrue(self.does_index_exist('user', constraint_name)) else: self.assertTrue(self.does_constraint_exist('user', constraint_name)) self.upgrade(104) if self.engine.name == 'mysql': self.assertFalse(self.does_index_exist('user', constraint_name)) else: self.assertFalse(self.does_constraint_exist('user', constraint_name))
[docs] def test_migration_105_add_password_date_columns(self): def add_user_model_record(session): # add a user user = {'id': uuid.uuid4().hex} self.insert_dict(session, 'user', user) # add a local user local_user = { 'id': 1, 'user_id': user['id'], 'domain_id': 'default', 'name': uuid.uuid4().hex } self.insert_dict(session, 'local_user', local_user) # add a password password = { 'local_user_id': local_user['id'], 'password': uuid.uuid4().hex } self.insert_dict(session, 'password', password) self.upgrade(104) session = self.sessionmaker() password_name = 'password' # columns before self.assertTableColumns(password_name, ['id', 'local_user_id', 'password']) # add record and verify table count is greater than zero add_user_model_record(session) password_table = sqlalchemy.Table(password_name, self.metadata, autoload=True) cnt = session.query(password_table).count() self.assertGreater(cnt, 0) self.upgrade(105) # columns after self.assertTableColumns(password_name, ['id', 'local_user_id', 'password', 'created_at', 'expires_at']) password_table = sqlalchemy.Table(password_name, self.metadata, autoload=True) # verify created_at is not null null_created_at_cnt = ( session.query(password_table).filter_by(created_at=None).count()) self.assertEqual(null_created_at_cnt, 0) # verify expires_at is null null_expires_at_cnt = ( session.query(password_table).filter_by(expires_at=None).count()) self.assertGreater(null_expires_at_cnt, 0)
[docs] def test_migration_106_allow_password_column_to_be_nullable(self): password_table_name = 'password' self.upgrade(105) password_table = sqlalchemy.Table(password_table_name, self.metadata, autoload=True) self.assertFalse(password_table.c.password.nullable) self.upgrade(106) password_table = sqlalchemy.Table(password_table_name, self.metadata, autoload=True) self.assertTrue(password_table.c.password.nullable)
[docs] def test_migration_107_add_user_date_columns(self): user_table = 'user' self.upgrade(106) self.assertTableColumns(user_table, ['id', 'extra', 'enabled', 'default_project_id']) self.upgrade(107) self.assertTableColumns(user_table, ['id', 'extra', 'enabled', 'default_project_id', 'created_at', 'last_active_at'])
[docs] def test_migration_108_add_failed_auth_columns(self): self.upgrade(107) table_name = 'local_user' self.assertTableColumns(table_name, ['id', 'user_id', 'domain_id', 'name']) self.upgrade(108) self.assertTableColumns(table_name, ['id', 'user_id', 'domain_id', 'name', 'failed_auth_count', 'failed_auth_at'])
[docs] def test_migration_109_add_password_self_service_column(self): password_table = 'password' self.upgrade(108) self.assertTableColumns(password_table, ['id', 'local_user_id', 'password', 'created_at', 'expires_at']) self.upgrade(109) self.assertTableColumns(password_table, ['id', 'local_user_id', 'password', 'created_at', 'expires_at', 'self_service'])
[docs]class MySQLOpportunisticUpgradeTestCase(SqlLegacyRepoUpgradeTests): FIXTURE = test_base.MySQLOpportunisticFixture
[docs]class PostgreSQLOpportunisticUpgradeTestCase(SqlLegacyRepoUpgradeTests): FIXTURE = test_base.PostgreSQLOpportunisticFixture
[docs]class SqlExpandSchemaUpgradeTests(SqlMigrateBase):
[docs] def setUp(self): # Make sure the main repo is fully upgraded for this release since the # expand phase is only run after such an upgrade super(SqlExpandSchemaUpgradeTests, self).setUp() self.upgrade()
[docs] def test_start_version_db_init_version(self): self.assertEqual( self.repos[EXPAND_REPO].min_version, self.repos[EXPAND_REPO].version)
[docs]class MySQLOpportunisticExpandSchemaUpgradeTestCase( SqlExpandSchemaUpgradeTests): FIXTURE = test_base.MySQLOpportunisticFixture
[docs]class PostgreSQLOpportunisticExpandSchemaUpgradeTestCase( SqlExpandSchemaUpgradeTests): FIXTURE = test_base.PostgreSQLOpportunisticFixture
[docs]class SqlDataMigrationUpgradeTests(SqlMigrateBase):
[docs] def setUp(self): # Make sure the legacy and expand repos are fully upgraded, since the # data migration phase is only run after these are upgraded super(SqlDataMigrationUpgradeTests, self).setUp() self.upgrade() self.expand()
[docs] def test_start_version_db_init_version(self): self.assertEqual( self.repos[DATA_MIGRATION_REPO].min_version, self.repos[DATA_MIGRATION_REPO].version)
[docs]class MySQLOpportunisticDataMigrationUpgradeTestCase( SqlDataMigrationUpgradeTests): FIXTURE = test_base.MySQLOpportunisticFixture
[docs]class PostgreSQLOpportunisticDataMigrationUpgradeTestCase( SqlDataMigrationUpgradeTests): FIXTURE = test_base.PostgreSQLOpportunisticFixture
[docs]class SqlContractSchemaUpgradeTests(SqlMigrateBase, unit.TestCase):
[docs] def setUp(self): # Make sure the legacy, expand and data migration repos are fully # upgraded, since the contract phase is only run after these are # upgraded. super(SqlContractSchemaUpgradeTests, self).setUp() self.useFixture( ksfixtures.KeyRepository( self.config_fixture, 'credential', credential_fernet.MAX_ACTIVE_KEYS ) ) self.upgrade() self.expand() self.migrate()
[docs] def test_start_version_db_init_version(self): self.assertEqual( self.repos[CONTRACT_REPO].min_version, self.repos[CONTRACT_REPO].version)
[docs]class MySQLOpportunisticContractSchemaUpgradeTestCase( SqlContractSchemaUpgradeTests): FIXTURE = test_base.MySQLOpportunisticFixture
[docs]class PostgreSQLOpportunisticContractSchemaUpgradeTestCase( SqlContractSchemaUpgradeTests): FIXTURE = test_base.PostgreSQLOpportunisticFixture
[docs]class VersionTests(SqlMigrateBase):
[docs] def test_core_initial(self): """Get the version before migrated, it's the initial DB version.""" self.assertEqual( self.repos[LEGACY_REPO].min_version, self.repos[LEGACY_REPO].version)
[docs] def test_core_max(self): """When get the version after upgrading, it's the new version.""" self.upgrade() self.assertEqual( self.repos[LEGACY_REPO].max_version, self.repos[LEGACY_REPO].version)
[docs] def test_assert_not_schema_downgrade(self): self.upgrade() self.assertRaises( db_exception.DbMigrationError, migration_helpers._sync_common_repo, self.repos[LEGACY_REPO].max_version - 1)
[docs] def test_these_are_not_the_migrations_you_are_looking_for(self): """Keystone has shifted to rolling upgrades. New database migrations should no longer land in the legacy migration repository. Instead, new database migrations should be divided into three discrete steps: schema expansion, data migration, and schema contraction. These migrations live in a new set of database migration repositories, called ``expand_repo``, ``data_migration_repo``, and ``contract_repo``. For more information, see "Database Migrations" here: http://docs.openstack.org/developer/keystone/developing.html """ # Note to reviewers: this version number should never change. self.assertEqual(109, self.repos[LEGACY_REPO].max_version)
[docs]class FullMigration(SqlMigrateBase, unit.TestCase): """Test complete orchestration between all database phases."""
[docs] def setUp(self): super(FullMigration, self).setUp() # Upgrade the legacy repository self.upgrade()
[docs] def test_migration_002_password_created_at_not_nullable(self): # upgrade each repository to 001 self.expand(1) self.migrate(1) self.contract(1) password = sqlalchemy.Table('password', self.metadata, autoload=True) self.assertTrue(password.c.created_at.nullable) # upgrade each repository to 002 self.expand(2) self.migrate(2) self.contract(2) password = sqlalchemy.Table('password', self.metadata, autoload=True) if self.engine.name != 'sqlite': self.assertFalse(password.c.created_at.nullable)
[docs] def test_migration_003_migrate_unencrypted_credentials(self): self.useFixture( ksfixtures.KeyRepository( self.config_fixture, 'credential', credential_fernet.MAX_ACTIVE_KEYS ) ) session = self.sessionmaker() credential_table_name = 'credential' # upgrade each repository to 002 self.expand(2) self.migrate(2) self.contract(2) # populate the credential table with some sample credentials credentials = list() for i in range(5): credential = {'id': uuid.uuid4().hex, 'blob': uuid.uuid4().hex, 'user_id': uuid.uuid4().hex, 'type': 'cert'} credentials.append(credential) self.insert_dict(session, credential_table_name, credential) # verify the current schema self.assertTableColumns( credential_table_name, ['id', 'user_id', 'project_id', 'type', 'blob', 'extra'] ) # upgrade expand repo to 003 to add new columns self.expand(3) # verify encrypted_blob and key_hash columns have been added and verify # the original blob column is still there self.assertTableColumns( credential_table_name, ['id', 'user_id', 'project_id', 'type', 'blob', 'extra', 'key_hash', 'encrypted_blob'] ) # verify triggers by making sure we can't write to the credential table credential = {'id': uuid.uuid4().hex, 'blob': uuid.uuid4().hex, 'user_id': uuid.uuid4().hex, 'type': 'cert'} self.assertRaises(db_exception.DBError, self.insert_dict, session, credential_table_name, credential) # upgrade migrate repo to 003 to migrate existing credentials self.migrate(3) # make sure we've actually updated the credential with the # encrypted blob and the corresponding key hash credential_table = sqlalchemy.Table( credential_table_name, self.metadata, autoload=True ) for credential in credentials: filter = credential_table.c.id == credential['id'] cols = [credential_table.c.key_hash, credential_table.c.blob, credential_table.c.encrypted_blob] q = sqlalchemy.select(cols).where(filter) result = session.execute(q).fetchone() self.assertIsNotNone(result.encrypted_blob) self.assertIsNotNone(result.key_hash) # verify the original blob column is still populated self.assertEqual(result.blob, credential['blob']) # verify we can't make any writes to the credential table credential = {'id': uuid.uuid4().hex, 'blob': uuid.uuid4().hex, 'user_id': uuid.uuid4().hex, 'key_hash': uuid.uuid4().hex, 'type': 'cert'} self.assertRaises(db_exception.DBError, self.insert_dict, session, credential_table_name, credential) # upgrade contract repo to 003 to remove triggers and blob column self.contract(3) # verify the new schema doesn't have a blob column anymore self.assertTableColumns( credential_table_name, ['id', 'user_id', 'project_id', 'type', 'extra', 'key_hash', 'encrypted_blob'] ) # verify that the triggers are gone by writing to the database credential = {'id': uuid.uuid4().hex, 'encrypted_blob': uuid.uuid4().hex, 'key_hash': uuid.uuid4().hex, 'user_id': uuid.uuid4().hex, 'type': 'cert'} self.insert_dict(session, credential_table_name, credential)
[docs] def test_migration_004_reset_password_created_at(self): # upgrade each repository to 003 and test self.expand(3) self.migrate(3) self.contract(3) password = sqlalchemy.Table('password', self.metadata, autoload=True) # postgresql returns 'TIMESTAMP WITHOUT TIME ZONE' self.assertTrue( str(password.c.created_at.type).startswith('TIMESTAMP')) # upgrade each repository to 004 and test self.expand(4) self.migrate(4) self.contract(4) password = sqlalchemy.Table('password', self.metadata, autoload=True) # type would still be TIMESTAMP with postgresql if self.engine.name == 'postgresql': self.assertTrue( str(password.c.created_at.type).startswith('TIMESTAMP')) else: self.assertEqual('DATETIME', str(password.c.created_at.type)) self.assertFalse(password.c.created_at.nullable)
[docs]class MySQLOpportunisticFullMigration(FullMigration): FIXTURE = test_base.MySQLOpportunisticFixture
[docs]class PostgreSQLOpportunisticFullMigration(FullMigration): FIXTURE = test_base.PostgreSQLOpportunisticFixture

Project Source