SQL
SqlApp is the template for SQL metadata extraction. Subclass it to extract databases, schemas, tables, columns, views, and procedures from any SQL source—PostgreSQL, MySQL, Snowflake, Redshift, and others.
How it works
SqlApp runs a two-phase pipeline for each entity type:
- Extract: streams SQL rows verbatim into raw JSONL (
raw/<entity>/records.json)—no mapping yet. - Transform: reads the raw JSONL and runs each record through your
map_*function, writing Atlan assets totransformed/<entity>/entities.json.
The extract and transform phases each run as separate @task methods, so each phase is an independent checkpoint. If the transform for tables fails, only that task retries—the extract doesn't re-run.
By default, run() issues the four core extract tasks (databases, schemas, tables, columns) in parallel via asyncio.gather, then runs the four transform tasks in parallel, then uploads.
What you must provide
Three things are required in every SqlApp subclass:
A SQL client
Subclass BaseSQLClient with a DB_CONFIG that describes the connection URL template and required credential fields:
from application_sdk.clients.sql import BaseSQLClient
from application_sdk.clients.models import DatabaseConfig
class PostgresClient(BaseSQLClient):
DB_CONFIG = DatabaseConfig(
template="postgresql+psycopg://{username}:{password}@{host}:{port}/{database}",
required=["username", "password", "host", "port", "database"],
)
Set sql_client_class on your SqlApp subclass to wire it in:
class PostgresApp(SqlApp):
sql_client_class = PostgresClient
SQL queries
Set SQL class attributes to supply the queries for each entity type. The framework substitutes filter variables like {normalized_include_regex} and {normalized_exclude_regex} at query time. For the full list of available variables, see application_sdk/common/sql_filters.py in the repository.
class PostgresApp(SqlApp):
sql_client_class = PostgresClient
fetch_database_sql = """
SELECT datname AS database_name
FROM pg_database
WHERE datname = current_database();
"""
fetch_schema_sql = """
SELECT s.*
FROM information_schema.schemata s
WHERE s.schema_name NOT LIKE 'pg_%'
AND s.schema_name != 'information_schema'
AND concat(s.catalog_name, '.', s.schema_name) !~ '{normalized_exclude_regex}'
AND concat(s.catalog_name, '.', s.schema_name) ~ '{normalized_include_regex}';
"""
fetch_table_sql = """
SELECT t.*
FROM information_schema.tables t
WHERE concat(current_database(), '.', t.table_schema) !~ '{normalized_exclude_regex}'
AND concat(current_database(), '.', t.table_schema) ~ '{normalized_include_regex}'
{temp_table_regex_sql};
"""
fetch_column_sql = """
SELECT c.*
FROM information_schema.columns c
WHERE concat(current_database(), '.', c.table_schema) !~ '{normalized_exclude_regex}'
AND concat(current_database(), '.', c.table_schema) ~ '{normalized_include_regex}'
{temp_table_regex_sql};
"""
You can also load SQL from files:
from application_sdk.common.utils import read_sql_files
# Reads app/sql/fetch_databases.sql, app/sql/fetch_schemas.sql, etc.
# Keys are the filenames uppercased without the .sql extension.
SQL = read_sql_files("app/sql")
class PostgresApp(SqlApp):
fetch_database_sql = SQL.get("FETCH_DATABASES") # app/sql/fetch_databases.sql
fetch_schema_sql = SQL.get("FETCH_SCHEMAS") # app/sql/fetch_schemas.sql
fetch_table_sql = SQL.get("FETCH_TABLES") # app/sql/fetch_tables.sql
fetch_column_sql = SQL.get("FETCH_COLUMNS") # app/sql/fetch_columns.sql
Asset mappers
Implement a map_* method for each entity type you extract. Each mapper receives a raw record dict and the connection qualified name, and returns a pyatlan asset:
from pyatlan_v9.model.assets import (
Database, Schema, Table, Column,
RelatedDatabase, RelatedSchema, RelatedTable,
)
class PostgresApp(SqlApp):
sql_client_class = PostgresClient
# ... SQL query attributes ...
def map_database(self, record: dict, connection_qn: str) -> Database:
return Database(
qualified_name=f"{connection_qn}/{record['database_name']}",
name=record["database_name"],
)
def map_schema(self, record: dict, connection_qn: str) -> Schema:
db_name = record["catalog_name"]
db_qn = f"{connection_qn}/{db_name}"
schema_qn = f"{db_qn}/{record['schema_name']}"
return Schema(
qualified_name=schema_qn,
name=record["schema_name"],
database_name=db_name,
database_qualified_name=db_qn,
database=RelatedDatabase(qualified_name=db_qn),
)
def map_table(self, record: dict, connection_qn: str) -> Table:
db_name = record["table_catalog"]
schema_name = record["table_schema"]
db_qn = f"{connection_qn}/{db_name}"
schema_qn = f"{db_qn}/{schema_name}"
table_qn = f"{schema_qn}/{record['table_name']}"
return Table(
qualified_name=table_qn,
name=record["table_name"],
database_name=db_name,
database_qualified_name=db_qn,
schema_name=schema_name,
schema_qualified_name=schema_qn,
atlan_schema=RelatedSchema(qualified_name=schema_qn),
)
def map_column(self, record: dict, connection_qn: str) -> Column:
db_name = record["table_catalog"]
schema_name = record["table_schema"]
table_name = record["table_name"]
db_qn = f"{connection_qn}/{db_name}"
schema_qn = f"{db_qn}/{schema_name}"
table_qn = f"{schema_qn}/{table_name}"
column_qn = f"{table_qn}/{record['column_name']}"
return Column(
qualified_name=column_qn,
name=record["column_name"],
database_name=db_name,
database_qualified_name=db_qn,
schema_name=schema_name,
schema_qualified_name=schema_qn,
table_name=table_name,
table_qualified_name=table_qn,
table=RelatedTable(qualified_name=table_qn),
)
All four core mappers (map_database, map_schema, map_table, map_column) raise NotImplementedError if not overridden.
Optional
Views and procedures are off by default. Set fetch_view_sql or fetch_procedure_sql to enable them.
Views
Set fetch_view_sql to stream view rows. The framework routes them through your existing map_table—there's no separate map_view method. Inside map_table, inspect the record and return a View instead of Table for view rows:
from pyatlan_v9.model.assets import Table, View, RelatedSchema
class PostgresApp(SqlApp):
fetch_view_sql = """
SELECT v.*, 'VIEW' AS table_type
FROM information_schema.views v
WHERE concat(current_database(), '.', v.table_schema) !~ '{normalized_exclude_regex}'
AND concat(current_database(), '.', v.table_schema) ~ '{normalized_include_regex}';
"""
def map_table(self, record: dict, connection_qn: str) -> Table | View:
db_name = record["table_catalog"]
schema_name = record["table_schema"]
db_qn = f"{connection_qn}/{db_name}"
schema_qn = f"{db_qn}/{schema_name}"
entity_qn = f"{schema_qn}/{record['table_name']}"
cls = View if record.get("table_type") == "VIEW" else Table
return cls(
qualified_name=entity_qn,
name=record["table_name"],
database_name=db_name,
database_qualified_name=db_qn,
schema_name=schema_name,
schema_qualified_name=schema_qn,
atlan_schema=RelatedSchema(qualified_name=schema_qn),
)
Procedures
Set fetch_procedure_sql and implement map_procedure returning a Procedure:
from pyatlan_v9.model.assets import Procedure, RelatedSchema
class PostgresApp(SqlApp):
fetch_procedure_sql = """
SELECT routine_name AS procedure_name,
routine_schema AS procedure_schema,
routine_catalog AS procedure_catalog,
routine_definition AS procedure_definition
FROM information_schema.routines
WHERE routine_type = 'PROCEDURE'
AND routine_catalog = current_database();
"""
def map_procedure(self, record: dict, connection_qn: str) -> Procedure:
db_name = record["procedure_catalog"]
schema_name = record["procedure_schema"]
db_qn = f"{connection_qn}/{db_name}"
schema_qn = f"{db_qn}/{schema_name}"
proc_qn = f"{schema_qn}/_procedures_/{record['procedure_name']}"
return Procedure(
qualified_name=proc_qn,
name=record["procedure_name"],
database_name=db_name,
database_qualified_name=db_qn,
schema_name=schema_name,
schema_qualified_name=schema_qn,
atlan_schema=RelatedSchema(qualified_name=schema_qn),
definition=record.get("procedure_definition", ""),
)
Override run()
The default run() extracts and transforms all four core entities in parallel. If your source requires a different order—for example, paginating tables per schema, or sequencing fetches—override run() and call the extract_* and transform_* tasks directly:
from application_sdk.templates.contracts.sql_metadata import (
ExtractionInput, ExtractionOutput, ExtractionTaskInput,
)
class PostgresApp(SqlApp):
async def run(self, input: ExtractionInput) -> ExtractionOutput:
task_input = self.build_task_input(ExtractionTaskInput, input)
# Sequential extraction when order matters
await self.extract_databases(task_input)
await self.extract_schemas(task_input)
await self.extract_tables(task_input)
await self.extract_columns(task_input)
# Transform in parallel
await asyncio.gather(
self.transform_databases(task_input),
self.transform_schemas(task_input),
self.transform_tables(task_input),
self.transform_columns(task_input),
)
await self.upload_to_atlan(...)
return ExtractionOutput(...)
Use build_task_input(ExtractionTaskInput, input) to construct the typed input each task expects from the top-level ExtractionInput.
See also
- Apps and tasks—the App and task primitives that SqlApp is built on
- Handlers—the Handler class that pairs with your SqlApp for auth, preflight, and metadata endpoints