Taskflow

Flow control for tasks. Task flow example:

>>> def fetch_message(message):
...    message = dict(body=message)
...    return message
>>> def transform_message(data):
...     body = data["body"]
...     if body is None:
...         raise ValueError("You should not send empty messages!")
...     body=body[:5]
...     return body
>>> def send_message(message):
...     print(message)
...     return message
>>> def handle_error(err_info):
...     failure = err_info
...     print("Message error:", str(failure))
...     return ""
>>> chain = Chain("Example"
...     ).then(fetch_message
...     ).then(transform_message
...     ).then(send_message
...     ).then(functools.partial(print, "Message sent:")
...     ).catch(handle_error
...     ).then(print)
>>> # "Lazy" chain could be called later:
>>> chain("Hello, World!")
Hello
Message sent: Hello
class fairways.taskflow.Chain(name=None)[source]

Entity which holds sequence of tasks and logic of their mutual flow. Allows “lazy” computations after build.

Parameters:name – Optional name of chain (could be used for debugging)
catch(method)[source]

Add global interceptor to catch Exception

Parameters:method (Callable(Any, **kwargs) -> Any | functools.partial) – Interceptor function
Returns:Self reference for chaining
Return type:taskflow.Chain
>>> chain = Chain().then(lambda _: 1/0).catch(lambda _: "error catched!")
>>> chain("")
'error catched!'
>>> chain = Chain().then(lambda _: 1/0).then(lambda _: "error not catched, step lost!")
>>> chain("")
catch_on(ex_class_or_name, method)[source]

Add narrow/specific interceptor with selector. Attached handler will be called only on specific exceptions

Parameters:
  • ex_class_or_name (Class | str) – Name or class of a target exception
  • method (Callable(Any, **kwargs) -> Any | functools.partial) – Interceptor function
Raises:
  • ValueError – Prevents of “catch_on” usage for Exception instance (use “catch” for this)
  • TypeError – Raised when “ex_class_or_name” has invalid type
Returns:

Self reference for chaining

Return type:

taskflow.Chain

>>> chain = Chain().then(lambda _: 1/0).catch_on(ZeroDivisionError, lambda _: "error catched!")
>>> chain("")
'error catched!'
>>> chain = Chain().then(lambda _: 1/0).catch_on(ValueError, lambda _: "error not catched!")
>>> chain("")
compiled

Returns list of wrapped methods for chain, with middleware (if any)

Returns:List of callables
Return type:list
on(keypath, method)[source]

Append new task, attached to selected key or path of data. Only this part of data mapping will be passed to a task function. The task function will be called only when related key (or key path when mappings are nested) exists. This approach can be considered as event-based, which allows to build flexible chains without “if”. A key (or keypath) can be considered as event and while task function is event handler. When value for key path does not exist or when related value is None, attached task will not be called at all.

Parameters:
  • keypath (str) – Key name (or key path in a form like “a/b/c”)
  • method (Callable(Any, **kwargs) -> Any | functools.partial) – Task function
Returns:

Self reference for chaining

Return type:

taskflow.Chain

>>> chain = Chain().on("a", lambda value: "changed!").then(print)
>>> chain({"a": "not changed"})
{'a': 'changed!'}
>>> chain({"b": "not changed"})
{'b': 'not changed'}
then(method)[source]

Append new task (global reducer, which has access to entire structure of data passed to the chain)

Parameters:method (Callable(Any, **kwargs) -> Any | functools.partial) – Task function
Returns:Self reference for chaining
Return type:taskflow.Chain
>>> chain = Chain().then(lambda data: data).then(print)
>>> chain({'a':1})
{'a': 1}
class fairways.taskflow.ChainAll(name=None)[source]
class fairways.taskflow.ChainAny(name=None)[source]
class fairways.taskflow.Envelope(*, initial_data, failure_data=None)[source]

Container to pass data between tasks

Parameters:initial_data (Any) – Data to pass. Usually this is Mapping, but you could use another type
clone()[source]

Deep copy of wrapped data and state

Returns:[description]
Return type:[type]
get_ctx(attr_path)[source]

Get attribute value. Nested mappings are supported also. You can address values of nested objects using default separator “/” Note that internally data is stored as a tree with two branches: DATA_ROOT and FAILURE_ROOT, which relates to data of last succesful operation and data related to last failed operation. For example, value of key “a” for last successful state looks like “data/a”, while the same for last failed operation look like “failure/a”

Parameters:attr_path (str) – Attribute name. Can be complex path separated with “/”
Returns:Value, related to the attribute name.
Return type:Any
get_data(topic=None)[source]

Get entire data object for last successful task

Returns:Wrapped object
Return type:Any
get_failure()[source]

Get entire data object for last failed task

Returns:Wrapped object
Return type:Any
isfailure

Flag which is True when current state is a failure

Returns:Value
Return type:bool
set_ctx(attr_path, value)[source]

Set attribute value. Nested mappings are supported also. You can address values of nested objects using default separator “/”. Note that internally data is stored as a tree with two branches: DATA_ROOT and FAILURE_ROOT, which relates to data of last succesful operation and data related to last failed operation. For example, value of key “a” for last successful state looks like “data/a”, while the same for last failed operation look like “failure/a”

Parameters:
  • attr_path (str) – Attribute name. Can be complex path separated with “/”
  • value (Any) – New value
set_data(topic=None, value=None)[source]

Get entire data object for last successful task

Returns:Wrapped object
Return type:Any
set_failure(failure)[source]

Set failure state

Parameters:failure (Failure) – Failure class instance
Raises:ValueError – Protects from setting failure to None
class fairways.taskflow.Failure(exception, data_before_failure, method_name=None, topic=None, **kwargs)[source]

Generic exception wrapper for task flow. It is a wrapper for standard exceptions with additional metadata.

Parameters:
  • exception (Exception) – Exception instance which happens during runtime
  • data_before_failure (Any) – Data wrapped in envelope before exception occurs
  • topic (str) – Topic (children path) exception occurs in reducer attached to some topic
exception fairways.taskflow.SkipFollowing(exit_code, *args, **kwargs)[source]

Special class of Exception - Just skip next steps of chain up to nearest catch method

Parameters:exit_code (Any) – User-defined code to distinguish between different cases
fairways.taskflow.callable_name(c)[source]

Extract name of function / functools.partial