Source code for keystone.tests.unit.test_v3_credential

# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import hashlib
import json
import uuid

from keystoneclient.contrib.ec2 import utils as ec2_utils
from oslo_config import cfg
from six.moves import http_client
from testtools import matchers

from keystone.common import utils
from keystone.contrib.ec2 import controllers
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import test_v3


CONF = cfg.CONF
CRED_TYPE_EC2 = controllers.CRED_TYPE_EC2


[docs]class CredentialBaseTestCase(test_v3.RestfulTestCase): def _create_dict_blob_credential(self): blob, credential = unit.new_ec2_credential(user_id=self.user['id'], project_id=self.project_id) # Store the blob as a dict *not* JSON ref bug #1259584 # This means we can test the dict->json workaround, added # as part of the bugfix for backwards compatibility works. credential['blob'] = blob credential_id = credential['id'] # Create direct via the DB API to avoid validation failure self.credential_api.create_credential(credential_id, credential) return json.dumps(blob), credential_id
[docs]class CredentialTestCase(CredentialBaseTestCase): """Test credential CRUD."""
[docs] def setUp(self): super(CredentialTestCase, self).setUp() self.credential = unit.new_credential_ref(user_id=self.user['id'], project_id=self.project_id) self.credential_api.create_credential( self.credential['id'], self.credential)
[docs] def test_credential_api_delete_credentials_for_project(self): self.credential_api.delete_credentials_for_project(self.project_id) # Test that the credential that we created in .setUp no longer exists # once we delete all credentials for self.project_id self.assertRaises(exception.CredentialNotFound, self.credential_api.get_credential, credential_id=self.credential['id'])
[docs] def test_credential_api_delete_credentials_for_user(self): self.credential_api.delete_credentials_for_user(self.user_id) # Test that the credential that we created in .setUp no longer exists # once we delete all credentials for self.user_id self.assertRaises(exception.CredentialNotFound, self.credential_api.get_credential, credential_id=self.credential['id'])
[docs] def test_list_credentials(self): """Call ``GET /credentials``.""" r = self.get('/credentials') self.assertValidCredentialListResponse(r, ref=self.credential)
[docs] def test_list_credentials_filtered_by_user_id(self): """Call ``GET /credentials?user_id={user_id}``.""" credential = unit.new_credential_ref(user_id=uuid.uuid4().hex) self.credential_api.create_credential(credential['id'], credential) r = self.get('/credentials?user_id=%s' % self.user['id']) self.assertValidCredentialListResponse(r, ref=self.credential) for cred in r.result['credentials']: self.assertEqual(self.user['id'], cred['user_id'])
[docs] def test_list_credentials_filtered_by_type(self): """Call ``GET /credentials?type={type}``.""" # The type ec2 was chosen, instead of a random string, # because the type must be in the list of supported types ec2_credential = unit.new_credential_ref(user_id=uuid.uuid4().hex, project_id=self.project_id, type=CRED_TYPE_EC2) ec2_resp = self.credential_api.create_credential( ec2_credential['id'], ec2_credential) # The type cert was chosen for the same reason as ec2 r = self.get('/credentials?type=cert') # Testing the filter for two different types self.assertValidCredentialListResponse(r, ref=self.credential) for cred in r.result['credentials']: self.assertEqual('cert', cred['type']) r_ec2 = self.get('/credentials?type=ec2') self.assertThat(r_ec2.result['credentials'], matchers.HasLength(1)) cred_ec2 = r_ec2.result['credentials'][0] self.assertValidCredentialListResponse(r_ec2, ref=ec2_resp) self.assertEqual(CRED_TYPE_EC2, cred_ec2['type']) self.assertEqual(ec2_credential['id'], cred_ec2['id'])
[docs] def test_list_credentials_filtered_by_type_and_user_id(self): """Call ``GET /credentials?user_id={user_id}&type={type}``.""" user1_id = uuid.uuid4().hex user2_id = uuid.uuid4().hex # Creating credentials for two different users credential_user1_ec2 = unit.new_credential_ref(user_id=user1_id, type=CRED_TYPE_EC2) credential_user1_cert = unit.new_credential_ref(user_id=user1_id) credential_user2_cert = unit.new_credential_ref(user_id=user2_id) self.credential_api.create_credential( credential_user1_ec2['id'], credential_user1_ec2) self.credential_api.create_credential( credential_user1_cert['id'], credential_user1_cert) self.credential_api.create_credential( credential_user2_cert['id'], credential_user2_cert) r = self.get('/credentials?user_id=%s&type=ec2' % user1_id) self.assertValidCredentialListResponse(r, ref=credential_user1_ec2) self.assertThat(r.result['credentials'], matchers.HasLength(1)) cred = r.result['credentials'][0] self.assertEqual(CRED_TYPE_EC2, cred['type']) self.assertEqual(user1_id, cred['user_id'])
[docs] def test_create_credential(self): """Call ``POST /credentials``.""" ref = unit.new_credential_ref(user_id=self.user['id']) r = self.post( '/credentials', body={'credential': ref}) self.assertValidCredentialResponse(r, ref)
[docs] def test_get_credential(self): """Call ``GET /credentials/{credential_id}``.""" r = self.get( '/credentials/%(credential_id)s' % { 'credential_id': self.credential['id']}) self.assertValidCredentialResponse(r, self.credential)
[docs] def test_update_credential(self): """Call ``PATCH /credentials/{credential_id}``.""" ref = unit.new_credential_ref(user_id=self.user['id'], project_id=self.project_id) del ref['id'] r = self.patch( '/credentials/%(credential_id)s' % { 'credential_id': self.credential['id']}, body={'credential': ref}) self.assertValidCredentialResponse(r, ref)
[docs] def test_delete_credential(self): """Call ``DELETE /credentials/{credential_id}``.""" self.delete( '/credentials/%(credential_id)s' % { 'credential_id': self.credential['id']})
[docs] def test_create_ec2_credential(self): """Call ``POST /credentials`` for creating ec2 credential.""" blob, ref = unit.new_ec2_credential(user_id=self.user['id'], project_id=self.project_id) r = self.post('/credentials', body={'credential': ref}) self.assertValidCredentialResponse(r, ref) # Assert credential id is same as hash of access key id for # ec2 credentials access = blob['access'].encode('utf-8') self.assertEqual(hashlib.sha256(access).hexdigest(), r.result['credential']['id']) # Create second ec2 credential with the same access key id and check # for conflict. self.post( '/credentials', body={'credential': ref}, expected_status=http_client.CONFLICT)
[docs] def test_get_ec2_dict_blob(self): """Ensure non-JSON blob data is correctly converted.""" expected_blob, credential_id = self._create_dict_blob_credential() r = self.get( '/credentials/%(credential_id)s' % { 'credential_id': credential_id}) # use json.loads to transform the blobs back into Python dictionaries # to avoid problems with the keys being in different orders. self.assertEqual(json.loads(expected_blob), json.loads(r.result['credential']['blob']))
[docs] def test_list_ec2_dict_blob(self): """Ensure non-JSON blob data is correctly converted.""" expected_blob, credential_id = self._create_dict_blob_credential() list_r = self.get('/credentials') list_creds = list_r.result['credentials'] list_ids = [r['id'] for r in list_creds] self.assertIn(credential_id, list_ids) # use json.loads to transform the blobs back into Python dictionaries # to avoid problems with the keys being in different orders. for r in list_creds: if r['id'] == credential_id: self.assertEqual(json.loads(expected_blob), json.loads(r['blob']))
[docs] def test_create_non_ec2_credential(self): """Test creating non-ec2 credential. Call ``POST /credentials``. """ blob, ref = unit.new_cert_credential(user_id=self.user['id']) r = self.post('/credentials', body={'credential': ref}) self.assertValidCredentialResponse(r, ref) # Assert credential id is not same as hash of access key id for # non-ec2 credentials access = blob['access'].encode('utf-8') self.assertNotEqual(hashlib.sha256(access).hexdigest(), r.result['credential']['id'])
[docs] def test_create_ec2_credential_with_missing_project_id(self): """Test Creating ec2 credential with missing project_id. Call ``POST /credentials``. """ _, ref = unit.new_ec2_credential(user_id=self.user['id'], project_id=None) # Assert bad request status when missing project_id self.post( '/credentials', body={'credential': ref}, expected_status=http_client.BAD_REQUEST)
[docs] def test_create_ec2_credential_with_invalid_blob(self): """Test creating ec2 credential with invalid blob. Call ``POST /credentials``. """ ref = unit.new_credential_ref(user_id=self.user['id'], project_id=self.project_id, blob='{"abc":"def"d}', type=CRED_TYPE_EC2) # Assert bad request status when request contains invalid blob response = self.post( '/credentials', body={'credential': ref}, expected_status=http_client.BAD_REQUEST) self.assertValidErrorResponse(response)
[docs] def test_create_credential_with_admin_token(self): # Make sure we can create credential with the static admin token ref = unit.new_credential_ref(user_id=self.user['id']) r = self.post( '/credentials', body={'credential': ref}, token=self.get_admin_token()) self.assertValidCredentialResponse(r, ref)
[docs]class TestCredentialTrustScoped(test_v3.RestfulTestCase): """Test credential with trust scoped token."""
[docs] def setUp(self): super(TestCredentialTrustScoped, self).setUp() self.trustee_user = unit.new_user_ref(domain_id=self.domain_id) password = self.trustee_user['password'] self.trustee_user = self.identity_api.create_user(self.trustee_user) self.trustee_user['password'] = password self.trustee_user_id = self.trustee_user['id']
[docs] def config_overrides(self): super(TestCredentialTrustScoped, self).config_overrides() self.config_fixture.config(group='trust', enabled=True)
[docs] def test_trust_scoped_ec2_credential(self): """Test creating trust scoped ec2 credential. Call ``POST /credentials``. """ # Create the trust ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=True, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) # Get a trust scoped token auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) r = self.v3_create_token(auth_data) self.assertValidProjectScopedTokenResponse(r, self.user) trust_id = r.result['token']['OS-TRUST:trust']['id'] token_id = r.headers.get('X-Subject-Token') # Create the credential with the trust scoped token blob, ref = unit.new_ec2_credential(user_id=self.user['id'], project_id=self.project_id) r = self.post('/credentials', body={'credential': ref}, token=token_id) # We expect the response blob to contain the trust_id ret_ref = ref.copy() ret_blob = blob.copy() ret_blob['trust_id'] = trust_id ret_ref['blob'] = json.dumps(ret_blob) self.assertValidCredentialResponse(r, ref=ret_ref) # Assert credential id is same as hash of access key id for # ec2 credentials access = blob['access'].encode('utf-8') self.assertEqual(hashlib.sha256(access).hexdigest(), r.result['credential']['id']) # Create second ec2 credential with the same access key id and check # for conflict. self.post( '/credentials', body={'credential': ref}, token=token_id, expected_status=http_client.CONFLICT)
[docs]class TestCredentialEc2(CredentialBaseTestCase): """Test v3 credential compatibility with ec2tokens."""
[docs] def setUp(self): super(TestCredentialEc2, self).setUp()
def _validate_signature(self, access, secret): """Test signature validation with the access/secret provided.""" signer = ec2_utils.Ec2Signer(secret) params = {'SignatureMethod': 'HmacSHA256', 'SignatureVersion': '2', 'AWSAccessKeyId': access} request = {'host': 'foo', 'verb': 'GET', 'path': '/bar', 'params': params} signature = signer.generate(request) # Now make a request to validate the signed dummy request via the # ec2tokens API. This proves the v3 ec2 credentials actually work. sig_ref = {'access': access, 'signature': signature, 'host': 'foo', 'verb': 'GET', 'path': '/bar', 'params': params} r = self.post( '/ec2tokens', body={'ec2Credentials': sig_ref}, expected_status=http_client.OK) self.assertValidTokenResponse(r)
[docs] def test_ec2_credential_signature_validate(self): """Test signature validation with a v3 ec2 credential.""" blob, ref = unit.new_ec2_credential(user_id=self.user['id'], project_id=self.project_id) r = self.post('/credentials', body={'credential': ref}) self.assertValidCredentialResponse(r, ref) # Assert credential id is same as hash of access key id access = blob['access'].encode('utf-8') self.assertEqual(hashlib.sha256(access).hexdigest(), r.result['credential']['id']) cred_blob = json.loads(r.result['credential']['blob']) self.assertEqual(blob, cred_blob) self._validate_signature(access=cred_blob['access'], secret=cred_blob['secret'])
[docs] def test_ec2_credential_signature_validate_legacy(self): """Test signature validation with a legacy v3 ec2 credential.""" cred_json, _ = self._create_dict_blob_credential() cred_blob = json.loads(cred_json) self._validate_signature(access=cred_blob['access'], secret=cred_blob['secret'])
def _get_ec2_cred_uri(self): return '/users/%s/credentials/OS-EC2' % self.user_id def _get_ec2_cred(self): uri = self._get_ec2_cred_uri() r = self.post(uri, body={'tenant_id': self.project_id}) return r.result['credential']
[docs] def test_ec2_create_credential(self): """Test ec2 credential creation.""" ec2_cred = self._get_ec2_cred() self.assertEqual(self.user_id, ec2_cred['user_id']) self.assertEqual(self.project_id, ec2_cred['tenant_id']) self.assertIsNone(ec2_cred['trust_id']) self._validate_signature(access=ec2_cred['access'], secret=ec2_cred['secret']) uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) self.assertThat(ec2_cred['links']['self'], matchers.EndsWith(uri))
[docs] def test_ec2_get_credential(self): ec2_cred = self._get_ec2_cred() uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) r = self.get(uri) self.assertDictEqual(ec2_cred, r.result['credential']) self.assertThat(ec2_cred['links']['self'], matchers.EndsWith(uri))
[docs] def test_ec2_cannot_get_non_ec2_credential(self): access_key = uuid.uuid4().hex cred_id = utils.hash_access_key(access_key) non_ec2_cred = unit.new_credential_ref( user_id=self.user_id, project_id=self.project_id) non_ec2_cred['id'] = cred_id self.credential_api.create_credential(cred_id, non_ec2_cred) uri = '/'.join([self._get_ec2_cred_uri(), access_key]) # if access_key is not found, ec2 controller raises Unauthorized # exception self.get(uri, expected_status=http_client.UNAUTHORIZED)
[docs] def test_ec2_list_credentials(self): """Test ec2 credential listing.""" self._get_ec2_cred() uri = self._get_ec2_cred_uri() r = self.get(uri) cred_list = r.result['credentials'] self.assertEqual(1, len(cred_list)) self.assertThat(r.result['links']['self'], matchers.EndsWith(uri)) # non-EC2 credentials won't be fetched non_ec2_cred = unit.new_credential_ref( user_id=self.user_id, project_id=self.project_id) non_ec2_cred['type'] = uuid.uuid4().hex self.credential_api.create_credential(non_ec2_cred['id'], non_ec2_cred) r = self.get(uri) cred_list_2 = r.result['credentials'] # still one element because non-EC2 credentials are not returned. self.assertEqual(1, len(cred_list_2)) self.assertEqual(cred_list[0], cred_list_2[0])
[docs] def test_ec2_delete_credential(self): """Test ec2 credential deletion.""" ec2_cred = self._get_ec2_cred() uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']]) cred_from_credential_api = ( self.credential_api .list_credentials_for_user(self.user_id, type=CRED_TYPE_EC2)) self.assertEqual(1, len(cred_from_credential_api)) self.delete(uri) self.assertRaises(exception.CredentialNotFound, self.credential_api.get_credential, cred_from_credential_api[0]['id'])

Project Source