commit fb08d477eb7bf5e678b9cd99b44a435842a7dfbf Author: Clay Gerrard Date: Thu Oct 1 14:28:04 2020 -0500 New proxy logging field for wire status Capture the on the wire status code for logging because we change the logged status code sometimes. Closes-Bug: #1896518 Change-Id: I27feabe923a6520e983637a9c68a19ec7174a0df diff --git a/doc/source/logs.rst b/doc/source/logs.rst index 8f53dfd..e70de1f 100644 --- a/doc/source/logs.rst +++ b/doc/source/logs.rst @@ -93,6 +93,9 @@ container The container part extracted from the path of the request. object The object part extracted from the path of the request. (anonymizable) pid PID of the process emitting the log line. +wire_status_int The status sent to the client, which may be different than + the logged response code if there was an error during the + body of the request or a disconnect. =================== ========================================================== In one log line, all of the above fields are space-separated and url-encoded. diff --git a/swift/common/middleware/proxy_logging.py b/swift/common/middleware/proxy_logging.py index fd7b0d7..f774afb 100644 --- a/swift/common/middleware/proxy_logging.py +++ b/swift/common/middleware/proxy_logging.py @@ -74,9 +74,10 @@ bandwidth usage will want to only sum up logs with no swift.source. import os import time +from swift.common.middleware.catch_errors import enforce_byte_count from swift.common.swob import Request from swift.common.utils import (get_logger, get_remote_client, - config_true_value, + config_true_value, reiterate, InputProxy, list_from_csv, get_policy_index, split_path, StrAnonymizer, StrFormatTime, LogStringFormatter) @@ -176,7 +177,8 @@ class ProxyLoggingMiddleware(object): 'log_info': '', 'policy_index': '', 'ttfb': '0.05', - 'pid': '42' + 'pid': '42', + 'wire_status_int': '200', } try: self.log_formatter.format(self.log_msg_template, **replacements) @@ -198,7 +200,8 @@ class ProxyLoggingMiddleware(object): return value def log_request(self, req, status_int, bytes_received, bytes_sent, - start_time, end_time, resp_headers=None, ttfb=0): + start_time, end_time, resp_headers=None, ttfb=0, + wire_status_int=None): """ Log a request. @@ -209,6 +212,7 @@ class ProxyLoggingMiddleware(object): :param start_time: timestamp request started :param end_time: timestamp request completed :param resp_headers: dict of the response headers + :param wire_status_int: the on the wire status int """ resp_headers = resp_headers or {} logged_headers = None @@ -277,6 +281,7 @@ class ProxyLoggingMiddleware(object): 'policy_index': policy_index, 'ttfb': ttfb, 'pid': self.pid, + 'wire_status_int': wire_status_int or status_int, } self.access_logger.info( self.log_formatter.format(self.log_msg_template, @@ -352,47 +357,46 @@ class ProxyLoggingMiddleware(object): def my_start_response(status, headers, exc_info=None): start_response_args[0] = (status, list(headers), exc_info) - def status_int_for_logging(client_disconnect=False, start_status=None): + def status_int_for_logging(start_status, client_disconnect=False): # log disconnected clients as '499' status code if client_disconnect or input_proxy.client_disconnect: - ret_status_int = 499 - elif start_status is None: - ret_status_int = int( - start_response_args[0][0].split(' ', 1)[0]) - else: - ret_status_int = start_status - return ret_status_int + return 499 + return start_status def iter_response(iterable): - iterator = iter(iterable) - try: - chunk = next(iterator) - while not chunk: - chunk = next(iterator) - except StopIteration: - chunk = b'' + iterator = reiterate(iterable) + content_length = None for h, v in start_response_args[0][1]: - if h.lower() in ('content-length', 'transfer-encoding'): + if h.lower() == 'content-length': + content_length = int(v) + break + elif h.lower() == 'transfer-encoding': break else: - if not chunk: - start_response_args[0][1].append(('Content-Length', '0')) - elif isinstance(iterable, list): + if isinstance(iterator, list): + content_length = sum(len(i) for i in iterator) start_response_args[0][1].append( - ('Content-Length', str(sum(len(i) for i in iterable)))) + ('Content-Length', str(content_length))) + + req = Request(env) + method = self.method_from_req(req) + if method == 'HEAD': + content_length = 0 + if content_length is not None: + iterator = enforce_byte_count(iterator, content_length) + + wire_status_int = int(start_response_args[0][0].split(' ', 1)[0]) resp_headers = dict(start_response_args[0][1]) start_response(*start_response_args[0]) - req = Request(env) # Log timing information for time-to-first-byte (GET requests only) - method = self.method_from_req(req) ttfb = 0.0 if method == 'GET': - status_int = status_int_for_logging() policy_index = get_policy_index(req.headers, resp_headers) - metric_name = self.statsd_metric_name(req, status_int, method) + metric_name = self.statsd_metric_name( + req, wire_status_int, method) metric_name_policy = self.statsd_metric_name_policy( - req, status_int, method, policy_index) + req, wire_status_int, method, policy_index) ttfb = time.time() - start_time if metric_name: self.access_logger.timing( @@ -403,31 +407,33 @@ class ProxyLoggingMiddleware(object): bytes_sent = 0 client_disconnect = False + start_status = wire_status_int try: - while chunk: + for chunk in iterator: bytes_sent += len(chunk) yield chunk - chunk = next(iterator) except StopIteration: # iterator was depleted return except GeneratorExit: # generator was closed before we finished client_disconnect = True raise + except Exception: + start_status = 500 + raise finally: - status_int = status_int_for_logging(client_disconnect) + status_int = status_int_for_logging( + start_status, client_disconnect) self.log_request( req, status_int, input_proxy.bytes_received, bytes_sent, start_time, time.time(), resp_headers=resp_headers, - ttfb=ttfb) - close_method = getattr(iterable, 'close', None) - if callable(close_method): - close_method() + ttfb=ttfb, wire_status_int=wire_status_int) + iterator.close() try: iterable = self.app(env, my_start_response) except Exception: req = Request(env) - status_int = status_int_for_logging(start_status=500) + status_int = status_int_for_logging(500) self.log_request( req, status_int, input_proxy.bytes_received, 0, start_time, time.time()) diff --git a/swift/common/middleware/versioned_writes/object_versioning.py b/swift/common/middleware/versioned_writes/object_versioning.py index 508972f..26dcb5c 100644 --- a/swift/common/middleware/versioned_writes/object_versioning.py +++ b/swift/common/middleware/versioned_writes/object_versioning.py @@ -329,16 +329,17 @@ class ObjectContext(ObjectVersioningContext): # do the write put_resp = put_req.get_response(self.app) - drain_and_close(put_resp) close_if_possible(put_req.environ['wsgi.input']) if put_resp.status_int == HTTP_NOT_FOUND: + drain_and_close(put_resp) raise HTTPInternalServerError( request=req, content_type='text/plain', body=b'The versions container does not exist. You may ' b'want to re-enable object versioning.') self._check_response_error(req, put_resp) + drain_and_close(put_resp) put_bytes = byte_counter.bytes_read # N.B. this is essentially the same hack that symlink does in # _validate_etag_and_update_sysmeta to deal with SLO @@ -392,12 +393,13 @@ class ObjectContext(ObjectVersioningContext): """ if is_success(resp.status_int): return + body = resp.body drain_and_close(resp) if is_client_error(resp.status_int): # missing container or bad permissions if resp.status_int == 404: raise HTTPPreconditionFailed(request=req) - raise HTTPException(body=resp.body, status=resp.status, + raise HTTPException(body=body, status=resp.status, headers=resp.headers) # could not version the data, bail raise HTTPServiceUnavailable(request=req) diff --git a/swift/common/utils.py b/swift/common/utils.py index 83417ff..6f7c64b 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3981,23 +3981,26 @@ class CloseableChain(object): """ def __init__(self, *iterables): self.iterables = iterables + self.chained_iter = itertools.chain(*self.iterables) def __iter__(self): - return iter(itertools.chain(*(self.iterables))) + return self + + def __next__(self): + return next(self.chained_iter) + + next = __next__ # py2 def close(self): for it in self.iterables: - close_method = getattr(it, 'close', None) - if close_method: - close_method() + close_if_possible(it) def reiterate(iterable): """ - Consume the first item from an iterator, then re-chain it to the rest of - the iterator. This is useful when you want to make sure the prologue to - downstream generators have been executed before continuing. - + Consume the first truthy item from an iterator, then re-chain it to the + rest of the iterator. This is useful when you want to make sure the + prologue to downstream generators have been executed before continuing. :param iterable: an iterable object """ if isinstance(iterable, (list, tuple)): @@ -4005,12 +4008,13 @@ def reiterate(iterable): else: iterator = iter(iterable) try: - chunk = '' + chunk = next(iterator) while not chunk: chunk = next(iterator) return CloseableChain([chunk], iterator) except StopIteration: - return [] + close_if_possible(iterable) + return iter([]) class InputProxy(object): @@ -4311,6 +4315,8 @@ def drain_and_close(response_or_app_iter): don't care about the body of an error. """ app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter) + if app_iter is None: # for example, if we used the Response.body property + return for _chunk in app_iter: pass close_if_possible(app_iter) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index cd35973..67b9a8b 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -1063,7 +1063,7 @@ class ECAppIter(object): # cleanup the frag queue feeding coros that may be currently # executing the internal_parts_iters. if self.stashed_iter: - self.stashed_iter.close() + close_if_possible(self.stashed_iter) sleep() # Give the per-frag threads a chance to clean up for it in self.internal_parts_iters: close_if_possible(it) @@ -1200,10 +1200,15 @@ class ECAppIter(object): def __iter__(self): if self.stashed_iter is not None: - return iter(self.stashed_iter) + return self else: raise ValueError("Failed to call kickoff() before __iter__()") + def __next__(self): + return next(self.stashed_iter) + + next = __next__ # py2 + def _real_iter(self, req, resp_headers): if not self.range_specs: client_asked_for_range = False diff --git a/test/unit/common/middleware/helpers.py b/test/unit/common/middleware/helpers.py index f299aa6..1eb4323 100644 --- a/test/unit/common/middleware/helpers.py +++ b/test/unit/common/middleware/helpers.py @@ -32,15 +32,22 @@ class LeakTrackingIter(object): def __init__(self, inner_iter, mark_closed, mark_read, key): if isinstance(inner_iter, bytes): inner_iter = (inner_iter, ) - self.inner_iter = inner_iter + self.inner_iter = iter(inner_iter) self.mark_closed = mark_closed self.mark_read = mark_read self.key = key def __iter__(self): - for x in self.inner_iter: - yield x - self.mark_read(self.key) + return self + + def __next__(self): + try: + return next(self.inner_iter) + except StopIteration: + self.mark_read(self.key) + raise + + next = __next__ # for py2 def close(self): self.mark_closed(self.key) diff --git a/test/unit/common/middleware/test_proxy_logging.py b/test/unit/common/middleware/test_proxy_logging.py index 494eec9..6e7b23f 100644 --- a/test/unit/common/middleware/test_proxy_logging.py +++ b/test/unit/common/middleware/test_proxy_logging.py @@ -51,8 +51,10 @@ class FakeApp(object): except ValueError: is_container_or_object_req = False - headers = [('Content-Type', 'text/plain'), - ('Content-Length', str(sum(map(len, self.body))))] + headers = [('Content-Type', 'text/plain')] + if not hasattr(self.body, 'close'): + content_length = sum(map(len, self.body)) + headers.append(('Content-Length', str(content_length))) if is_container_or_object_req and self.policy_idx is not None: headers.append(('X-Backend-Storage-Policy-Index', str(self.policy_idx))) @@ -612,13 +614,22 @@ class TestProxyLogging(unittest.TestCase): class CloseableBody(object): def __init__(self): + self.msg = b"CloseableBody" self.closed = False def close(self): self.closed = True def __iter__(self): - return iter(["CloseableBody"]) + return self + + def __next__(self): + if not self.msg: + raise StopIteration + result, self.msg = self.msg, b'' + return result + + next = __next__ # py2 body = CloseableBody() app = proxy_logging.ProxyLoggingMiddleware(FakeApp(body), {}) @@ -682,6 +693,27 @@ class TestProxyLogging(unittest.TestCase): self.assertEqual(log_parts[6], '499') self.assertEqual(log_parts[11], '4') # write length + def test_exploding_body(self): + + def exploding_body(): + yield 'some' + yield 'stuff' + raise Exception('kaboom!') + + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp(exploding_body()), { + 'log_msg_template': '{method} {path} ' + '{status_int} {wire_status_int}', + }) + app.access_logger = FakeLogger() + req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'}) + resp = req.get_response(app) + with self.assertRaises(Exception) as ctx: + resp.body + self.assertEqual('kaboom!', str(ctx.exception)) + log_parts = self._log_parts(app) + self.assertEqual(log_parts, ['GET', '/', '500', '200']) + def test_disconnect_on_readline(self): app = proxy_logging.ProxyLoggingMiddleware(FakeAppReadline(), {}) app.access_logger = FakeLogger() @@ -748,7 +780,7 @@ class TestProxyLogging(unittest.TestCase): app = proxy_logging.ProxyLoggingMiddleware( FakeAppNoContentLengthNoTransferEncoding( # test the "while not chunk: chunk = next(iterator)" - body=['', '', ''], + body=[b'', b'', b''], ), {}) app.access_logger = FakeLogger() req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'}) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index de4aef6..ef9913b 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -1247,6 +1247,23 @@ class TestUtils(unittest.TestCase): else: os.environ.pop('TZ') + def test_drain_and_close(self): + utils.drain_and_close([]) + utils.drain_and_close(iter([])) + drained = [False] + + def gen(): + yield 'x' + yield 'y' + drained[0] = True + + utils.drain_and_close(gen()) + self.assertTrue(drained[0]) + utils.drain_and_close(Response(status=200, body=b'Some body')) + drained = [False] + utils.drain_and_close(Response(status=200, app_iter=gen())) + self.assertTrue(drained[0]) + def test_backwards(self): # Test swift.common.utils.backward