Skip to content

Workflow runner

The WorkflowRunner class is compatible with the Workbench Workflow Definition Language (WWDL) schema.

Instantiating a runner

Provided your run-time environment has the necessary environment variables to connect to an Azure Table Storage resource (for persisting log messages), you can create a workflow runner simply by:

from workbench.workflows import WorkflowRunner
from workbench.bindings import AzureBlobSession

session = AzureBlobSession(organization, workspace, session_id, user_id)

# The workflow can be defined in-line, provided it adheres to the schema
workflow = session.workflow

runner = WorkflowRunner(workflow=workflow)

For more details on setting up environment variables, see Local installation.

Workflow context

Some workflows require additional attributes to run. You can pass these attributes to a workflow runner as context when instantiating the workflow runner or by updating the runner's context dictionary.

context = {
    organization: "example-org",
    workspace:"My Test Workspace",
    session_id: "clever-birds-learn".
    user_id: "waad_..."
    }

runner = WorkflowRunner(workflow=workflow, context=context)

Tip

Constructing your workflow to accept a DocumentVersion or Session object via the context is a clean way of scoping your workflow to operate on a particular file or dataset.

Running a workflow

To run a workflow call:

runner.run()

Iterative Workflows

WorkflowRunner automatically handles stages with for_each definitions, enabling efficient processing of collections.

Execution Model

When a stage contains for_each:

  1. Expansion: The iteration definitions are expanded into individual iteration contexts
  2. Parallel Execution: Iterations are submitted to a thread pool for parallel processing
  3. Context Isolation: Each iteration has its own context - step outputs within an iteration don't affect other iterations
  4. Result Collection: After all iterations complete, step outputs are collected into dictionaries keyed by iteration indices

Configuring Parallelism

The max_workers parameter controls how many iterations run concurrently:

# For workflows with many iterations, increase max_workers
runner = WorkflowRunner(
    workflow=workflow,
    context=context,
    max_workers=8  # Default is 4
)

Choose max_workers based on:

  • Number of CPU cores available
  • I/O vs CPU-bound operations in your steps
  • Memory constraints (each iteration maintains its own context)

Context Isolation

Each iteration receives an isolated copy of the workflow context:

  • Global context: Variables like $connection, $organization are readable
  • Iteration variables: Current item aliases (e.g., $current_sheet) and $iteration metadata
  • Step outputs: Outputs from steps within the iteration are local to that iteration

This isolation means:

  • Iterations can safely run in parallel without conflicts
  • Step A in iteration (0,0) and Step A in iteration (0,1) don't interfere
  • Final results are merged back into the main context after all iterations complete

Error Handling

Error handling in iterative stages follows step-level configuration:

  • Steps with on_failure: "fail" will cause their iteration to fail, but other iterations continue
  • Steps with on_failure: "continue" allow the iteration to proceed despite failures
  • After all iterations complete, collected results include only successful outputs

Metrics

Iterative stages log aggregate metrics:

{
  "iterations": {
    "count": 100,
    "successful": 98,
    "failed": 2
  }
}