Metadata extraction workflows orchestrate the extraction of metadata from SQL databases, including databases, schemas, tables, columns, and stored procedures. They handle batch processing, parallel transformation, and upload to Atlan.
BaseSQLMetadataExtractionWorkflow
Classapplication_sdk.workflows.metadata_extraction.sqlMetadataExtractionWorkflowOrchestrates the complete metadata extraction process from SQL databases. Coordinates activities to fetch metadata in batches, transform it using transformers, and upload it to Atlan. Handles parallel extraction of databases, schemas, tables, columns, and procedures with batch processing and parallel transformation.
Properties
activities_clsType[BaseSQLMetadataExtractionActivities]"BaseSQLMetadataExtractionActivities"application_namestrMethods6
get_activities
@staticmethod get_activities(activities: BaseSQLMetadataExtractionActivities) -> Sequence[Callable[..., Any]]Parameters
activitiesBaseSQLMetadataExtractionActivitiesReturns
Sequence[Callable[..., Any]] - List of activity methods to be executedrun
@workflow.run async run(self, workflow_config: Dict[str, Any]) -> NoneParameters
workflow_configDict[str, Any]Returns
None - No return valuefetch_and_transform
async fetch_and_transform(self, fetch_fn: Callable, workflow_args: Dict[str, Any], retry_policy: RetryPolicy) -> NoneParameters
fetch_fnCallable[[Dict[str, Any]], Coroutine[Any, Any, Dict[str, Any] | None]]workflow_argsDict[str, Any]retry_policyRetryPolicyReturns
None - No return valueget_transform_batches
get_transform_batches(self, chunk_count: int, typename: str, partitions: List[int]) -> Tuple[List[List[str]], List[int]]Parameters
chunk_countinttypenamestrpartitionsList[int]Returns
Tuple[List[List[str]], List[int]] - Tuple containing list of batches (each batch is a list of file paths) and list of starting chunk numbers for each batchget_fetch_functions
get_fetch_functions(self) -> List[Callable]Returns
List[Callable] - List of fetch functions in execution orderrun_exit_activities
async run_exit_activities(self, workflow_args: Dict[str, Any]) -> NoneParameters
workflow_argsDict[str, Any]Returns
None - No return valueUsage Examples
Basic workflow usage
Workflow is automatically used when registered with application
from application_sdk.workflows.metadata_extraction.sql import BaseSQLMetadataExtractionWorkflow
from application_sdk.activities.metadata_extraction.sql import BaseSQLMetadataExtractionActivities
app = BaseSQLMetadataExtractionApplication(
name="postgresql-connector",
client_class=SQLClient,
handler_class=BaseSQLHandler
)
await app.setup_workflow(
workflow_and_activities_classes=[
(BaseSQLMetadataExtractionWorkflow, BaseSQLMetadataExtractionActivities)
]
)
Custom workflow with additional activities
Extend the workflow to add custom activities
from application_sdk.workflows.metadata_extraction.sql import BaseSQLMetadataExtractionWorkflow
class CustomMetadataWorkflow(BaseSQLMetadataExtractionWorkflow):
@staticmethod
def get_activities(activities):
# Get base activities
base_activities = list(super().get_activities(activities))
# Add custom activity
base_activities.append(activities.custom_processing)
return base_activities
@workflow.run
async def run(self, workflow_config: Dict[str, Any]) -> None:
# Call base implementation
await super().run(workflow_config)
# Add custom logic
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config
)
# Execute custom activity
await workflow.execute_activity_method(
self.activities_cls.custom_processing,
args=[workflow_args]
)
Workflow flow
- Configuration retrieval: Gets workflow arguments from StateStore
- Preflight check: Validates database connection and configuration
- Parallel extraction: Fetches databases, schemas, tables, columns, and procedures concurrently
- Transformation: Transforms raw metadata into Atlas entities using transformers
- Upload: Uploads transformed metadata to Atlan (if enabled)
Execution flow
Step-by-step process
- Workflow start: Receives
workflow_configwithworkflow_id - Base workflow execution: Calls
super().run()which:- Retrieves workflow arguments from StateStore
- Executes preflight check activity
- Workflow arguments retrieval: Gets full configuration from StateStore
- Parallel fetch operations: Executes all fetch functions concurrently:
fetch_and_transforms = [
self.fetch_and_transform(fetch_function, workflow_args, retry_policy)
for fetch_function in fetch_functions
]
await asyncio.gather(*fetch_and_transforms) - Fetch and transform cycle (for each entity type):
- Execute fetch activity (for example,
fetch_databases) - Get statistics (chunk_count, partitions, typename)
- Create transform batches
- Execute transform activities in parallel
- Aggregate results
- Execute fetch activity (for example,
- Exit activities: Execute
upload_to_atlanif enabled
Parallel processing
The workflow uses parallel processing at two levels:
Level 1: Entity types (databases, schemas, tables, columns, procedures)
- All entity types are fetched and transformed concurrently
- Uses
asyncio.gather()for parallel execution
Level 2: Transform batches (within each entity type)
- Multiple chunks are transformed in parallel
- Each chunk may have multiple partitions
- Uses
asyncio.gather()for parallel transform execution
Workflow configuration
Workflow configuration is stored in StateStore and retrieved using workflow_id:
workflow_config = {
"workflow_id": "workflow-uuid",
# Additional parameters if needed
}
# Workflow retrieves full configuration from StateStore
workflow_args = {
"credentials": {...},
"connection": {
"connection_name": "production",
"connection_qualified_name": "tenant/connector/1"
},
"metadata": {
"include-filter": '{"^dbengine$":["^public$"]}',
"exclude-filter": "{}",
"temp-table-regex": ""
},
"output_path": "/tmp/output",
"output_prefix": "metadata"
}
Metrics
The workflow records metrics for monitoring:
Workflow execution time:
- Metric name:
workflow_execution_time_seconds - Type: GAUGE
- Labels:
workflow_id,workflow_type: "sql_metadata_extraction",status: "success" | "error"
Activity metrics:
- Individual activities record their own metrics
- Metrics include execution time, record counts, and error counts
Error handling
Activity errors:
- Retry policies automatically retry failed activities
- Maximum 6 attempts with exponential backoff
- Errors are logged with context
Workflow errors:
- Workflow execution failures are logged with workflow ID
- Metrics record error status
- Temporal UI provides visibility into workflow state
Validation errors:
- Invalid statistics (missing typename, zero chunks) are handled gracefully
- Empty results are skipped without error
Performance considerations
Parallel execution:
- Entity types are processed concurrently for faster execution
- Transform batches are processed in parallel within each entity type
- Uses
asyncio.gather()for efficient concurrent execution
Batch processing:
- Metadata is processed in chunks to manage memory
- Each chunk can have multiple partitions for large datasets
- Transform operations are batched for parallel processing
Resource management:
- Activities use timeouts to prevent resource exhaustion
- Heartbeat timeouts maintain activity health
- Retry policies prevent cascading failures
See also
- Workflows overview: Overview of all workflows
- Query extraction workflows: Workflows for query extraction
- StateStore: Persistent state management for workflows
- Application SDK README: Overview of the Application SDK and its components