Workflow runner
The WorkflowRunner class is compatible with the Workbench Workflow Definition Language (WWDL) schema.
Instantiating a runner
Provided your run-time environment has the necessary environment variables to connect to an Azure Table Storage resource (for persisting log messages), you can create a workflow runner simply by:
from workbench.workflows import WorkflowRunner
from workbench.bindings import AzureBlobSession
session = AzureBlobSession(organization, workspace, session_id, user_id)
# The workflow can be defined in-line, provided it adheres to the schema
workflow = session.workflow
runner = WorkflowRunner(workflow=workflow)
For more details on setting up environment variables, see Local installation.
Workflow context
Some workflows require additional attributes to run. You can pass these attributes to a workflow runner as context when instantiating the workflow runner or by updating the runner's context dictionary.
context = {
organization: "example-org",
workspace:"My Test Workspace",
session_id: "clever-birds-learn".
user_id: "waad_..."
}
runner = WorkflowRunner(workflow=workflow, context=context)
Tip
Constructing your workflow to accept a DocumentVersion or Session object via the context is a clean way of scoping your workflow to operate on a particular file or dataset.
Running a workflow
To run a workflow call:
runner.run()
Iterative Workflows
WorkflowRunner automatically handles stages with for_each definitions, enabling efficient processing of collections.
Execution Model
When a stage contains for_each:
- Expansion: The iteration definitions are expanded into individual iteration contexts
- Parallel Execution: Iterations are submitted to a thread pool for parallel processing
- Context Isolation: Each iteration has its own context - step outputs within an iteration don't affect other iterations
- Result Collection: After all iterations complete, step outputs are collected into dictionaries keyed by iteration indices
Configuring Parallelism
The max_workers parameter controls how many iterations run concurrently:
# For workflows with many iterations, increase max_workers
runner = WorkflowRunner(
workflow=workflow,
context=context,
max_workers=8 # Default is 4
)
Choose max_workers based on:
- Number of CPU cores available
- I/O vs CPU-bound operations in your steps
- Memory constraints (each iteration maintains its own context)
Context Isolation
Each iteration receives an isolated copy of the workflow context:
- Global context: Variables like
$connection,$organizationare readable - Iteration variables: Current item aliases (e.g.,
$current_sheet) and$iterationmetadata - Step outputs: Outputs from steps within the iteration are local to that iteration
This isolation means:
- Iterations can safely run in parallel without conflicts
- Step A in iteration (0,0) and Step A in iteration (0,1) don't interfere
- Final results are merged back into the main context after all iterations complete
Error Handling
Error handling in iterative stages follows step-level configuration:
- Steps with
on_failure: "fail"will cause their iteration to fail, but other iterations continue - Steps with
on_failure: "continue"allow the iteration to proceed despite failures - After all iterations complete, collected results include only successful outputs
Metrics
Iterative stages log aggregate metrics:
{
"iterations": {
"count": 100,
"successful": 98,
"failed": 2
}
}