Skip to content

Workflows

Workflows in Workbench are directed acyclic graphs (DAGs) - a sequence of dependent processing operations that can only be performed in one direction, with no closed (infinite) loops.

Steps in Workflows are grouped into stages for ease of manipulation and bulk-execution.

Job

Bases: ABC

Abstract base class for data processing jobs.

Jobs are classes that encapsulate other Workbench functions and introduce in-built state management to write outputs back to storage.

Attributes:

Name Type Description
name

Human-readable name for the job

call_params

Parameters to pass to the job's run method

Note

The Job class is expected to undergo significant enhancement in future versions of Workbench, to include features like set-up and retry logic that improve robustness and applicability of descendant classes.

__init__

__init__(name, call_params=None)

Initialize a job instance.

Parameters:

Name Type Description Default
name str

Human-readable name for the job

required
call_params Dict

Parameters to pass to the job's run method. Defaults to None.

None

run abstractmethod

run(item, session=None, **kwargs)

Execute the job on an information container.

Must be implemented by concrete job classes to define the specific data processing operations to perform.

Parameters:

Name Type Description Default
item InformationContainer

The information container to process

required
session Session

Session for reading and writing file content, results and more from cloud storage. If not provided, results are returned directly. Defaults to None.

None
**kwargs Any

Additional keyword arguments for job execution

{}

Returns:

Name Type Description
Any Any

Job results, either written to session storage or returned directly

Stage

Class for organizing and executing data processing jobs in Basic mode.

A Stage groups related jobs together and provides methods for executing them individually or sequentially. Stages belong to workflows and represent logical phases of data processing.

Attributes:

Name Type Description
name

Human-readable name for the stage

jobs Dict[str, Job]

Dictionary mapping job names to job instances

__init__

__init__(name)

Initialize a stage instance.

Parameters:

Name Type Description Default
name str

Human-readable name for the stage

required

add_job

add_job(job)

Add a job to the stage.

Parameters:

Name Type Description Default
job Job

An instance of Job to be added to the stage

required

run_all_jobs_sequentially

run_all_jobs_sequentially(item, session=None)

Run all jobs in the stage sequentially.

Parameters:

Name Type Description Default
item InformationContainer

The information container to analyze during the job

required
session Session

Used to write results back to storage. If not provided, results are returned in the response. Defaults to None.

None

Returns:

Type Description
Dict[str, Any]

Dictionary mapping job names to their results

run_job

run_job(job_name, item, session=None, **kwargs)

Run a specific job by name if it exists in the stage.

Parameters:

Name Type Description Default
job_name str

The name of the job to run

required
item InformationContainer

The information container to analyze during the job

required
session Session

Used to write results back to storage. If not provided, results are returned in the response. Defaults to None.

None
**kwargs Any

Additional keyword arguments to pass to the job's run() method

{}

Returns:

Name Type Description
Any Any

Job results if the job exists and runs successfully, None otherwise

Workflow

Bases: ABC

Class for constructing and running data processing workflows in Basic mode.

A Workflow organizes data processing Jobs into sequential Stages that can be executed independently or as part of a complete workflow. This provides a structured approach to complex data processing pipelines.

Attributes:

Name Type Description
name

Human-readable name for the workflow

stages Dict[str, Stage]

Dictionary mapping stage names to stage instances

__init__

__init__(name)

Initialize a workflow instance.

Parameters:

Name Type Description Default
name str

Human-readable name for the workflow

required

add_stage

add_stage(stage)

Add a stage to the workflow.

Parameters:

Name Type Description Default
stage Stage

An instance of Stage to be added to the workflow

required

run_all_stages_sequentially

run_all_stages_sequentially(item, session=None)

Run all stages in the workflow sequentially.

Parameters:

Name Type Description Default
item InformationContainer

The information container to analyze during the workflow

required
session Session

Used to write results back to storage. If not provided, results are returned in the response. Defaults to None.

None

Returns:

Type Description
Dict[str, Any]

Dictionary mapping stage names to their results

run_stage

run_stage(stage_name, item, session=None)

Run a specific stage by name if it exists in the workflow.

Parameters:

Name Type Description Default
stage_name str

The name of the stage to run

required
item InformationContainer

The information container to analyze during the stage

required
session Session

Used to write results back to storage. If not provided, results are returned in the response. Defaults to None.

None

Returns:

Type Description
Dict[str, Any]

Dictionary mapping job names to their results if stage exists, None otherwise

WorkflowRunner

Executes workflow configurations with parallel processing and dependency management - see Advanced mode.

WorkflowRunner takes a workflow configuration defined as a list of stages with steps, and executes them with support for parallel execution, dependency resolution, conditional execution, and comprehensive logging.

Attributes:

Name Type Description
base_path str

Base module path for dynamic class loading

id UUID

Unique identifier for this runner instance

context Dict[str, Any]

Context variables available to workflow steps

workflow List[Dict[str, Any]]

Workflow configuration with stages and steps

max_workers int

Maximum number of threads for parallel execution

logger Logger

Logger instance for workflow execution

log_handler AzureTableLogHandler

Azure Table Storage log handler

__init__

__init__(workflow=None, max_workers=4, context=None, logger=None)

Initialize a WorkflowRunner instance.

Parameters:

Name Type Description Default
workflow List[Dict]

Workflow configuration with stages and steps.

None
max_workers int

Maximum number of threads for parallel execution.

4
context Dict[str, Any]

Context variables available to workflow steps.

None
logger Logger

Logger instance for workflow execution. If None, creates a new logger with Azure Table Storage handler.

None

execute_step

execute_step(step_config, stage_name='<unknown stage>')

Execute a single workflow step with retry logic.

Parameters:

Name Type Description Default
step_config Dict[str, Any]

Configuration for the step to execute. See guide to Workbench Workflow Definition Language (WWDL).

required
stage_name str

Name of the stage containing this step. Used to log stage-related run metrics.

'<unknown stage>'

Raises:

Type Description
Exception

If step execution fails after all retries (when on_failure="fail")

resolve_params

resolve_params(value, parent)

Prefixing a string with a "$" parameter references context variables passed into or generated as intermediate outputs during the workflow execution.

The function recursively resolve parameter values with variable substitution. Supports dot notation for retrieving nested values stored within context variable dictionaries

Parameters:

Name Type Description Default
value Any

Value to resolve (can be dict, list, string, or other type)

required
parent Dict[str, Any]

Parent context for variable resolution

required

Returns:

Type Description
Any

Resolved value with context variable names substituted with their values from the context dictionary.

Examples:

Basic variable substitution:

>>> context = {'user_id': '12345', 'status': 'active'}
>>> resolver.resolve_params('$user_id', context)
'12345'

Nested object resolution with dot notation:

>>> context = {
...     'user': {'profile': {'name': 'John', 'email': 'john@example.com'}},
...     'config': {'timeout': 30}
... }
>>> resolver.resolve_params({
...     'name': '$user.profile.name',
...     'contact': '$user.profile.email',
...     'timeout': '$config.timeout'
... }, context)
{'name': 'John', 'contact': 'john@example.com', 'timeout': 30}

run

run()

Execute the complete workflow.

Runs all stages in the workflow sequentially, with each stage's steps executed in parallel based on their dependencies. Records comprehensive metrics and logs throughout execution.

Raises:

Type Description
ValueError

If circular dependencies are detected in workflow steps

Exception

If any stage fails during execution

topological_sort

topological_sort(steps)

Perform topological sort on workflow steps to detect circular dependencies.

Parameters:

Name Type Description Default
steps List[Dict[str, Any]]

List of step configurations to sort

required

Returns:

Type Description
List[Dict[str, Any]]

Topologically sorted list of steps

Raises:

Type Description
ValueError

If circular dependencies or missing steps are detected

workflow_metrics

workflow_metrics()

Retrieve workflow metrics and write to log partitions.

Retrieves workflow metrics from the session context and writes to ALL workflow log partitions (Message partitions and Metric partitions).