Source code for glance.tests.unit.test_auth

# Copyright 2011 OpenStack Foundation
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

from oslo_serialization import jsonutils
from oslotest import moxstubout
from six.moves import http_client as http
import webob

from glance.api import authorization
from glance.common import auth
from glance.common import exception
from glance.common import timeutils
import glance.domain
from glance.tests.unit import utils as unittest_utils
from glance.tests import utils


TENANT1 = '6838eb7b-6ded-434a-882c-b344c77fe8df'
TENANT2 = '2c014f32-55eb-467d-8fcb-4bd706012f81'

UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
UUID2 = 'a85abd86-55b3-4d5b-b0b4-5d0a6e6042fc'


[docs]class FakeResponse(object): """ Simple class that masks the inconsistency between webob.Response.status_int and httplib.Response.status """ def __init__(self, resp): self.resp = resp def __getitem__(self, key): return self.resp.headers.get(key) @property def status(self): return self.resp.status_int
[docs]class V2Token(object): def __init__(self): self.tok = self.base_token
[docs] def add_service_no_type(self): catalog = self.tok['access']['serviceCatalog'] service_type = {"name": "glance_no_type"} catalog.append(service_type) service = catalog[-1] service['endpoints'] = [self.base_endpoint]
[docs] def add_service(self, s_type, region_list=None): if region_list is None: region_list = [] catalog = self.tok['access']['serviceCatalog'] service_type = {"type": s_type, "name": "glance"} catalog.append(service_type) service = catalog[-1] endpoint_list = [] if region_list == []: endpoint_list.append(self.base_endpoint) else: for region in region_list: endpoint = self.base_endpoint endpoint['region'] = region endpoint_list.append(endpoint) service['endpoints'] = endpoint_list
@property def token(self): return self.tok @property def base_endpoint(self): return { "adminURL": "http://localhost:9292", "internalURL": "http://localhost:9292", "publicURL": "http://localhost:9292" } @property def base_token(self): return { "access": { "token": { "expires": "2010-11-23T16:40:53.321584", "id": "5c7f8799-2e54-43e4-851b-31f81871b6c", "tenant": {"id": "1", "name": "tenant-ok"} }, "serviceCatalog": [ ], "user": { "id": "2", "roles": [{ "tenantId": "1", "id": "1", "name": "Admin" }], "name": "joeadmin" } } }
[docs]class TestKeystoneAuthPlugin(utils.BaseTestCase): """Test that the Keystone auth plugin works properly"""
[docs] def setUp(self): super(TestKeystoneAuthPlugin, self).setUp() mox_fixture = self.useFixture(moxstubout.MoxStubout()) self.stubs = mox_fixture.stubs
[docs] def test_get_plugin_from_strategy_keystone(self): strategy = auth.get_plugin_from_strategy('keystone') self.assertIsInstance(strategy, auth.KeystoneStrategy) self.assertTrue(strategy.configure_via_auth)
[docs] def test_get_plugin_from_strategy_keystone_configure_via_auth_false(self): strategy = auth.get_plugin_from_strategy('keystone', configure_via_auth=False) self.assertIsInstance(strategy, auth.KeystoneStrategy) self.assertFalse(strategy.configure_via_auth)
[docs] def test_required_creds(self): """ Test that plugin created without required credential pieces raises an exception """ bad_creds = [ {}, # missing everything { 'username': 'user1', 'strategy': 'keystone', 'password': 'pass' }, # missing auth_url { 'password': 'pass', 'strategy': 'keystone', 'auth_url': 'http://localhost/v1' }, # missing username { 'username': 'user1', 'strategy': 'keystone', 'auth_url': 'http://localhost/v1' }, # missing password { 'username': 'user1', 'password': 'pass', 'auth_url': 'http://localhost/v1' }, # missing strategy { 'username': 'user1', 'password': 'pass', 'strategy': 'keystone', 'auth_url': 'http://localhost/v2.0/' }, # v2.0: missing tenant { 'username': None, 'password': 'pass', 'auth_url': 'http://localhost/v2.0/' }, # None parameter { 'username': 'user1', 'password': 'pass', 'auth_url': 'http://localhost/v2.0/', 'tenant': None } # None tenant ] for creds in bad_creds: try: plugin = auth.KeystoneStrategy(creds) plugin.authenticate() self.fail("Failed to raise correct exception when supplying " "bad credentials: %r" % creds) except exception.MissingCredentialError: continue # Expected
[docs] def test_invalid_auth_url_v1(self): """ Test that a 400 during authenticate raises exception.AuthBadRequest """ def fake_do_request(*args, **kwargs): resp = webob.Response() resp.status = http.BAD_REQUEST return FakeResponse(resp), "" self.stubs.Set(auth.KeystoneStrategy, '_do_request', fake_do_request) bad_creds = { 'username': 'user1', 'auth_url': 'http://localhost/badauthurl/', 'password': 'pass', 'strategy': 'keystone', 'region': 'RegionOne' } plugin = auth.KeystoneStrategy(bad_creds) self.assertRaises(exception.AuthBadRequest, plugin.authenticate)
[docs] def test_invalid_auth_url_v2(self): """ Test that a 400 during authenticate raises exception.AuthBadRequest """ def fake_do_request(*args, **kwargs): resp = webob.Response() resp.status = http.BAD_REQUEST return FakeResponse(resp), "" self.stubs.Set(auth.KeystoneStrategy, '_do_request', fake_do_request) bad_creds = { 'username': 'user1', 'auth_url': 'http://localhost/badauthurl/v2.0/', 'password': 'pass', 'tenant': 'tenant1', 'strategy': 'keystone', 'region': 'RegionOne' } plugin = auth.KeystoneStrategy(bad_creds) self.assertRaises(exception.AuthBadRequest, plugin.authenticate)
[docs] def test_v1_auth(self): """Test v1 auth code paths""" def fake_do_request(cls, url, method, headers=None, body=None): if url.find("2.0") != -1: self.fail("Invalid v1.0 token path (%s)" % url) headers = headers or {} resp = webob.Response() if (headers.get('X-Auth-User') != 'user1' or headers.get('X-Auth-Key') != 'pass'): resp.status = http.UNAUTHORIZED else: resp.status = http.OK resp.headers.update({"x-image-management-url": "example.com"}) return FakeResponse(resp), "" self.stubs.Set(auth.KeystoneStrategy, '_do_request', fake_do_request) unauthorized_creds = [ { 'username': 'wronguser', 'auth_url': 'http://localhost/badauthurl/', 'strategy': 'keystone', 'region': 'RegionOne', 'password': 'pass' }, # wrong username { 'username': 'user1', 'auth_url': 'http://localhost/badauthurl/', 'strategy': 'keystone', 'region': 'RegionOne', 'password': 'badpass' }, # bad password... ] for creds in unauthorized_creds: try: plugin = auth.KeystoneStrategy(creds) plugin.authenticate() self.fail("Failed to raise NotAuthenticated when supplying " "bad credentials: %r" % creds) except exception.NotAuthenticated: continue # Expected no_strategy_creds = { 'username': 'user1', 'auth_url': 'http://localhost/redirect/', 'password': 'pass', 'region': 'RegionOne' } try: plugin = auth.KeystoneStrategy(no_strategy_creds) plugin.authenticate() self.fail("Failed to raise MissingCredentialError when " "supplying no strategy: %r" % no_strategy_creds) except exception.MissingCredentialError: pass # Expected good_creds = [ { 'username': 'user1', 'auth_url': 'http://localhost/redirect/', 'password': 'pass', 'strategy': 'keystone', 'region': 'RegionOne' } ] for creds in good_creds: plugin = auth.KeystoneStrategy(creds) self.assertIsNone(plugin.authenticate()) self.assertEqual("example.com", plugin.management_url) # Assert it does not update management_url via auth response for creds in good_creds: plugin = auth.KeystoneStrategy(creds, configure_via_auth=False) self.assertIsNone(plugin.authenticate()) self.assertIsNone(plugin.management_url)
[docs] def test_v2_auth(self): """Test v2 auth code paths""" mock_token = None def fake_do_request(cls, url, method, headers=None, body=None): if (not url.rstrip('/').endswith('v2.0/tokens') or url.count("2.0") != 1): self.fail("Invalid v2.0 token path (%s)" % url) creds = jsonutils.loads(body)['auth'] username = creds['passwordCredentials']['username'] password = creds['passwordCredentials']['password'] tenant = creds['tenantName'] resp = webob.Response() if (username != 'user1' or password != 'pass' or tenant != 'tenant-ok'): resp.status = http.UNAUTHORIZED else: resp.status = http.OK body = mock_token.token return FakeResponse(resp), jsonutils.dumps(body) mock_token = V2Token() mock_token.add_service('image', ['RegionOne']) self.stubs.Set(auth.KeystoneStrategy, '_do_request', fake_do_request) unauthorized_creds = [ { 'username': 'wronguser', 'auth_url': 'http://localhost/v2.0', 'password': 'pass', 'tenant': 'tenant-ok', 'strategy': 'keystone', 'region': 'RegionOne' }, # wrong username { 'username': 'user1', 'auth_url': 'http://localhost/v2.0', 'password': 'badpass', 'tenant': 'tenant-ok', 'strategy': 'keystone', 'region': 'RegionOne' }, # bad password... { 'username': 'user1', 'auth_url': 'http://localhost/v2.0', 'password': 'pass', 'tenant': 'carterhayes', 'strategy': 'keystone', 'region': 'RegionOne' }, # bad tenant... ] for creds in unauthorized_creds: try: plugin = auth.KeystoneStrategy(creds) plugin.authenticate() self.fail("Failed to raise NotAuthenticated when supplying " "bad credentials: %r" % creds) except exception.NotAuthenticated: continue # Expected no_region_creds = { 'username': 'user1', 'tenant': 'tenant-ok', 'auth_url': 'http://localhost/redirect/v2.0/', 'password': 'pass', 'strategy': 'keystone' } plugin = auth.KeystoneStrategy(no_region_creds) self.assertIsNone(plugin.authenticate()) self.assertEqual('http://localhost:9292', plugin.management_url) # Add another image service, with a different region mock_token.add_service('image', ['RegionTwo']) try: plugin = auth.KeystoneStrategy(no_region_creds) plugin.authenticate() self.fail("Failed to raise RegionAmbiguity when no region present " "and multiple regions exist: %r" % no_region_creds) except exception.RegionAmbiguity: pass # Expected wrong_region_creds = { 'username': 'user1', 'tenant': 'tenant-ok', 'auth_url': 'http://localhost/redirect/v2.0/', 'password': 'pass', 'strategy': 'keystone', 'region': 'NonExistentRegion' } try: plugin = auth.KeystoneStrategy(wrong_region_creds) plugin.authenticate() self.fail("Failed to raise NoServiceEndpoint when supplying " "wrong region: %r" % wrong_region_creds) except exception.NoServiceEndpoint: pass # Expected no_strategy_creds = { 'username': 'user1', 'tenant': 'tenant-ok', 'auth_url': 'http://localhost/redirect/v2.0/', 'password': 'pass', 'region': 'RegionOne' } try: plugin = auth.KeystoneStrategy(no_strategy_creds) plugin.authenticate() self.fail("Failed to raise MissingCredentialError when " "supplying no strategy: %r" % no_strategy_creds) except exception.MissingCredentialError: pass # Expected bad_strategy_creds = { 'username': 'user1', 'tenant': 'tenant-ok', 'auth_url': 'http://localhost/redirect/v2.0/', 'password': 'pass', 'region': 'RegionOne', 'strategy': 'keypebble' } try: plugin = auth.KeystoneStrategy(bad_strategy_creds) plugin.authenticate() self.fail("Failed to raise BadAuthStrategy when supplying " "bad auth strategy: %r" % bad_strategy_creds) except exception.BadAuthStrategy: pass # Expected mock_token = V2Token() mock_token.add_service('image', ['RegionOne', 'RegionTwo']) good_creds = [ { 'username': 'user1', 'auth_url': 'http://localhost/v2.0/', 'password': 'pass', 'tenant': 'tenant-ok', 'strategy': 'keystone', 'region': 'RegionOne' }, # auth_url with trailing '/' { 'username': 'user1', 'auth_url': 'http://localhost/v2.0', 'password': 'pass', 'tenant': 'tenant-ok', 'strategy': 'keystone', 'region': 'RegionOne' }, # auth_url without trailing '/' { 'username': 'user1', 'auth_url': 'http://localhost/v2.0', 'password': 'pass', 'tenant': 'tenant-ok', 'strategy': 'keystone', 'region': 'RegionTwo' } # Second region ] for creds in good_creds: plugin = auth.KeystoneStrategy(creds) self.assertIsNone(plugin.authenticate()) self.assertEqual('http://localhost:9292', plugin.management_url) ambiguous_region_creds = { 'username': 'user1', 'auth_url': 'http://localhost/v2.0/', 'password': 'pass', 'tenant': 'tenant-ok', 'strategy': 'keystone', 'region': 'RegionOne' } mock_token = V2Token() # Add two identical services mock_token.add_service('image', ['RegionOne']) mock_token.add_service('image', ['RegionOne']) try: plugin = auth.KeystoneStrategy(ambiguous_region_creds) plugin.authenticate() self.fail("Failed to raise RegionAmbiguity when " "non-unique regions exist: %r" % ambiguous_region_creds) except exception.RegionAmbiguity: pass mock_token = V2Token() mock_token.add_service('bad-image', ['RegionOne']) good_creds = { 'username': 'user1', 'auth_url': 'http://localhost/v2.0/', 'password': 'pass', 'tenant': 'tenant-ok', 'strategy': 'keystone', 'region': 'RegionOne' } try: plugin = auth.KeystoneStrategy(good_creds) plugin.authenticate() self.fail("Failed to raise NoServiceEndpoint when bad service " "type encountered") except exception.NoServiceEndpoint: pass mock_token = V2Token() mock_token.add_service_no_type() try: plugin = auth.KeystoneStrategy(good_creds) plugin.authenticate() self.fail("Failed to raise NoServiceEndpoint when bad service " "type encountered") except exception.NoServiceEndpoint: pass try: plugin = auth.KeystoneStrategy(good_creds, configure_via_auth=False) plugin.authenticate() except exception.NoServiceEndpoint: self.fail("NoServiceEndpoint was raised when authenticate " "should not check for endpoint.")
[docs]class TestEndpoints(utils.BaseTestCase):
[docs] def setUp(self): super(TestEndpoints, self).setUp() self.service_catalog = [ { 'endpoint_links': [], 'endpoints': [ { 'adminURL': 'http://localhost:8080/', 'region': 'RegionOne', 'internalURL': 'http://internalURL/', 'publicURL': 'http://publicURL/', }, ], 'type': 'object-store', 'name': 'Object Storage Service', } ]
[docs] def test_get_endpoint_with_custom_server_type(self): endpoint = auth.get_endpoint(self.service_catalog, service_type='object-store') self.assertEqual('http://publicURL/', endpoint)
[docs] def test_get_endpoint_with_custom_endpoint_type(self): endpoint = auth.get_endpoint(self.service_catalog, service_type='object-store', endpoint_type='internalURL') self.assertEqual('http://internalURL/', endpoint)
[docs] def test_get_endpoint_raises_with_invalid_service_type(self): self.assertRaises(exception.NoServiceEndpoint, auth.get_endpoint, self.service_catalog, service_type='foo')
[docs] def test_get_endpoint_raises_with_invalid_endpoint_type(self): self.assertRaises(exception.NoServiceEndpoint, auth.get_endpoint, self.service_catalog, service_type='object-store', endpoint_type='foo')
[docs] def test_get_endpoint_raises_with_invalid_endpoint_region(self): self.assertRaises(exception.NoServiceEndpoint, auth.get_endpoint, self.service_catalog, service_type='object-store', endpoint_region='foo', endpoint_type='internalURL')
[docs]class TestImageMutability(utils.BaseTestCase):
[docs] def setUp(self): super(TestImageMutability, self).setUp() self.image_factory = glance.domain.ImageFactory()
def _is_mutable(self, tenant, owner, is_admin=False): context = glance.context.RequestContext(tenant=tenant, is_admin=is_admin) image = self.image_factory.new_image(owner=owner) return authorization.is_image_mutable(context, image)
[docs] def test_admin_everything_mutable(self): self.assertTrue(self._is_mutable(None, None, is_admin=True)) self.assertTrue(self._is_mutable(None, TENANT1, is_admin=True)) self.assertTrue(self._is_mutable(TENANT1, None, is_admin=True)) self.assertTrue(self._is_mutable(TENANT1, TENANT1, is_admin=True)) self.assertTrue(self._is_mutable(TENANT1, TENANT2, is_admin=True))
[docs] def test_no_tenant_nothing_mutable(self): self.assertFalse(self._is_mutable(None, None)) self.assertFalse(self._is_mutable(None, TENANT1))
[docs] def test_regular_user(self): self.assertFalse(self._is_mutable(TENANT1, None)) self.assertFalse(self._is_mutable(TENANT1, TENANT2)) self.assertTrue(self._is_mutable(TENANT1, TENANT1))
[docs]class TestImmutableImage(utils.BaseTestCase):
[docs] def setUp(self): super(TestImmutableImage, self).setUp() image_factory = glance.domain.ImageFactory() self.context = glance.context.RequestContext(tenant=TENANT1) image = image_factory.new_image( image_id=UUID1, name='Marvin', owner=TENANT1, disk_format='raw', container_format='bare', extra_properties={'foo': 'bar'}, tags=['ping', 'pong'], ) self.image = authorization.ImmutableImageProxy(image, self.context)
def _test_change(self, attr, value): self.assertRaises(exception.Forbidden, setattr, self.image, attr, value) self.assertRaises(exception.Forbidden, delattr, self.image, attr)
[docs] def test_change_id(self): self._test_change('image_id', UUID2)
[docs] def test_change_name(self): self._test_change('name', 'Freddie')
[docs] def test_change_owner(self): self._test_change('owner', TENANT2)
[docs] def test_change_min_disk(self): self._test_change('min_disk', 100)
[docs] def test_change_min_ram(self): self._test_change('min_ram', 1024)
[docs] def test_change_disk_format(self): self._test_change('disk_format', 'vhd')
[docs] def test_change_container_format(self): self._test_change('container_format', 'ova')
[docs] def test_change_visibility(self): self._test_change('visibility', 'public')
[docs] def test_change_status(self): self._test_change('status', 'active')
[docs] def test_change_created_at(self): self._test_change('created_at', timeutils.utcnow())
[docs] def test_change_updated_at(self): self._test_change('updated_at', timeutils.utcnow())
[docs] def test_change_locations(self): self._test_change('locations', ['http://a/b/c']) self.assertRaises(exception.Forbidden, self.image.locations.append, 'http://a/b/c') self.assertRaises(exception.Forbidden, self.image.locations.extend, ['http://a/b/c']) self.assertRaises(exception.Forbidden, self.image.locations.insert, 'foo') self.assertRaises(exception.Forbidden, self.image.locations.pop) self.assertRaises(exception.Forbidden, self.image.locations.remove, 'foo') self.assertRaises(exception.Forbidden, self.image.locations.reverse) self.assertRaises(exception.Forbidden, self.image.locations.sort) self.assertRaises(exception.Forbidden, self.image.locations.__delitem__, 0) self.assertRaises(exception.Forbidden, self.image.locations.__delslice__, 0, 2) self.assertRaises(exception.Forbidden, self.image.locations.__setitem__, 0, 'foo') self.assertRaises(exception.Forbidden, self.image.locations.__setslice__, 0, 2, ['foo', 'bar']) self.assertRaises(exception.Forbidden, self.image.locations.__iadd__, 'foo') self.assertRaises(exception.Forbidden, self.image.locations.__imul__, 2)
[docs] def test_change_size(self): self._test_change('size', 32)
[docs] def test_change_tags(self): self.assertRaises(exception.Forbidden, delattr, self.image, 'tags') self.assertRaises(exception.Forbidden, setattr, self.image, 'tags', ['king', 'kong']) self.assertRaises(exception.Forbidden, self.image.tags.pop) self.assertRaises(exception.Forbidden, self.image.tags.clear) self.assertRaises(exception.Forbidden, self.image.tags.add, 'king') self.assertRaises(exception.Forbidden, self.image.tags.remove, 'ping') self.assertRaises(exception.Forbidden, self.image.tags.update, set(['king', 'kong'])) self.assertRaises(exception.Forbidden, self.image.tags.intersection_update, set([])) self.assertRaises(exception.Forbidden, self.image.tags.difference_update, set([])) self.assertRaises(exception.Forbidden, self.image.tags.symmetric_difference_update, set([]))
[docs] def test_change_properties(self): self.assertRaises(exception.Forbidden, delattr, self.image, 'extra_properties') self.assertRaises(exception.Forbidden, setattr, self.image, 'extra_properties', {}) self.assertRaises(exception.Forbidden, self.image.extra_properties.__delitem__, 'foo') self.assertRaises(exception.Forbidden, self.image.extra_properties.__setitem__, 'foo', 'b') self.assertRaises(exception.Forbidden, self.image.extra_properties.__setitem__, 'z', 'j') self.assertRaises(exception.Forbidden, self.image.extra_properties.pop) self.assertRaises(exception.Forbidden, self.image.extra_properties.popitem) self.assertRaises(exception.Forbidden, self.image.extra_properties.setdefault, 'p', 'j') self.assertRaises(exception.Forbidden, self.image.extra_properties.update, {})
[docs] def test_delete(self): self.assertRaises(exception.Forbidden, self.image.delete)
[docs] def test_set_data(self): self.assertRaises(exception.Forbidden, self.image.set_data, 'blah', 4)
[docs] def test_deactivate_image(self): self.assertRaises(exception.Forbidden, self.image.deactivate)
[docs] def test_reactivate_image(self): self.assertRaises(exception.Forbidden, self.image.reactivate)
[docs] def test_get_data(self): class FakeImage(object): def get_data(self): return 'tiddlywinks' image = glance.api.authorization.ImmutableImageProxy( FakeImage(), self.context) self.assertEqual('tiddlywinks', image.get_data())
[docs]class TestImageFactoryProxy(utils.BaseTestCase):
[docs] def setUp(self): super(TestImageFactoryProxy, self).setUp() factory = glance.domain.ImageFactory() self.context = glance.context.RequestContext(tenant=TENANT1) self.image_factory = authorization.ImageFactoryProxy(factory, self.context)
[docs] def test_default_owner_is_set(self): image = self.image_factory.new_image() self.assertEqual(TENANT1, image.owner)
[docs] def test_wrong_owner_cannot_be_set(self): self.assertRaises(exception.Forbidden, self.image_factory.new_image, owner=TENANT2)
[docs] def test_cannot_set_owner_to_none(self): self.assertRaises(exception.Forbidden, self.image_factory.new_image, owner=None)
[docs] def test_admin_can_set_any_owner(self): self.context.is_admin = True image = self.image_factory.new_image(owner=TENANT2) self.assertEqual(TENANT2, image.owner)
[docs] def test_admin_can_set_owner_to_none(self): self.context.is_admin = True image = self.image_factory.new_image(owner=None) self.assertIsNone(image.owner)
[docs] def test_admin_still_gets_default_tenant(self): self.context.is_admin = True image = self.image_factory.new_image() self.assertEqual(TENANT1, image.owner)
[docs]class TestImageRepoProxy(utils.BaseTestCase):
[docs] class ImageRepoStub(object): def __init__(self, fixtures): self.fixtures = fixtures
[docs] def get(self, image_id): for f in self.fixtures: if f.image_id == image_id: return f else: raise ValueError(image_id)
[docs] def list(self, *args, **kwargs): return self.fixtures
[docs] def setUp(self): super(TestImageRepoProxy, self).setUp() image_factory = glance.domain.ImageFactory() self.fixtures = [ image_factory.new_image(owner=TENANT1), image_factory.new_image(owner=TENANT2, visibility='public'), image_factory.new_image(owner=TENANT2), ] self.context = glance.context.RequestContext(tenant=TENANT1) image_repo = self.ImageRepoStub(self.fixtures) self.image_repo = authorization.ImageRepoProxy(image_repo, self.context)
[docs] def test_get_mutable_image(self): image = self.image_repo.get(self.fixtures[0].image_id) self.assertEqual(image.image_id, self.fixtures[0].image_id)
[docs] def test_get_immutable_image(self): image = self.image_repo.get(self.fixtures[1].image_id) self.assertRaises(exception.Forbidden, setattr, image, 'name', 'Vince')
[docs] def test_list(self): images = self.image_repo.list() self.assertEqual(images[0].image_id, self.fixtures[0].image_id) self.assertRaises(exception.Forbidden, setattr, images[1], 'name', 'Wally') self.assertRaises(exception.Forbidden, setattr, images[2], 'name', 'Calvin')
[docs]class TestImmutableTask(utils.BaseTestCase):
[docs] def setUp(self): super(TestImmutableTask, self).setUp() task_factory = glance.domain.TaskFactory() self.context = glance.context.RequestContext(tenant=TENANT2) task_type = 'import' owner = TENANT2 task = task_factory.new_task(task_type, owner) self.task = authorization.ImmutableTaskProxy(task)
def _test_change(self, attr, value): self.assertRaises( exception.Forbidden, setattr, self.task, attr, value ) self.assertRaises( exception.Forbidden, delattr, self.task, attr )
[docs] def test_change_id(self): self._test_change('task_id', UUID2)
[docs] def test_change_type(self): self._test_change('type', 'fake')
[docs] def test_change_status(self): self._test_change('status', 'success')
[docs] def test_change_owner(self): self._test_change('owner', 'fake')
[docs] def test_change_expires_at(self): self._test_change('expires_at', 'fake')
[docs] def test_change_created_at(self): self._test_change('created_at', 'fake')
[docs] def test_change_updated_at(self): self._test_change('updated_at', 'fake')
[docs] def test_begin_processing(self): self.assertRaises( exception.Forbidden, self.task.begin_processing )
[docs] def test_succeed(self): self.assertRaises( exception.Forbidden, self.task.succeed, 'result' )
[docs] def test_fail(self): self.assertRaises( exception.Forbidden, self.task.fail, 'message' )
[docs]class TestImmutableTaskStub(utils.BaseTestCase):
[docs] def setUp(self): super(TestImmutableTaskStub, self).setUp() task_factory = glance.domain.TaskFactory() self.context = glance.context.RequestContext(tenant=TENANT2) task_type = 'import' owner = TENANT2 task = task_factory.new_task(task_type, owner) self.task = authorization.ImmutableTaskStubProxy(task)
def _test_change(self, attr, value): self.assertRaises( exception.Forbidden, setattr, self.task, attr, value ) self.assertRaises( exception.Forbidden, delattr, self.task, attr )
[docs] def test_change_id(self): self._test_change('task_id', UUID2)
[docs] def test_change_type(self): self._test_change('type', 'fake')
[docs] def test_change_status(self): self._test_change('status', 'success')
[docs] def test_change_owner(self): self._test_change('owner', 'fake')
[docs] def test_change_expires_at(self): self._test_change('expires_at', 'fake')
[docs] def test_change_created_at(self): self._test_change('created_at', 'fake')
[docs] def test_change_updated_at(self): self._test_change('updated_at', 'fake')
[docs]class TestTaskFactoryProxy(utils.BaseTestCase):
[docs] def setUp(self): super(TestTaskFactoryProxy, self).setUp() factory = glance.domain.TaskFactory() self.context = glance.context.RequestContext(tenant=TENANT1) self.context_owner_is_none = glance.context.RequestContext() self.task_factory = authorization.TaskFactoryProxy( factory, self.context ) self.task_type = 'import' self.task_input = '{"loc": "fake"}' self.owner = 'foo' self.request1 = unittest_utils.get_fake_request(tenant=TENANT1) self.request2 = unittest_utils.get_fake_request(tenant=TENANT2)
[docs] def test_task_create_default_owner(self): owner = self.request1.context.owner task = self.task_factory.new_task(task_type=self.task_type, owner=owner) self.assertEqual(TENANT1, task.owner)
[docs] def test_task_create_wrong_owner(self): self.assertRaises(exception.Forbidden, self.task_factory.new_task, task_type=self.task_type, task_input=self.task_input, owner=self.owner)
[docs] def test_task_create_owner_as_None(self): self.assertRaises(exception.Forbidden, self.task_factory.new_task, task_type=self.task_type, task_input=self.task_input, owner=None)
[docs] def test_task_create_admin_context_owner_as_None(self): self.context.is_admin = True self.assertRaises(exception.Forbidden, self.task_factory.new_task, task_type=self.task_type, task_input=self.task_input, owner=None)
[docs]class TestTaskRepoProxy(utils.BaseTestCase):
[docs] class TaskRepoStub(object): def __init__(self, fixtures): self.fixtures = fixtures
[docs] def get(self, task_id): for f in self.fixtures: if f.task_id == task_id: return f else: raise ValueError(task_id)
[docs] class TaskStubRepoStub(object): def __init__(self, fixtures): self.fixtures = fixtures
[docs] def list(self, *args, **kwargs): return self.fixtures
[docs] def setUp(self): super(TestTaskRepoProxy, self).setUp() task_factory = glance.domain.TaskFactory() task_type = 'import' owner = None self.fixtures = [ task_factory.new_task(task_type, owner), task_factory.new_task(task_type, owner), task_factory.new_task(task_type, owner), ] self.context = glance.context.RequestContext(tenant=TENANT1) task_repo = self.TaskRepoStub(self.fixtures) task_stub_repo = self.TaskStubRepoStub(self.fixtures) self.task_repo = authorization.TaskRepoProxy( task_repo, self.context ) self.task_stub_repo = authorization.TaskStubRepoProxy( task_stub_repo, self.context )
[docs] def test_get_mutable_task(self): task = self.task_repo.get(self.fixtures[0].task_id) self.assertEqual(task.task_id, self.fixtures[0].task_id)
[docs] def test_get_immutable_task(self): task_id = self.fixtures[1].task_id task = self.task_repo.get(task_id) self.assertRaises(exception.Forbidden, setattr, task, 'input', 'foo')
[docs] def test_list(self): tasks = self.task_stub_repo.list() self.assertEqual(tasks[0].task_id, self.fixtures[0].task_id) self.assertRaises(exception.Forbidden, setattr, tasks[1], 'owner', 'foo') self.assertRaises(exception.Forbidden, setattr, tasks[2], 'owner', 'foo')