Source code for taskflow.formatters

# -*- coding: utf-8 -*-

#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import functools

from taskflow.engines.action_engine import compiler
from taskflow import exceptions as exc
from taskflow import states
from taskflow.types import tree
from taskflow.utils import misc


def _cached_get(cache, cache_key, atom_name, fetch_func, *args, **kwargs):
    """Tries to get a previously saved value or fetches it and caches it."""
    value, value_found = None, False
    try:
        value, value_found = cache[cache_key][atom_name]
    except KeyError:
        try:
            value = fetch_func(*args, **kwargs)
            value_found = True
        except (exc.StorageFailure, exc.NotFound):
            pass
        cache[cache_key][atom_name] = value, value_found
    return value, value_found


def _fetch_predecessor_tree(graph, atom):
    """Creates a tree of predecessors, rooted at given atom."""
    root = tree.Node(atom)
    stack = [(root, atom)]
    while stack:
        parent, node = stack.pop()
        for pred_node in graph.predecessors_iter(node):
            pred_node_data = graph.node[pred_node]
            if pred_node_data['kind'] == compiler.FLOW_END:
                # Jump over and/or don't show flow end nodes...
                for pred_pred_node in graph.predecessors_iter(pred_node):
                    stack.append((parent, pred_pred_node))
            else:
                child = tree.Node(pred_node, **pred_node_data)
                parent.add(child)
                # And go further backwards...
                stack.append((child, pred_node))
    return root


[docs]class FailureFormatter(object): """Formats a failure and connects it to associated atoms & engine.""" _BUILDERS = { states.EXECUTE: (_fetch_predecessor_tree, 'predecessors'), } def __init__(self, engine, hide_inputs_outputs_of=()): self._hide_inputs_outputs_of = hide_inputs_outputs_of self._engine = engine def _format_node(self, storage, cache, node): """Formats a single tree node into a string version.""" if node.metadata['kind'] == compiler.FLOW: flow = node.item flow_name = flow.name return "Flow '%s'" % (flow_name) elif node.metadata['kind'] in compiler.ATOMS: atom = node.item atom_name = atom.name atom_attrs = {} intention, intention_found = _cached_get( cache, 'intentions', atom_name, storage.get_atom_intention, atom_name) if intention_found: atom_attrs['intention'] = intention state, state_found = _cached_get(cache, 'states', atom_name, storage.get_atom_state, atom_name) if state_found: atom_attrs['state'] = state if atom_name not in self._hide_inputs_outputs_of: # When the cache does not exist for this atom this # will be called with the rest of these arguments # used to populate the cache. fetch_mapped_args = functools.partial( storage.fetch_mapped_args, atom.rebind, atom_name=atom_name, optional_args=atom.optional) requires, requires_found = _cached_get(cache, 'requires', atom_name, fetch_mapped_args) if requires_found: atom_attrs['requires'] = requires provides, provides_found = _cached_get( cache, 'provides', atom_name, storage.get_execute_result, atom_name) if provides_found: atom_attrs['provides'] = provides if atom_attrs: return "Atom '%s' %s" % (atom_name, atom_attrs) else: return "Atom '%s'" % (atom_name) else: raise TypeError("Unable to format node, unknown node" " kind '%s' encountered" % node.metadata['kind'])
[docs] def format(self, fail, atom_matcher): """Returns a (exc_info, details) tuple about the failure. The ``exc_info`` tuple should be a standard three element (exctype, value, traceback) tuple that will be used for further logging. A non-empty string is typically returned for ``details``; it should contain any string info about the failure (with any specific details the ``exc_info`` may not have/contain). """ buff = misc.StringIO() storage = self._engine.storage compilation = self._engine.compilation if fail.exc_info is None: # Remote failures will not have a 'exc_info' tuple, so just use # the captured traceback that was captured by the creator when it # failed... buff.write_nl(fail.pformat(traceback=True)) if storage is None or compilation is None: # Somehow we got called before prepared and/or compiled; ok # that's weird, skip doing the rest... return (fail.exc_info, buff.getvalue()) hierarchy = compilation.hierarchy graph = compilation.execution_graph atom_node = hierarchy.find_first_match(atom_matcher) atom = None atom_intention = None if atom_node is not None: atom = atom_node.item atom_intention = storage.get_atom_intention(atom.name) if atom is not None and atom_intention in self._BUILDERS: # Cache as much as we can, since the path of various atoms # may cause the same atom to be seen repeatedly depending on # the graph structure... cache = { 'intentions': {}, 'provides': {}, 'requires': {}, 'states': {}, } builder, kind = self._BUILDERS[atom_intention] rooted_tree = builder(graph, atom) child_count = rooted_tree.child_count(only_direct=False) buff.write_nl('%s %s (most recent first):' % (child_count, kind)) formatter = functools.partial(self._format_node, storage, cache) direct_child_count = rooted_tree.child_count(only_direct=True) for i, child in enumerate(rooted_tree, 1): if i == direct_child_count: buff.write(child.pformat(stringify_node=formatter, starting_prefix=" ")) else: buff.write_nl(child.pformat(stringify_node=formatter, starting_prefix=" ")) return (fail.exc_info, buff.getvalue())

Navigation

Project Source