Workflows
Workflows in Workbench are directed acyclic graphs (DAGs) - a sequence of dependent processing operations that can only be performed in one direction, with no closed (infinite) loops.
Steps in Workflows are grouped into stages for ease of manipulation and bulk-execution.
Job
Bases: ABC
Abstract base class for data processing jobs.
Jobs are classes that encapsulate other Workbench functions and introduce in-built state management to write outputs back to storage.
Attributes:
Name | Type | Description |
---|---|---|
name |
Human-readable name for the job |
|
call_params |
Parameters to pass to the job's run method |
Note
The Job class is expected to undergo significant enhancement in future versions of Workbench, to include features like set-up and retry logic that improve robustness and applicability of descendant classes.
__init__
__init__(name, call_params=None)
Initialize a job instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Human-readable name for the job |
required |
call_params
|
Dict
|
Parameters to pass to the job's run method. Defaults to None. |
None
|
run
abstractmethod
run(item, session=None, **kwargs)
Execute the job on an information container.
Must be implemented by concrete job classes to define the specific data processing operations to perform.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item
|
InformationContainer
|
The information container to process |
required |
session
|
Session
|
Session for reading and writing file content, results and more from cloud storage. If not provided, results are returned directly. Defaults to None. |
None
|
**kwargs
|
Any
|
Additional keyword arguments for job execution |
{}
|
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
Job results, either written to session storage or returned directly |
Stage
Class for organizing and executing data processing jobs in Basic mode.
A Stage groups related jobs together and provides methods for executing them individually or sequentially. Stages belong to workflows and represent logical phases of data processing.
Attributes:
Name | Type | Description |
---|---|---|
name |
Human-readable name for the stage |
|
jobs |
Dict[str, Job]
|
Dictionary mapping job names to job instances |
__init__
__init__(name)
Initialize a stage instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Human-readable name for the stage |
required |
add_job
add_job(job)
Add a job to the stage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job
|
Job
|
An instance of Job to be added to the stage |
required |
run_all_jobs_sequentially
run_all_jobs_sequentially(item, session=None)
Run all jobs in the stage sequentially.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item
|
InformationContainer
|
The information container to analyze during the job |
required |
session
|
Session
|
Used to write results back to storage. If not provided, results are returned in the response. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dictionary mapping job names to their results |
run_job
run_job(job_name, item, session=None, **kwargs)
Run a specific job by name if it exists in the stage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job_name
|
str
|
The name of the job to run |
required |
item
|
InformationContainer
|
The information container to analyze during the job |
required |
session
|
Session
|
Used to write results back to storage. If not provided, results are returned in the response. Defaults to None. |
None
|
**kwargs
|
Any
|
Additional keyword arguments to pass to the job's run() method |
{}
|
Returns:
Name | Type | Description |
---|---|---|
Any |
Any
|
Job results if the job exists and runs successfully, None otherwise |
Workflow
Bases: ABC
Class for constructing and running data processing workflows in Basic mode.
A Workflow organizes data processing Jobs into sequential Stages that can be executed independently or as part of a complete workflow. This provides a structured approach to complex data processing pipelines.
Attributes:
Name | Type | Description |
---|---|---|
name |
Human-readable name for the workflow |
|
stages |
Dict[str, Stage]
|
Dictionary mapping stage names to stage instances |
__init__
__init__(name)
Initialize a workflow instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Human-readable name for the workflow |
required |
add_stage
add_stage(stage)
Add a stage to the workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stage
|
Stage
|
An instance of Stage to be added to the workflow |
required |
run_all_stages_sequentially
run_all_stages_sequentially(item, session=None)
Run all stages in the workflow sequentially.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item
|
InformationContainer
|
The information container to analyze during the workflow |
required |
session
|
Session
|
Used to write results back to storage. If not provided, results are returned in the response. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dictionary mapping stage names to their results |
run_stage
run_stage(stage_name, item, session=None)
Run a specific stage by name if it exists in the workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stage_name
|
str
|
The name of the stage to run |
required |
item
|
InformationContainer
|
The information container to analyze during the stage |
required |
session
|
Session
|
Used to write results back to storage. If not provided, results are returned in the response. Defaults to None. |
None
|
Returns:
Type | Description |
---|---|
Dict[str, Any]
|
Dictionary mapping job names to their results if stage exists, None otherwise |
WorkflowRunner
Executes workflow configurations with parallel processing and dependency management - see Advanced mode.
WorkflowRunner takes a workflow configuration defined as a list of stages with steps, and executes them with support for parallel execution, dependency resolution, conditional execution, and comprehensive logging.
Attributes:
Name | Type | Description |
---|---|---|
base_path |
str
|
Base module path for dynamic class loading |
id |
UUID
|
Unique identifier for this runner instance |
context |
Dict[str, Any]
|
Context variables available to workflow steps |
workflow |
List[Dict[str, Any]]
|
Workflow configuration with stages and steps |
max_workers |
int
|
Maximum number of threads for parallel execution |
logger |
Logger
|
Logger instance for workflow execution |
log_handler |
AzureTableLogHandler
|
Azure Table Storage log handler |
__init__
__init__(workflow=None, max_workers=4, context=None, logger=None)
Initialize a WorkflowRunner instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow
|
List[Dict]
|
Workflow configuration with stages and steps. |
None
|
max_workers
|
int
|
Maximum number of threads for parallel execution. |
4
|
context
|
Dict[str, Any]
|
Context variables available to workflow steps. |
None
|
logger
|
Logger
|
Logger instance for workflow execution. If None, creates a new logger with Azure Table Storage handler. |
None
|
execute_step
execute_step(step_config, stage_name='<unknown stage>')
Execute a single workflow step with retry logic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_config
|
Dict[str, Any]
|
Configuration for the step to execute. See guide to Workbench Workflow Definition Language (WWDL). |
required |
stage_name
|
str
|
Name of the stage containing this step. Used to log stage-related run metrics. |
'<unknown stage>'
|
Raises:
Type | Description |
---|---|
Exception
|
If step execution fails after all retries (when on_failure="fail") |
resolve_params
resolve_params(value, parent)
Prefixing a string with a "$" parameter references context variables passed into or generated as intermediate outputs during the workflow execution.
The function recursively resolve parameter values with variable substitution. Supports dot notation for retrieving nested values stored within context variable dictionaries
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value
|
Any
|
Value to resolve (can be dict, list, string, or other type) |
required |
parent
|
Dict[str, Any]
|
Parent context for variable resolution |
required |
Returns:
Type | Description |
---|---|
Any
|
Resolved |
Examples:
Basic variable substitution:
>>> context = {'user_id': '12345', 'status': 'active'}
>>> resolver.resolve_params('$user_id', context)
'12345'
Nested object resolution with dot notation:
>>> context = {
... 'user': {'profile': {'name': 'John', 'email': 'john@example.com'}},
... 'config': {'timeout': 30}
... }
>>> resolver.resolve_params({
... 'name': '$user.profile.name',
... 'contact': '$user.profile.email',
... 'timeout': '$config.timeout'
... }, context)
{'name': 'John', 'contact': 'john@example.com', 'timeout': 30}
run
run()
Execute the complete workflow.
Runs all stages in the workflow sequentially, with each stage's steps executed in parallel based on their dependencies. Records comprehensive metrics and logs throughout execution.
Raises:
Type | Description |
---|---|
ValueError
|
If circular dependencies are detected in workflow steps |
Exception
|
If any stage fails during execution |
topological_sort
topological_sort(steps)
Perform topological sort on workflow steps to detect circular dependencies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
steps
|
List[Dict[str, Any]]
|
List of step configurations to sort |
required |
Returns:
Type | Description |
---|---|
List[Dict[str, Any]]
|
Topologically sorted list of steps |
Raises:
Type | Description |
---|---|
ValueError
|
If circular dependencies or missing steps are detected |
workflow_metrics
workflow_metrics()
Retrieve workflow metrics and write to log partitions.
Retrieves workflow metrics from the session context and writes to ALL workflow log partitions (Message partitions and Metric partitions).