Skip to content

Workflows

Workflows in Workbench are directed acyclic graphs (DAGs) - a sequence of dependent processing operations that can only be performed in one direction, with no closed (infinite) loops.

Steps in Workflows are grouped into stages for ease of manipulation and bulk-execution.

WorkflowRunner

Executes workflow configurations with parallel processing and dependency management - see Advanced mode.

WorkflowRunner takes a workflow configuration defined as a list of stages with steps, and executes them with support for parallel execution, dependency resolution, conditional execution, and comprehensive logging.

Attributes:

Name Type Description
base_path str

Base module path for dynamic class loading

id UUID

Unique identifier for this runner instance

context dict[str, Any]

Context variables available to workflow steps

workflow list[dict[str, Any]]

Workflow configuration with stages and steps

max_workers int

Maximum number of threads for parallel execution

logger Logger

Logger instance for workflow execution

log_handler AzureTableLogHandler

Azure Table Storage log handler

__init__

__init__(workflow=None, max_workers=4, context=None, logger=None)

Initialize a WorkflowRunner instance.

Parameters:

Name Type Description Default
workflow list[dict] | None

Workflow configuration with stages and steps.

None
max_workers int

Maximum number of threads for parallel execution.

4
context dict[str, Any] | None

Context variables available to workflow steps.

None
logger Logger | None

Logger instance for workflow execution. If None, creates a new logger with Azure Table Storage handler.

None

execute_step

execute_step(step_config, stage_name='<unknown stage>', context_override=None)

Execute a single workflow step with retry logic.

Parameters:

Name Type Description Default
step_config Dict[str, Any]

Configuration for the step to execute. See guide to Workbench Workflow Definition Language (WWDL).

required
stage_name str

Name of the stage containing this step. Used to log stage-related run metrics.

'<unknown stage>'
context_override dict[str, Any] | None

Optional context dict to use instead of self.context. Used for thread-safe iteration execution where each iteration needs isolated context.

None

Raises:

Type Description
Exception

If step execution fails after all retries (when on_failure="fail")

resolve_params

resolve_params(value, parent, context_override=None)

Prefixing a string with a "$" parameter references context variables passed into or generated as intermediate outputs during the workflow execution.

The function recursively resolve parameter values with variable substitution. Supports dot notation for retrieving nested values stored within context variable dictionaries

Parameters:

Name Type Description Default
value Any

Value to resolve (can be dict, list, string, or other type)

required
parent dict[str, Any]

Parent context for variable resolution

required
context_override dict[str, Any] | None

Optional context dict to use instead of self.context.

None

Returns:

Type Description
Any

Resolved value with context variable names substituted with their values from the context dictionary.

Examples:

Basic variable substitution:

>>> context = {'user_id': '12345', 'status': 'active'}
>>> resolver.resolve_params('$user_id', context)
'12345'

Nested object resolution with dot notation:

>>> context = {
...     'user': {'profile': {'name': 'John', 'email': 'john@example.com'}},
...     'config': {'timeout': 30}
... }
>>> resolver.resolve_params({
...     'name': '$user.profile.name',
...     'contact': '$user.profile.email',
...     'timeout': '$config.timeout'
... }, context)
{'name': 'John', 'contact': 'john@example.com', 'timeout': 30}

Bracket notation for keys with special characters:

>>> context = {'metadata': {'field-1': 'value1', 'Part-2': {'code': 'ABC'}}}
>>> resolver.resolve_params('$metadata[field-1]', context)
'value1'
>>> resolver.resolve_params('$metadata[Part-2].code', context)
'ABC'

Quoted bracket notation:

>>> resolver.resolve_params("$metadata['Part-2'].code", context)
'ABC'

Dynamic bracket notation with variable reference:

>>> context = {'classifier_id': 'Part-2', 'metadata': {'Part-2': {'code': 'ABC'}}}
>>> resolver.resolve_params('$metadata[$classifier_id].code', context)
'ABC'

run

run()

Execute the complete workflow.

Runs all stages in the workflow sequentially, with each stage's steps executed in parallel based on their dependencies. Records comprehensive metrics and logs throughout execution.

Raises:

Type Description
ValueError

If circular dependencies are detected in workflow steps

Exception

If any stage fails during execution

topological_sort

topological_sort(steps)

Perform topological sort on workflow steps to detect circular dependencies.

Parameters:

Name Type Description Default
steps list[dict[str, Any]]

List of step configurations to sort

required

Returns:

Type Description
list[dict[str, Any]]

Topologically sorted list of steps

Raises:

Type Description
ValueError

If circular dependencies or missing steps are detected

workflow_metrics

workflow_metrics()

Retrieve workflow metrics and write to log partitions.

Retrieves workflow metrics from the session context and writes to ALL workflow log partitions (Message partitions and Metric partitions).

generate_mermaid_from_workflow

generate_mermaid_from_workflow(workflow)

Generate a Mermaid flowchart from a workflow definition with support for parallel steps.

Parameters:

Name Type Description Default
workflow list

List of workflow stages with steps

required

Returns:

Name Type Description
dict dict

Object containing the Mermaid diagram code and metadata