Workflows are Temporal-based orchestration logic that sequence and manage the execution of activities to accomplish larger tasks like metadata extraction or query log mining. They provide reliability, state management, and configuration handling for long-running processes.
WorkflowInterface
Abstract Classapplication_sdk.workflowsAbstract base class that defines the contract for all workflow implementations. All workflows must inherit from this class and implement the required methods. Provides base orchestration logic for retrieving workflow arguments, executing preflight checks, and managing activity execution with retry policies and timeouts.
Properties
activities_clsType[ActivitiesInterface]default_heartbeat_timeouttimedeltadefault_start_to_close_timeouttimedeltaMethods2
get_activities
@staticmethod get_activities(activities: ActivitiesInterface) -> Sequence[Callable[..., Any]]Parameters
activitiesActivitiesInterfaceReturns
Sequence[Callable[..., Any]] - List of activity methods to be executedrun
@workflow.run async run(self, workflow_config: Dict[str, Any]) -> NoneParameters
workflow_configDict[str, Any]Returns
None - No return valueUsage Examples
Custom workflow implementation
Create a custom workflow by extending WorkflowInterface and implementing required methods
from application_sdk.workflows import WorkflowInterface
from temporalio import workflow
@workflow.defn
class CustomWorkflow(WorkflowInterface):
activities_cls = MyActivities
@staticmethod
def get_activities(activities: MyActivities):
return [
activities.setup,
activities.process_data,
activities.cleanup
]
@workflow.run
async def run(self, workflow_config: Dict[str, Any]) -> None:
# Call base implementation for preflight check
await super().run(workflow_config)
# Get workflow arguments
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config,
retry_policy=RetryPolicy(maximum_attempts=3)
)
# Custom orchestration logic
await workflow.execute_activity_method(
self.activities_cls.process_data,
args=[workflow_args],
retry_policy=RetryPolicy(maximum_attempts=6)
)
Workflow registration
Register workflows with the application using setup_workflow
from application_sdk.application import BaseApplication
app = BaseApplication(name="my-app")
await app.setup_workflow(
workflow_and_activities_classes=[
(MyWorkflow, MyActivities)
]
)
Available workflows
The Application SDK provides base workflow implementations for common patterns:
Metadata extraction workflows
Workflows for extracting metadata from data sources, including databases, schemas, tables, and columns.
See: Metadata extraction workflows for complete documentation.
Base classes:
MetadataExtractionWorkflow: Base class for metadata extraction workflowsBaseSQLMetadataExtractionWorkflow: Concrete implementation for SQL metadata extraction
Key features:
- Orchestrates extraction of databases, schemas, tables, columns, and procedures
- Handles batch processing and parallel transformation
Query extraction workflows
Workflows to implement Miner which extract queries.
See: Query extraction workflows for complete documentation.
Base classes:
QueryExtractionWorkflow: Base class for query extraction workflowsSQLQueryExtractionWorkflow: Concrete implementation for SQL query extraction
Key features:
- Extracts queries from database query logs
- Handles batch processing of query data
- Supports time-range based query extraction
Workflow execution flow
Base workflow execution
- Receive configuration: Workflow receives
workflow_configwithworkflow_id - Retrieve arguments: Calls
get_workflow_argsactivity to fetch full configuration from StateStore - Preflight check: Executes
preflight_checkactivity to validate configuration - Custom orchestration: Subclasses implement specific activity orchestration logic
Activity execution
Activities are executed using workflow.execute_activity_method:
result = await workflow.execute_activity_method(
self.activities_cls.activity_name,
args=[arg1, arg2],
retry_policy=RetryPolicy(maximum_attempts=3, backoff_coefficient=2),
start_to_close_timeout=self.default_start_to_close_timeout,
heartbeat_timeout=self.default_heartbeat_timeout
)
Parameters:
- Activity method reference
- Arguments list
- Retry policy (maximum attempts, backoff coefficient)
- Timeouts (start-to-close, heartbeat)
Retry policies
Workflows use Temporal retry policies for activity execution:
retry_policy = RetryPolicy(
maximum_attempts=6,
backoff_coefficient=2
)
Common retry policy settings:
- Preflight checks: 2 attempts, backoff coefficient 2
- Data fetching: 6 attempts, backoff coefficient 2
- Configuration retrieval: 3 attempts, backoff coefficient 2
Timeout configuration
Workflows use default timeouts that can be customized:
class MyWorkflow(WorkflowInterface):
default_heartbeat_timeout: timedelta = timedelta(seconds=300)
default_start_to_close_timeout: timedelta = timedelta(minutes=30)
Timeout types:
- Start-to-close timeout: Maximum time for activity execution
- Heartbeat timeout: Maximum time between activity heartbeats
State management
Workflows use StateStore to retrieve and manage configuration:
# Retrieve workflow arguments from StateStore
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config # Contains workflow_id
)
Workflow configuration structure:
workflow_id: Unique identifier for the workflowcredentials: Authentication credentialsconnection: Connection informationmetadata: Metadata filtering and configuration- Additional workflow-specific parameters
Error handling
Workflows handle errors at multiple levels:
Activity errors:
- Retry policies automatically retry failed activities
- Errors are logged with context
- Workflow continues or fails based on retry policy
Workflow errors:
- Workflow execution can be monitored via Temporal UI
- Errors are logged with workflow ID and run ID
- Metrics are recorded for workflow execution time and status
Metrics
Workflows automatically record metrics:
- Workflow execution time: Total time for workflow execution
- Workflow status: Success or error status
- Activity execution: Individual activity metrics
See also
- Metadata extraction workflows: Complete reference for metadata extraction workflows
- Query extraction workflows: Complete reference for query extraction workflows
- StateStore: Persistent state management for workflows
- Application SDK README: Overview of the Application SDK and its components