Skip to main content

Metadata extraction workflows orchestrate the extraction of metadata from SQL databases, including databases, schemas, tables, columns, and stored procedures. They handle batch processing, parallel transformation, and upload to Atlan.

BaseSQLMetadataExtractionWorkflow

Class
📁application_sdk.workflows.metadata_extraction.sql
Inheritance chain:
MetadataExtractionWorkflow

Orchestrates the complete metadata extraction process from SQL databases. Coordinates activities to fetch metadata in batches, transform it using transformers, and upload it to Atlan. Handles parallel extraction of databases, schemas, tables, columns, and procedures with batch processing and parallel transformation.

Properties

activities_clsType[BaseSQLMetadataExtractionActivities]
Required
Activities class for metadata extraction operations
Default: "BaseSQLMetadataExtractionActivities"
application_namestr
Name of the application (default: from APPLICATION_NAME constant)

Methods6

get_activities

static
@staticmethod get_activities(activities: BaseSQLMetadataExtractionActivities) -> Sequence[Callable[..., Any]]
Returns the sequence of activities to be executed by the workflow. Returns list of activity methods: preflight_check, get_workflow_args, fetch_databases, fetch_schemas, fetch_tables, fetch_columns, fetch_procedures, transform_data, upload_to_atlan.
Parameters
activitiesBaseSQLMetadataExtractionActivities
Required
Instance of the activities class
Returns
Sequence[Callable[..., Any]] - List of activity methods to be executed

run

async
@workflow.run async run(self, workflow_config: Dict[str, Any]) -> None
Orchestrates the entire metadata extraction process. Calls base workflow run() for preflight check, retrieves workflow arguments from StateStore, executes fetch functions in parallel using asyncio.gather(), each fetch function triggers fetch and transform operations, and records workflow execution metrics. Uses retry policy with maximum 6 attempts and backoff coefficient 2 for fetch operations.
Parameters
workflow_configDict[str, Any]
Required
Configuration dictionary containing workflow_id
Returns
None - No return value

fetch_and_transform

async
async fetch_and_transform(self, fetch_fn: Callable, workflow_args: Dict[str, Any], retry_policy: RetryPolicy) -> None
Fetches metadata and transforms it in batches. Executes fetch function to get raw metadata statistics, validates statistics (chunk_count, partitions, typename), creates transform batches for parallel processing, executes transform activities in parallel using asyncio.gather(), and aggregates record counts from all transform operations.
Parameters
fetch_fnCallable[[Dict[str, Any]], Coroutine[Any, Any, Dict[str, Any] | None]]
Required
Function to fetch metadata (e.g., fetch_databases, fetch_tables)
workflow_argsDict[str, Any]
Required
Workflow arguments dictionary
retry_policyRetryPolicy
Required
Retry policy for activity execution
Returns
None - No return value

get_transform_batches

get_transform_batches(self, chunk_count: int, typename: str, partitions: List[int]) -> Tuple[List[List[str]], List[int]]
Creates batches for parallel transformation processing. Groups files by chunk number and partition number. Each batch contains files for one chunk: {typename}/chunk-{i}-part{file}.parquet.
Parameters
chunk_countint
Required
Total number of chunks to process
typenamestr
Required
Type name for the chunks (e.g., 'DATABASE', 'TABLE')
partitionsList[int]
Required
List of partition counts for each chunk
Returns
Tuple[List[List[str]], List[int]] - Tuple containing list of batches (each batch is a list of file paths) and list of starting chunk numbers for each batch

get_fetch_functions

get_fetch_functions(self) -> List[Callable]
Returns the list of functions for fetching SQL metadata in execution order: fetch_databases, fetch_schemas, fetch_tables, fetch_columns, fetch_procedures.
Returns
List[Callable] - List of fetch functions in execution order

run_exit_activities

async
async run_exit_activities(self, workflow_args: Dict[str, Any]) -> None
Runs exit activities after workflow completion. If ENABLE_ATLAN_UPLOAD is True, executes upload_to_atlan activity. Otherwise, logs that Atlan upload is skipped.
Parameters
workflow_argsDict[str, Any]
Required
Workflow arguments dictionary
Returns
None - No return value

Usage Examples

Basic workflow usage

Workflow is automatically used when registered with application

from application_sdk.workflows.metadata_extraction.sql import BaseSQLMetadataExtractionWorkflow
from application_sdk.activities.metadata_extraction.sql import BaseSQLMetadataExtractionActivities

app = BaseSQLMetadataExtractionApplication(
name="postgresql-connector",
client_class=SQLClient,
handler_class=BaseSQLHandler
)

await app.setup_workflow(
workflow_and_activities_classes=[
(BaseSQLMetadataExtractionWorkflow, BaseSQLMetadataExtractionActivities)
]
)

Custom workflow with additional activities

Extend the workflow to add custom activities

from application_sdk.workflows.metadata_extraction.sql import BaseSQLMetadataExtractionWorkflow

class CustomMetadataWorkflow(BaseSQLMetadataExtractionWorkflow):
@staticmethod
def get_activities(activities):
# Get base activities
base_activities = list(super().get_activities(activities))

# Add custom activity
base_activities.append(activities.custom_processing)

return base_activities

@workflow.run
async def run(self, workflow_config: Dict[str, Any]) -> None:
# Call base implementation
await super().run(workflow_config)

# Add custom logic
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config
)

# Execute custom activity
await workflow.execute_activity_method(
self.activities_cls.custom_processing,
args=[workflow_args]
)

Workflow flow

  1. Configuration retrieval: Gets workflow arguments from StateStore
  2. Preflight check: Validates database connection and configuration
  3. Parallel extraction: Fetches databases, schemas, tables, columns, and procedures concurrently
  4. Transformation: Transforms raw metadata into Atlas entities using transformers
  5. Upload: Uploads transformed metadata to Atlan (if enabled)

Execution flow

Step-by-step process

  1. Workflow start: Receives workflow_config with workflow_id
  2. Base workflow execution: Calls super().run() which:
    • Retrieves workflow arguments from StateStore
    • Executes preflight check activity
  3. Workflow arguments retrieval: Gets full configuration from StateStore
  4. Parallel fetch operations: Executes all fetch functions concurrently:
    fetch_and_transforms = [
    self.fetch_and_transform(fetch_function, workflow_args, retry_policy)
    for fetch_function in fetch_functions
    ]
    await asyncio.gather(*fetch_and_transforms)
  5. Fetch and transform cycle (for each entity type):
    • Execute fetch activity (for example, fetch_databases)
    • Get statistics (chunk_count, partitions, typename)
    • Create transform batches
    • Execute transform activities in parallel
    • Aggregate results
  6. Exit activities: Execute upload_to_atlan if enabled

Parallel processing

The workflow uses parallel processing at two levels:

Level 1: Entity types (databases, schemas, tables, columns, procedures)

  • All entity types are fetched and transformed concurrently
  • Uses asyncio.gather() for parallel execution

Level 2: Transform batches (within each entity type)

  • Multiple chunks are transformed in parallel
  • Each chunk may have multiple partitions
  • Uses asyncio.gather() for parallel transform execution

Workflow configuration

Workflow configuration is stored in StateStore and retrieved using workflow_id:

workflow_config = {
"workflow_id": "workflow-uuid",
# Additional parameters if needed
}

# Workflow retrieves full configuration from StateStore
workflow_args = {
"credentials": {...},
"connection": {
"connection_name": "production",
"connection_qualified_name": "tenant/connector/1"
},
"metadata": {
"include-filter": '{"^dbengine$":["^public$"]}',
"exclude-filter": "{}",
"temp-table-regex": ""
},
"output_path": "/tmp/output",
"output_prefix": "metadata"
}

Metrics

The workflow records metrics for monitoring:

Workflow execution time:

  • Metric name: workflow_execution_time_seconds
  • Type: GAUGE
  • Labels: workflow_id, workflow_type: "sql_metadata_extraction", status: "success" | "error"

Activity metrics:

  • Individual activities record their own metrics
  • Metrics include execution time, record counts, and error counts

Error handling

Activity errors:

  • Retry policies automatically retry failed activities
  • Maximum 6 attempts with exponential backoff
  • Errors are logged with context

Workflow errors:

  • Workflow execution failures are logged with workflow ID
  • Metrics record error status
  • Temporal UI provides visibility into workflow state

Validation errors:

  • Invalid statistics (missing typename, zero chunks) are handled gracefully
  • Empty results are skipped without error

Performance considerations

Parallel execution:

  • Entity types are processed concurrently for faster execution
  • Transform batches are processed in parallel within each entity type
  • Uses asyncio.gather() for efficient concurrent execution

Batch processing:

  • Metadata is processed in chunks to manage memory
  • Each chunk can have multiple partitions for large datasets
  • Transform operations are batched for parallel processing

Resource management:

  • Activities use timeouts to prevent resource exhaustion
  • Heartbeat timeouts maintain activity health
  • Retry policies prevent cascading failures

See also