Source code for hypothesis.core

# coding=utf-8
#
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis-python
#
# Most of this work is copyright (C) 2013-2017 David R. MacIver
# (david@drmaciver.com), but it contains contributions by others. See
# CONTRIBUTING.rst for a full list of people who may hold copyright, and
# consult the git log if you need to determine who owns an individual
# contribution.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at http://mozilla.org/MPL/2.0/.
#
# END HEADER

"""This module provides the core primitives of Hypothesis, such as given."""


from __future__ import division, print_function, absolute_import

import time
import functools
import traceback
from random import Random
from collections import namedtuple

from hypothesis.errors import Flaky, Timeout, NoSuchExample, \
    Unsatisfiable, InvalidArgument, FailedHealthCheck, \
    UnsatisfiedAssumption, HypothesisDeprecationWarning
from hypothesis.control import BuildContext
from hypothesis._settings import settings as Settings
from hypothesis._settings import Phase, Verbosity, HealthCheck
from hypothesis.executors import new_style_executor, \
    default_new_style_executor
from hypothesis.reporting import report, verbose_report, current_verbosity
from hypothesis.statistics import note_engine_for_statistics
from hypothesis.internal.compat import str_to_bytes, getfullargspec
from hypothesis.internal.escalation import \
    escalate_hypothesis_internal_error
from hypothesis.internal.reflection import nicerepr, arg_string, \
    impersonate, function_digest, fully_qualified_name, \
    define_function_signature, convert_positional_arguments, \
    get_pretty_function_description
from hypothesis.internal.conjecture.data import Status, StopTest, \
    ConjectureData
from hypothesis.searchstrategy.strategies import SearchStrategy
from hypothesis.internal.conjecture.engine import ExitReason, \
    ConjectureRunner


def new_random():
    import random
    return random.Random(random.getrandbits(128))


def test_is_flaky(test, expected_repr):
    @functools.wraps(test)
    def test_or_flaky(*args, **kwargs):
        text_repr = arg_string(test, args, kwargs)
        raise Flaky(
            (
                'Hypothesis %s(%s) produces unreliable results: Falsified'
                ' on the first call but did not on a subsequent one'
            ) % (test.__name__, text_repr,))
    return test_or_flaky


Example = namedtuple('Example', ('args', 'kwargs'))


[docs]def example(*args, **kwargs): """A decorator to that ensures a specific example is always tested.""" if args and kwargs: raise InvalidArgument( 'Cannot mix positional and keyword arguments for examples' ) if not (args or kwargs): raise InvalidArgument( 'An example must provide at least one argument' ) def accept(test): if not hasattr(test, 'hypothesis_explicit_examples'): test.hypothesis_explicit_examples = [] test.hypothesis_explicit_examples.append(Example(tuple(args), kwargs)) return test return accept
def reify_and_execute( search_strategy, test, print_example=False, is_final=False, ): def run(data): with BuildContext(data, is_final=is_final): import random as rnd_module rnd_module.seed(0) args, kwargs = data.draw(search_strategy) if print_example: report( lambda: 'Falsifying example: %s(%s)' % ( test.__name__, arg_string(test, args, kwargs))) elif current_verbosity() >= Verbosity.verbose: report( lambda: 'Trying example: %s(%s)' % ( test.__name__, arg_string(test, args, kwargs))) return test(*args, **kwargs) return run def seed(seed): """seed: Start the test execution from a specific seed. May be any hashable object. No exact meaning for seed is provided other than that for a fixed seed value Hypothesis will try the same actions (insofar as it can given external sources of non- determinism. e.g. timing and hash randomization). Overrides the derandomize setting if it is present. """ def accept(test): test._hypothesis_internal_use_seed = seed return test return accept class WithRunner(SearchStrategy): def __init__(self, base, runner): assert runner is not None self.base = base self.runner = runner def do_draw(self, data): data.hypothesis_runner = self.runner return self.base.do_draw(data) def is_invalid_test( name, original_argspec, generator_arguments, generator_kwargs ): def invalid(message): def wrapped_test(*arguments, **kwargs): raise InvalidArgument(message) return wrapped_test if not (generator_arguments or generator_kwargs): return invalid( 'given must be called with at least one argument') if (generator_arguments and (original_argspec.varargs or original_argspec.varkw)): return invalid( 'varargs or keywords are not supported with positional ' 'arguments to @given' ) if ( len(generator_arguments) > len(original_argspec.args) ): return invalid(( 'Too many positional arguments for %s() (got %d but' ' expected at most %d') % ( name, len(generator_arguments), len(original_argspec.args))) if generator_arguments and generator_kwargs: return invalid( 'cannot mix positional and keyword arguments to @given' ) extra_kwargs = [ k for k in generator_kwargs if k not in original_argspec.args + original_argspec.kwonlyargs ] if extra_kwargs and not original_argspec.varkw: return invalid( '%s() got an unexpected keyword argument %r' % ( name, extra_kwargs[0] )) for a in original_argspec.args: if isinstance(a, list): # pragma: no cover return invalid(( 'Cannot decorate function %s() because it has ' 'destructuring arguments') % ( name, )) if original_argspec.defaults or original_argspec.kwonlydefaults: return invalid('Cannot apply @given to a function with defaults.') missing = [repr(kw) for kw in original_argspec.kwonlyargs if kw not in generator_kwargs] if missing: raise InvalidArgument('Missing required kwarg{}: {}'.format( 's' if len(missing) > 1 else '', ', '.join(missing))) def execute_explicit_examples( test_runner, test, wrapped_test, settings, arguments, kwargs ): original_argspec = getfullargspec(test) for example in reversed(getattr( wrapped_test, 'hypothesis_explicit_examples', () )): example_kwargs = dict(original_argspec.kwonlydefaults or {}) if example.args: if len(example.args) > len(original_argspec.args): raise InvalidArgument( 'example has too many arguments for test. ' 'Expected at most %d but got %d' % ( len(original_argspec.args), len(example.args))) example_kwargs.update(dict(zip( original_argspec.args[-len(example.args):], example.args ))) else: example_kwargs.update(example.kwargs) if Phase.explicit not in settings.phases: continue example_kwargs.update(kwargs) # Note: Test may mutate arguments and we can't rerun explicit # examples, so we have to calculate the failure message at this # point rather than than later. example_string = '%s(%s)' % ( test.__name__, arg_string(test, arguments, example_kwargs) ) try: with BuildContext(None) as b: if settings.verbosity >= Verbosity.verbose: report('Trying example: ' + example_string) test_runner( None, lambda data: test(*arguments, **example_kwargs) ) except BaseException: traceback.print_exc() report('Falsifying example: ' + example_string) for n in b.notes: report(n) raise def fail_health_check(settings, message, label): # Tell pytest to omit the body of this function from tracebacks # http://doc.pytest.org/en/latest/example/simple.html#writing-well-integrated-assertion-helpers __tracebackhide__ = True if label in settings.suppress_health_check: return message += ( '\nSee https://hypothesis.readthedocs.io/en/latest/health' 'checks.html for more information about this. ' ) message += ( 'If you want to disable just this health check, add %s ' 'to the suppress_health_check settings for this test.' ) % (label,) raise FailedHealthCheck(message) def perform_health_checks(random, settings, test_runner, search_strategy): # Tell pytest to omit the body of this function from tracebacks __tracebackhide__ = True if not settings.perform_health_check: return if not Settings.default.perform_health_check: return health_check_random = Random(random.getrandbits(128)) # We "pre warm" the health check with one draw to give it some # time to calculate any cached data. This prevents the case # where the first draw of the health check takes ages because # of loading unicode data the first time. data = ConjectureData( max_length=settings.buffer_size, draw_bytes=lambda data, n, distribution: distribution(health_check_random, n) ) with Settings(settings, verbosity=Verbosity.quiet): try: test_runner(data, reify_and_execute( search_strategy, lambda *args, **kwargs: None, )) except BaseException: pass count = 0 overruns = 0 filtered_draws = 0 start = time.time() while ( count < 10 and time.time() < start + 1 and filtered_draws < 50 and overruns < 20 ): try: data = ConjectureData( max_length=settings.buffer_size, draw_bytes=lambda data, n, distribution: distribution(health_check_random, n) ) with Settings(settings, verbosity=Verbosity.quiet): test_runner(data, reify_and_execute( search_strategy, lambda *args, **kwargs: None, )) count += 1 except UnsatisfiedAssumption: filtered_draws += 1 except StopTest: if data.status == Status.INVALID: filtered_draws += 1 else: assert data.status == Status.OVERRUN overruns += 1 except InvalidArgument: raise except Exception: escalate_hypothesis_internal_error() if ( HealthCheck.exception_in_generation in settings.suppress_health_check ): raise report(traceback.format_exc()) if test_runner is default_new_style_executor: fail_health_check( settings, 'An exception occurred during data ' 'generation in initial health check. ' 'This indicates a bug in the strategy. ' 'This could either be a Hypothesis bug or ' "an error in a function you've passed to " 'it to construct your data.', HealthCheck.exception_in_generation, ) else: fail_health_check( settings, 'An exception occurred during data ' 'generation in initial health check. ' 'This indicates a bug in the strategy. ' 'This could either be a Hypothesis bug or ' 'an error in a function you\'ve passed to ' 'it to construct your data. Additionally, ' 'you have a custom executor, which means ' 'that this could be your executor failing ' 'to handle a function which returns None. ', HealthCheck.exception_in_generation, ) if overruns >= 20 or ( not count and overruns > 0 ): fail_health_check(settings, ( 'Examples routinely exceeded the max allowable size. ' '(%d examples overran while generating %d valid ones)' '. Generating examples this large will usually lead to' ' bad results. You should try setting average_size or ' 'max_size parameters on your collections and turning ' 'max_leaves down on recursive() calls.') % ( overruns, count ), HealthCheck.data_too_large) if filtered_draws >= 50 or ( not count and filtered_draws > 0 ): fail_health_check(settings, ( 'It looks like your strategy is filtering out a lot ' 'of data. Health check found %d filtered examples but ' 'only %d good ones. This will make your tests much ' 'slower, and also will probably distort the data ' 'generation quite a lot. You should adapt your ' 'strategy to filter less. This can also be caused by ' 'a low max_leaves parameter in recursive() calls') % ( filtered_draws, count ), HealthCheck.filter_too_much) runtime = time.time() - start if runtime > 1.0 or count < 10: fail_health_check(settings, ( 'Data generation is extremely slow: Only produced ' '%d valid examples in %.2f seconds (%d invalid ones ' 'and %d exceeded maximum size). Try decreasing ' "size of the data you're generating (with e.g." 'average_size or max_leaves parameters).' ) % (count, runtime, filtered_draws, overruns), HealthCheck.too_slow, ) def get_random_for_wrapped_test(test, wrapped_test): settings = wrapped_test._hypothesis_internal_use_settings if wrapped_test._hypothesis_internal_use_seed is not None: return Random( wrapped_test._hypothesis_internal_use_seed) elif settings.derandomize: return Random(function_digest(test)) else: return new_random() def process_arguments_to_given( wrapped_test, arguments, kwargs, generator_arguments, generator_kwargs, argspec, test, settings ): import hypothesis.strategies as sd selfy = None arguments, kwargs = convert_positional_arguments( wrapped_test, arguments, kwargs) # If the test function is a method of some kind, the bound object # will be the first named argument if there are any, otherwise the # first vararg (if any). if argspec.args: selfy = kwargs.get(argspec.args[0]) elif arguments: selfy = arguments[0] test_runner = new_style_executor(selfy) arguments = tuple(arguments) given_specifier = sd.tuples( sd.just(arguments), sd.fixed_dictionaries(generator_kwargs).map( lambda args: dict(args, **kwargs) ) ) search_strategy = given_specifier if selfy is not None: search_strategy = WithRunner(search_strategy, selfy) search_strategy.validate() return arguments, kwargs, test_runner, search_strategy class StateForActualGivenExecution(object): def __init__(self, test_runner, search_strategy, test, settings, random): self.test_runner = test_runner self.search_strategy = search_strategy self.test = test self.settings = settings self.at_least_one_success = False self.last_exception = None self.repr_for_last_exception = None self.falsifying_example = None self.random = random def evaluate_test_data(self, data): try: result = self.test_runner(data, reify_and_execute( self.search_strategy, self.test, )) if result is not None and self.settings.perform_health_check: fail_health_check(self.settings, ( 'Tests run under @given should return None, but ' '%s returned %r instead.' ) % (self.test.__name__, result), HealthCheck.return_value) self.at_least_one_success = True return False except UnsatisfiedAssumption: data.mark_invalid() except ( HypothesisDeprecationWarning, FailedHealthCheck, StopTest, ): raise except Exception: escalate_hypothesis_internal_error() self.last_exception = traceback.format_exc() verbose_report(self.last_exception) data.mark_interesting() def run(self): # Tell pytest to omit the body of this function from tracebacks __tracebackhide__ = True database_key = str_to_bytes(fully_qualified_name(self.test)) start_time = time.time() runner = ConjectureRunner( self.evaluate_test_data, settings=self.settings, random=self.random, database_key=database_key, ) runner.run() note_engine_for_statistics(runner) run_time = time.time() - start_time timed_out = ( self.settings.timeout > 0 and run_time >= self.settings.timeout ) if runner.last_data is None: return if runner.last_data.status == Status.INTERESTING: self.falsifying_example = runner.last_data.buffer if self.settings.database is not None: self.settings.database.save( database_key, self.falsifying_example ) else: if runner.valid_examples < min( self.settings.min_satisfying_examples, self.settings.max_examples, ) and not ( runner.exit_reason == ExitReason.finished and self.at_least_one_success ): if timed_out: raise Timeout(( 'Ran out of time before finding a satisfying ' 'example for ' '%s. Only found %d examples in ' + '%.2fs.' ) % ( get_pretty_function_description(self.test), runner.valid_examples, run_time )) else: raise Unsatisfiable(( 'Unable to satisfy assumptions of hypothesis ' '%s. Only %d examples considered ' 'satisfied assumptions' ) % ( get_pretty_function_description(self.test), runner.valid_examples,)) if self.falsifying_example is None: return assert self.last_exception is not None try: with self.settings: self.test_runner( ConjectureData.for_buffer(self.falsifying_example), reify_and_execute( self.search_strategy, self.test, print_example=True, is_final=True )) except (UnsatisfiedAssumption, StopTest): report(traceback.format_exc()) raise Flaky( 'Unreliable assumption: An example which satisfied ' 'assumptions on the first run now fails it.' ) report( 'Failed to reproduce exception. Expected: \n' + self.last_exception, ) filter_message = ( 'Unreliable test data: Failed to reproduce a failure ' 'and then when it came to recreating the example in ' 'order to print the test data with a flaky result ' 'the example was filtered out (by e.g. a ' 'call to filter in your strategy) when we didn\'t ' 'expect it to be.' ) try: self.test_runner( ConjectureData.for_buffer(self.falsifying_example), reify_and_execute( self.search_strategy, test_is_flaky(self.test, self.repr_for_last_exception), print_example=True, is_final=True )) except (UnsatisfiedAssumption, StopTest): raise Flaky(filter_message)
[docs]def given(*given_arguments, **given_kwargs): """A decorator for turning a test function that accepts arguments into a randomized test. This is the main entry point to Hypothesis. """ def run_test_with_generator(test): generator_arguments = tuple(given_arguments) generator_kwargs = dict(given_kwargs) original_argspec = getfullargspec(test) check_invalid = is_invalid_test( test.__name__, original_argspec, generator_arguments, generator_kwargs) if check_invalid is not None: return check_invalid arguments = original_argspec.args for name, strategy in zip(arguments[-len(generator_arguments):], generator_arguments): generator_kwargs[name] = strategy argspec = original_argspec._replace( args=[a for a in arguments if a not in generator_kwargs], defaults=None, ) annots = {k: v for k, v in argspec.annotations.items() if k in (argspec.args + argspec.kwonlyargs)} annots['return'] = None argspec = argspec._replace(annotations=annots) @impersonate(test) @define_function_signature( test.__name__, test.__doc__, argspec ) def wrapped_test(*arguments, **kwargs): # Tell pytest to omit the body of this function from tracebacks __tracebackhide__ = True settings = wrapped_test._hypothesis_internal_use_settings random = get_random_for_wrapped_test(test, wrapped_test) processed_args = process_arguments_to_given( wrapped_test, arguments, kwargs, generator_arguments, generator_kwargs, argspec, test, settings ) arguments, kwargs, test_runner, search_strategy = processed_args execute_explicit_examples( test_runner, test, wrapped_test, settings, arguments, kwargs ) if settings.max_examples <= 0: return if not ( Phase.reuse in settings.phases or Phase.generate in settings.phases ): return perform_health_checks( random, settings, test_runner, search_strategy) state = StateForActualGivenExecution( test_runner, search_strategy, test, settings, random) state.run() for attr in dir(test): if attr[0] != '_' and not hasattr(wrapped_test, attr): setattr(wrapped_test, attr, getattr(test, attr)) wrapped_test.is_hypothesis_test = True wrapped_test._hypothesis_internal_use_seed = getattr( test, '_hypothesis_internal_use_seed', None ) wrapped_test._hypothesis_internal_use_settings = getattr( test, '_hypothesis_internal_use_settings', None ) or Settings.default return wrapped_test return run_test_with_generator
[docs]def find(specifier, condition, settings=None, random=None, database_key=None): """Returns the minimal example from the given strategy ``specifier`` that matches the predicate function ``condition``.""" settings = settings or Settings( max_examples=2000, min_satisfying_examples=0, max_shrinks=2000, ) if database_key is None and settings.database is not None: database_key = function_digest(condition) if not isinstance(specifier, SearchStrategy): raise InvalidArgument( 'Expected SearchStrategy but got %r of type %s' % ( specifier, type(specifier).__name__ )) specifier.validate() search = specifier random = random or new_random() successful_examples = [0] last_data = [None] def template_condition(data): with BuildContext(data): try: data.is_find = True result = data.draw(search) data.note(result) success = condition(result) except UnsatisfiedAssumption: data.mark_invalid() if success: successful_examples[0] += 1 if settings.verbosity == Verbosity.verbose: if not successful_examples[0]: report(lambda: u'Trying example %s' % ( nicerepr(result), )) elif success: if successful_examples[0] == 1: report(lambda: u'Found satisfying example %s' % ( nicerepr(result), )) else: report(lambda: u'Shrunk example to %s' % ( nicerepr(result), )) last_data[0] = data if success and not data.frozen: data.mark_interesting() start = time.time() runner = ConjectureRunner( template_condition, settings=settings, random=random, database_key=database_key, ) runner.run() note_engine_for_statistics(runner) run_time = time.time() - start if runner.last_data.status == Status.INTERESTING: data = ConjectureData.for_buffer(runner.last_data.buffer) with BuildContext(data): return data.draw(search) if ( runner.valid_examples <= settings.min_satisfying_examples and runner.exit_reason != ExitReason.finished ): if settings.timeout > 0 and run_time > settings.timeout: raise Timeout(( 'Ran out of time before finding enough valid examples for ' '%s. Only %d valid examples found in %.2f seconds.' ) % ( get_pretty_function_description(condition), runner.valid_examples, run_time)) else: raise Unsatisfiable(( 'Unable to satisfy assumptions of ' '%s. Only %d examples considered satisfied assumptions' ) % ( get_pretty_function_description(condition), runner.valid_examples,)) raise NoSuchExample(get_pretty_function_description(condition))