Workflows
Workflows in Workbench are directed acyclic graphs (DAGs) - a sequence of dependent processing operations that can only be performed in one direction, with no closed (infinite) loops.
Steps in Workflows are grouped into stages for ease of manipulation and bulk-execution.
WorkflowRunner
Executes workflow configurations with parallel processing and dependency management - see Advanced mode.
WorkflowRunner takes a workflow configuration defined as a list of stages with steps, and executes them with support for parallel execution, dependency resolution, conditional execution, and comprehensive logging.
Attributes:
| Name | Type | Description |
|---|---|---|
base_path |
str
|
Base module path for dynamic class loading |
id |
UUID
|
Unique identifier for this runner instance |
context |
dict[str, Any]
|
Context variables available to workflow steps |
workflow |
list[dict[str, Any]]
|
Workflow configuration with stages and steps |
max_workers |
int
|
Maximum number of threads for parallel execution |
logger |
Logger
|
Logger instance for workflow execution |
log_handler |
AzureTableLogHandler
|
Azure Table Storage log handler |
__init__
__init__(workflow=None, max_workers=4, context=None, logger=None)
Initialize a WorkflowRunner instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow
|
list[dict] | None
|
Workflow configuration with stages and steps. |
None
|
max_workers
|
int
|
Maximum number of threads for parallel execution. |
4
|
context
|
dict[str, Any] | None
|
Context variables available to workflow steps. |
None
|
logger
|
Logger | None
|
Logger instance for workflow execution. If None, creates a new logger with Azure Table Storage handler. |
None
|
execute_step
execute_step(step_config, stage_name='<unknown stage>', context_override=None)
Execute a single workflow step with retry logic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_config
|
Dict[str, Any]
|
Configuration for the step to execute. See guide to Workbench Workflow Definition Language (WWDL). |
required |
stage_name
|
str
|
Name of the stage containing this step. Used to log stage-related run metrics. |
'<unknown stage>'
|
context_override
|
dict[str, Any] | None
|
Optional context dict to use instead of self.context. Used for thread-safe iteration execution where each iteration needs isolated context. |
None
|
Raises:
| Type | Description |
|---|---|
Exception
|
If step execution fails after all retries (when on_failure="fail") |
resolve_params
resolve_params(value, parent, context_override=None)
Prefixing a string with a "$" parameter references context variables passed into or generated as intermediate outputs during the workflow execution.
The function recursively resolve parameter values with variable substitution. Supports dot notation for retrieving nested values stored within context variable dictionaries
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
value
|
Any
|
Value to resolve (can be dict, list, string, or other type) |
required |
parent
|
dict[str, Any]
|
Parent context for variable resolution |
required |
context_override
|
dict[str, Any] | None
|
Optional context dict to use instead of self.context. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Resolved |
Examples:
Basic variable substitution:
>>> context = {'user_id': '12345', 'status': 'active'}
>>> resolver.resolve_params('$user_id', context)
'12345'
Nested object resolution with dot notation:
>>> context = {
... 'user': {'profile': {'name': 'John', 'email': 'john@example.com'}},
... 'config': {'timeout': 30}
... }
>>> resolver.resolve_params({
... 'name': '$user.profile.name',
... 'contact': '$user.profile.email',
... 'timeout': '$config.timeout'
... }, context)
{'name': 'John', 'contact': 'john@example.com', 'timeout': 30}
Bracket notation for keys with special characters:
>>> context = {'metadata': {'field-1': 'value1', 'Part-2': {'code': 'ABC'}}}
>>> resolver.resolve_params('$metadata[field-1]', context)
'value1'
>>> resolver.resolve_params('$metadata[Part-2].code', context)
'ABC'
Quoted bracket notation:
>>> resolver.resolve_params("$metadata['Part-2'].code", context)
'ABC'
Dynamic bracket notation with variable reference:
>>> context = {'classifier_id': 'Part-2', 'metadata': {'Part-2': {'code': 'ABC'}}}
>>> resolver.resolve_params('$metadata[$classifier_id].code', context)
'ABC'
run
run()
Execute the complete workflow.
Runs all stages in the workflow sequentially, with each stage's steps executed in parallel based on their dependencies. Records comprehensive metrics and logs throughout execution.
Raises:
| Type | Description |
|---|---|
ValueError
|
If circular dependencies are detected in workflow steps |
Exception
|
If any stage fails during execution |
topological_sort
topological_sort(steps)
Perform topological sort on workflow steps to detect circular dependencies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
steps
|
list[dict[str, Any]]
|
List of step configurations to sort |
required |
Returns:
| Type | Description |
|---|---|
list[dict[str, Any]]
|
Topologically sorted list of steps |
Raises:
| Type | Description |
|---|---|
ValueError
|
If circular dependencies or missing steps are detected |
workflow_metrics
workflow_metrics()
Retrieve workflow metrics and write to log partitions.
Retrieves workflow metrics from the session context and writes to ALL workflow log partitions (Message partitions and Metric partitions).
generate_mermaid_from_workflow
generate_mermaid_from_workflow(workflow)
Generate a Mermaid flowchart from a workflow definition with support for parallel steps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow
|
list
|
List of workflow stages with steps |
required |
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
Object containing the Mermaid diagram code and metadata |