Types

Note

Even though these types are made for public consumption and usage should be encouraged/easily possible it should be noted that these may be moved out to new libraries at various points in the future. If you are using these types without using the rest of this library it is strongly encouraged that you be a vocal proponent of getting these made into isolated libraries (as using these types in this manner is not the expected and/or desired usage).

Entity

class taskflow.types.entity.Entity(kind, name, metadata)[source]

Bases: object

Entity object that identifies some resource/item/other.

Variables
  • kindimmutable type/kind that identifies this entity (typically unique to a library/application)

  • Entity.nameimmutable name that can be used to uniquely identify this entity among many other entities

  • metadataimmutable dictionary of metadata that is associated with this entity (and typically has keys/values that further describe this entity)

Failure

class taskflow.types.failure.Failure(exc_info=None, **kwargs)[source]

Bases: object

An immutable object that represents failure.

Failure objects encapsulate exception information so that they can be re-used later to re-raise, inspect, examine, log, print, serialize, deserialize…

One example where they are depended upon is in the WBE engine. When a remote worker throws an exception, the WBE based engine will receive that exception and desire to reraise it to the user/caller of the WBE based engine for appropriate handling (this matches the behavior of non-remote engines). To accomplish this a failure object (or a to_dict() form) would be sent over the WBE channel and the WBE based engine would deserialize it and use this objects reraise() method to cause an exception that contains similar/equivalent information as the original exception to be reraised, allowing the user (or the WBE engine itself) to then handle the worker failure/exception as they desire.

For those who are curious, here are a few reasons why the original exception itself may not be reraised and instead a reraised wrapped failure exception object will be instead. These explanations are only applicable when a failure object is serialized and deserialized (when it is retained inside the python process that the exception was created in the the original exception can be reraised correctly without issue).

  • Traceback objects are not serializable/recreatable, since they contain references to stack frames at the location where the exception was raised. When a failure object is serialized and sent across a channel and recreated it is not possible to restore the original traceback and originating stack frames.

  • The original exception type can not be guaranteed to be found, workers can run code that is not accessible/available when the failure is being deserialized. Even if it was possible to use pickle safely it would not be possible to find the originating exception or associated code in this situation.

  • The original exception type can not be guaranteed to be constructed in a correct manner. At the time of failure object creation the exception has already been created and the failure object can not assume it has knowledge (or the ability) to recreate the original type of the captured exception (this is especially hard if the original exception was created via a complex process via some custom exception constructor).

  • The original exception type can not be guaranteed to be constructed in a safe manner. Importing foreign exception types dynamically can be problematic when not done correctly and in a safe manner; since failure objects can capture any exception it would be unsafe to try to import those exception types namespaces and modules on the receiver side dynamically (this would create similar issues as the pickle module in python has where foreign modules can be imported, causing those modules to have code ran when this happens, and this can cause issues and side-effects that the receiver would not have intended to have caused).

TODO(harlowja): use parts of 17911 and the backport at https://pypi.org/project/traceback2/ to (hopefully) simplify the methods and contents of this object…

BASE_EXCEPTIONS = ('BaseException', 'Exception')

Root exceptions of all other python exceptions.

See: https://docs.python.org/2/library/exceptions.html

SCHEMA = {'$ref': '#/definitions/cause', 'definitions': {'cause': {'additionalProperties': True, 'properties': {'causes': {'items': {'$ref': '#/definitions/cause'}, 'type': 'array'}, 'exc_args': {'minItems': 0, 'type': 'array'}, 'exc_type_names': {'items': {'type': 'string'}, 'minItems': 1, 'type': 'array'}, 'exception_str': {'type': 'string'}, 'traceback_str': {'type': 'string'}, 'version': {'minimum': 0, 'type': 'integer'}}, 'required': ['exception_str', 'traceback_str', 'exc_type_names'], 'type': 'object'}}}

Expected failure schema (in json schema format).

classmethod from_exception(exception)[source]

Creates a failure object from a exception instance.

classmethod validate(data)[source]

Validate input data matches expected failure dict format.

matches(other)[source]

Checks if another object is equivalent to this object.

Returns

checks if another object is equivalent to this object

Return type

boolean

property exception

Exception value, or none if exception value is not present.

Exception value may be lost during serialization.

property exception_str

String representation of exception.

property exception_args

Tuple of arguments given to the exception constructor.

property exc_info

Exception info tuple or none.

See: https://docs.python.org/2/library/sys.html#sys.exc_info for what

the contents of this tuple are (if none, then no contents can be examined).

property traceback_str

Exception traceback as string.

static reraise_if_any(failures)[source]

Re-raise exceptions if argument is not empty.

If argument is empty list/tuple/iterator, this method returns None. If argument is converted into a list with a single Failure object in it, that failure is reraised. Else, a WrappedFailure exception is raised with the failure list as causes.

reraise()[source]

Re-raise captured exception.

check(*exc_classes)[source]

Check if any of exc_classes caused the failure.

Arguments of this method can be exception types or type names (stings). If captured exception is instance of exception of given type, the corresponding argument is returned. Else, None is returned.

property causes

Tuple of all inner failure causes of this failure.

NOTE(harlowja): Does not include the current failure (only returns connected causes of this failure, if any). This property is really only useful on 3.x or newer versions of python as older versions do not have associated causes (the tuple will always be empty on 2.x versions of python).

Refer to PEP 3134 and PEP 409 and PEP 415 for what this is examining to find failure causes.

pformat(traceback=False)[source]

Pretty formats the failure object into a string.

classmethod from_dict(data)[source]

Converts this from a dictionary to a object.

to_dict(include_args=True)[source]

Converts this object to a dictionary.

Parameters

include_args – boolean indicating whether to include the exception args in the output.

copy()[source]

Copies this object.

Graph

class taskflow.types.graph.Graph(incoming_graph_data=None, name='')[source]

Bases: networkx.classes.graph.Graph

A graph subclass with useful utility functions.

freeze()[source]

Freezes the graph so that no more mutations can occur.

export_to_dot()[source]

Exports the graph to a dot format (requires pydot library).

pformat()[source]

Pretty formats your graph into a string.

add_edge(u, v, attr_dict=None, **attr)[source]

Add an edge between u and v.

add_node(n, attr_dict=None, **attr)[source]

Add a single node n and update node attributes.

fresh_copy()[source]

Return a fresh copy graph with the same data structure.

A fresh copy has no nodes, edges or graph attributes. It is the same data structure as the current graph. This method is typically used to create an empty version of the graph.

class taskflow.types.graph.DiGraph(incoming_graph_data=None, name='')[source]

Bases: networkx.classes.digraph.DiGraph

A directed graph subclass with useful utility functions.

freeze()[source]

Freezes the graph so that no more mutations can occur.

get_edge_data(u, v, default=None)[source]

Returns a copy of the edge attribute dictionary between (u, v).

NOTE(harlowja): this differs from the networkx get_edge_data() as that function does not return a copy (but returns a reference to the actual edge data).

topological_sort()[source]

Return a list of nodes in this graph in topological sort order.

pformat()[source]

Pretty formats your graph into a string.

This pretty formatted string representation includes many useful details about your graph, including; name, type, frozeness, node count, nodes, edge count, edges, graph density and graph cycles (if any).

export_to_dot()[source]

Exports the graph to a dot format (requires pydot library).

is_directed_acyclic()[source]

Returns if this graph is a DAG or not.

no_successors_iter()[source]

Returns an iterator for all nodes with no successors.

no_predecessors_iter()[source]

Returns an iterator for all nodes with no predecessors.

bfs_predecessors_iter(n)[source]

Iterates breadth first over all predecessors of a given node.

This will go through the nodes predecessors, then the predecessor nodes predecessors and so on until no more predecessors are found.

NOTE(harlowja): predecessor cycles (if they exist) will not be iterated over more than once (this prevents infinite iteration).

add_edge(u, v, attr_dict=None, **attr)[source]

Add an edge between u and v.

add_node(n, attr_dict=None, **attr)[source]

Add a single node n and update node attributes.

fresh_copy()[source]

Return a fresh copy graph with the same data structure.

A fresh copy has no nodes, edges or graph attributes. It is the same data structure as the current graph. This method is typically used to create an empty version of the graph.

class taskflow.types.graph.OrderedDiGraph(incoming_graph_data=None, name='')[source]

Bases: taskflow.types.graph.DiGraph

A directed graph subclass with useful utility functions.

This derivative retains node, edge, insertion and iteration ordering (so that the iteration order matches the insertion order).

node_dict_factory

alias of collections.OrderedDict

adjlist_outer_dict_factory

alias of collections.OrderedDict

adjlist_inner_dict_factory

alias of collections.OrderedDict

edge_attr_dict_factory

alias of collections.OrderedDict

fresh_copy()[source]

Return a fresh copy graph with the same data structure.

A fresh copy has no nodes, edges or graph attributes. It is the same data structure as the current graph. This method is typically used to create an empty version of the graph.

class taskflow.types.graph.OrderedGraph(incoming_graph_data=None, name='')[source]

Bases: taskflow.types.graph.Graph

A graph subclass with useful utility functions.

This derivative retains node, edge, insertion and iteration ordering (so that the iteration order matches the insertion order).

node_dict_factory

alias of collections.OrderedDict

adjlist_outer_dict_factory

alias of collections.OrderedDict

adjlist_inner_dict_factory

alias of collections.OrderedDict

edge_attr_dict_factory

alias of collections.OrderedDict

fresh_copy()[source]

Return a fresh copy graph with the same data structure.

A fresh copy has no nodes, edges or graph attributes. It is the same data structure as the current graph. This method is typically used to create an empty version of the graph.

taskflow.types.graph.merge_graphs(graph, *graphs, **kwargs)[source]

Merges a bunch of graphs into a new graph.

If no additional graphs are provided the first graph is returned unmodified otherwise the merged graph is returned.

Notifier

class taskflow.types.notifier.Listener(callback, args=None, kwargs=None, details_filter=None)[source]

Bases: object

Immutable helper that represents a notification listener/target.

property callback

Callback (can not be none) to call with event + details.

property details_filter

Callback (may be none) to call to discard events + details.

property kwargs

Dictionary of keyword arguments to use in future calls.

property args

Tuple of positional arguments to use in future calls.

__call__(event_type, details)[source]

Activate the target callback with the given event + details.

NOTE(harlowja): if a details filter callback exists and it returns a falsey value when called with the provided details, then the target callback will not be called.

is_equivalent(callback, details_filter=None)[source]

Check if the callback is same

Parameters
  • callback – callback used for comparison

  • details_filter – callback used for comparison

Returns

false if not the same callback, otherwise true

Return type

boolean

class taskflow.types.notifier.Notifier[source]

Bases: object

A notification (pub/sub like) helper class.

It is intended to be used to subscribe to notifications of events occurring as well as allow a entity to post said notifications to any associated subscribers without having either entity care about how this notification occurs.

Not thread-safe when a single notifier is mutated at the same time by multiple threads. For example having multiple threads call into register() or reset() at the same time could potentially end badly. It is thread-safe when only notify() calls or other read-only actions (like calling into is_registered()) are occurring at the same time.

RESERVED_KEYS = ('details',)

Keys that can not be used in callbacks arguments

ANY = '*'

Kleene star constant that is used to receive all notifications

is_registered(event_type, callback, details_filter=None)[source]

Check if a callback is registered.

Returns

checks if the callback is registered

Return type

boolean

reset()[source]

Forget all previously registered callbacks.

notify(event_type, details)[source]

Notify about event occurrence.

All callbacks registered to receive notifications about given event type will be called. If the provided event type can not be used to emit notifications (this is checked via the can_be_registered() method) then it will silently be dropped (notification failures are not allowed to cause or raise exceptions).

Parameters
  • event_type – event type that occurred

  • details (dictionary) – additional event details dictionary passed to callback keyword argument with the same name

register(event_type, callback, args=None, kwargs=None, details_filter=None)[source]

Register a callback to be called when event of a given type occurs.

Callback will be called with provided args and kwargs and when event type occurs (or on any event if event_type equals to ANY). It will also get additional keyword argument, details, that will hold event details provided to the notify() method (if a details filter callback is provided then the target callback will only be triggered if the details filter callback returns a truthy value).

Parameters
  • event_type – event type input

  • callback – function callback to be registered.

  • args (list) – non-keyworded arguments

  • kwargs (dictionary) – key-value pair arguments

deregister(event_type, callback, details_filter=None)[source]

Remove a single listener bound to event event_type.

Parameters

event_type – deregister listener bound to event_type

deregister_event(event_type)[source]

Remove a group of listeners bound to event event_type.

Parameters

event_type – deregister listeners bound to event_type

listeners_iter()[source]

Return an iterator over the mapping of event => listeners bound.

NOTE(harlowja): Each listener in the yielded (event, listeners) tuple is an instance of the Listener type, which itself wraps a provided callback (and its details filter callback, if any).

can_be_registered(event_type)[source]

Checks if the event can be registered/subscribed to.

can_trigger_notification(event_type)[source]

Checks if the event can trigger a notification.

Parameters

event_type – event that needs to be verified

Returns

whether the event can trigger a notification

Return type

boolean

class taskflow.types.notifier.RestrictedNotifier(watchable_events, allow_any=True)[source]

Bases: taskflow.types.notifier.Notifier

A notification class that restricts events registered/triggered.

NOTE(harlowja): This class unlike Notifier restricts and disallows registering callbacks for event types that are not declared when constructing the notifier.

events_iter()[source]

Returns iterator of events that can be registered/subscribed to.

NOTE(harlowja): does not include back the ANY event type as that meta-type is not a specific event but is a capture-all that does not imply the same meaning as specific event types.

can_be_registered(event_type)[source]

Checks if the event can be registered/subscribed to.

Parameters

event_type – event that needs to be verified

Returns

whether the event can be registered/subscribed to

Return type

boolean

taskflow.types.notifier.register_deregister(notifier, event_type, callback=None, args=None, kwargs=None, details_filter=None)[source]

Context manager that registers a callback, then deregisters on exit.

NOTE(harlowja): if the callback is none, then this registers nothing, which

is different from the behavior of the register method which will not accept none as it is not callable…

Sets

class taskflow.types.sets.OrderedSet(iterable=None)[source]

Bases: collections.abc.Set, collections.abc.Hashable

A read-only hashable set that retains insertion/initial ordering.

It should work in all existing places that frozenset is used.

See: https://mail.python.org/pipermail/python-ideas/2009-May/004567.html for an idea thread that may eventually (someday) result in this (or similar) code being included in the mainline python codebase (although the end result of that thread is somewhat discouraging in that regard).

copy()[source]

Return a shallow copy of a set.

intersection(*sets)[source]

Return the intersection of two or more sets as a new set.

(i.e. elements that are common to all of the sets.)

issuperset(other)[source]

Report whether this set contains another set.

issubset(other)[source]

Report whether another set contains this set.

difference(*sets)[source]

Return the difference of two or more sets as a new set.

(i.e. all elements that are in this set but not the others.)

union(*sets)[source]

Return the union of sets as a new set.

(i.e. all elements that are in either set.)

Timing

class taskflow.types.timing.Timeout(value, event_factory=<class 'threading.Event'>)[source]

Bases: object

An object which represents a timeout.

This object has the ability to be interrupted before the actual timeout is reached.

property value

Immutable value of the internally used timeout.

interrupt()[source]

Forcefully set the timeout (releases any waiters).

is_stopped()[source]

Returns if the timeout has been interrupted.

wait()[source]

Block current thread (up to timeout) and wait until interrupted.

reset()[source]

Reset so that interruption (and waiting) can happen again.

taskflow.types.timing.convert_to_timeout(value=None, default_value=None, event_factory=<class 'threading.Event'>)[source]

Converts a given value to a timeout instance (and returns it).

Does nothing if the value provided is already a timeout instance.

Tree

exception taskflow.types.tree.FrozenNode[source]

Bases: Exception

Exception raised when a frozen node is modified.

class taskflow.types.tree.Node(item, **kwargs)[source]

Bases: object

A n-ary node class that can be used to create tree structures.

STARTING_PREFIX = ''

Default string prefix used in pformat().

EMPTY_SPACE_SEP = ' '

Default string used to create empty space used in pformat().

HORIZONTAL_CONN = '__'

Default string used to horizontally connect a node to its parent (used in pformat().).

VERTICAL_CONN = '|'

Default string used to vertically connect a node to its parent (used in pformat()).

LINE_SEP = '\n'

Default line separator used in pformat().

add(child)[source]

Adds a child to this node (appends to left of existing children).

NOTE(harlowja): this will also set the childs parent to be this node.

empty()[source]

Returns if the node is a leaf node.

path_iter(include_self=True)[source]

Yields back the path from this node to the root node.

find_first_match(matcher, only_direct=False, include_self=True)[source]

Finds the first node that matching callback returns true.

This will search not only this node but also any children nodes (in depth first order, from right to left) and finally if nothing is matched then None is returned instead of a node object.

Parameters
  • matcher – callback that takes one positional argument (a node) and returns true if it matches desired node or false if not.

  • only_direct – only look at current node and its direct children (implies that this does not search using depth first).

  • include_self – include the current node during searching.

Returns

the node that matched (or None)

find(item, only_direct=False, include_self=True)[source]

Returns the first node for an item if it exists in this node.

This will search not only this node but also any children nodes (in depth first order, from right to left) and finally if nothing is matched then None is returned instead of a node object.

Parameters
  • item – item to look for.

  • only_direct – only look at current node and its direct children (implies that this does not search using depth first).

  • include_self – include the current node during searching.

Returns

the node that matched provided item (or None)

disassociate()[source]

Removes this node from its parent (if any).

Returns

occurrences of this node that were removed from its parent.

remove(item, only_direct=False, include_self=True)[source]

Removes a item from this nodes children.

This will search not only this node but also any children nodes and finally if nothing is found then a value error is raised instead of the normally returned removed node object.

Parameters
  • item – item to lookup.

  • only_direct – only look at current node and its direct children (implies that this does not search using depth first).

  • include_self – include the current node during searching.

pformat(stringify_node=None, linesep='\n', vertical_conn='|', horizontal_conn='__', empty_space=' ', starting_prefix='')[source]

Formats this node + children into a nice string representation.

Example:

>>> from taskflow.types import tree
>>> yahoo = tree.Node("CEO")
>>> yahoo.add(tree.Node("Infra"))
>>> yahoo[0].add(tree.Node("Boss"))
>>> yahoo[0][0].add(tree.Node("Me"))
>>> yahoo.add(tree.Node("Mobile"))
>>> yahoo.add(tree.Node("Mail"))
>>> print(yahoo.pformat())
CEO
|__Infra
|  |__Boss
|     |__Me
|__Mobile
|__Mail
child_count(only_direct=True)[source]

Returns how many children this node has.

This can be either only the direct children of this node or inclusive of all children nodes of this node (children of children and so-on).

NOTE(harlowja): it does not account for the current node in this count.

reverse_iter()[source]

Iterates over the direct children of this node (left->right).

index(item)[source]

Finds the child index of a given item, searches in added order.

dfs_iter(include_self=False, right_to_left=True)[source]

Depth first iteration (non-recursive) over the child nodes.

bfs_iter(include_self=False, right_to_left=False)[source]

Breadth first iteration (non-recursive) over the child nodes.

to_digraph()[source]

Converts this node + its children into a ordered directed graph.

The graph returned will have the same structure as the this node and its children (and tree node metadata will be translated into graph node metadata).

Returns

a directed graph

Return type

taskflow.types.graph.OrderedDiGraph