Hello world

Note

Full source located at hello_world.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import futurist

from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task


# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
# an overly simplistic workflow can be created that runs using different
# engines using different styles of execution (all can be used to run in
# parallel if a workflow is provided that is parallelizable).

class PrinterTask(task.Task):
    def __init__(self, name, show_name=True, inject=None):
        super(PrinterTask, self).__init__(name, inject=inject)
        self._show_name = show_name

    def execute(self, output):
        if self._show_name:
            print("%s: %s" % (self.name, output))
        else:
            print(output)


# This will be the work that we want done, which for this example is just to
# print 'hello world' (like a song) using different tasks and different
# execution models.
song = lf.Flow("beats")

# Unordered flows when ran can be ran in parallel; and a chorus is everyone
# singing at once of course!
hi_chorus = uf.Flow('hello')
world_chorus = uf.Flow('world')
for (name, hello, world) in [('bob', 'hello', 'world'),
                             ('joe', 'hellooo', 'worllllld'),
                             ('sue', "helloooooo!", 'wooorllld!')]:
    hi_chorus.add(PrinterTask("%s@hello" % name,
                              # This will show up to the execute() method of
                              # the task as the argument named 'output' (which
                              # will allow us to print the character we want).
                              inject={'output': hello}))
    world_chorus.add(PrinterTask("%s@world" % name,
                                 inject={'output': world}))

# The composition starts with the conductor and then runs in sequence with
# the chorus running in parallel, but no matter what the 'hello' chorus must
# always run before the 'world' chorus (otherwise the world will fall apart).
song.add(PrinterTask("conductor@begin",
                     show_name=False, inject={'output': "*ding*"}),
         hi_chorus,
         world_chorus,
         PrinterTask("conductor@end",
                     show_name=False, inject={'output': "*dong*"}))

# Run in parallel using eventlet green threads...
try:
    executor = futurist.GreenThreadPoolExecutor()
except RuntimeError:
    # No eventlet currently active, skip running with it...
    pass
else:
    print("-- Running in parallel using eventlet --")
    with executor:
        e = engines.load(song, executor=executor, engine='parallel')
        e.run()


# Run in parallel using real threads...
with futurist.ThreadPoolExecutor(max_workers=1) as executor:
    print("-- Running in parallel using threads --")
    e = engines.load(song, executor=executor, engine='parallel')
    e.run()


# Run in parallel using external processes...
with futurist.ProcessPoolExecutor(max_workers=1) as executor:
    print("-- Running in parallel using processes --")
    e = engines.load(song, executor=executor, engine='parallel')
    e.run()


# Run serially (aka, if the workflow could have been ran in parallel, it will
# not be when ran in this mode)...
print("-- Running serially --")
e = engines.load(song, engine='serial')
e.run()

Passing values from and to tasks

Note

Full source located at simple_linear_pass.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from taskflow import engines
from taskflow.patterns import linear_flow
from taskflow import task

# INTRO: This example shows how a task (in a linear/serial workflow) can
# produce an output that can be then consumed/used by a downstream task.


class TaskA(task.Task):
    default_provides = 'a'

    def execute(self):
        print("Executing '%s'" % (self.name))
        return 'a'


class TaskB(task.Task):
    def execute(self, a):
        print("Executing '%s'" % (self.name))
        print("Got input '%s'" % (a))


print("Constructing...")
wf = linear_flow.Flow("pass-from-to")
wf.add(TaskA('a'), TaskB('b'))

print("Loading...")
e = engines.load(wf)

print("Compiling...")
e.compile()

print("Preparing...")
e.prepare()

print("Running...")
e.run()

print("Done...")

Using listeners

Note

Full source located at echo_listener.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import logging
import os
import sys

logging.basicConfig(level=logging.DEBUG)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.listeners import logging as logging_listener
from taskflow.patterns import linear_flow as lf
from taskflow import task

# INTRO: This example walks through a miniature workflow which will do a
# simple echo operation; during this execution a listener is associated with
# the engine to receive all notifications about what the flow has performed,
# this example dumps that output to the stdout for viewing (at debug level
# to show all the information which is possible).


class Echo(task.Task):
    def execute(self):
        print(self.name)


# Generate the work to be done (but don't do it yet).
wf = lf.Flow('abc')
wf.add(Echo('a'))
wf.add(Echo('b'))
wf.add(Echo('c'))

# This will associate the listener with the engine (the listener
# will automatically register for notifications with the engine and deregister
# when the context is exited).
e = engines.load(wf)
with logging_listener.DynamicLoggingListener(e):
    e.run()

Using listeners (to watch a phone call)

Note

Full source located at simple_linear_listening.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import notifier

ANY = notifier.Notifier.ANY

# INTRO: In this example we create two tasks (this time as functions instead
# of task subclasses as in the simple_linear.py example), each of which ~calls~
# a given ~phone~ number (provided as a function input) in a linear fashion
# (one after the other).
#
# For a workflow which is serial this shows an extremely simple way
# of structuring your tasks (the code that does the work) into a linear
# sequence (the flow) and then passing the work off to an engine, with some
# initial data to be ran in a reliable manner.
#
# This example shows a basic usage of the taskflow structures without involving
# the complexity of persistence. Using the structures that taskflow provides
# via tasks and flows makes it possible for you to easily at a later time
# hook in a persistence layer (and then gain the functionality that offers)
# when you decide the complexity of adding that layer in is 'worth it' for your
# applications usage pattern (which some applications may not need).
#
# It **also** adds on to the simple_linear.py example by adding a set of
# callback functions which the engine will call when a flow state transition
# or task state transition occurs. These types of functions are useful for
# updating task or flow progress, or for debugging, sending notifications to
# external systems, or for other yet unknown future usage that you may create!


def call_jim(context):
    print("Calling jim.")
    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))


def call_joe(context):
    print("Calling joe.")
    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))


def flow_watch(state, details):
    print('Flow => %s' % state)


def task_watch(state, details):
    print('Task %s => %s' % (details.get('task_name'), state))


# Wrap your functions into a task type that knows how to treat your functions
# as tasks. There was previous work done to just allow a function to be
# directly passed, but in python 3.0 there is no easy way to capture an
# instance method, so this wrapping approach was decided upon instead which
# can attach to instance methods (if that's desired).
flow = lf.Flow("Call-them")
flow.add(task.FunctorTask(execute=call_jim))
flow.add(task.FunctorTask(execute=call_joe))

# Now load (but do not run) the flow using the provided initial data.
engine = taskflow.engines.load(flow, store={
    'context': {
        "joe_number": 444,
        "jim_number": 555,
    }
})

# This is where we attach our callback functions to the 2 different
# notification objects that an engine exposes. The usage of a ANY (kleene star)
# here means that we want to be notified on all state changes, if you want to
# restrict to a specific state change, just register that instead.
engine.notifier.register(ANY, flow_watch)
engine.atom_notifier.register(ANY, task_watch)

# And now run!
engine.run()

Dumping a in-memory backend

Note

Full source located at dump_memory_backend.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow import task

# INTRO: in this example we create a dummy flow with a dummy task, and run
# it using a in-memory backend and pre/post run we dump out the contents
# of the in-memory backends tree structure (which can be quite useful to
# look at for debugging or other analysis).


class PrintTask(task.Task):
    def execute(self):
        print("Running '%s'" % self.name)

# Make a little flow and run it...
f = lf.Flow('root')
for alpha in ['a', 'b', 'c']:
    f.add(PrintTask(alpha))

e = engines.load(f)
e.compile()
e.prepare()

# After prepare the storage layer + backend can now be accessed safely...
backend = e.storage.backend

print("----------")
print("Before run")
print("----------")
print(backend.memory.pformat())
print("----------")

e.run()

print("---------")
print("After run")
print("---------")
for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
    value = backend.memory[path]
    if value:
        print("%s -> %s" % (path, value))
    else:
        print("%s" % (path))

Making phone calls

Note

Full source located at simple_linear.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task

# INTRO: In this example we create two tasks, each of which ~calls~ a given
# ~phone~ number (provided as a function input) in a linear fashion (one after
# the other). For a workflow which is serial this shows a extremely simple way
# of structuring your tasks (the code that does the work) into a linear
# sequence (the flow) and then passing the work off to an engine, with some
# initial data to be ran in a reliable manner.
#
# NOTE(harlowja): This example shows a basic usage of the taskflow structures
# without involving the complexity of persistence. Using the structures that
# taskflow provides via tasks and flows makes it possible for you to easily at
# a later time hook in a persistence layer (and then gain the functionality
# that offers) when you decide the complexity of adding that layer in
# is 'worth it' for your application's usage pattern (which certain
# applications may not need).


class CallJim(task.Task):
    def execute(self, jim_number, *args, **kwargs):
        print("Calling jim %s." % jim_number)


class CallJoe(task.Task):
    def execute(self, joe_number, *args, **kwargs):
        print("Calling joe %s." % joe_number)


# Create your flow and associated tasks (the work to be done).
flow = lf.Flow('simple-linear').add(
    CallJim(),
    CallJoe()
)

# Now run that flow using the provided initial data (store below).
taskflow.engines.run(flow, store=dict(joe_number=444,
                                      jim_number=555))

Making phone calls (automatically reverting)

Note

Full source located at reverting_linear.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task

# INTRO: In this example we create three tasks, each of which ~calls~ a given
# number (provided as a function input), one of those tasks *fails* calling a
# given number (the suzzie calling); this causes the workflow to enter the
# reverting process, which activates the revert methods of the previous two
# phone ~calls~.
#
# This simulated calling makes it appear like all three calls occur or all
# three don't occur (transaction-like capabilities). No persistence layer is
# used here so reverting and executing will *not* be tolerant of process
# failure.


class CallJim(task.Task):
    def execute(self, jim_number, *args, **kwargs):
        print("Calling jim %s." % jim_number)

    def revert(self, jim_number, *args, **kwargs):
        print("Calling %s and apologizing." % jim_number)


class CallJoe(task.Task):
    def execute(self, joe_number, *args, **kwargs):
        print("Calling joe %s." % joe_number)

    def revert(self, joe_number, *args, **kwargs):
        print("Calling %s and apologizing." % joe_number)


class CallSuzzie(task.Task):
    def execute(self, suzzie_number, *args, **kwargs):
        raise IOError("Suzzie not home right now.")


# Create your flow and associated tasks (the work to be done).
flow = lf.Flow('simple-linear').add(
    CallJim(),
    CallJoe(),
    CallSuzzie()
)

try:
    # Now run that flow using the provided initial data (store below).
    taskflow.engines.run(flow, store=dict(joe_number=444,
                                          jim_number=555,
                                          suzzie_number=666))
except Exception as e:
    # NOTE(harlowja): This exception will be the exception that came out of the
    # 'CallSuzzie' task instead of a different exception, this is useful since
    # typically surrounding code wants to handle the original exception and not
    # a wrapped or altered one.
    #
    # *WARNING* If this flow was multi-threaded and multiple active tasks threw
    # exceptions then the above exception would be wrapped into a combined
    # exception (the object has methods to iterate over the contained
    # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
    # how to deal with multiple tasks failing while running.
    #
    # You will also note that this is not a problem in this case since no
    # parallelism is involved; this is ensured by the usage of a linear flow
    # and the default engine type which is 'serial' vs being 'parallel'.
    print("Flow failed: %s" % e)

Building a car

Note

Full source located at build_a_car.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import logging
import os
import sys


logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)


import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import notifier

ANY = notifier.Notifier.ANY

import example_utils as eu  # noqa


# INTRO: This example shows how a graph flow and linear flow can be used
# together to execute dependent & non-dependent tasks by going through the
# steps required to build a simplistic car (an assembly line if you will). It
# also shows how raw functions can be wrapped into a task object instead of
# being forced to use the more *heavy* task base class. This is useful in
# scenarios where pre-existing code has functions that you easily want to
# plug-in to taskflow, without requiring a large amount of code changes.


def build_frame():
    return 'steel'


def build_engine():
    return 'honda'


def build_doors():
    return '2'


def build_wheels():
    return '4'


# These just return true to indiciate success, they would in the real work
# do more than just that.

def install_engine(frame, engine):
    return True


def install_doors(frame, windows_installed, doors):
    return True


def install_windows(frame, doors):
    return True


def install_wheels(frame, engine, engine_installed, wheels):
    return True


def trash(**kwargs):
    eu.print_wrapped("Throwing away pieces of car!")


def startup(**kwargs):
    # If you want to see the rollback function being activated try uncommenting
    # the following line.
    #
    # raise ValueError("Car not verified")
    return True


def verify(spec, **kwargs):
    # If the car is not what we ordered throw away the car (trigger reversion).
    for key, value in kwargs.items():
        if spec[key] != value:
            raise Exception("Car doesn't match spec!")
    return True


# These two functions connect into the state transition notification emission
# points that the engine outputs, they can be used to log state transitions
# that are occurring, or they can be used to suspend the engine (or perform
# other useful activities).
def flow_watch(state, details):
    print('Flow => %s' % state)


def task_watch(state, details):
    print('Task %s => %s' % (details.get('task_name'), state))


flow = lf.Flow("make-auto").add(
    task.FunctorTask(startup, revert=trash, provides='ran'),
    # A graph flow allows automatic dependency based ordering, the ordering
    # is determined by analyzing the symbols required and provided and ordering
    # execution based on a functioning order (if one exists).
    gf.Flow("install-parts").add(
        task.FunctorTask(build_frame, provides='frame'),
        task.FunctorTask(build_engine, provides='engine'),
        task.FunctorTask(build_doors, provides='doors'),
        task.FunctorTask(build_wheels, provides='wheels'),
        # These *_installed outputs allow for other tasks to depend on certain
        # actions being performed (aka the components were installed), another
        # way to do this is to link() the tasks manually instead of creating
        # an 'artificial' data dependency that accomplishes the same goal the
        # manual linking would result in.
        task.FunctorTask(install_engine, provides='engine_installed'),
        task.FunctorTask(install_doors, provides='doors_installed'),
        task.FunctorTask(install_windows, provides='windows_installed'),
        task.FunctorTask(install_wheels, provides='wheels_installed')),
    task.FunctorTask(verify, requires=['frame',
                                       'engine',
                                       'doors',
                                       'wheels',
                                       'engine_installed',
                                       'doors_installed',
                                       'windows_installed',
                                       'wheels_installed']))

# This dictionary will be provided to the tasks as a specification for what
# the tasks should produce, in this example this specification will influence
# what those tasks do and what output they create. Different tasks depend on
# different information from this specification, all of which will be provided
# automatically by the engine to those tasks.
spec = {
    "frame": 'steel',
    "engine": 'honda',
    "doors": '2',
    "wheels": '4',
    # These are used to compare the result product, a car without the pieces
    # installed is not a car after all.
    "engine_installed": True,
    "doors_installed": True,
    "windows_installed": True,
    "wheels_installed": True,
}


engine = taskflow.engines.load(flow, store={'spec': spec.copy()})

# This registers all (ANY) state transitions to trigger a call to the
# flow_watch function for flow state transitions, and registers the
# same all (ANY) state transitions for task state transitions.
engine.notifier.register(ANY, flow_watch)
engine.atom_notifier.register(ANY, task_watch)

eu.print_wrapped("Building a car")
engine.run()

# Alter the specification and ensure that the reverting logic gets triggered
# since the resultant car that will be built by the build_wheels function will
# build a car with 4 doors only (not 5), this will cause the verification
# task to mark the car that is produced as not matching the desired spec.
spec['doors'] = 5

engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
engine.notifier.register(ANY, flow_watch)
engine.atom_notifier.register(ANY, task_watch)

eu.print_wrapped("Building a wrong car that doesn't match specification")
try:
    engine.run()
except Exception as e:
    eu.print_wrapped("Flow failed: %s" % e)

Iterating over the alphabet (using processes)

Note

Full source located at alphabet_soup.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import fractions
import functools
import logging
import os
import string
import sys
import time

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from taskflow import engines
from taskflow import exceptions
from taskflow.patterns import linear_flow
from taskflow import task


# In this example we show how a simple linear set of tasks can be executed
# using local processes (and not threads or remote workers) with minimal (if
# any) modification to those tasks to make them safe to run in this mode.
#
# This is useful since it allows further scaling up your workflows when thread
# execution starts to become a bottleneck (which it can start to be due to the
# GIL in python). It also offers a intermediary scalable runner that can be
# used when the scale and/or setup of remote workers is not desirable.


def progress_printer(task, event_type, details):
    # This callback, attached to each task will be called in the local
    # process (not the child processes)...
    progress = details.pop('progress')
    progress = int(progress * 100.0)
    print("Task '%s' reached %d%% completion" % (task.name, progress))


class AlphabetTask(task.Task):
    # Second delay between each progress part.
    _DELAY = 0.1

    # This task will run in X main stages (each with a different progress
    # report that will be delivered back to the running process...). The
    # initial 0% and 100% are triggered automatically by the engine when
    # a task is started and finished (so that's why those are not emitted
    # here).
    _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]

    def execute(self):
        for p in self._PROGRESS_PARTS:
            self.update_progress(p)
            time.sleep(self._DELAY)


print("Constructing...")
soup = linear_flow.Flow("alphabet-soup")
for letter in string.ascii_lowercase:
    abc = AlphabetTask(letter)
    abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
                          functools.partial(progress_printer, abc))
    soup.add(abc)
try:
    print("Loading...")
    e = engines.load(soup, engine='parallel', executor='processes')
    print("Compiling...")
    e.compile()
    print("Preparing...")
    e.prepare()
    print("Running...")
    e.run()
    print("Done...")
except exceptions.NotImplementedError as e:
    print(e)

Watching execution timing

Note

Full source located at timing_listener.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import logging
import os
import random
import sys
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.listeners import timing
from taskflow.patterns import linear_flow as lf
from taskflow import task

# INTRO: in this example we will attach a listener to an engine
# and have variable run time tasks run and show how the listener will print
# out how long those tasks took (when they started and when they finished).
#
# This shows how timing metrics can be gathered (or attached onto an engine)
# after a workflow has been constructed, making it easy to gather metrics
# dynamically for situations where this kind of information is applicable (or
# even adding this information on at a later point in the future when your
# application starts to slow down).


class VariableTask(task.Task):
    def __init__(self, name):
        super(VariableTask, self).__init__(name)
        self._sleepy_time = random.random()

    def execute(self):
        time.sleep(self._sleepy_time)


f = lf.Flow('root')
f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
e = engines.load(f)
with timing.PrintingDurationListener(e):
    e.run()

Distance calculator

Note

Full source located at distance_calculator

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import collections
import math
import os
import sys

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.patterns import linear_flow
from taskflow import task

# INTRO: This shows how to use a tasks/atoms ability to take requirements from
# its execute functions default parameters and shows how to provide those
# via different methods when needed, to influence those parameters to in
# this case calculate the distance between two points in 2D space.

# A 2D point.
Point = collections.namedtuple("Point", "x,y")


def is_near(val, expected, tolerance=0.001):
    # Floats don't really provide equality...
    if val > (expected + tolerance):
        return False
    if val < (expected - tolerance):
        return False
    return True


class DistanceTask(task.Task):
    # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space

    default_provides = 'distance'

    def execute(self, a=Point(0, 0), b=Point(0, 0)):
        return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))


if __name__ == '__main__':
    # For these we rely on the execute() methods points by default being
    # at the origin (and we override it with store values when we want) at
    # execution time (which then influences what is calculated).
    any_distance = linear_flow.Flow("origin").add(DistanceTask())
    results = engines.run(any_distance)
    print(results)
    print("%s is near-enough to %s: %s" % (results['distance'],
                                           0.0,
                                           is_near(results['distance'], 0.0)))

    results = engines.run(any_distance, store={'a': Point(1, 1)})
    print(results)
    print("%s is near-enough to %s: %s" % (results['distance'],
                                           1.4142,
                                           is_near(results['distance'],
                                                   1.4142)))

    results = engines.run(any_distance, store={'a': Point(10, 10)})
    print(results)
    print("%s is near-enough to %s: %s" % (results['distance'],
                                           14.14199,
                                           is_near(results['distance'],
                                                   14.14199)))

    results = engines.run(any_distance,
                          store={'a': Point(5, 5), 'b': Point(10, 10)})
    print(results)
    print("%s is near-enough to %s: %s" % (results['distance'],
                                           7.07106,
                                           is_near(results['distance'],
                                                   7.07106)))

    # For this we use the ability to override at task creation time the
    # optional arguments so that we don't need to continue to send them
    # in via the 'store' argument like in the above (and we fix the new
    # starting point 'a' at (10, 10) instead of (0, 0)...

    ten_distance = linear_flow.Flow("ten")
    ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
    results = engines.run(ten_distance, store={'b': Point(10, 10)})
    print(results)
    print("%s is near-enough to %s: %s" % (results['distance'],
                                           0.0,
                                           is_near(results['distance'], 0.0)))

    results = engines.run(ten_distance)
    print(results)
    print("%s is near-enough to %s: %s" % (results['distance'],
                                           14.14199,
                                           is_near(results['distance'],
                                                   14.14199)))

Table multiplier (in parallel)

Note

Full source located at parallel_table_multiply

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import csv
import logging
import os
import random
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import futurist
from six.moves import range as compat_range

from taskflow import engines
from taskflow.patterns import unordered_flow as uf
from taskflow import task

# INTRO: This example walks through a miniature workflow which does a parallel
# table modification where each row in the table gets adjusted by a thread, or
# green thread (if eventlet is available) in parallel and then the result
# is reformed into a new table and some verifications are performed on it
# to ensure everything went as expected.


MULTIPLER = 10


class RowMultiplier(task.Task):
    """Performs a modification of an input row, creating a output row."""

    def __init__(self, name, index, row, multiplier):
        super(RowMultiplier, self).__init__(name=name)
        self.index = index
        self.multiplier = multiplier
        self.row = row

    def execute(self):
        return [r * self.multiplier for r in self.row]


def make_flow(table):
    # This creation will allow for parallel computation (since the flow here
    # is specifically unordered; and when things are unordered they have
    # no dependencies and when things have no dependencies they can just be
    # ran at the same time, limited in concurrency by the executor or max
    # workers of that executor...)
    f = uf.Flow("root")
    for i, row in enumerate(table):
        f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
    # NOTE(harlowja): at this point nothing has ran, the above is just
    # defining what should be done (but not actually doing it) and associating
    # an ordering dependencies that should be enforced (the flow pattern used
    # forces this), the engine in the later main() function will actually
    # perform this work...
    return f


def main():
    if len(sys.argv) == 2:
        tbl = []
        with open(sys.argv[1], 'rb') as fh:
            reader = csv.reader(fh)
            for row in reader:
                tbl.append([float(r) if r else 0.0 for r in row])
    else:
        # Make some random table out of thin air...
        tbl = []
        cols = random.randint(1, 100)
        rows = random.randint(1, 100)
        for _i in compat_range(0, rows):
            row = []
            for _j in compat_range(0, cols):
                row.append(random.random())
            tbl.append(row)

    # Generate the work to be done.
    f = make_flow(tbl)

    # Now run it (using the specified executor)...
    try:
        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
    except RuntimeError:
        # No eventlet currently active, use real threads instead.
        executor = futurist.ThreadPoolExecutor(max_workers=5)
    try:
        e = engines.load(f, engine='parallel', executor=executor)
        for st in e.run_iter():
            print(st)
    finally:
        executor.shutdown()

    # Find the old rows and put them into place...
    #
    # TODO(harlowja): probably easier just to sort instead of search...
    computed_tbl = []
    for i in compat_range(0, len(tbl)):
        for t in f:
            if t.index == i:
                computed_tbl.append(e.storage.get(t.name))

    # Do some basic validation (which causes the return code of this process
    # to be different if things were not as expected...)
    if len(computed_tbl) != len(tbl):
        return 1
    else:
        return 0


if __name__ == "__main__":
    sys.exit(main())

Linear equation solver (explicit dependencies)

Note

Full source located at calculate_linear.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import task


# INTRO: In this example a linear flow is used to group four tasks to calculate
# a value. A single added task is used twice, showing how this can be done
# and the twice added task takes in different bound values. In the first case
# it uses default parameters ('x' and 'y') and in the second case arguments
# are bound with ('z', 'd') keys from the engines internal storage mechanism.
#
# A multiplier task uses a binding that another task also provides, but this
# example explicitly shows that 'z' parameter is bound with 'a' key
# This shows that if a task depends on a key named the same as a key provided
# from another task the name can be remapped to take the desired key from a
# different origin.


# This task provides some values from as a result of execution, this can be
# useful when you want to provide values from a static set to other tasks that
# depend on those values existing before those tasks can run.
#
# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
# that just provides those values on engine running by prepopulating the
# storage backend before your tasks are ran (which accomplishes a similar goal
# in a more uniform manner).
class Provider(task.Task):

    def __init__(self, name, *args, **kwargs):
        super(Provider, self).__init__(name=name, **kwargs)
        self._provide = args

    def execute(self):
        return self._provide


# This task adds two input variables and returns the result.
#
# Note that since this task does not have a revert() function (since addition
# is a stateless operation) there are no side-effects that this function needs
# to undo if some later operation fails.
class Adder(task.Task):
    def execute(self, x, y):
        return x + y


# This task multiplies an input variable by a multiplier and returns the
# result.
#
# Note that since this task does not have a revert() function (since
# multiplication is a stateless operation) and there are no side-effects that
# this function needs to undo if some later operation fails.
class Multiplier(task.Task):
    def __init__(self, name, multiplier, provides=None, rebind=None):
        super(Multiplier, self).__init__(name=name, provides=provides,
                                         rebind=rebind)
        self._multiplier = multiplier

    def execute(self, z):
        return z * self._multiplier


# Note here that the ordering is established so that the correct sequences
# of operations occurs where the adding and multiplying is done according
# to the expected and typical mathematical model. A graph flow could also be
# used here to automatically infer & ensure the correct ordering.
flow = lf.Flow('root').add(
    # Provide the initial values for other tasks to depend on.
    #
    # x = 2, y = 3, d = 5
    Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
    # z = x+y = 5
    Adder("add-1", provides='z'),
    # a = z+d = 10
    Adder("add-2", provides='a', rebind=['z', 'd']),
    # Calculate 'r = a*3 = 30'
    #
    # Note here that the 'z' argument of the execute() function will not be
    # bound to the 'z' variable provided from the above 'provider' object but
    # instead the 'z' argument will be taken from the 'a' variable provided
    # by the second add-2 listed above.
    Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
)

# The result here will be all results (from all tasks) which is stored in an
# in-memory storage location that backs this engine since it is not configured
# with persistence storage.
results = taskflow.engines.run(flow)
print(results)

Linear equation solver (inferred dependencies)

Source: graph_flow.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task


# In this example there are complex *inferred* dependencies between tasks that
# are used to perform a simple set of linear equations.
#
# As you will see below the tasks just define what they require as input
# and produce as output (named values). Then the user doesn't care about
# ordering the tasks (in this case the tasks calculate pieces of the overall
# equation).
#
# As you will notice a graph flow resolves dependencies automatically using the
# tasks symbol requirements and provided symbol values and no orderin
# dependency has to be manually created.
#
# Also notice that flows of any types can be nested into a graph flow; showing
# that subflow dependencies (and associated ordering) will be inferred too.


class Adder(task.Task):

    def execute(self, x, y):
        return x + y


flow = gf.Flow('root').add(
    lf.Flow('nested_linear').add(
        # x2 = y3+y4 = 12
        Adder("add2", provides='x2', rebind=['y3', 'y4']),
        # x1 = y1+y2 = 4
        Adder("add1", provides='x1', rebind=['y1', 'y2'])
    ),
    # x5 = x1+x3 = 20
    Adder("add5", provides='x5', rebind=['x1', 'x3']),
    # x3 = x1+x2 = 16
    Adder("add3", provides='x3', rebind=['x1', 'x2']),
    # x4 = x2+y5 = 21
    Adder("add4", provides='x4', rebind=['x2', 'y5']),
    # x6 = x5+x4 = 41
    Adder("add6", provides='x6', rebind=['x5', 'x4']),
    # x7 = x6+x6 = 82
    Adder("add7", provides='x7', rebind=['x6', 'x6']))

# Provide the initial variable inputs using a storage dictionary.
store = {
    "y1": 1,
    "y2": 3,
    "y3": 5,
    "y4": 7,
    "y5": 9,
}

# This is the expected values that should be created.
unexpected = 0
expected = [
    ('x1', 4),
    ('x2', 12),
    ('x3', 16),
    ('x4', 21),
    ('x5', 20),
    ('x6', 41),
    ('x7', 82),
]

result = taskflow.engines.run(
    flow, engine='serial', store=store)

print("Single threaded engine result %s" % result)
for (name, value) in expected:
    actual = result.get(name)
    if actual != value:
        sys.stderr.write("%s != %s\n" % (actual, value))
        unexpected += 1

result = taskflow.engines.run(
    flow, engine='parallel', store=store)

print("Multi threaded engine result %s" % result)
for (name, value) in expected:
    actual = result.get(name)
    if actual != value:
        sys.stderr.write("%s != %s\n" % (actual, value))
        unexpected += 1

if unexpected:
    sys.exit(1)

Linear equation solver (in parallel)

Note

Full source located at calculate_in_parallel

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import task

# INTRO: These examples show how a linear flow and an unordered flow can be
# used together to execute calculations in parallel and then use the
# result for the next task/s. The adder task is used for all calculations
# and argument bindings are used to set correct parameters for each task.


# This task provides some values from as a result of execution, this can be
# useful when you want to provide values from a static set to other tasks that
# depend on those values existing before those tasks can run.
#
# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
# that provides those values on engine running by prepopulating the storage
# backend before your tasks are ran (which accomplishes a similar goal in a
# more uniform manner).
class Provider(task.Task):
    def __init__(self, name, *args, **kwargs):
        super(Provider, self).__init__(name=name, **kwargs)
        self._provide = args

    def execute(self):
        return self._provide


# This task adds two input variables and returns the result of that addition.
#
# Note that since this task does not have a revert() function (since addition
# is a stateless operation) there are no side-effects that this function needs
# to undo if some later operation fails.
class Adder(task.Task):
    def execute(self, x, y):
        return x + y


flow = lf.Flow('root').add(
    # Provide the initial values for other tasks to depend on.
    #
    # x1 = 2, y1 = 3, x2 = 5, x3 = 8
    Provider("provide-adder", 2, 3, 5, 8,
             provides=('x1', 'y1', 'x2', 'y2')),
    # Note here that we define the flow that contains the 2 adders to be an
    # unordered flow since the order in which these execute does not matter,
    # another way to solve this would be to use a graph_flow pattern, which
    # also can run in parallel (since they have no ordering dependencies).
    uf.Flow('adders').add(
        # Calculate 'z1 = x1+y1 = 5'
        #
        # Rebind here means that the execute() function x argument will be
        # satisfied from a previous output named 'x1', and the y argument
        # of execute() will be populated from the previous output named 'y1'
        #
        # The output (result of adding) will be mapped into a variable named
        # 'z1' which can then be refereed to and depended on by other tasks.
        Adder(name="add", provides='z1', rebind=['x1', 'y1']),
        # z2 = x2+y2 = 13
        Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
    ),
    # r = z1+z2 = 18
    Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))


# The result here will be all results (from all tasks) which is stored in an
# in-memory storage location that backs this engine since it is not configured
# with persistence storage.
result = taskflow.engines.run(flow, engine='parallel')
print(result)

Creating a volume (in parallel)

Note

Full source located at create_parallel_volume

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import contextlib
import logging
import os
import random
import sys
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from oslo_utils import reflection

from taskflow import engines
from taskflow.listeners import printing
from taskflow.patterns import unordered_flow as uf
from taskflow import task

# INTRO: These examples show how unordered_flow can be used to create a large
# number of fake volumes in parallel (or serially, depending on a constant that
# can be easily changed).


@contextlib.contextmanager
def show_time(name):
    start = time.time()
    yield
    end = time.time()
    print(" -- %s took %0.3f seconds" % (name, end - start))


# This affects how many volumes to create and how much time to *simulate*
# passing for that volume to be created.
MAX_CREATE_TIME = 3
VOLUME_COUNT = 5

# This will be used to determine if all the volumes are created in parallel
# or whether the volumes are created serially (in an undefined ordered since
# a unordered flow is used). Note that there is a disconnection between the
# ordering and the concept of parallelism (since unordered items can still be
# ran in a serial ordering). A typical use-case for offering both is to allow
# for debugging using a serial approach, while when running at a larger scale
# one would likely want to use the parallel approach.
#
# If you switch this flag from serial to parallel you can see the overall
# time difference that this causes.
SERIAL = False
if SERIAL:
    engine = 'serial'
else:
    engine = 'parallel'


class VolumeCreator(task.Task):
    def __init__(self, volume_id):
        # Note here that the volume name is composed of the name of the class
        # along with the volume id that is being created, since a name of a
        # task uniquely identifies that task in storage it is important that
        # the name be relevant and identifiable if the task is recreated for
        # subsequent resumption (if applicable).
        #
        # UUIDs are *not* used as they can not be tied back to a previous tasks
        # state on resumption (since they are unique and will vary for each
        # task that is created). A name based off the volume id that is to be
        # created is more easily tied back to the original task so that the
        # volume create can be resumed/revert, and is much easier to use for
        # audit and tracking purposes.
        base_name = reflection.get_callable_name(self)
        super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
                                                            volume_id))
        self._volume_id = volume_id

    def execute(self):
        print("Making volume %s" % (self._volume_id))
        time.sleep(random.random() * MAX_CREATE_TIME)
        print("Finished making volume %s" % (self._volume_id))


# Assume there is no ordering dependency between volumes.
flow = uf.Flow("volume-maker")
for i in range(0, VOLUME_COUNT):
    flow.add(VolumeCreator(volume_id="vol-%s" % (i)))


# Show how much time the overall engine loading and running takes.
with show_time(name=flow.name.title()):
    eng = engines.load(flow, engine=engine)
    # This context manager automatically adds (and automatically removes) a
    # helpful set of state transition notification printing helper utilities
    # that show you exactly what transitions the engine is going through
    # while running the various volume create tasks.
    with printing.PrintingListener(eng):
        eng.run()

Summation mapper(s) and reducer (in parallel)

Note

Full source located at simple_map_reduce

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

# INTRO: These examples show a simplistic map/reduce implementation where
# a set of mapper(s) will sum a series of input numbers (in parallel) and
# return their individual summed result. A reducer will then use those
# produced values and perform a final summation and this result will then be
# printed (and verified to ensure the calculation was as expected).

import six

from taskflow import engines
from taskflow.patterns import linear_flow
from taskflow.patterns import unordered_flow
from taskflow import task


class SumMapper(task.Task):
    def execute(self, inputs):
        # Sums some set of provided inputs.
        return sum(inputs)


class TotalReducer(task.Task):
    def execute(self, *args, **kwargs):
        # Reduces all mapped summed outputs into a single value.
        total = 0
        for (k, v) in six.iteritems(kwargs):
            # If any other kwargs was passed in, we don't want to use those
            # in the calculation of the total...
            if k.startswith('reduction_'):
                total += v
        return total


def chunk_iter(chunk_size, upperbound):
    """Yields back chunk size pieces from zero to upperbound - 1."""
    chunk = []
    for i in range(0, upperbound):
        chunk.append(i)
        if len(chunk) == chunk_size:
            yield chunk
            chunk = []


# Upper bound of numbers to sum for example purposes...
UPPER_BOUND = 10000

# How many mappers we want to have.
SPLIT = 10

# How big of a chunk we want to give each mapper.
CHUNK_SIZE = UPPER_BOUND // SPLIT

# This will be the workflow we will compose and run.
w = linear_flow.Flow("root")

# The mappers will run in parallel.
store = {}
provided = []
mappers = unordered_flow.Flow('map')
for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
    mapper_name = 'mapper_%s' % i
    # Give that mapper some information to compute.
    store[mapper_name] = chunk
    # The reducer uses all of the outputs of the mappers, so it needs
    # to be recorded that it needs access to them (under a specific name).
    provided.append("reduction_%s" % i)
    mappers.add(SumMapper(name=mapper_name,
                          rebind={'inputs': mapper_name},
                          provides=provided[-1]))
w.add(mappers)

# The reducer will run last (after all the mappers).
w.add(TotalReducer('reducer', requires=provided))

# Now go!
e = engines.load(w, engine='parallel', store=store, max_workers=4)
print("Running a parallel engine with options: %s" % e.options)
e.run()

# Now get the result the reducer created.
total = e.storage.get('reducer')
print("Calculated result = %s" % total)

# Calculate it manually to verify that it worked...
calc_total = sum(range(0, UPPER_BOUND))
if calc_total != total:
    sys.exit(1)

Sharing a thread pool executor (in parallel)

Note

Full source located at share_engine_thread

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import logging
import os
import random
import sys
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import futurist
import six

from taskflow import engines
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.utils import threading_utils as tu

# INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
# run it using a shared thread pool executor to show how a single executor can
# be used with more than one engine (sharing the execution thread pool between
# them); this allows for saving resources and reusing threads in situations
# where this is benefical.


class DelayedTask(task.Task):
    def __init__(self, name):
        super(DelayedTask, self).__init__(name=name)
        self._wait_for = random.random()

    def execute(self):
        print("Running '%s' in thread '%s'" % (self.name, tu.get_ident()))
        time.sleep(self._wait_for)


f1 = uf.Flow("f1")
f1.add(DelayedTask("f1-1"))
f1.add(DelayedTask("f1-2"))

f2 = uf.Flow("f2")
f2.add(DelayedTask("f2-1"))
f2.add(DelayedTask("f2-2"))

# Run them all using the same futures (thread-pool based) executor...
with futurist.ThreadPoolExecutor() as ex:
    e1 = engines.load(f1, engine='parallel', executor=ex)
    e2 = engines.load(f2, engine='parallel', executor=ex)
    iters = [e1.run_iter(), e2.run_iter()]
    # Iterate over a copy (so we can remove from the source list).
    cloned_iters = list(iters)
    while iters:
        # Run a single 'step' of each iterator, forcing each engine to perform
        # some work, then yield, and repeat until each iterator is consumed
        # and there is no more engine work to be done.
        for it in cloned_iters:
            try:
                six.next(it)
            except StopIteration:
                try:
                    iters.remove(it)
                except ValueError:
                    pass

Storing & emitting a bill

Note

Full source located at fake_billing

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import json
import logging
import os
import sys
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from oslo_utils import uuidutils

from taskflow import engines
from taskflow.listeners import printing
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.utils import misc

# INTRO: This example walks through a miniature workflow which simulates
# the reception of an API request, creation of a database entry, driver
# activation (which invokes a 'fake' webservice) and final completion.
#
# This example also shows how a function/object (in this class the url sending)
# that occurs during driver activation can update the progress of a task
# without being aware of the internals of how to do this by associating a
# callback that the url sending can update as the sending progresses from 0.0%
# complete to 100% complete.


class DB(object):
    def query(self, sql):
        print("Querying with: %s" % (sql))


class UrlCaller(object):
    def __init__(self):
        self._send_time = 0.5
        self._chunks = 25

    def send(self, url, data, status_cb=None):
        sleep_time = float(self._send_time) / self._chunks
        for i in range(0, len(data)):
            time.sleep(sleep_time)
            # As we send the data, each chunk we 'fake' send will progress
            # the sending progress that much further to 100%.
            if status_cb:
                status_cb(float(i) / len(data))


# Since engines save the output of tasks to a optional persistent storage
# backend resources have to be dealt with in a slightly different manner since
# resources are transient and can *not* be persisted (or serialized). For tasks
# that require access to a set of resources it is a common pattern to provide
# a object (in this case this object) on construction of those tasks via the
# task constructor.
class ResourceFetcher(object):
    def __init__(self):
        self._db_handle = None
        self._url_handle = None

    @property
    def db_handle(self):
        if self._db_handle is None:
            self._db_handle = DB()
        return self._db_handle

    @property
    def url_handle(self):
        if self._url_handle is None:
            self._url_handle = UrlCaller()
        return self._url_handle


class ExtractInputRequest(task.Task):
    def __init__(self, resources):
        super(ExtractInputRequest, self).__init__(provides="parsed_request")
        self._resources = resources

    def execute(self, request):
        return {
            'user': request.user,
            'user_id': misc.as_int(request.id),
            'request_id': uuidutils.generate_uuid(),
        }


class MakeDBEntry(task.Task):
    def __init__(self, resources):
        super(MakeDBEntry, self).__init__()
        self._resources = resources

    def execute(self, parsed_request):
        db_handle = self._resources.db_handle
        db_handle.query("INSERT %s INTO mydb" % (parsed_request))

    def revert(self, result, parsed_request):
        db_handle = self._resources.db_handle
        db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))


class ActivateDriver(task.Task):
    def __init__(self, resources):
        super(ActivateDriver, self).__init__(provides='sent_to')
        self._resources = resources
        self._url = "http://blahblah.com"

    def execute(self, parsed_request):
        print("Sending billing data to %s" % (self._url))
        url_sender = self._resources.url_handle
        # Note that here we attach our update_progress function (which is a
        # function that the engine also 'binds' to) to the progress function
        # that the url sending helper class uses. This allows the task progress
        # to be tied to the url sending progress, which is very useful for
        # downstream systems to be aware of what a task is doing at any time.
        url_sender.send(self._url, json.dumps(parsed_request),
                        status_cb=self.update_progress)
        return self._url

    def update_progress(self, progress, **kwargs):
        # Override the parent method to also print out the status.
        super(ActivateDriver, self).update_progress(progress, **kwargs)
        print("%s is %0.2f%% done" % (self.name, progress * 100))


class DeclareSuccess(task.Task):
    def execute(self, sent_to):
        print("Done!")
        print("All data processed and sent to %s" % (sent_to))


class DummyUser(object):
    def __init__(self, user, id_):
        self.user = user
        self.id = id_


# Resources (db handles and similar) of course can *not* be persisted so we
# need to make sure that we pass this resource fetcher to the tasks constructor
# so that the tasks have access to any needed resources (the resources are
# lazily loaded so that they are only created when they are used).
resources = ResourceFetcher()
flow = lf.Flow("initialize-me")

# 1. First we extract the api request into a usable format.
# 2. Then we go ahead and make a database entry for our request.
flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))

# 3. Then we activate our payment method and finally declare success.
sub_flow = gf.Flow("after-initialize")
sub_flow.add(ActivateDriver(resources), DeclareSuccess())
flow.add(sub_flow)

# Initially populate the storage with the following request object,
# prepopulating this allows the tasks that dependent on the 'request' variable
# to start processing (in this case this is the ExtractInputRequest task).
store = {
    'request': DummyUser(user="bob", id_="1.35"),
}
eng = engines.load(flow, engine='serial', store=store)

# This context manager automatically adds (and automatically removes) a
# helpful set of state transition notification printing helper utilities
# that show you exactly what transitions the engine is going through
# while running the various billing related tasks.
with printing.PrintingListener(eng):
    eng.run()

Suspending a workflow & resuming

Note

Full source located at resume_from_backend

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import contextlib
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from oslo_utils import uuidutils

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import models
from taskflow import task

import example_utils as eu  # noqa

# INTRO: In this example linear_flow is used to group three tasks, one which
# will suspend the future work the engine may do. This suspend engine is then
# discarded and the workflow is reloaded from the persisted data and then the
# workflow is resumed from where it was suspended. This allows you to see how
# to start an engine, have a task stop the engine from doing future work (if
# a multi-threaded engine is being used, then the currently active work is not
# preempted) and then resume the work later.
#
# Usage:
#
#   With a filesystem directory as backend
#
#     python taskflow/examples/resume_from_backend.py
#
#   With ZooKeeper as backend
#
#     python taskflow/examples/resume_from_backend.py \
#       zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/


# UTILITY FUNCTIONS #########################################


def print_task_states(flowdetail, msg):
    eu.print_wrapped(msg)
    print("Flow '%s' state: %s" % (flowdetail.name, flowdetail.state))
    # Sort by these so that our test validation doesn't get confused by the
    # order in which the items in the flow detail can be in.
    items = sorted((td.name, td.version, td.state, td.results)
                   for td in flowdetail)
    for item in items:
        print(" %s==%s: %s, result=%s" % item)


def find_flow_detail(backend, lb_id, fd_id):
    conn = backend.get_connection()
    lb = conn.get_logbook(lb_id)
    return lb.find(fd_id)


# CREATE FLOW ###############################################


class InterruptTask(task.Task):
    def execute(self):
        # DO NOT TRY THIS AT HOME
        engine.suspend()


class TestTask(task.Task):
    def execute(self):
        print('executing %s' % self)
        return 'ok'


def flow_factory():
    return lf.Flow('resume from backend example').add(
        TestTask(name='first'),
        InterruptTask(name='boom'),
        TestTask(name='second'))


# INITIALIZE PERSISTENCE ####################################

with eu.get_backend() as backend:

    # Create a place where the persistence information will be stored.
    book = models.LogBook("example")
    flow_detail = models.FlowDetail("resume from backend example",
                                    uuid=uuidutils.generate_uuid())
    book.add(flow_detail)
    with contextlib.closing(backend.get_connection()) as conn:
        conn.save_logbook(book)

    # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################

    flow = flow_factory()
    engine = taskflow.engines.load(flow, flow_detail=flow_detail,
                                   book=book, backend=backend)

    print_task_states(flow_detail, "At the beginning, there is no state")
    eu.print_wrapped("Running")
    engine.run()
    print_task_states(flow_detail, "After running")

    # RE-CREATE, RESUME, RUN ####################################

    eu.print_wrapped("Resuming and running again")

    # NOTE(harlowja): reload the flow detail from backend, this will allow us
    # to resume the flow from its suspended state, but first we need to search
    # for the right flow details in the correct logbook where things are
    # stored.
    #
    # We could avoid re-loading the engine and just do engine.run() again, but
    # this example shows how another process may unsuspend a given flow and
    # start it again for situations where this is useful to-do (say the process
    # running the above flow crashes).
    flow2 = flow_factory()
    flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
    engine2 = taskflow.engines.load(flow2,
                                    flow_detail=flow_detail_2,
                                    backend=backend, book=book)
    engine2.run()
    print_task_states(flow_detail_2, "At the end")

Creating a virtual machine (resumable)

Note

Full source located at resume_vm_boot

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import contextlib
import hashlib
import logging
import os
import random
import sys
import time

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

import futurist
from oslo_utils import uuidutils

from taskflow import engines
from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import models
from taskflow import task

import example_utils as eu  # noqa

# INTRO: These examples show how a hierarchy of flows can be used to create a
# vm in a reliable & resumable manner using taskflow + a miniature version of
# what nova does while booting a vm.


@contextlib.contextmanager
def slow_down(how_long=0.5):
    try:
        yield how_long
    finally:
        if len(sys.argv) > 1:
            # Only both to do this if user input provided.
            print("** Ctrl-c me please!!! **")
            time.sleep(how_long)


class PrintText(task.Task):
    """Just inserts some text print outs in a workflow."""
    def __init__(self, print_what, no_slow=False):
        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
        super(PrintText, self).__init__(name="Print: %s" % (content_hash))
        self._text = print_what
        self._no_slow = no_slow

    def execute(self):
        if self._no_slow:
            eu.print_wrapped(self._text)
        else:
            with slow_down():
                eu.print_wrapped(self._text)


class DefineVMSpec(task.Task):
    """Defines a vm specification to be."""
    def __init__(self, name):
        super(DefineVMSpec, self).__init__(provides='vm_spec', name=name)

    def execute(self):
        return {
            'type': 'kvm',
            'disks': 2,
            'vcpu': 1,
            'ips': 1,
            'volumes': 3,
        }


class LocateImages(task.Task):
    """Locates where the vm images are."""
    def __init__(self, name):
        super(LocateImages, self).__init__(provides='image_locations',
                                           name=name)

    def execute(self, vm_spec):
        image_locations = {}
        for i in range(0, vm_spec['disks']):
            url = "http://www.yahoo.com/images/%s" % (i)
            image_locations[url] = "/tmp/%s.img" % (i)
        return image_locations


class DownloadImages(task.Task):
    """Downloads all the vm images."""
    def __init__(self, name):
        super(DownloadImages, self).__init__(provides='download_paths',
                                             name=name)

    def execute(self, image_locations):
        for src, loc in image_locations.items():
            with slow_down(1):
                print("Downloading from %s => %s" % (src, loc))
        return sorted(image_locations.values())


class CreateNetworkTpl(task.Task):
    """Generates the network settings file to be placed in the images."""
    SYSCONFIG_CONTENTS = """DEVICE=eth%s
BOOTPROTO=static
IPADDR=%s
ONBOOT=yes"""

    def __init__(self, name):
        super(CreateNetworkTpl, self).__init__(provides='network_settings',
                                               name=name)

    def execute(self, ips):
        settings = []
        for i, ip in enumerate(ips):
            settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
        return settings


class AllocateIP(task.Task):
    """Allocates the ips for the given vm."""
    def __init__(self, name):
        super(AllocateIP, self).__init__(provides='ips', name=name)

    def execute(self, vm_spec):
        ips = []
        for _i in range(0, vm_spec.get('ips', 0)):
            ips.append("192.168.0.%s" % (random.randint(1, 254)))
        return ips


class WriteNetworkSettings(task.Task):
    """Writes all the network settings into the downloaded images."""
    def execute(self, download_paths, network_settings):
        for j, path in enumerate(download_paths):
            with slow_down(1):
                print("Mounting %s to /tmp/%s" % (path, j))
            for i, setting in enumerate(network_settings):
                filename = ("/tmp/etc/sysconfig/network-scripts/"
                            "ifcfg-eth%s" % (i))
                with slow_down(1):
                    print("Writing to %s" % (filename))
                    print(setting)


class BootVM(task.Task):
    """Fires off the vm boot operation."""
    def execute(self, vm_spec):
        print("Starting vm!")
        with slow_down(1):
            print("Created: %s" % (vm_spec))


class AllocateVolumes(task.Task):
    """Allocates the volumes for the vm."""
    def execute(self, vm_spec):
        volumes = []
        for i in range(0, vm_spec['volumes']):
            with slow_down(1):
                volumes.append("/dev/vda%s" % (i + 1))
                print("Allocated volume %s" % volumes[-1])
        return volumes


class FormatVolumes(task.Task):
    """Formats the volumes for the vm."""
    def execute(self, volumes):
        for v in volumes:
            print("Formatting volume %s" % v)
            with slow_down(1):
                pass
            print("Formatted volume %s" % v)


def create_flow():
    # Setup the set of things to do (mini-nova).
    flow = lf.Flow("root").add(
        PrintText("Starting vm creation.", no_slow=True),
        lf.Flow('vm-maker').add(
            # First create a specification for the final vm to-be.
            DefineVMSpec("define_spec"),
            # This does all the image stuff.
            gf.Flow("img-maker").add(
                LocateImages("locate_images"),
                DownloadImages("download_images"),
            ),
            # This does all the network stuff.
            gf.Flow("net-maker").add(
                AllocateIP("get_my_ips"),
                CreateNetworkTpl("fetch_net_settings"),
                WriteNetworkSettings("write_net_settings"),
            ),
            # This does all the volume stuff.
            gf.Flow("volume-maker").add(
                AllocateVolumes("allocate_my_volumes", provides='volumes'),
                FormatVolumes("volume_formatter"),
            ),
            # Finally boot it all.
            BootVM("boot-it"),
        ),
        # Ya it worked!
        PrintText("Finished vm create.", no_slow=True),
        PrintText("Instance is running!", no_slow=True))
    return flow

eu.print_wrapped("Initializing")

# Setup the persistence & resumption layer.
with eu.get_backend() as backend:

    # Try to find a previously passed in tracking id...
    try:
        book_id, flow_id = sys.argv[2].split("+", 1)
        if not uuidutils.is_uuid_like(book_id):
            book_id = None
        if not uuidutils.is_uuid_like(flow_id):
            flow_id = None
    except (IndexError, ValueError):
        book_id = None
        flow_id = None

    # Set up how we want our engine to run, serial, parallel...
    try:
        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
    except RuntimeError:
        # No eventlet installed, just let the default be used instead.
        executor = None

    # Create/fetch a logbook that will track the workflows work.
    book = None
    flow_detail = None
    if all([book_id, flow_id]):
        # Try to find in a prior logbook and flow detail...
        with contextlib.closing(backend.get_connection()) as conn:
            try:
                book = conn.get_logbook(book_id)
                flow_detail = book.find(flow_id)
            except exc.NotFound:
                pass
    if book is None and flow_detail is None:
        book = models.LogBook("vm-boot")
        with contextlib.closing(backend.get_connection()) as conn:
            conn.save_logbook(book)
        engine = engines.load_from_factory(create_flow,
                                           backend=backend, book=book,
                                           engine='parallel',
                                           executor=executor)
        print("!! Your tracking id is: '%s+%s'" % (book.uuid,
                                                   engine.storage.flow_uuid))
        print("!! Please submit this on later runs for tracking purposes")
    else:
        # Attempt to load from a previously partially completed flow.
        engine = engines.load_from_detail(flow_detail, backend=backend,
                                          engine='parallel', executor=executor)

    # Make me my vm please!
    eu.print_wrapped('Running')
    engine.run()

# How to use.
#
# 1. $ python me.py "sqlite:////tmp/nova.db"
# 2. ctrl-c before this finishes
# 3. Find the tracking id (search for 'Your tracking id is')
# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
# 5. Watch it pick up where it left off.
# 6. Profit!

Creating a volume (resumable)

Note

Full source located at resume_volume_create

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import contextlib
import hashlib
import logging
import os
import random
import sys
import time

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)

from oslo_utils import uuidutils

from taskflow import engines
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import models
from taskflow import task

import example_utils  # noqa

# INTRO: These examples show how a hierarchy of flows can be used to create a
# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
# version of what cinder does while creating a volume (very miniature).


@contextlib.contextmanager
def slow_down(how_long=0.5):
    try:
        yield how_long
    finally:
        print("** Ctrl-c me please!!! **")
        time.sleep(how_long)


def find_flow_detail(backend, book_id, flow_id):
    # NOTE(harlowja): this is used to attempt to find a given logbook with
    # a given id and a given flow details inside that logbook, we need this
    # reference so that we can resume the correct flow (as a logbook tracks
    # flows and a flow detail tracks a individual flow).
    #
    # Without a reference to the logbook and the flow details in that logbook
    # we will not know exactly what we should resume and that would mean we
    # can't resume what we don't know.
    with contextlib.closing(backend.get_connection()) as conn:
        lb = conn.get_logbook(book_id)
        return lb.find(flow_id)


class PrintText(task.Task):
    def __init__(self, print_what, no_slow=False):
        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
        super(PrintText, self).__init__(name="Print: %s" % (content_hash))
        self._text = print_what
        self._no_slow = no_slow

    def execute(self):
        if self._no_slow:
            print("-" * (len(self._text)))
            print(self._text)
            print("-" * (len(self._text)))
        else:
            with slow_down():
                print("-" * (len(self._text)))
                print(self._text)
                print("-" * (len(self._text)))


class CreateSpecForVolumes(task.Task):
    def execute(self):
        volumes = []
        for i in range(0, random.randint(1, 10)):
            volumes.append({
                'type': 'disk',
                'location': "/dev/vda%s" % (i + 1),
            })
        return volumes


class PrepareVolumes(task.Task):
    def execute(self, volume_specs):
        for v in volume_specs:
            with slow_down():
                print("Dusting off your hard drive %s" % (v))
            with slow_down():
                print("Taking a well deserved break.")
            print("Your drive %s has been certified." % (v))


# Setup the set of things to do (mini-cinder).
flow = lf.Flow("root").add(
    PrintText("Starting volume create", no_slow=True),
    gf.Flow('maker').add(
        CreateSpecForVolumes("volume_specs", provides='volume_specs'),
        PrintText("I need a nap, it took me a while to build those specs."),
        PrepareVolumes(),
    ),
    PrintText("Finished volume create", no_slow=True))

# Setup the persistence & resumption layer.
with example_utils.get_backend() as backend:
    try:
        book_id, flow_id = sys.argv[2].split("+", 1)
    except (IndexError, ValueError):
        book_id = None
        flow_id = None

    if not all([book_id, flow_id]):
        # If no 'tracking id' (think a fedex or ups tracking id) is provided
        # then we create one by creating a logbook (where flow details are
        # stored) and creating a flow detail (where flow and task state is
        # stored). The combination of these 2 objects unique ids (uuids) allows
        # the users of taskflow to reassociate the workflows that were
        # potentially running (and which may have partially completed) back
        # with taskflow so that those workflows can be resumed (or reverted)
        # after a process/thread/engine has failed in someway.
        book = models.LogBook('resume-volume-create')
        flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
        book.add(flow_detail)
        with contextlib.closing(backend.get_connection()) as conn:
            conn.save_logbook(book)
        print("!! Your tracking id is: '%s+%s'" % (book.uuid,
                                                   flow_detail.uuid))
        print("!! Please submit this on later runs for tracking purposes")
    else:
        flow_detail = find_flow_detail(backend, book_id, flow_id)

    # Load and run.
    engine = engines.load(flow,
                          flow_detail=flow_detail,
                          backend=backend, engine='serial')
    engine.run()

# How to use.
#
# 1. $ python me.py "sqlite:////tmp/cinder.db"
# 2. ctrl-c before this finishes
# 3. Find the tracking id (search for 'Your tracking id is')
# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
# 5. Profit!

Running engines via iteration

Note

Full source located at run_by_iter

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import logging
import os
import sys

import six

logging.basicConfig(level=logging.ERROR)

self_dir = os.path.abspath(os.path.dirname(__file__))
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)
sys.path.insert(0, self_dir)


from taskflow import engines
from taskflow.patterns import linear_flow as lf
from taskflow import task


# INTRO: This example shows how to run a set of engines at the same time, each
# running in different engines using a single thread of control to iterate over
# each engine (which causes that engine to advanced to its next state during
# each iteration).


class EchoTask(task.Task):
    def execute(self, value):
        print(value)
        return chr(ord(value) + 1)


def make_alphabet_flow(i):
    f = lf.Flow("alphabet_%s" % (i))
    start_value = 'A'
    end_value = 'Z'
    curr_value = start_value
    while ord(curr_value) <= ord(end_value):
        next_value = chr(ord(curr_value) + 1)
        if curr_value != end_value:
            f.add(EchoTask(name="echoer_%s" % curr_value,
                           rebind={'value': curr_value},
                           provides=next_value))
        else:
            f.add(EchoTask(name="echoer_%s" % curr_value,
                           rebind={'value': curr_value}))
        curr_value = next_value
    return f


# Adjust this number to change how many engines/flows run at once.
flow_count = 1
flows = []
for i in range(0, flow_count):
    f = make_alphabet_flow(i + 1)
    flows.append(make_alphabet_flow(i + 1))
engine_iters = []
for f in flows:
    e = engines.load(f)
    e.compile()
    e.storage.inject({'A': 'A'})
    e.prepare()
    engine_iters.append(e.run_iter())
while engine_iters:
    for it in list(engine_iters):
        try:
            print(six.next(it))
        except StopIteration:
            engine_iters.remove(it)

Controlling retries using a retry controller

Note

Full source located at retry_flow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import logging
import os
import sys

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import taskflow.engines
from taskflow.patterns import linear_flow as lf
from taskflow import retry
from taskflow import task

# INTRO: In this example we create a retry controller that receives a phone
# directory and tries different phone numbers. The next task tries to call Jim
# using the given number. If it is not a Jim's number, the task raises an
# exception and retry controller takes the next number from the phone
# directory and retries the call.
#
# This example shows a basic usage of retry controllers in a flow.
# Retry controllers allows to revert and retry a failed subflow with new
# parameters.


class CallJim(task.Task):
    def execute(self, jim_number):
        print ("Calling jim %s." % jim_number)
        if jim_number != 555:
            raise Exception("Wrong number!")
        else:
            print ("Hello Jim!")

    def revert(self, jim_number, **kwargs):
        print ("Wrong number, apologizing.")


# Create your flow and associated tasks (the work to be done).
flow = lf.Flow('retrying-linear',
               retry=retry.ParameterizedForEach(
                   rebind=['phone_directory'],
                   provides='jim_number')).add(CallJim())

# Now run that flow using the provided initial data (store below).
taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})

Distributed execution (simple)

Note

Full source located at wbe_simple_linear

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import json
import logging
import os
import sys
import tempfile

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import linear_flow as lf
from taskflow.tests import utils
from taskflow.utils import threading_utils

import example_utils  # noqa

# INTRO: This example walks through a miniature workflow which shows how to
# start up a number of workers (these workers will process task execution and
# reversion requests using any provided input data) and then use an engine
# that creates a set of *capable* tasks and flows (the engine can not create
# tasks that the workers are not able to run, this will end in failure) that
# those workers will run and then executes that workflow seamlessly using the
# workers to perform the actual execution.
#
# NOTE(harlowja): this example simulates the expected larger number of workers
# by using a set of threads (which in this example simulate the remote workers
# that would typically be running on other external machines).

# A filesystem can also be used as the queue transport (useful as simple
# transport type that does not involve setting up a larger mq system). If this
# is false then the memory transport is used instead, both work in standalone
# setups.
USE_FILESYSTEM = False
BASE_SHARED_CONF = {
    'exchange': 'taskflow',
}

# Until https://github.com/celery/kombu/issues/398 is resolved it is not
# recommended to run many worker threads in this example due to the types
# of errors mentioned in that issue.
MEMORY_WORKERS = 2
FILE_WORKERS = 1
WORKER_CONF = {
    # These are the tasks the worker can execute, they *must* be importable,
    # typically this list is used to restrict what workers may execute to
    # a smaller set of *allowed* tasks that are known to be safe (one would
    # not want to allow all python code to be executed).
    'tasks': [
        'taskflow.tests.utils:TaskOneArgOneReturn',
        'taskflow.tests.utils:TaskMultiArgOneReturn'
    ],
}


def run(engine_options):
    flow = lf.Flow('simple-linear').add(
        utils.TaskOneArgOneReturn(provides='result1'),
        utils.TaskMultiArgOneReturn(provides='result2')
    )
    eng = engines.load(flow,
                       store=dict(x=111, y=222, z=333),
                       engine='worker-based', **engine_options)
    eng.run()
    return eng.storage.fetch_all()


if __name__ == "__main__":
    logging.basicConfig(level=logging.ERROR)

    # Setup our transport configuration and merge it into the worker and
    # engine configuration so that both of those use it correctly.
    shared_conf = dict(BASE_SHARED_CONF)

    tmp_path = None
    if USE_FILESYSTEM:
        worker_count = FILE_WORKERS
        tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
        shared_conf.update({
            'transport': 'filesystem',
            'transport_options': {
                'data_folder_in': tmp_path,
                'data_folder_out': tmp_path,
                'polling_interval': 0.1,
            },
        })
    else:
        worker_count = MEMORY_WORKERS
        shared_conf.update({
            'transport': 'memory',
            'transport_options': {
                'polling_interval': 0.1,
            },
        })
    worker_conf = dict(WORKER_CONF)
    worker_conf.update(shared_conf)
    engine_options = dict(shared_conf)
    workers = []
    worker_topics = []

    try:
        # Create a set of workers to simulate actual remote workers.
        print('Running %s workers.' % (worker_count))
        for i in range(0, worker_count):
            worker_conf['topic'] = 'worker-%s' % (i + 1)
            worker_topics.append(worker_conf['topic'])
            w = worker.Worker(**worker_conf)
            runner = threading_utils.daemon_thread(w.run)
            runner.start()
            w.wait()
            workers.append((runner, w.stop))

        # Now use those workers to do something.
        print('Executing some work.')
        engine_options['topics'] = worker_topics
        result = run(engine_options)
        print('Execution finished.')
        # This is done so that the test examples can work correctly
        # even when the keys change order (which will happen in various
        # python versions).
        print("Result = %s" % json.dumps(result, sort_keys=True))
    finally:
        # And cleanup.
        print('Stopping workers.')
        while workers:
            r, stopper = workers.pop()
            stopper()
            r.join()
        if tmp_path:
            example_utils.rm_path(tmp_path)

Distributed notification (simple)

Note

Full source located at wbe_event_sender

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import logging
import os
import string
import sys
import time

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from six.moves import range as compat_range

from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import linear_flow as lf
from taskflow import task
from taskflow.types import notifier
from taskflow.utils import threading_utils

ANY = notifier.Notifier.ANY

# INTRO: These examples show how to use a remote worker's event notification
# attribute to proxy back task event notifications to the controlling process.
#
# In this case a simple set of events is triggered by a worker running a
# task (simulated to be remote by using a kombu memory transport and threads).
# Those events that the 'remote worker' produces will then be proxied back to
# the task that the engine is running 'remotely', and then they will be emitted
# back to the original callbacks that exist in the originating engine
# process/thread. This creates a one-way *notification* channel that can
# transparently be used in-process, outside-of-process using remote workers and
# so-on that allows tasks to signal to its controlling process some sort of
# action that has occurred that the task may need to tell others about (for
# example to trigger some type of response when the task reaches 50% done...).


def event_receiver(event_type, details):
    """This is the callback that (in this example) doesn't do much..."""
    print("Recieved event '%s'" % event_type)
    print("Details = %s" % details)


class EventReporter(task.Task):
    """This is the task that will be running 'remotely' (not really remote)."""

    EVENTS = tuple(string.ascii_uppercase)
    EVENT_DELAY = 0.1

    def execute(self):
        for i, e in enumerate(self.EVENTS):
            details = {
                'leftover': self.EVENTS[i:],
            }
            self.notifier.notify(e, details)
            time.sleep(self.EVENT_DELAY)


BASE_SHARED_CONF = {
    'exchange': 'taskflow',
    'transport': 'memory',
    'transport_options': {
        'polling_interval': 0.1,
    },
}

# Until https://github.com/celery/kombu/issues/398 is resolved it is not
# recommended to run many worker threads in this example due to the types
# of errors mentioned in that issue.
MEMORY_WORKERS = 1
WORKER_CONF = {
    'tasks': [
        # Used to locate which tasks we can run (we don't want to allow
        # arbitrary code/tasks to be ran by any worker since that would
        # open up a variety of vulnerabilities).
        '%s:EventReporter' % (__name__),
    ],
}


def run(engine_options):
    reporter = EventReporter()
    reporter.notifier.register(ANY, event_receiver)
    flow = lf.Flow('event-reporter').add(reporter)
    eng = engines.load(flow, engine='worker-based', **engine_options)
    eng.run()


if __name__ == "__main__":
    logging.basicConfig(level=logging.ERROR)

    # Setup our transport configuration and merge it into the worker and
    # engine configuration so that both of those objects use it correctly.
    worker_conf = dict(WORKER_CONF)
    worker_conf.update(BASE_SHARED_CONF)
    engine_options = dict(BASE_SHARED_CONF)
    workers = []

    # These topics will be used to request worker information on; those
    # workers will respond with their capabilities which the executing engine
    # will use to match pending tasks to a matched worker, this will cause
    # the task to be sent for execution, and the engine will wait until it
    # is finished (a response is received) and then the engine will either
    # continue with other tasks, do some retry/failure resolution logic or
    # stop (and potentially re-raise the remote workers failure)...
    worker_topics = []

    try:
        # Create a set of worker threads to simulate actual remote workers...
        print('Running %s workers.' % (MEMORY_WORKERS))
        for i in compat_range(0, MEMORY_WORKERS):
            # Give each one its own unique topic name so that they can
            # correctly communicate with the engine (they will all share the
            # same exchange).
            worker_conf['topic'] = 'worker-%s' % (i + 1)
            worker_topics.append(worker_conf['topic'])
            w = worker.Worker(**worker_conf)
            runner = threading_utils.daemon_thread(w.run)
            runner.start()
            w.wait()
            workers.append((runner, w.stop))

        # Now use those workers to do something.
        print('Executing some work.')
        engine_options['topics'] = worker_topics
        result = run(engine_options)
        print('Execution finished.')
    finally:
        # And cleanup.
        print('Stopping workers.')
        while workers:
            r, stopper = workers.pop()
            stopper()
            r.join()

Distributed mandelbrot (complex)

Note

Full source located at wbe_mandelbrot

Output

Generated mandelbrot fractal

Code

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import logging
import math
import os
import sys

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from six.moves import range as compat_range

from taskflow import engines
from taskflow.engines.worker_based import worker
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.utils import threading_utils

# INTRO: This example walks through a workflow that will in parallel compute
# a mandelbrot result set (using X 'remote' workers) and then combine their
# results together to form a final mandelbrot fractal image. It shows a usage
# of taskflow to perform a well-known embarrassingly parallel problem that has
# the added benefit of also being an elegant visualization.
#
# NOTE(harlowja): this example simulates the expected larger number of workers
# by using a set of threads (which in this example simulate the remote workers
# that would typically be running on other external machines).
#
# NOTE(harlowja): to have it produce an image run (after installing pillow):
#
# $ python taskflow/examples/wbe_mandelbrot.py output.png

BASE_SHARED_CONF = {
    'exchange': 'taskflow',
}
WORKERS = 2
WORKER_CONF = {
    # These are the tasks the worker can execute, they *must* be importable,
    # typically this list is used to restrict what workers may execute to
    # a smaller set of *allowed* tasks that are known to be safe (one would
    # not want to allow all python code to be executed).
    'tasks': [
        '%s:MandelCalculator' % (__name__),
    ],
}
ENGINE_CONF = {
    'engine': 'worker-based',
}

# Mandelbrot & image settings...
IMAGE_SIZE = (512, 512)
CHUNK_COUNT = 8
MAX_ITERATIONS = 25


class MandelCalculator(task.Task):
    def execute(self, image_config, mandelbrot_config, chunk):
        """Returns the number of iterations before the computation "escapes".

        Given the real and imaginary parts of a complex number, determine if it
        is a candidate for membership in the mandelbrot set given a fixed
        number of iterations.
        """

        # Parts borrowed from (credit to mark harris and benoît mandelbrot).
        #
        # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
        def mandelbrot(x, y, max_iters):
            c = complex(x, y)
            z = 0.0j
            for i in compat_range(max_iters):
                z = z * z + c
                if (z.real * z.real + z.imag * z.imag) >= 4:
                    return i
            return max_iters

        min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
        height, width = image_config['size']
        pixel_size_x = (max_x - min_x) / width
        pixel_size_y = (max_y - min_y) / height
        block = []
        for y in compat_range(chunk[0], chunk[1]):
            row = []
            imag = min_y + y * pixel_size_y
            for x in compat_range(0, width):
                real = min_x + x * pixel_size_x
                row.append(mandelbrot(real, imag, max_iters))
            block.append(row)
        return block


def calculate(engine_conf):
    # Subdivide the work into X pieces, then request each worker to calculate
    # one of those chunks and then later we will write these chunks out to
    # an image bitmap file.

    # And unordered flow is used here since the mandelbrot calculation is an
    # example of an embarrassingly parallel computation that we can scatter
    # across as many workers as possible.
    flow = uf.Flow("mandelbrot")

    # These symbols will be automatically given to tasks as input to their
    # execute method, in this case these are constants used in the mandelbrot
    # calculation.
    store = {
        'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
        'image_config': {
            'size': IMAGE_SIZE,
        }
    }

    # We need the task names to be in the right order so that we can extract
    # the final results in the right order (we don't care about the order when
    # executing).
    task_names = []

    # Compose our workflow.
    height, _width = IMAGE_SIZE
    chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
    for i in compat_range(0, CHUNK_COUNT):
        chunk_name = 'chunk_%s' % i
        task_name = "calculation_%s" % i
        # Break the calculation up into chunk size pieces.
        rows = [i * chunk_size, i * chunk_size + chunk_size]
        flow.add(
            MandelCalculator(task_name,
                             # This ensures the storage symbol with name
                             # 'chunk_name' is sent into the tasks local
                             # symbol 'chunk'. This is how we give each
                             # calculator its own correct sequence of rows
                             # to work on.
                             rebind={'chunk': chunk_name}))
        store[chunk_name] = rows
        task_names.append(task_name)

    # Now execute it.
    eng = engines.load(flow, store=store, engine_conf=engine_conf)
    eng.run()

    # Gather all the results and order them for further processing.
    gather = []
    for name in task_names:
        gather.extend(eng.storage.get(name))
    points = []
    for y, row in enumerate(gather):
        for x, color in enumerate(row):
            points.append(((x, y), color))
    return points


def write_image(results, output_filename=None):
    print("Gathered %s results that represents a mandelbrot"
          " image (using %s chunks that are computed jointly"
          " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
    if not output_filename:
        return

    # Pillow (the PIL fork) saves us from writing our own image writer...
    try:
        from PIL import Image
    except ImportError as e:
        # To currently get this (may change in the future),
        # $ pip install Pillow
        raise RuntimeError("Pillow is required to write image files: %s" % e)

    # Limit to 255, find the max and normalize to that...
    color_max = 0
    for _point, color in results:
        color_max = max(color, color_max)

    # Use gray scale since we don't really have other colors.
    img = Image.new('L', IMAGE_SIZE, "black")
    pixels = img.load()
    for (x, y), color in results:
        if color_max == 0:
            color = 0
        else:
            color = int((float(color) / color_max) * 255.0)
        pixels[x, y] = color
    img.save(output_filename)


def create_fractal():
    logging.basicConfig(level=logging.ERROR)

    # Setup our transport configuration and merge it into the worker and
    # engine configuration so that both of those use it correctly.
    shared_conf = dict(BASE_SHARED_CONF)
    shared_conf.update({
        'transport': 'memory',
        'transport_options': {
            'polling_interval': 0.1,
        },
    })

    if len(sys.argv) >= 2:
        output_filename = sys.argv[1]
    else:
        output_filename = None

    worker_conf = dict(WORKER_CONF)
    worker_conf.update(shared_conf)
    engine_conf = dict(ENGINE_CONF)
    engine_conf.update(shared_conf)
    workers = []
    worker_topics = []

    print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
    try:
        # Create a set of workers to simulate actual remote workers.
        print('Running %s workers.' % (WORKERS))
        for i in compat_range(0, WORKERS):
            worker_conf['topic'] = 'calculator_%s' % (i + 1)
            worker_topics.append(worker_conf['topic'])
            w = worker.Worker(**worker_conf)
            runner = threading_utils.daemon_thread(w.run)
            runner.start()
            w.wait()
            workers.append((runner, w.stop))

        # Now use those workers to do something.
        engine_conf['topics'] = worker_topics
        results = calculate(engine_conf)
        print('Execution finished.')
    finally:
        # And cleanup.
        print('Stopping workers.')
        while workers:
            r, stopper = workers.pop()
            stopper()
            r.join()
    print("Writing image...")
    write_image(results, output_filename=output_filename)


if __name__ == "__main__":
    create_fractal()

Jobboard producer/consumer (simple)

Note

Full source located at jobboard_produce_consume_colors

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import collections
import contextlib
import logging
import os
import random
import sys
import threading
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

import six
from six.moves import range as compat_range
from zake import fake_client

from taskflow import exceptions as excp
from taskflow.jobs import backends
from taskflow.utils import threading_utils

# In this example we show how a jobboard can be used to post work for other
# entities to work on. This example creates a set of jobs using one producer
# thread (typically this would be split across many machines) and then having
# other worker threads with their own jobboards select work using a given
# filters [red/blue] and then perform that work (and consuming or abandoning
# the job after it has been completed or failed).

# Things to note:
# - No persistence layer is used (or logbook), just the job details are used
#   to determine if a job should be selected by a worker or not.
# - This example runs in a single process (this is expected to be atypical
#   but this example shows that it can be done if needed, for testing...)
# - The iterjobs(), claim(), consume()/abandon() worker workflow.
# - The post() producer workflow.

SHARED_CONF = {
    'path': "/taskflow/jobs",
    'board': 'zookeeper',
}

# How many workers and producers of work will be created (as threads).
PRODUCERS = 3
WORKERS = 5

# How many units of work each producer will create.
PRODUCER_UNITS = 10

# How many units of work are expected to be produced (used so workers can
# know when to stop running and shutdown, typically this would not be a
# a value but we have to limit this example's execution time to be less than
# infinity).
EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS

# Delay between producing/consuming more work.
WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)

# To ensure threads don't trample other threads output.
STDOUT_LOCK = threading.Lock()


def dispatch_work(job):
    # This is where the jobs contained work *would* be done
    time.sleep(1.0)


def safe_print(name, message, prefix=""):
    with STDOUT_LOCK:
        if prefix:
            print("%s %s: %s" % (prefix, name, message))
        else:
            print("%s: %s" % (name, message))


def worker(ident, client, consumed):
    # Create a personal board (using the same client so that it works in
    # the same process) and start looking for jobs on the board that we want
    # to perform.
    name = "W-%s" % (ident)
    safe_print(name, "started")
    claimed_jobs = 0
    consumed_jobs = 0
    abandoned_jobs = 0
    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
        while len(consumed) != EXPECTED_UNITS:
            favorite_color = random.choice(['blue', 'red'])
            for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
                # See if we should even bother with it...
                if job.details.get('color') != favorite_color:
                    continue
                safe_print(name, "'%s' [attempting claim]" % (job))
                try:
                    board.claim(job, name)
                    claimed_jobs += 1
                    safe_print(name, "'%s' [claimed]" % (job))
                except (excp.NotFound, excp.UnclaimableJob):
                    safe_print(name, "'%s' [claim unsuccessful]" % (job))
                else:
                    try:
                        dispatch_work(job)
                        board.consume(job, name)
                        safe_print(name, "'%s' [consumed]" % (job))
                        consumed_jobs += 1
                        consumed.append(job)
                    except Exception:
                        board.abandon(job, name)
                        abandoned_jobs += 1
                        safe_print(name, "'%s' [abandoned]" % (job))
            time.sleep(WORKER_DELAY)
    safe_print(name,
               "finished (claimed %s jobs, consumed %s jobs,"
               " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
                                        abandoned_jobs), prefix=">>>")


def producer(ident, client):
    # Create a personal board (using the same client so that it works in
    # the same process) and start posting jobs on the board that we want
    # some entity to perform.
    name = "P-%s" % (ident)
    safe_print(name, "started")
    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
        for i in compat_range(0, PRODUCER_UNITS):
            job_name = "%s-%s" % (name, i)
            details = {
                'color': random.choice(['red', 'blue']),
            }
            job = board.post(job_name, book=None, details=details)
            safe_print(name, "'%s' [posted]" % (job))
            time.sleep(PRODUCER_DELAY)
    safe_print(name, "finished", prefix=">>>")


def main():
    if six.PY3:
        # TODO(harlowja): Hack to make eventlet work right, remove when the
        # following is fixed: https://github.com/eventlet/eventlet/issues/230
        from taskflow.utils import eventlet_utils as _eu  # noqa
        try:
            import eventlet as _eventlet  # noqa
        except ImportError:
            pass
    with contextlib.closing(fake_client.FakeClient()) as c:
        created = []
        for i in compat_range(0, PRODUCERS):
            p = threading_utils.daemon_thread(producer, i + 1, c)
            created.append(p)
            p.start()
        consumed = collections.deque()
        for i in compat_range(0, WORKERS):
            w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
            created.append(w)
            w.start()
        while created:
            t = created.pop()
            t.join()
        # At the end there should be nothing leftover, let's verify that.
        board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
        board.connect()
        with contextlib.closing(board):
            if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
                return 1
            return 0


if __name__ == "__main__":
    sys.exit(main())

Conductor simulating a CI pipeline

Note

Full source located at tox_conductor

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import contextlib
import itertools
import logging
import os
import shutil
import socket
import sys
import tempfile
import threading
import time

logging.basicConfig(level=logging.ERROR)

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from zake import fake_client

from taskflow.conductors import backends as conductors
from taskflow import engines
from taskflow.jobs import backends as boards
from taskflow.patterns import linear_flow
from taskflow.persistence import backends as persistence
from taskflow.persistence import models
from taskflow import task
from taskflow.utils import threading_utils

# INTRO: This examples shows how a worker/producer can post desired work (jobs)
# to a jobboard and a conductor can consume that work (jobs) from that jobboard
# and execute those jobs in a reliable & async manner (for example, if the
# conductor were to crash then the job will be released back onto the jobboard
# and another conductor can attempt to finish it, from wherever that job last
# left off).
#
# In this example a in-memory jobboard (and in-memory storage) is created and
# used that simulates how this would be done at a larger scale (it is an
# example after all).

# Restrict how long this example runs for...
RUN_TIME = 5
REVIEW_CREATION_DELAY = 0.5
SCAN_DELAY = 0.1
NAME = "%s_%s" % (socket.getfqdn(), os.getpid())

# This won't really use zookeeper but will use a local version of it using
# the zake library that mimics an actual zookeeper cluster using threads and
# an in-memory data structure.
JOBBOARD_CONF = {
    'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
}


class RunReview(task.Task):
    # A dummy task that clones the review and runs tox...

    def _clone_review(self, review, temp_dir):
        print("Cloning review '%s' into %s" % (review['id'], temp_dir))

    def _run_tox(self, temp_dir):
        print("Running tox in %s" % temp_dir)

    def execute(self, review, temp_dir):
        self._clone_review(review, temp_dir)
        self._run_tox(temp_dir)


class MakeTempDir(task.Task):
    # A task that creates and destroys a temporary dir (on failure).
    #
    # It provides the location of the temporary dir for other tasks to use
    # as they see fit.

    default_provides = 'temp_dir'

    def execute(self):
        return tempfile.mkdtemp()

    def revert(self, *args, **kwargs):
        temp_dir = kwargs.get(task.REVERT_RESULT)
        if temp_dir:
            shutil.rmtree(temp_dir)


class CleanResources(task.Task):
    # A task that cleans up any workflow resources.

    def execute(self, temp_dir):
        print("Removing %s" % temp_dir)
        shutil.rmtree(temp_dir)


def review_iter():
    """Makes reviews (never-ending iterator/generator)."""
    review_id_gen = itertools.count(0)
    while True:
        review_id = six.next(review_id_gen)
        review = {
            'id': review_id,
        }
        yield review


# The reason this is at the module namespace level is important, since it must
# be accessible from a conductor dispatching an engine, if it was a lambda
# function for example, it would not be reimportable and the conductor would
# be unable to reference it when creating the workflow to run.
def create_review_workflow():
    """Factory method used to create a review workflow to run."""
    f = linear_flow.Flow("tester")
    f.add(
        MakeTempDir(name="maker"),
        RunReview(name="runner"),
        CleanResources(name="cleaner")
    )
    return f


def generate_reviewer(client, saver, name=NAME):
    """Creates a review producer thread with the given name prefix."""
    real_name = "%s_reviewer" % name
    no_more = threading.Event()
    jb = boards.fetch(real_name, JOBBOARD_CONF,
                      client=client, persistence=saver)

    def make_save_book(saver, review_id):
        # Record what we want to happen (sometime in the future).
        book = models.LogBook("book_%s" % review_id)
        detail = models.FlowDetail("flow_%s" % review_id,
                                   uuidutils.generate_uuid())
        book.add(detail)
        # Associate the factory method we want to be called (in the future)
        # with the book, so that the conductor will be able to call into
        # that factory to retrieve the workflow objects that represent the
        # work.
        #
        # These args and kwargs *can* be used to save any specific parameters
        # into the factory when it is being called to create the workflow
        # objects (typically used to tell a factory how to create a unique
        # workflow that represents this review).
        factory_args = ()
        factory_kwargs = {}
        engines.save_factory_details(detail, create_review_workflow,
                                     factory_args, factory_kwargs)
        with contextlib.closing(saver.get_connection()) as conn:
            conn.save_logbook(book)
            return book

    def run():
        """Periodically publishes 'fake' reviews to analyze."""
        jb.connect()
        review_generator = review_iter()
        with contextlib.closing(jb):
            while not no_more.is_set():
                review = six.next(review_generator)
                details = {
                    'store': {
                        'review': review,
                    },
                }
                job_name = "%s_%s" % (real_name, review['id'])
                print("Posting review '%s'" % review['id'])
                jb.post(job_name,
                        book=make_save_book(saver, review['id']),
                        details=details)
                time.sleep(REVIEW_CREATION_DELAY)

    # Return the unstarted thread, and a callback that can be used
    # shutdown that thread (to avoid running forever).
    return (threading_utils.daemon_thread(target=run), no_more.set)


def generate_conductor(client, saver, name=NAME):
    """Creates a conductor thread with the given name prefix."""
    real_name = "%s_conductor" % name
    jb = boards.fetch(name, JOBBOARD_CONF,
                      client=client, persistence=saver)
    conductor = conductors.fetch("blocking", real_name, jb,
                                 engine='parallel', wait_timeout=SCAN_DELAY)

    def run():
        jb.connect()
        with contextlib.closing(jb):
            conductor.run()

    # Return the unstarted thread, and a callback that can be used
    # shutdown that thread (to avoid running forever).
    return (threading_utils.daemon_thread(target=run), conductor.stop)


def main():
    # Need to share the same backend, so that data can be shared...
    persistence_conf = {
        'connection': 'memory',
    }
    saver = persistence.fetch(persistence_conf)
    with contextlib.closing(saver.get_connection()) as conn:
        # This ensures that the needed backend setup/data directories/schema
        # upgrades and so on... exist before they are attempted to be used...
        conn.upgrade()
    fc1 = fake_client.FakeClient()
    # Done like this to share the same client storage location so the correct
    # zookeeper features work across clients...
    fc2 = fake_client.FakeClient(storage=fc1.storage)
    entities = [
        generate_reviewer(fc1, saver),
        generate_conductor(fc2, saver),
    ]
    for t, stopper in entities:
        t.start()
    try:
        watch = timeutils.StopWatch(duration=RUN_TIME)
        watch.start()
        while not watch.expired():
            time.sleep(0.1)
    finally:
        for t, stopper in reversed(entities):
            stopper()
            t.join()


if __name__ == '__main__':
    main()

Conductor running 99 bottles of beer song requests

Note

Full source located at 99_bottles

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import contextlib
import functools
import logging
import os
import sys
import time
import traceback

from kazoo import client

top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
                                       os.pardir,
                                       os.pardir))
sys.path.insert(0, top_dir)

from taskflow.conductors import backends as conductor_backends
from taskflow import engines
from taskflow.jobs import backends as job_backends
from taskflow.patterns import linear_flow as lf
from taskflow.persistence import backends as persistence_backends
from taskflow.persistence import logbook
from taskflow import task
from taskflow.types import timing

from oslo_utils import uuidutils

# Instructions!
#
# 1. Install zookeeper (or change host listed below)
# 2. Download this example, place in file '99_bottles.py'
# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
# 4. Run `python 99_bottles.py c` a few times (in different shells)
# 5. On demand kill previously listed processes created in (4) and watch
#    the work resume on another process (and repeat)
# 6. Keep enough workers alive to eventually finish the song (if desired).

ME = os.getpid()
ZK_HOST = "localhost:2181"
JB_CONF = {
    'hosts': ZK_HOST,
    'board': 'zookeeper',
    'path': '/taskflow/99-bottles-demo',
}
PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
TAKE_DOWN_DELAY = 1.0
PASS_AROUND_DELAY = 3.0
HOW_MANY_BOTTLES = 99


class TakeABottleDown(task.Task):
    def execute(self, bottles_left):
        sys.stdout.write('Take one down, ')
        sys.stdout.flush()
        time.sleep(TAKE_DOWN_DELAY)
        return bottles_left - 1


class PassItAround(task.Task):
    def execute(self):
        sys.stdout.write('pass it around, ')
        sys.stdout.flush()
        time.sleep(PASS_AROUND_DELAY)


class Conclusion(task.Task):
    def execute(self, bottles_left):
        sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
        sys.stdout.flush()


def make_bottles(count):
    # This is the function that will be called to generate the workflow
    # and will also be called to regenerate it on resumption so that work
    # can continue from where it last left off...

    s = lf.Flow("bottle-song")

    take_bottle = TakeABottleDown("take-bottle-%s" % count,
                                  inject={'bottles_left': count},
                                  provides='bottles_left')
    pass_it = PassItAround("pass-%s-around" % count)
    next_bottles = Conclusion("next-bottles-%s" % (count - 1))
    s.add(take_bottle, pass_it, next_bottles)

    for bottle in reversed(list(range(1, count))):
        take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
                                      provides='bottles_left')
        pass_it = PassItAround("pass-%s-around" % bottle)
        next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
        s.add(take_bottle, pass_it, next_bottles)

    return s


def run_conductor(only_run_once=False):
    # This continuously consumers until its stopped via ctrl-c or other
    # kill signal...
    event_watches = {}

    # This will be triggered by the conductor doing various activities
    # with engines, and is quite nice to be able to see the various timing
    # segments (which is useful for debugging, or watching, or figuring out
    # where to optimize).
    def on_conductor_event(cond, event, details):
        print("Event '%s' has been received..." % event)
        print("Details = %s" % details)
        if event.endswith("_start"):
            w = timing.StopWatch()
            w.start()
            base_event = event[0:-len("_start")]
            event_watches[base_event] = w
        if event.endswith("_end"):
            base_event = event[0:-len("_end")]
            try:
                w = event_watches.pop(base_event)
                w.stop()
                print("It took %0.3f seconds for event '%s' to finish"
                      % (w.elapsed(), base_event))
            except KeyError:
                pass
        if event == 'running_end' and only_run_once:
            cond.stop()

    print("Starting conductor with pid: %s" % ME)
    my_name = "conductor-%s" % ME
    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
    with contextlib.closing(persist_backend):
        with contextlib.closing(persist_backend.get_connection()) as conn:
            conn.upgrade()
        job_backend = job_backends.fetch(my_name, JB_CONF,
                                         persistence=persist_backend)
        job_backend.connect()
        with contextlib.closing(job_backend):
            cond = conductor_backends.fetch('blocking', my_name, job_backend,
                                            persistence=persist_backend)
            on_conductor_event = functools.partial(on_conductor_event, cond)
            cond.notifier.register(cond.notifier.ANY, on_conductor_event)
            # Run forever, and kill -9 or ctrl-c me...
            try:
                cond.run()
            finally:
                cond.stop()
                cond.wait()


def run_poster():
    # This just posts a single job and then ends...
    print("Starting poster with pid: %s" % ME)
    my_name = "poster-%s" % ME
    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
    with contextlib.closing(persist_backend):
        with contextlib.closing(persist_backend.get_connection()) as conn:
            conn.upgrade()
        job_backend = job_backends.fetch(my_name, JB_CONF,
                                         persistence=persist_backend)
        job_backend.connect()
        with contextlib.closing(job_backend):
            # Create information in the persistence backend about the
            # unit of work we want to complete and the factory that
            # can be called to create the tasks that the work unit needs
            # to be done.
            lb = logbook.LogBook("post-from-%s" % my_name)
            fd = logbook.FlowDetail("song-from-%s" % my_name,
                                    uuidutils.generate_uuid())
            lb.add(fd)
            with contextlib.closing(persist_backend.get_connection()) as conn:
                conn.save_logbook(lb)
            engines.save_factory_details(fd, make_bottles,
                                         [HOW_MANY_BOTTLES], {},
                                         backend=persist_backend)
            # Post, and be done with it!
            jb = job_backend.post("song-from-%s" % my_name, book=lb)
            print("Posted: %s" % jb)
            print("Goodbye...")


def main_local():
    # Run locally typically this is activating during unit testing when all
    # the examples are made sure to still function correctly...
    global TAKE_DOWN_DELAY
    global PASS_AROUND_DELAY
    global JB_CONF
    # Make everything go much faster (so that this finishes quickly).
    PASS_AROUND_DELAY = 0.01
    TAKE_DOWN_DELAY = 0.01
    JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
    run_poster()
    run_conductor(only_run_once=True)


def check_for_zookeeper(timeout=1):
    sys.stderr.write("Testing for the existence of a zookeeper server...\n")
    sys.stderr.write("Please wait....\n")
    with contextlib.closing(client.KazooClient()) as test_client:
        try:
            test_client.start(timeout=timeout)
        except test_client.handler.timeout_exception:
            sys.stderr.write("Zookeeper is needed for running this example!\n")
            traceback.print_exc()
            return False
        else:
            test_client.stop()
            return True


def main():
    logging.basicConfig(level=logging.ERROR)
    if not check_for_zookeeper():
        return
    if len(sys.argv) == 1:
        main_local()
    elif sys.argv[1] in ('p', 'c'):
        if sys.argv[-1] == "v":
            logging.basicConfig(level=5)
        if sys.argv[1] == 'p':
            run_poster()
        else:
            run_conductor()
    else:
        sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))


if __name__ == '__main__':
    main()