Source code for ironic.tests.unit.conductor.test_task_manager

# coding=utf-8

# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""Tests for :class:`ironic.conductor.task_manager`."""

import futurist
import mock
from oslo_context import context as oslo_context
from oslo_utils import uuidutils

from ironic.common import context
from ironic.common import driver_factory
from ironic.common import exception
from ironic.common import fsm
from ironic.common import states
from ironic.conductor import task_manager
from ironic import objects
from ironic.tests import base as tests_base
from ironic.tests.unit.db import base as tests_db_base
from ironic.tests.unit.objects import utils as obj_utils


@mock.patch.object(objects.Node, 'get')
@mock.patch.object(objects.Node, 'release')
@mock.patch.object(objects.Node, 'reserve')
@mock.patch.object(driver_factory, 'build_driver_for_task')
@mock.patch.object(objects.Port, 'list_by_node_id')
@mock.patch.object(objects.Portgroup, 'list_by_node_id')
[docs]class TaskManagerTestCase(tests_db_base.DbTestCase):
[docs] def setUp(self): super(TaskManagerTestCase, self).setUp() self.host = 'test-host' self.config(host=self.host) self.config(node_locked_retry_attempts=1, group='conductor') self.config(node_locked_retry_interval=0, group='conductor') self.node = obj_utils.create_test_node(self.context) self.future_mock = mock.Mock(spec=['cancel', 'add_done_callback'])
[docs] def test_excl_lock(self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): reserve_mock.return_value = self.node with task_manager.TaskManager(self.context, 'fake-node-id') as task: self.assertEqual(self.context, task.context) self.assertEqual(self.node, task.node) self.assertEqual(get_ports_mock.return_value, task.ports) self.assertEqual(get_portgroups_mock.return_value, task.portgroups) self.assertEqual(build_driver_mock.return_value, task.driver) self.assertFalse(task.shared) build_driver_mock.assert_called_once_with(task, driver_name=None) reserve_mock.assert_called_once_with(self.context, self.host, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) get_portgroups_mock.assert_called_once_with(self.context, self.node.id) release_mock.assert_called_once_with(self.context, self.host, self.node.id) self.assertFalse(node_get_mock.called)
[docs] def test_excl_lock_with_driver( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): reserve_mock.return_value = self.node with task_manager.TaskManager(self.context, 'fake-node-id', driver_name='fake-driver') as task: self.assertEqual(self.context, task.context) self.assertEqual(self.node, task.node) self.assertEqual(get_ports_mock.return_value, task.ports) self.assertEqual(get_portgroups_mock.return_value, task.portgroups) self.assertEqual(build_driver_mock.return_value, task.driver) self.assertFalse(task.shared) build_driver_mock.assert_called_once_with( task, driver_name='fake-driver') reserve_mock.assert_called_once_with(self.context, self.host, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) get_portgroups_mock.assert_called_once_with(self.context, self.node.id) release_mock.assert_called_once_with(self.context, self.host, self.node.id) self.assertFalse(node_get_mock.called)
[docs] def test_excl_nested_acquire( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): node2 = obj_utils.create_test_node(self.context, uuid=uuidutils.generate_uuid(), driver='fake') reserve_mock.return_value = self.node get_ports_mock.return_value = mock.sentinel.ports1 get_portgroups_mock.return_value = mock.sentinel.portgroups1 build_driver_mock.return_value = mock.sentinel.driver1 with task_manager.TaskManager(self.context, 'node-id1') as task: reserve_mock.return_value = node2 get_ports_mock.return_value = mock.sentinel.ports2 get_portgroups_mock.return_value = mock.sentinel.portgroups2 build_driver_mock.return_value = mock.sentinel.driver2 with task_manager.TaskManager(self.context, 'node-id2') as task2: self.assertEqual(self.context, task.context) self.assertEqual(self.node, task.node) self.assertEqual(mock.sentinel.ports1, task.ports) self.assertEqual(mock.sentinel.portgroups1, task.portgroups) self.assertEqual(mock.sentinel.driver1, task.driver) self.assertFalse(task.shared) self.assertEqual(self.context, task2.context) self.assertEqual(node2, task2.node) self.assertEqual(mock.sentinel.ports2, task2.ports) self.assertEqual(mock.sentinel.portgroups2, task2.portgroups) self.assertEqual(mock.sentinel.driver2, task2.driver) self.assertFalse(task2.shared) self.assertEqual([mock.call(task, driver_name=None), mock.call(task2, driver_name=None)], build_driver_mock.call_args_list) self.assertEqual([mock.call(self.context, self.host, 'node-id1'), mock.call(self.context, self.host, 'node-id2')], reserve_mock.call_args_list) self.assertEqual([mock.call(self.context, self.node.id), mock.call(self.context, node2.id)], get_ports_mock.call_args_list) # release should be in reverse order self.assertEqual([mock.call(self.context, self.host, node2.id), mock.call(self.context, self.host, self.node.id)], release_mock.call_args_list) self.assertFalse(node_get_mock.called)
[docs] def test_excl_lock_exception_then_lock( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): retry_attempts = 3 self.config(node_locked_retry_attempts=retry_attempts, group='conductor') # Fail on the first lock attempt, succeed on the second. reserve_mock.side_effect = [exception.NodeLocked(node='foo', host='foo'), self.node] with task_manager.TaskManager(self.context, 'fake-node-id') as task: self.assertFalse(task.shared) expected_calls = [mock.call(self.context, self.host, 'fake-node-id')] * 2 reserve_mock.assert_has_calls(expected_calls) self.assertEqual(2, reserve_mock.call_count)
[docs] def test_excl_lock_reserve_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): retry_attempts = 3 self.config(node_locked_retry_attempts=retry_attempts, group='conductor') reserve_mock.side_effect = exception.NodeLocked(node='foo', host='foo') self.assertRaises(exception.NodeLocked, task_manager.TaskManager, self.context, 'fake-node-id') reserve_mock.assert_called_with(self.context, self.host, 'fake-node-id') self.assertEqual(retry_attempts, reserve_mock.call_count) self.assertFalse(get_ports_mock.called) self.assertFalse(get_portgroups_mock.called) self.assertFalse(build_driver_mock.called) self.assertFalse(release_mock.called) self.assertFalse(node_get_mock.called)
[docs] def test_excl_lock_get_ports_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): reserve_mock.return_value = self.node get_ports_mock.side_effect = exception.IronicException('foo') self.assertRaises(exception.IronicException, task_manager.TaskManager, self.context, 'fake-node-id') reserve_mock.assert_called_once_with(self.context, self.host, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) self.assertFalse(build_driver_mock.called) release_mock.assert_called_once_with(self.context, self.host, self.node.id) self.assertFalse(node_get_mock.called)
[docs] def test_excl_lock_get_portgroups_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): reserve_mock.return_value = self.node get_portgroups_mock.side_effect = exception.IronicException('foo') self.assertRaises(exception.IronicException, task_manager.TaskManager, self.context, 'fake-node-id') reserve_mock.assert_called_once_with(self.context, self.host, 'fake-node-id') get_portgroups_mock.assert_called_once_with(self.context, self.node.id) self.assertFalse(build_driver_mock.called) release_mock.assert_called_once_with(self.context, self.host, self.node.id) self.assertFalse(node_get_mock.called)
[docs] def test_excl_lock_build_driver_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): reserve_mock.return_value = self.node build_driver_mock.side_effect = ( exception.DriverNotFound(driver_name='foo')) self.assertRaises(exception.DriverNotFound, task_manager.TaskManager, self.context, 'fake-node-id') reserve_mock.assert_called_once_with(self.context, self.host, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) get_portgroups_mock.assert_called_once_with(self.context, self.node.id) build_driver_mock.assert_called_once_with(mock.ANY, driver_name=None) release_mock.assert_called_once_with(self.context, self.host, self.node.id) self.assertFalse(node_get_mock.called)
[docs] def test_shared_lock( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): node_get_mock.return_value = self.node with task_manager.TaskManager(self.context, 'fake-node-id', shared=True) as task: self.assertEqual(self.context, task.context) self.assertEqual(self.node, task.node) self.assertEqual(get_ports_mock.return_value, task.ports) self.assertEqual(get_portgroups_mock.return_value, task.portgroups) self.assertEqual(build_driver_mock.return_value, task.driver) self.assertTrue(task.shared) build_driver_mock.assert_called_once_with(task, driver_name=None) self.assertFalse(reserve_mock.called) self.assertFalse(release_mock.called) node_get_mock.assert_called_once_with(self.context, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) get_portgroups_mock.assert_called_once_with(self.context, self.node.id)
[docs] def test_shared_lock_with_driver( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): node_get_mock.return_value = self.node with task_manager.TaskManager(self.context, 'fake-node-id', shared=True, driver_name='fake-driver') as task: self.assertEqual(self.context, task.context) self.assertEqual(self.node, task.node) self.assertEqual(get_ports_mock.return_value, task.ports) self.assertEqual(get_portgroups_mock.return_value, task.portgroups) self.assertEqual(build_driver_mock.return_value, task.driver) self.assertTrue(task.shared) build_driver_mock.assert_called_once_with( task, driver_name='fake-driver') self.assertFalse(reserve_mock.called) self.assertFalse(release_mock.called) node_get_mock.assert_called_once_with(self.context, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) get_portgroups_mock.assert_called_once_with(self.context, self.node.id)
[docs] def test_shared_lock_node_get_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): node_get_mock.side_effect = exception.NodeNotFound(node='foo') self.assertRaises(exception.NodeNotFound, task_manager.TaskManager, self.context, 'fake-node-id', shared=True) self.assertFalse(reserve_mock.called) self.assertFalse(release_mock.called) node_get_mock.assert_called_once_with(self.context, 'fake-node-id') self.assertFalse(get_ports_mock.called) self.assertFalse(get_portgroups_mock.called) self.assertFalse(build_driver_mock.called)
[docs] def test_shared_lock_get_ports_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): node_get_mock.return_value = self.node get_ports_mock.side_effect = exception.IronicException('foo') self.assertRaises(exception.IronicException, task_manager.TaskManager, self.context, 'fake-node-id', shared=True) self.assertFalse(reserve_mock.called) self.assertFalse(release_mock.called) node_get_mock.assert_called_once_with(self.context, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) self.assertFalse(build_driver_mock.called)
[docs] def test_shared_lock_get_portgroups_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): node_get_mock.return_value = self.node get_portgroups_mock.side_effect = exception.IronicException('foo') self.assertRaises(exception.IronicException, task_manager.TaskManager, self.context, 'fake-node-id', shared=True) self.assertFalse(reserve_mock.called) self.assertFalse(release_mock.called) node_get_mock.assert_called_once_with(self.context, 'fake-node-id') get_portgroups_mock.assert_called_once_with(self.context, self.node.id) self.assertFalse(build_driver_mock.called)
[docs] def test_shared_lock_build_driver_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): node_get_mock.return_value = self.node build_driver_mock.side_effect = ( exception.DriverNotFound(driver_name='foo')) self.assertRaises(exception.DriverNotFound, task_manager.TaskManager, self.context, 'fake-node-id', shared=True) self.assertFalse(reserve_mock.called) self.assertFalse(release_mock.called) node_get_mock.assert_called_once_with(self.context, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) get_portgroups_mock.assert_called_once_with(self.context, self.node.id) build_driver_mock.assert_called_once_with(mock.ANY, driver_name=None)
[docs] def test_upgrade_lock( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): node_get_mock.return_value = self.node reserve_mock.return_value = self.node with task_manager.TaskManager(self.context, 'fake-node-id', shared=True) as task: self.assertEqual(self.context, task.context) self.assertEqual(self.node, task.node) self.assertEqual(get_ports_mock.return_value, task.ports) self.assertEqual(get_portgroups_mock.return_value, task.portgroups) self.assertEqual(build_driver_mock.return_value, task.driver) self.assertTrue(task.shared) self.assertFalse(reserve_mock.called) task.upgrade_lock() self.assertFalse(task.shared) # second upgrade does nothing task.upgrade_lock() self.assertFalse(task.shared) build_driver_mock.assert_called_once_with(mock.ANY, driver_name=None) # make sure reserve() was called only once reserve_mock.assert_called_once_with(self.context, self.host, 'fake-node-id') release_mock.assert_called_once_with(self.context, self.host, self.node.id) node_get_mock.assert_called_once_with(self.context, 'fake-node-id') get_ports_mock.assert_called_once_with(self.context, self.node.id) get_portgroups_mock.assert_called_once_with(self.context, self.node.id)
[docs] def test_spawn_after( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): spawn_mock = mock.Mock(return_value=self.future_mock) task_release_mock = mock.Mock() reserve_mock.return_value = self.node with task_manager.TaskManager(self.context, 'node-id') as task: task.spawn_after(spawn_mock, 1, 2, foo='bar', cat='meow') task.release_resources = task_release_mock spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') self.future_mock.add_done_callback.assert_called_once_with( task._thread_release_resources) self.assertFalse(self.future_mock.cancel.called) # Since we mocked link(), we're testing that __exit__ didn't # release resources pending the finishing of the background # thread self.assertFalse(task_release_mock.called)
[docs] def test_spawn_after_exception_while_yielded( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): spawn_mock = mock.Mock() task_release_mock = mock.Mock() reserve_mock.return_value = self.node def _test_it(): with task_manager.TaskManager(self.context, 'node-id') as task: task.spawn_after(spawn_mock, 1, 2, foo='bar', cat='meow') task.release_resources = task_release_mock raise exception.IronicException('foo') self.assertRaises(exception.IronicException, _test_it) self.assertFalse(spawn_mock.called) task_release_mock.assert_called_once_with()
[docs] def test_spawn_after_spawn_fails( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): spawn_mock = mock.Mock(side_effect=exception.IronicException('foo')) task_release_mock = mock.Mock() reserve_mock.return_value = self.node def _test_it(): with task_manager.TaskManager(self.context, 'node-id') as task: task.spawn_after(spawn_mock, 1, 2, foo='bar', cat='meow') task.release_resources = task_release_mock self.assertRaises(exception.IronicException, _test_it) spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') task_release_mock.assert_called_once_with()
[docs] def test_spawn_after_on_error_hook( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): expected_exception = exception.IronicException('foo') spawn_mock = mock.Mock(side_effect=expected_exception) task_release_mock = mock.Mock() on_error_handler = mock.Mock() reserve_mock.return_value = self.node def _test_it(): with task_manager.TaskManager(self.context, 'node-id') as task: task.set_spawn_error_hook(on_error_handler, 'fake-argument') task.spawn_after(spawn_mock, 1, 2, foo='bar', cat='meow') task.release_resources = task_release_mock self.assertRaises(exception.IronicException, _test_it) spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') task_release_mock.assert_called_once_with() on_error_handler.assert_called_once_with(expected_exception, 'fake-argument')
[docs] def test_spawn_after_on_error_hook_exception( self, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): expected_exception = exception.IronicException('foo') spawn_mock = mock.Mock(side_effect=expected_exception) task_release_mock = mock.Mock() # Raise an exception within the on_error handler on_error_handler = mock.Mock(side_effect=Exception('unexpected')) on_error_handler.__name__ = 'foo_method' reserve_mock.return_value = self.node def _test_it(): with task_manager.TaskManager(self.context, 'node-id') as task: task.set_spawn_error_hook(on_error_handler, 'fake-argument') task.spawn_after(spawn_mock, 1, 2, foo='bar', cat='meow') task.release_resources = task_release_mock # Make sure the original exception is the one raised self.assertRaises(exception.IronicException, _test_it) spawn_mock.assert_called_once_with(1, 2, foo='bar', cat='meow') task_release_mock.assert_called_once_with() on_error_handler.assert_called_once_with(expected_exception, 'fake-argument')
@mock.patch.object(states.machine, 'copy')
[docs] def test_init_prepares_fsm( self, copy_mock, get_portgroups_mock, get_ports_mock, build_driver_mock, reserve_mock, release_mock, node_get_mock): m = mock.Mock(spec=fsm.FSM) reserve_mock.return_value = self.node copy_mock.return_value = m t = task_manager.TaskManager('fake', 'fake') copy_mock.assert_called_once_with() self.assertIs(m, t.fsm) m.initialize.assert_called_once_with( start_state=self.node.provision_state, target_state=self.node.target_provision_state)
[docs]class TaskManagerStateModelTestCases(tests_base.TestCase):
[docs] def setUp(self): super(TaskManagerStateModelTestCases, self).setUp() self.fsm = mock.Mock(spec=fsm.FSM) self.node = mock.Mock(spec=objects.Node) self.task = mock.Mock(spec=task_manager.TaskManager) self.task.fsm = self.fsm self.task.node = self.node
[docs] def test_release_clears_resources(self): t = self.task t.release_resources = task_manager.TaskManager.release_resources t.driver = mock.Mock() t.ports = mock.Mock() t.portgroups = mock.Mock() t.shared = True t._purpose = 'purpose' t._debug_timer = mock.Mock() t.release_resources(t) self.assertIsNone(t.node) self.assertIsNone(t.driver) self.assertIsNone(t.ports) self.assertIsNone(t.portgroups) self.assertIsNone(t.fsm)
[docs] def test_process_event_fsm_raises(self): self.task.process_event = task_manager.TaskManager.process_event self.fsm.process_event.side_effect = exception.InvalidState('test') self.assertRaises( exception.InvalidState, self.task.process_event, self.task, 'fake') self.assertEqual(0, self.task.spawn_after.call_count) self.assertFalse(self.task.node.save.called)
[docs] def test_process_event_sets_callback(self): cb = mock.Mock() arg = mock.Mock() kwarg = mock.Mock() self.task.process_event = task_manager.TaskManager.process_event self.task.process_event( self.task, 'fake', callback=cb, call_args=[arg], call_kwargs={'mock': kwarg}) self.fsm.process_event.assert_called_once_with('fake', target_state=None) self.task.spawn_after.assert_called_with(cb, arg, mock=kwarg) self.assertEqual(1, self.task.node.save.call_count) self.assertIsNone(self.node.last_error)
[docs] def test_process_event_sets_callback_and_error_handler(self): arg = mock.Mock() cb = mock.Mock() er = mock.Mock() kwarg = mock.Mock() provision_state = 'provision_state' target_provision_state = 'target' self.node.provision_state = provision_state self.node.target_provision_state = target_provision_state self.task.process_event = task_manager.TaskManager.process_event self.task.process_event( self.task, 'fake', callback=cb, call_args=[arg], call_kwargs={'mock': kwarg}, err_handler=er) self.task.set_spawn_error_hook.assert_called_once_with( er, self.node, provision_state, target_provision_state) self.fsm.process_event.assert_called_once_with('fake', target_state=None) self.task.spawn_after.assert_called_with(cb, arg, mock=kwarg) self.assertEqual(1, self.task.node.save.call_count) self.assertIsNone(self.node.last_error) self.assertNotEqual(provision_state, self.node.provision_state) self.assertNotEqual(target_provision_state, self.node.target_provision_state)
[docs] def test_process_event_sets_target_state(self): event = 'fake' tgt_state = 'target' provision_state = 'provision_state' target_provision_state = 'target_provision_state' self.node.provision_state = provision_state self.node.target_provision_state = target_provision_state self.task.process_event = task_manager.TaskManager.process_event self.task.process_event(self.task, event, target_state=tgt_state) self.fsm.process_event.assert_called_once_with(event, target_state=tgt_state) self.assertEqual(1, self.task.node.save.call_count) self.assertNotEqual(provision_state, self.node.provision_state) self.assertNotEqual(target_provision_state, self.node.target_provision_state)
[docs] def test_process_event_callback_stable_state(self): callback = mock.Mock() for state in states.STABLE_STATES: self.node.provision_state = state self.node.target_provision_state = 'target' self.task.process_event = task_manager.TaskManager.process_event self.task.process_event(self.task, 'fake', callback=callback) # assert the target state is set when callback is passed self.assertNotEqual(states.NOSTATE, self.task.node.target_provision_state)
[docs] def test_process_event_no_callback_stable_state(self): for state in states.STABLE_STATES: self.node.provision_state = state self.node.target_provision_state = 'target' self.task.process_event = task_manager.TaskManager.process_event self.task.process_event(self.task, 'fake') # assert the target state was cleared when moving to a # stable state self.assertEqual(states.NOSTATE, self.task.node.target_provision_state)
@task_manager.require_exclusive_lock def _req_excl_lock_method(*args, **kwargs): return (args, kwargs)
[docs]class ExclusiveLockDecoratorTestCase(tests_base.TestCase):
[docs] def setUp(self): super(ExclusiveLockDecoratorTestCase, self).setUp() self.task = mock.Mock(spec=task_manager.TaskManager) self.task.context = self.context self.args_task_first = (self.task, 1, 2) self.args_task_second = (1, self.task, 2) self.kwargs = dict(cat='meow', dog='wuff')
[docs] def test_with_excl_lock_task_first_arg(self): self.task.shared = False (args, kwargs) = _req_excl_lock_method(*self.args_task_first, **self.kwargs) self.assertEqual(self.args_task_first, args) self.assertEqual(self.kwargs, kwargs)
[docs] def test_with_excl_lock_task_second_arg(self): self.task.shared = False (args, kwargs) = _req_excl_lock_method(*self.args_task_second, **self.kwargs) self.assertEqual(self.args_task_second, args) self.assertEqual(self.kwargs, kwargs)
[docs] def test_with_shared_lock_task_first_arg(self): self.task.shared = True self.assertRaises(exception.ExclusiveLockRequired, _req_excl_lock_method, *self.args_task_first, **self.kwargs)
[docs] def test_with_shared_lock_task_second_arg(self): self.task.shared = True self.assertRaises(exception.ExclusiveLockRequired, _req_excl_lock_method, *self.args_task_second, **self.kwargs)
[docs]class ThreadExceptionTestCase(tests_base.TestCase):
[docs] def setUp(self): super(ThreadExceptionTestCase, self).setUp() self.node = mock.Mock(spec=objects.Node) self.node.last_error = None self.task = mock.Mock(spec=task_manager.TaskManager) self.task.node = self.node self.task._write_exception = task_manager.TaskManager._write_exception self.future_mock = mock.Mock(spec_set=['exception']) def async_method_foo(): pass self.task._spawn_args = (async_method_foo,)
[docs] def test_set_node_last_error(self): self.future_mock.exception.return_value = Exception('fiasco') self.task._write_exception(self.task, self.future_mock) self.node.save.assert_called_once_with() self.assertIn('fiasco', self.node.last_error) self.assertIn('async_method_foo', self.node.last_error)
[docs] def test_set_node_last_error_exists(self): self.future_mock.exception.return_value = Exception('fiasco') self.node.last_error = 'oops' self.task._write_exception(self.task, self.future_mock) self.assertFalse(self.node.save.called) self.assertFalse(self.future_mock.exception.called) self.assertEqual('oops', self.node.last_error)
[docs] def test_set_node_last_error_no_error(self): self.future_mock.exception.return_value = None self.task._write_exception(self.task, self.future_mock) self.assertFalse(self.node.save.called) self.future_mock.exception.assert_called_once_with() self.assertIsNone(self.node.last_error)
@mock.patch.object(task_manager.LOG, 'exception', spec_set=True, autospec=True)
[docs] def test_set_node_last_error_cancelled(self, log_mock): self.future_mock.exception.side_effect = futurist.CancelledError() self.task._write_exception(self.task, self.future_mock) self.assertFalse(self.node.save.called) self.future_mock.exception.assert_called_once_with() self.assertIsNone(self.node.last_error) self.assertTrue(log_mock.called)
@mock.patch.object(oslo_context, 'get_current')
[docs]class TaskManagerContextTestCase(tests_base.TestCase):
[docs] def setUp(self): super(TaskManagerContextTestCase, self).setUp() self.context = mock.Mock(spec=context.RequestContext)
[docs] def test_thread_without_context(self, context_get_mock): context_get_mock.return_value = False task_manager.ensure_thread_contain_context(self.context) self.assertTrue(self.context.update_store.called)
[docs] def test_thread_with_context(self, context_get_mock): context_get_mock.return_value = True task_manager.ensure_thread_contain_context(self.context) self.assertFalse(self.context.update_store.called)

Project Source