Skip to main content

AtlasTransformer

Class
📁application_sdk.transformers.atlas

Converts raw metadata into Atlas entities using pyatlan library classes. Processes metadata row-by-row, creating properly structured Atlas entities with relationships, qualified names, and workflow metadata enrichment. Uses entity class definitions to transform raw metadata into Atlas-compatible entities for each entity type (DATABASE, SCHEMA, TABLE, etc.).

Methods4

__init__

__init__(self, connector_name, tenant_id, current_epoch='0', connection_qualified_name=None)
Initialize the transformer with connector name and tenant ID. Optionally specify epoch timestamp and connection qualified name.
Parameters
connector_namestr
Required
Name of the connector
tenant_idstr
Required
ID of the tenant
current_epochstr
Optional
Current epoch timestamp for versioning (default: '0')
connection_qualified_namestr
Optional
Qualified name for the connection

transform_metadata

transform_metadata(self, typename, dataframe, workflow_id, workflow_run_id, entity_class_definitions=None, **kwargs)
Transforms a DataFrame of raw metadata into Atlas entities. Processes each row in the DataFrame, transforms each row using the appropriate entity class, enriches entities with workflow metadata, and returns DataFrame with transformed entities. Invalid rows are logged with warnings and skipped.
Parameters
typenamestr
Required
Type identifier (e.g., 'DATABASE', 'SCHEMA', 'TABLE', 'COLUMN')
dataframedaft.DataFrame
Required
Raw metadata as daft DataFrame
workflow_idstr
Required
Workflow identifier
workflow_run_idstr
Required
Workflow run identifier
entity_class_definitionsOptional[Dict[str, Type[Any]]]
Optional
Custom entity class definitions
**kwargsdict
Optional
Additional keyword arguments including connection (dict) with connection_name and connection_qualified_name
Returns
daft.DataFrame - Transformed DataFrame with Atlas entities as dictionaries

transform_row

transform_row(self, typename, data, workflow_id, workflow_run_id, entity_class_definitions=None, **kwargs)
Transforms a single row of metadata into an Atlas entity. Looks up entity class for the typename, calls get_attributes() on the entity class to parse attributes, enriches entity with workflow metadata, creates entity instance with pyatlan class, and returns entity as dictionary.
Parameters
typenamestr
Required
Type identifier for the entity
dataDict[str, Any]
Required
Metadata dictionary for the row
workflow_idstr
Required
Workflow identifier
workflow_run_idstr
Required
Workflow run identifier
entity_class_definitionsOptional[Dict[str, Type[Any]]]
Optional
Custom entity class definitions
**kwargsdict
Optional
Additional keyword arguments including connection_name and connection_qualified_name
Returns
Optional[Dict[str, Any]] - Transformed entity as dictionary, or None if transformation fails

_enrich_entity_with_metadata

_enrich_entity_with_metadata(self, workflow_id, workflow_run_id, data)
Enriches an entity with workflow metadata and connection information. Adds status, tenant_id, workflow tracking fields, connection information, and source metadata when available.
Parameters
workflow_idstr
Required
Workflow identifier
workflow_run_idstr
Required
Workflow run identifier
dataDict[str, Any]
Required
Entity data dictionary
Returns
dict - Dictionary with attributes and custom_attributes keys containing enriched metadata

Usage Examples

Basic transformation

Initialize transformer and transform metadata for tables

from application_sdk.transformers.atlas import AtlasTransformer

# Initialize transformer
transformer = AtlasTransformer(
connector_name="postgresql-connector",
tenant_id="tenant-123"
)

# Transform metadata
transformed_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_table_df,
workflow_id="extract-tables",
workflow_run_id="run-001",
connection={
"connection_name": "production",
"connection_qualified_name": "tenant/postgresql/1"
}
)

Processing multiple entity types

Transform different entity types in sequence

# Transform different entity types
databases_df = transformer.transform_metadata(
typename="DATABASE",
dataframe=raw_databases_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection=connection_info
)

schemas_df = transformer.transform_metadata(
typename="SCHEMA",
dataframe=raw_schemas_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection=connection_info
)

tables_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_tables_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection=connection_info
)

Error handling

The transformer handles errors gracefully. Invalid rows are logged with warnings and skipped. Row-level errors don't stop the entire transformation.

try:
transformed_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection=connection_info
)
except Exception as e:
logger.error(f"Transformation failed: {e}")
# Handle error

Default entity classes

The transformer includes default entity class definitions:

Entity TypeClassDescription
DATABASEDatabaseDatabase entities
SCHEMASchemaSchema entities
TABLETableTable entities
VIEWTableView entities (uses Table class)
MATERIALIZED VIEWTableMaterialized view entities
COLUMNColumnColumn entities
FUNCTIONFunctionFunction entities
PROCEDUREProcedureStored procedure entities
TAG_REFTagAttachmentTag attachment entities

Entity classes

Entity classes are defined in application_sdk.transformers.atlas.sql and extend pyatlan asset classes.

Database

Transforms database metadata into Database entities.

Required fields:

  • database_name: Name of the database
  • connection_qualified_name: Connection qualified name

Attributes created:

  • qualified_name: Built from connection and database name
  • name: Database name
  • connection_qualified_name: Connection reference

Schema

Transforms schema metadata into Schema entities.

Required fields:

  • schema_name: Name of the schema
  • database_name: Name of the parent database
  • connection_qualified_name: Connection qualified name

Attributes created:

  • qualified_name: Built from connection, database, and schema name
  • name: Schema name
  • database_qualified_name: Parent database reference

Table

Transforms table metadata into Table entities.

Required fields:

  • table_name: Name of the table
  • table_schema: Name of the parent schema
  • table_catalog: Name of the parent database
  • connection_qualified_name: Connection qualified name

Attributes created:

  • qualified_name: Built from connection, database, schema, and table name
  • name: Table name
  • schema_qualified_name: Parent schema reference
  • database_qualified_name: Parent database reference
  • Additional table-specific attributes (row_count, column_count, etc.)

Column

Transforms column metadata into Column entities.

Required fields:

  • column_name: Name of the column
  • table_name: Name of the parent table
  • table_schema: Name of the parent schema
  • table_catalog: Name of the parent database
  • connection_qualified_name: Connection qualified name

Attributes created:

  • qualified_name: Built from connection, database, schema, table, and column name
  • name: Column name
  • table_qualified_name: Parent table reference
  • Additional column-specific attributes (data_type, nullable, etc.)

Function

Transforms function metadata into Function entities.

Required fields:

  • function_name: Name of the function
  • function_definition: Source code of the function
  • function_catalog: Database containing the function
  • function_schema: Schema containing the function
  • connection_qualified_name: Connection qualified name

Procedure

Transforms stored procedure metadata into Procedure entities.

Required fields:

  • procedure_name: Name of the procedure
  • procedure_definition: Source code of the procedure
  • procedure_catalog: Database containing the procedure
  • procedure_schema: Schema containing the procedure
  • connection_qualified_name: Connection qualified name

Entity enrichment

All entities are automatically enriched with:

Workflow metadata

  • last_sync_workflow_name: Workflow identifier
  • last_sync_run: Workflow run identifier
  • last_sync_run_at: Timestamp of last sync

Connection metadata

  • connection_name: Name of the connection
  • connector_name: Connector type derived from qualified name
  • connection_qualified_name: Full connection qualified name

Source metadata (when available)

  • description: Processed from remarks or comment
  • source_created_by: Original creator
  • source_created_at: Creation timestamp
  • source_updated_at: Last update timestamp
  • source_id: Source system identifier

See also