commit 5f95e1becec55f422bc27cfbfcfef43854c76be7 Author: Clay Gerrard Date: Fri Sep 25 09:15:55 2020 -0500 Use bigger GreenPool for concurrent EC We're getting some blockage trying to feed backup requests in waterfall EC because the pool_size was limited to the initial batch of requests. This was (un?)fortunately working out in practice because there were lots of initial primary fragment requests and some would inevitably be quick enough to make room for the pending feeder requests. But when enough of the initial requests were slow (network issue at the proxy?) we wouldn't have the expected number of pending backup requests in-flight. Since concurrent EC should never make extra requests to non-primaries (at least not until an existing primary request completes) ec_n_unique_fragments makes a reasonable cap for the pool. Drive-bys: * Don't make concurrent_ec_extra_requests unless you have enabled concurrent_gets. * Improved mock_http_connect extra requests tracking formatting * FakeStatus __repr__'s w/ status code in AssertionErrors Change-Id: Iec579ed874ef097c659dc80fff1ba326b6da05e9 diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index cd924b0..6f506ae 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2907,9 +2907,10 @@ class ECObjectController(BaseObjectController): safe_iter = GreenthreadSafeIterator(node_iter) policy_options = self.app.get_policy_options(policy) - ec_request_count = policy.ec_ndata + \ - policy_options.concurrent_ec_extra_requests - with ContextPool(ec_request_count) as pool: + ec_request_count = policy.ec_ndata + if policy_options.concurrent_gets: + ec_request_count += policy_options.concurrent_ec_extra_requests + with ContextPool(policy.ec_n_unique_fragments) as pool: pile = GreenAsyncPile(pool) buckets = ECGetResponseCollection(policy) node_iter.set_node_provider(buckets.provide_alternate_node) @@ -2921,7 +2922,7 @@ class ECObjectController(BaseObjectController): self.app.logger.thread_locals) feeder_q = None - if self.app.get_policy_options(policy).concurrent_gets: + if policy_options.concurrent_gets: feeder_q = Queue() pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req, partition, policy, buckets, feeder_q, diff --git a/test/unit/__init__.py b/test/unit/__init__.py index a412ba8..a9df236 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -838,6 +838,11 @@ class FakeStatus(object): self.expect_sleep_list.append(None) self.response_sleep = response_sleep + def __repr__(self): + return '%s(%s, expect_status=%r, response_sleep=%s)' % ( + self.__class__.__name__, self.status, + self.expect_status, self.response_sleep) + def get_response_status(self): if self.response_sleep is not None: eventlet.sleep(self.response_sleep) @@ -1078,7 +1083,7 @@ def fake_http_connect(*code_iter, **kwargs): # the code under test may swallow the StopIteration, so by logging # unexpected requests here we allow the test framework to check for # them after the connect function has been used. - unexpected_requests.append((args, kwargs)) + unexpected_requests.append((args, ckwargs)) raise if 'give_connect' in kwargs: @@ -1142,8 +1147,8 @@ def mocked_http_conn(*args, **kwargs): if left_over_status: raise AssertionError('left over status %r' % left_over_status) if fake_conn.unexpected_requests: - raise AssertionError('unexpected requests %r' % - fake_conn.unexpected_requests) + raise AssertionError('unexpected requests:\n%s' % '\n '.join( + '%r' % (req,) for req in fake_conn.unexpected_requests)) def make_timestamp_iter(offset=0): diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 3f73dd8..ca1469e 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -2602,6 +2602,50 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(len(log.requests), self.policy.ec_n_unique_fragments) + def test_ec_concurrent_GET_with_slow_leaders(self): + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-289] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + ts = self.ts() + headers = [] + for i, body in enumerate(ec_archive_bodies): + headers.append({ + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(body), + 'X-Object-Sysmeta-Ec-Frag-Index': + self.policy.get_backend_index(i), + 'X-Backend-Timestamp': ts.internal, + 'X-Timestamp': ts.normal, + 'X-Backend-Durable-Timestamp': ts.internal, + 'X-Backend-Data-Timestamp': ts.internal, + }) + + req = swift.common.swob.Request.blank('/v1/a/c/o') + + policy_opts = self.app.get_policy_options(self.policy) + policy_opts.concurrent_gets = True + policy_opts.concurrency_timeout = 0.0 + + slow_count = 4 + status_codes = ([ + FakeStatus(200, response_sleep=0.2), + ] * slow_count) + ([ + FakeStatus(200, response_sleep=0.1), + ] * (self.policy.ec_n_unique_fragments - slow_count)) + for i in range(slow_count): + # poison the super slow requests + ec_archive_bodies[i] = '' + with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, test_data, '%r != %r' % ( + resp.body if len(resp.body) < 60 else '%s...' % resp.body[:60], + test_data if len(test_data) < 60 else '%s...' % test_data[:60], + )) + self.assertEqual(len(log.requests), self.policy.ec_n_unique_fragments) + def test_GET_with_slow_nodes_and_failures(self): segment_size = self.policy.ec_segment_size test_data = (b'test' * segment_size)[:-289] @@ -2722,6 +2766,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): policy_opts = self.app.get_policy_options(self.policy) policy_opts.concurrent_ec_extra_requests = self.policy.ec_nparity - 1 req = swift.common.swob.Request.blank('/v1/a/c/o') + # w/o concurrent_gets ec_extra_requests has no effect + status_codes = [200] * self.policy.ec_ndata + with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(len(log.requests), self.policy.ec_ndata) + self.assertEqual(resp.body, test_data) + + policy_opts.concurrent_gets = True status_codes = [200] * (self.policy.object_ring.replicas - 1) with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, headers=headers) as log: