# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.
from oslo_serialization import jsonutils as json
from tempest.api.identity import base
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
CONF = config.CONF
[docs]
class CredentialsTestJSON(base.BaseIdentityV3AdminTest):
    """Test keystone credentials"""
    # NOTE: force_tenant_isolation is true in the base class by default but
    # overridden to false here to allow test execution for clouds using the
    # pre-provisioned credentials provider.
    force_tenant_isolation = False
    @classmethod
    def resource_setup(cls):
        super(CredentialsTestJSON, cls).resource_setup()
        cls.projects = list()
        cls.creds_list = [['project_id', 'user_id', 'id'],
                          ['access', 'secret']]
        for _ in range(2):
            project = cls.projects_client.create_project(
                data_utils.rand_name(
                    name='project', prefix=CONF.resource_name_prefix),
                description=data_utils.rand_name(
                    name='project-desc',
                    prefix=CONF.resource_name_prefix))['project']
            cls.addClassResourceCleanup(
                cls.projects_client.delete_project, project['id'])
            cls.projects.append(project['id'])
        cls.user_body = cls.users_client.show_user(
            cls.os_primary.credentials.user_id)['user']
    def _delete_credential(self, cred_id):
        self.creds_client.delete_credential(cred_id)
[docs]
    @decorators.attr(type='smoke')
    @decorators.idempotent_id('7cd59bf9-bda4-4c72-9467-d21cab278355')
    def test_credentials_create_get_update_delete(self):
        """Test creating, getting, updating, deleting of credentials"""
        prefix = CONF.resource_name_prefix
        blob = '{"access": "%s", "secret": "%s"}' % (
            data_utils.rand_name(name='Access', prefix=prefix),
            data_utils.rand_name(name='Secret', prefix=prefix))
        cred = self.creds_client.create_credential(
            user_id=self.user_body['id'], project_id=self.projects[0],
            blob=blob, type='ec2')['credential']
        self.addCleanup(self._delete_credential, cred['id'])
        for value1 in self.creds_list[0]:
            self.assertIn(value1, cred)
        for value2 in self.creds_list[1]:
            self.assertIn(value2, cred['blob'])
        new_keys = [data_utils.rand_name(name='NewAccess', prefix=prefix),
                    data_utils.rand_name(name='NewSecret', prefix=prefix)]
        blob = '{"access": "%s", "secret": "%s"}' % (new_keys[0], new_keys[1])
        update_body = self.creds_client.update_credential(
            cred['id'], blob=blob, project_id=self.projects[1],
            type='ec2')['credential']
        update_body['blob'] = json.loads(update_body['blob'])
        self.assertEqual(cred['id'], update_body['id'])
        self.assertEqual(self.projects[1], update_body['project_id'])
        self.assertEqual(self.user_body['id'], update_body['user_id'])
        self.assertEqual(update_body['blob']['access'], new_keys[0])
        self.assertEqual(update_body['blob']['secret'], new_keys[1])
        get_body = self.creds_client.show_credential(cred['id'])['credential']
        get_body['blob'] = json.loads(get_body['blob'])
        for value1 in self.creds_list[0]:
            self.assertEqual(update_body[value1],
                             get_body[value1])
        for value2 in self.creds_list[1]:
            self.assertEqual(update_body['blob'][value2],
                             get_body['blob'][value2]) 
[docs]
    @decorators.idempotent_id('13202c00-0021-42a1-88d4-81b44d448aab')
    def test_credentials_list_delete(self):
        """Test listing credentials"""
        created_cred_ids = list()
        fetched_cred_ids = list()
        prefix = CONF.resource_name_prefix
        for _ in range(2):
            blob = '{"access": "%s", "secret": "%s"}' % (
                data_utils.rand_name(name='Access', prefix=prefix),
                data_utils.rand_name(name='Secret', prefix=prefix))
            cred = self.creds_client.create_credential(
                user_id=self.user_body['id'], project_id=self.projects[0],
                blob=blob, type='ec2')['credential']
            created_cred_ids.append(cred['id'])
            self.addCleanup(self._delete_credential, cred['id'])
        creds = self.creds_client.list_credentials()['credentials']
        for i in creds:
            fetched_cred_ids.append(i['id'])
        missing_creds = [c for c in created_cred_ids
                         if c not in fetched_cred_ids]
        self.assertEmpty(missing_creds,
                         "Failed to find cred %s in fetched list" %
                         ', '.join(m_cred for m_cred in missing_creds))