Skip to main content

Workflows are Temporal-based orchestration logic that sequence and manage the execution of activities to accomplish larger tasks like metadata extraction or query log mining. They provide reliability, state management, and configuration handling for long-running processes.

WorkflowInterface

Abstract Class
📁application_sdk.workflows

Abstract base class that defines the contract for all workflow implementations. All workflows must inherit from this class and implement the required methods. Provides base orchestration logic for retrieving workflow arguments, executing preflight checks, and managing activity execution with retry policies and timeouts.

Properties

activities_clsType[ActivitiesInterface]
Required
The activities class containing activity methods for this workflow
default_heartbeat_timeouttimedelta
Default heartbeat timeout for activity execution (default: from HEARTBEAT_TIMEOUT constant)
default_start_to_close_timeouttimedelta
Default start-to-close timeout for activity execution (default: from START_TO_CLOSE_TIMEOUT constant)

Methods2

get_activities

static
@staticmethod get_activities(activities: ActivitiesInterface) -> Sequence[Callable[..., Any]]
Returns the sequence of activities to be executed by the workflow. Must be implemented by subclasses to define the activity execution order.
Parameters
activitiesActivitiesInterface
Required
Instance of the activities class
Returns
Sequence[Callable[..., Any]] - List of activity methods to be executed

run

async
@workflow.run async run(self, workflow_config: Dict[str, Any]) -> None
Async method that serves as the entry point for workflow execution. Must be decorated with @workflow.run. Base implementation retrieves full workflow arguments from StateStore using get_workflow_args activity and executes preflight_check activity with retry policy. Subclasses typically call await super().run(workflow_config) first, then add specific orchestration logic.
Parameters
workflow_configDict[str, Any]
Required
Initial configuration dictionary that must contain workflow_id
Returns
None - No return value

Usage Examples

Custom workflow implementation

Create a custom workflow by extending WorkflowInterface and implementing required methods

from application_sdk.workflows import WorkflowInterface
from temporalio import workflow

@workflow.defn
class CustomWorkflow(WorkflowInterface):
activities_cls = MyActivities

@staticmethod
def get_activities(activities: MyActivities):
return [
activities.setup,
activities.process_data,
activities.cleanup
]

@workflow.run
async def run(self, workflow_config: Dict[str, Any]) -> None:
# Call base implementation for preflight check
await super().run(workflow_config)

# Get workflow arguments
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config,
retry_policy=RetryPolicy(maximum_attempts=3)
)

# Custom orchestration logic
await workflow.execute_activity_method(
self.activities_cls.process_data,
args=[workflow_args],
retry_policy=RetryPolicy(maximum_attempts=6)
)

Workflow registration

Register workflows with the application using setup_workflow

from application_sdk.application import BaseApplication

app = BaseApplication(name="my-app")

await app.setup_workflow(
workflow_and_activities_classes=[
(MyWorkflow, MyActivities)
]
)

Available workflows

The Application SDK provides base workflow implementations for common patterns:

Metadata extraction workflows

Workflows for extracting metadata from data sources, including databases, schemas, tables, and columns.

See: Metadata extraction workflows for complete documentation.

Base classes:

  • MetadataExtractionWorkflow: Base class for metadata extraction workflows
  • BaseSQLMetadataExtractionWorkflow: Concrete implementation for SQL metadata extraction

Key features:

  • Orchestrates extraction of databases, schemas, tables, columns, and procedures
  • Handles batch processing and parallel transformation

Query extraction workflows

Workflows to implement Miner which extract queries.

See: Query extraction workflows for complete documentation.

Base classes:

  • QueryExtractionWorkflow: Base class for query extraction workflows
  • SQLQueryExtractionWorkflow: Concrete implementation for SQL query extraction

Key features:

  • Extracts queries from database query logs
  • Handles batch processing of query data
  • Supports time-range based query extraction

Workflow execution flow

Base workflow execution

  1. Receive configuration: Workflow receives workflow_config with workflow_id
  2. Retrieve arguments: Calls get_workflow_args activity to fetch full configuration from StateStore
  3. Preflight check: Executes preflight_check activity to validate configuration
  4. Custom orchestration: Subclasses implement specific activity orchestration logic

Activity execution

Activities are executed using workflow.execute_activity_method:

result = await workflow.execute_activity_method(
self.activities_cls.activity_name,
args=[arg1, arg2],
retry_policy=RetryPolicy(maximum_attempts=3, backoff_coefficient=2),
start_to_close_timeout=self.default_start_to_close_timeout,
heartbeat_timeout=self.default_heartbeat_timeout
)

Parameters:

  • Activity method reference
  • Arguments list
  • Retry policy (maximum attempts, backoff coefficient)
  • Timeouts (start-to-close, heartbeat)

Retry policies

Workflows use Temporal retry policies for activity execution:

retry_policy = RetryPolicy(
maximum_attempts=6,
backoff_coefficient=2
)

Common retry policy settings:

  • Preflight checks: 2 attempts, backoff coefficient 2
  • Data fetching: 6 attempts, backoff coefficient 2
  • Configuration retrieval: 3 attempts, backoff coefficient 2

Timeout configuration

Workflows use default timeouts that can be customized:

class MyWorkflow(WorkflowInterface):
default_heartbeat_timeout: timedelta = timedelta(seconds=300)
default_start_to_close_timeout: timedelta = timedelta(minutes=30)

Timeout types:

  • Start-to-close timeout: Maximum time for activity execution
  • Heartbeat timeout: Maximum time between activity heartbeats

State management

Workflows use StateStore to retrieve and manage configuration:

# Retrieve workflow arguments from StateStore
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config # Contains workflow_id
)

Workflow configuration structure:

  • workflow_id: Unique identifier for the workflow
  • credentials: Authentication credentials
  • connection: Connection information
  • metadata: Metadata filtering and configuration
  • Additional workflow-specific parameters

Error handling

Workflows handle errors at multiple levels:

Activity errors:

  • Retry policies automatically retry failed activities
  • Errors are logged with context
  • Workflow continues or fails based on retry policy

Workflow errors:

  • Workflow execution can be monitored via Temporal UI
  • Errors are logged with workflow ID and run ID
  • Metrics are recorded for workflow execution time and status

Metrics

Workflows automatically record metrics:

  • Workflow execution time: Total time for workflow execution
  • Workflow status: Success or error status
  • Activity execution: Individual activity metrics

See also