Skip to content

Microsoft Azure

AzureAILanguageClient

Basic wrapper around the Azure TextAnalyticsClient.

https://learn.microsoft.com/en-us/python/api/overview/azure/ai-textanalytics-readme?view=azure-python

__init__

__init__(api_key=None, azure_endpoint=None)

Initialize client.

Parameters:

Name Type Description Default
api_key str | None

Credential for using the service. Defaults to environment variable AZURE_AI_LANGUAGE_KEY, then AZURE_FOUNDRY_API_KEY.

None
azure_endpoint str | None

URL AI Language resource, for example https://your-resource.cognitiveservices.azure.com/. Defaults to environment variable AZURE_AI_LANGUAGE_ENDPOINT, then AZURE_AI_ENDPOINT.

None

AzureBlobStorageClient

Wrapper around the Azure BlobServiceClient that implements additional specialist methods.

https://learn.microsoft.com/en-us/python/api/overview/azure/storage-blob-readme?view=azure-python

Attributes:

Name Type Description
organization str

Organization containing the workspace session to connect to. Must match an Azure Blob Storage container name.

workspace str

The workspace containing the session.

session_id str

The id of the session.

directory str

Default directory where new blobs will be created. Concatenation of workspace and session_id.

connection_string str

Credential for connecting to the Azure Blob Storage resource.

url_prefix str

The root URL to the Azure Blob Storage resource.

client BlobServiceClient

Blob service client. Use to access all sub methods.

__init__

__init__(organization, workspace, session_id, connection_string=None, url_prefix=None, connection_pool_maxsize=DEFAULT_CONNECTION_POOL_MAXSIZE)

Initialize client.

Parameters:

Name Type Description Default
organization str

Organization containing the workspace session to connect to. Must match an Azure Blob Storage container name.

required
workspace str

The workspace containing the session. Must match a folder in the parent container.

required
session_id str

The id of the session. Must match a folder in the parent workspace folder in the organization container.

required
connection_string str | None

Credential for connecting to the Azure Blob Storage resource. Defaults to environment variable BLOB_CONNECTION_STRING.

None
url_prefix str | None

The root URL to the Azure Blob Storage resource. Defaults to environment variable BLOB_URL_PREFIX.

None
connection_pool_maxsize int

Per-host HTTP connection-pool size for the underlying blob service client. Set this >= the concurrency of any parallel op run through this client so threads don't contend for the default 10-slot pool.

DEFAULT_CONNECTION_POOL_MAXSIZE

Raises:

Type Description
ValueError

If required parameters are not provided and environment variables are not set.

batch_download_blob_jsons

batch_download_blob_jsons(blob_names, max_workers=4, max_retries=3)

Download multiple JSON blobs in parallel with retry logic.

Parameters:

Name Type Description Default
blob_names list[str]

List of blob paths to download.

required
max_workers int

Number of parallel download threads (default 4 to avoid overwhelming Azure connections).

4
max_retries int

Maximum retry attempts per blob on transient failures.

3

Returns:

Type Description
dict[str, Any]

Dictionary mapping blob_name to parsed JSON content.

dict[str, Any]

Failed downloads are logged and excluded from results.

download_blob_json

download_blob_json(blob_name)

Used to download and access JSON files.

Parameters:

Name Type Description Default
blob_name str

The path to the blob in the container.

required

Returns:

Type Description
Any

The JSON content of the blob, likely a list or dictionary.

get_blob_client

get_blob_client(blob_name)

Return a client for interacting with blob objects.

Parameters:

Name Type Description Default
blob_name str

Name of the blob.

required

Returns:

Type Description
BlobClient

Blob client.

get_signed_url

get_signed_url(blob_name, minutes=720)

Generate a signed URL for an Azure Blob Storage object, valid for specified duration.

Parameters:

Name Type Description Default
blob_name str

Name of the blob.

required
minutes int

Time duration for validity of URL.

720

Returns:

Type Description
str

The signed URL.

index_blobs

index_blobs(all_blobs, document_ids)

Index a pre-fetched blob list by document ID. Pure, no I/O.

Lets callers that already hold a full LIST (e.g. Session.initialize) build the child-blob index without paying for a second LIST.

Parameters:

Name Type Description Default
all_blobs list[BlobProperties]

Blobs to index, typically from list_all_blobs().

required
document_ids list[str]

Document IDs (full path prefixes) to index against.

required

Returns:

Type Description
dict[str, list[BlobProperties]]

Dictionary mapping document ID to list of child BlobProperties.

dict[str, list[BlobProperties]]

Blobs that don't match any document ID are not included.

list_all_blobs

list_all_blobs(max_retries=3)

List every blob under the session directory in a single API call.

Resilient to transient listing failures (exponential back-off). Raises the last exception once retries are exhausted so callers can fail loud; an empty session simply returns an empty list (no exception).

Parameters:

Name Type Description Default
max_retries int

Maximum retry attempts on transient failures.

3

Returns:

Type Description
list[BlobProperties]

All blobs under {directory}/, unfiltered.

list_all_blobs_indexed

list_all_blobs_indexed(document_ids, max_retries=3)

List all blobs in the session directory and index them by document ID.

This method fetches all blobs in a single API call and builds an index mapping each document ID to its child blobs. This is much more efficient than calling list_blobs_with_prefix() for each document individually.

Parameters:

Name Type Description Default
document_ids list[str]

List of document IDs to build the index for. Each ID should be the full path prefix for that document.

required
max_retries int

Maximum retry attempts on transient failures.

3

Returns:

Type Description
dict[str, list[BlobProperties]]

Dictionary mapping document ID to list of child BlobProperties.

dict[str, list[BlobProperties]]

Blobs that don't match any document ID are not included. Returns an

dict[str, list[BlobProperties]]

empty index if listing fails after all retries (allows partial

dict[str, list[BlobProperties]]

processing for callers that prefer degradation over raising).

list_blobs_in_directory

list_blobs_in_directory(ignore_files=None)

Lists all blobs in the client directory within an Azure Blob container.

Parameters:

Name Type Description Default
ignore_files list[str] | None

Blob names containing any string in this list will not be included in the outputted list.

None

Returns:

Type Description
list[BlobProperties]

A list of blobs in the client directory.

list_blobs_with_prefix

list_blobs_with_prefix(prefix, ignore_files=None)

Lists all blobs in the Azure Blob container with the supplied prefix.

Parameters:

Name Type Description Default
prefix str

The prefix to search for

required
ignore_files list[str] | None

Blob names containing any string in this list will not be included in the outputted list

None

Returns:

Type Description
list[BlobProperties]

A list of blobs with the matching prefix.

upload_blob_json

upload_blob_json(blob_name, blob_content)

Used to upload JSON files to a blob in the container.

Parameters:

Name Type Description Default
blob_name str

The path to the blob in the container.

required
blob_content str

The JSON-formatted content to be uploaded.

required

Returns:

Type Description
dict[str, Any]

Blob updated property dictionary.

AzureKeyVaultClient

Wrapper around the Azure SecretClient. Uses DefaultAzureCredential for credential, and therefore expects either a managed identity or an identity currently logged into Azure CLI.

https://learn.microsoft.com/en-us/python/api/overview/azure/key-vault?view=azure-python

__init__

__init__(vault_url=None)

Initialize client.

Parameters:

Name Type Description Default
vault_url str | None

URL of the Azure Key Vault resource e.g. https://your-resource.vault.azure.net/. Defaults to environment variable KEY_VAULT_NAME

None

Raises:

Type Description
ValueError

If vault_url is not provided and KEY_VAULT_NAME environment variable is not set.

get_secret

get_secret(secret_name)

Retrieve a secret from the Key Vault.

Parameters:

Name Type Description Default
secret_name str

Name of the secret to retrieve

required

Returns:

Type Description
str | None

Retrieved secret value, or None if not found

set_secret

set_secret(secret_name, secret_value)

Set a secret in the Key Vault.

Parameters:

Name Type Description Default
secret_name str

Name of the secret to set

required
secret_value str

Value of the secret

required

Returns:

Type Description
bool

True if operation successful

AzureOpenAIClient

A wrapper around the AzureOpenAi class.

https://github.com/openai/openai-python?tab=readme-ov-file#microsoft-azure-openai

Attributes:

Name Type Description
api_key str

API key for Azure resource. If not provided will default to environment variable AZURE_FOUNDRY_API_KEY, then AZURE_OPENAI_API_KEY.

azure_endpoint str

Your Azure endpoint, including the resource, e.g. https://example-resource.azure.openai.com/. if not provided will default to environment variable AZURE_OPENAI_ENDPOINT.

api_version str

API version for Azure resource. If not provided will default to environment variable OPENAI_API_VERSION, then 2024-10-21.

model str

Model deployment name within the Azure resource. If not provided will default to environment variable AZURE_OPENAI_DEPLOYMENT.

response_format dict[Any, Any] | None

The type of response to request from the client. For example for JSON: { "type": "json_object" }.

client

The AzureOpenAI client. Can be used to access other sub methods.

__del__

__del__()

Destructor to ensure cleanup if close() wasn't called.

__enter__

__enter__()

Context manager entry point.

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit point - ensures connections are cleaned up.

__init__

__init__(api_key=None, azure_endpoint=os.getenv('AZURE_OPENAI_ENDPOINT'), api_version=None, model=os.getenv('AZURE_OPENAI_DEPLOYMENT'), response_format=None, temperature=0.1, max_connections=50, max_keepalive_connections=20, timeout=600.0)

Initialize the client with connection pooling configuration.

Parameters:

Name Type Description Default
api_key str | None

API key for Azure resource. If not provided will default to environment variable AZURE_FOUNDRY_API_KEY, then AZURE_OPENAI_API_KEY.

None
azure_endpoint str | None

Your Azure endpoint, including the resource, e.g. https://example-resource.azure.openai.com/. if not provided will default to environment variable AZURE_OPENAI_ENDPOINT.

getenv('AZURE_OPENAI_ENDPOINT')
api_version str | None

API version for Azure resource. If not provided will default to environment variable OPENAI_API_VERSION, then 2024-10-21.

None
model str | None

Model deployment name within the Azure resource. If not provided will default to environment variable AZURE_OPENAI_DEPLOYMENT.

getenv('AZURE_OPENAI_DEPLOYMENT')
response_format dict[Any, Any] | None

The type of response to request from the client. For example for JSON: { "type": "json_object" }.

None
temperature float | None

Sampling temperature for chat completions. Defaults to 0.1 — low enough for near-deterministic factual extraction, with a small amount of stochasticity retained. Pass None to defer to the API default. Pass a higher value (e.g. 0.7) per-instance for creativity-leaning operations such as description generation.

0.1
max_connections int

Maximum number of concurrent connections (default: 50).

50
max_keepalive_connections int

Maximum number of keepalive connections to maintain (default: 20).

20
timeout float

Read/write timeout in seconds (default: 600.0 / 10 minutes, matching OpenAI defaults).

600.0

call_chat

call_chat(messages, max_retries=5, max_completion_tokens=None)

Call the chat completions API.

Parameters:

Name Type Description Default
messages list[dict[str, Any]]

List of dictionary objects specifying the messages to send. Messages must adhere to the prompting standard.

required
max_retries int

Number of times to call the API before raising an error

5
max_completion_tokens int | None

Maximum number of tokens to generate in the response. If None, uses API default.

None

Returns:

Type Description
str

Response from the chat API.

Raises:

Type Description
RuntimeError

If attempts exceeds max_retries

call_embedding

call_embedding(batch, max_retries=5)

Call the embeddings API.

Parameters:

Name Type Description Default
batch list[str]

List of strings to embed

required
max_retries int

Number of times to call the API before raising an error

5

Returns:

Type Description
list[list[float]]

List of embeddings

Raises:

Type Description
RuntimeError

If attempts exceeds max_retries

close

close()

Explicitly close the OpenAI client and release all connections.

Call this method when you're done using the client to ensure connections are properly cleaned up, especially in high-concurrency scenarios.

AzureTableLogHandler

Bases: Handler

Client to write workflow logs to an Azure Table Storage resource, adopting a log key and partition strategy for fast search/filtering across organizations, workspaces and sessions.

https://learn.microsoft.com/en-us/python/api/overview/azure/tables?view=azure-python

Attributes:

Name Type Description
table_name

Name of the log table.

host

The hostname of the host running the process.

start_time

When the logger was initialized. Used to set workflow start time.

parent

Parent WorkflowRunner object. Used to access workflow context.

service_client

Client for interacting with Azure Table Storage resource.

table_client

Client for interacting with table in the Azure Table Storage resource.

__init__

__init__(parent, connection_string=os.environ.get('BLOB_CONNECTION_STRING'), table_name='WorkflowLogs', host=socket.gethostname())

Initialize the client.

Parameters:

Name Type Description Default
parent WorkflowRunner

Parent WorkflowRunner object. Used to access workflow context.

required
connection_string str | None

Credential for connecting to Azure Table Storage resource. Defaults to environment variable BLOB_CONNECTION_STRING.

get('BLOB_CONNECTION_STRING')
table_name str

Name of the log table.

'WorkflowLogs'
host str

The hostname of the host running the process.

gethostname()

emit

emit(record)

Emits log messages to Table Storage, duplicating across partitions and indexing chronologicallys:

  1. messages
    • General log messages
  2. {organization}_{workspace}_messages
    • General log messages for a workspace
  3. {organization}_{workspace}_{session_id}_messages
    • General log messages for a session

emit_metrics

emit_metrics(record)

Emits log messages to Table Storage, duplicating across partitions and indexing chronologicallys:

  1. metrics
    • Metrics on document counts, processing time etc
  2. {organization}_metrics
    • Metrics for an organization
  3. {organization}_{workspace}_metrics
    • Metrics for a workspace

AzureVectorStorageClient

Wrapper around the Azure SearchClient with specialist methods for HARDR classification.

This client creates a single SearchClient instance that is reused across all calls, making it thread-safe and efficient for concurrent operations.

https://learn.microsoft.com/en-us/python/api/overview/azure/search-documents-readme?view=azure-python

__del__

__del__()

Destructor to ensure cleanup if close() wasn't called.

__enter__

__enter__()

Context manager entry point.

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit point - ensures connections are cleaned up.

__init__

__init__(index_name='uniclass-prod', endpoint=None, connection_timeout=10.0, read_timeout=120.0, max_pool_size=50)

Initialize the client with a reusable SearchClient connection and connection pooling.

Parameters:

Name Type Description Default
index_name str

The vector index to connect to.

'uniclass-prod'
endpoint str | None

The URL for the Azure AI Search resource. Defaults to environment variable AZURE_SEARCH_ENDPOINT.

None
connection_timeout float

Connection timeout in seconds (default: 10.0).

10.0
read_timeout float

Read timeout in seconds (default: 120.0).

120.0
max_pool_size int

Maximum number of connections in the pool (default: 50).

50

Raises:

Type Description
ValueError

If endpoint is not provided and environment variable is not set.

close

close()

Explicitly close the search client and transport, releasing all connections.

Call this method when you're done using the client to ensure connections are properly cleaned up, especially in high-concurrency scenarios.

create_or_update_index

create_or_update_index(index)

Create or update a search index (idempotent).

Generic provisioning entry point; pass :func:build_fingerprint_index for the duplicate-detection index. Safe to call repeatedly — run once per environment from a provisioning script/notebook, not on the hot path.

get_document

get_document(key, selected_fields=None)

Fetch a single index document by key, or None if it does not exist.

Used by the duplicate-search layer to load the query document's stored artefacts (hashes, MinHash signature, embeddings) before fanning out the Set A / Set B sub-queries.

neighbours_from_text

neighbours_from_text(text, filter, top=10, vector_fields=None, scoring_profile='default')

Retrieve the top 'n' nearest neighbours to an input text query.

Parameters:

Name Type Description Default
text str

Text to search for

required
filter str

ODATA filter query to limit the scope of the search. For example for a Uniclass index, to scope to the Materials table use 'subsystem eq 'Materials'.

required
top int

The number of matches to return.

10
vector_fields list[str] | None

The vector fields to include in the search. Must be at least three vector fields. Each vector field is weighted differently in the search results: 1. 2.0 2. 0.5 3. 1.0

None
scoring_profile str

The name of the vector search

'default'

Returns:

Type Description
list[dict[str, Any]]

Dictionary of nearest neighbours. Items have the following fields:

  • code (str): The ID or reference code for the item
  • title (str): Plain-text descriptor for the item
  • examples (str): Extended description of the item
  • similarity (float): Similarity score

search_documents

search_documents(filter, select=None, top=1000)

Return all documents matching an ODATA filter (no scoring).

A plain filtered fetch used for the deterministic Set A tiers — exact hash equality and LSH-band candidate retrieval. search_text="*" with no vector query means results are filter-only.

semantic_search(text, filter, top=10, scoring_profile='default', semantic_configuration='default', vector_search=False, max_retries=3, initial_delay=1.0)

Retrieve the top 'n' semantic search matches with exponential backoff retry logic.

Parameters:

Name Type Description Default
text str

Text to search for

required
filter str

ODATA filter query to limit the scope of the search. For example for a Uniclass index, to scope to the Materials table use 'subsystem eq 'Materials'.

required
top int

The number of matches to return.

10
scoring_profile str

Profile for weighting search fields and applying boosting

'default'
semantic_configuration str

Describe the title, content, and keywords fields that will be used for semantic ranking, captions, highlights, and answers.

'default'
vector_search bool

Whether to include vector search in the query

False
max_retries int

Maximum number of retry attempts (default 3)

3
initial_delay float

Initial delay in seconds for exponential backoff (default 1.0)

1.0

Returns: Dictionary of nearest neighbours. Items have the following fields:

    - code (str): The ID or reference code for the item
    - title (str): Plain-text descriptor for the item
    - examples (str): Extended description of the item
    - similarity (float): Similarity score

upsert

upsert(records, max_retries=3, initial_delay=1.0)

Idempotently upsert documents into this client's index with backoff.

Uses merge_or_upload_documents, so records sharing an existing key overwrite rather than duplicate (the fingerprint key is deterministic; see :func:workbench.generators.fingerprint.fingerprint_key).

Parameters:

Name Type Description Default
records list[dict[str, Any]]

Index documents to upsert.

required
max_retries int

Maximum attempts on transient errors.

3
initial_delay float

Initial backoff delay in seconds (doubles each retry).

1.0

Returns:

Type Description
list[Any]

The per-document upload results from Azure AI Search.

weighted_vector_search(vector_queries, filter, top=10, k_nearest_neighbors=50, select=None)

Combined multi-field weighted vector search in a single request.

Each (vector, field, weight) becomes a VectorizedQuery (a precomputed embedding, not the VectorizableTextQuery server-side path used by the Uniclass/NRM classifiers). Azure fuses the per-field result sets via Reciprocal Rank Fusion, scaled by the weights — mirroring the multi-field pattern in neighbours_from_text. The fused @search.score (an RRF score, not a raw cosine) is exposed as similarity. Backs the semantic Set B (related) tier.

k_nearest_neighbors (the ANN retrieval depth per field) is held independent of top so the fused ranking is stable: a larger top reveals more of the same ordering rather than re-fusing a different candidate set and reshuffling the head. It is raised to top when top is larger, so the request can always return top results.

build_fingerprint_index

build_fingerprint_index(name=DEFAULT_DUPLICATES_INDEX, dimensions=DEFAULT_EMBEDDING_DIMENSIONS)

Build the :class:SearchIndex definition for the fingerprint index.

Returned (rather than created) so provisioning scripts, tests and the live client all share one schema. See the duplicate-detection spec §6.2. The deterministic record key is produced by :func:workbench.generators.fingerprint.fingerprint_key.