QueryBasedTransformer transforms raw metadata DataFrames into structured entities using SQL queries defined in YAML templates. It executes SQL transformations on raw dataframes using the daft engine and automatically creates nested structures from flat dataframes with dot notation. This transformer is ideal when you need SQL-based transformations with flexible template definitions.
QueryBasedTransformer
Classapplication_sdk.transformers.queryTransforms metadata using SQL queries defined in YAML templates. Executes SQL transformations on raw dataframes using the daft engine and automatically creates nested structures from flat dataframes with dot notation. Uses YAML files to define SQL queries for each asset type, executes these queries on raw dataframes using daft's SQL engine, then automatically groups columns with dot notation into nested structs.
Methods5
__init__
__init__(self, connector_name, tenant_id)Parameters
connector_namestrtenant_idstrtransform_metadata
transform_metadata(self, typename, dataframe, workflow_id, workflow_run_id, entity_class_definitions=None, **kwargs)Parameters
typenamestrdataframedaft.DataFrameworkflow_idstrworkflow_run_idstrentity_class_definitionsOptional[Dict[str, str]]**kwargsdictReturns
Optional[daft.DataFrame] - Transformed DataFrame with nested structures, or None if input is emptygenerate_sql_query
generate_sql_query(self, yaml_path, dataframe, default_attributes)Parameters
yaml_pathstrdataframedaft.DataFramedefault_attributesDict[str, Any]Returns
Tuple[str, List[str]] - Tuple of (SQL query string, list of literal columns)prepare_template_and_attributes
prepare_template_and_attributes(self, dataframe, workflow_id, workflow_run_id, connection_qualified_name=None, connection_name=None, entity_sql_template_path=None)Parameters
dataframedaft.DataFrameworkflow_idstrworkflow_run_idstrconnection_qualified_nameOptional[str]connection_nameOptional[str]entity_sql_template_pathOptional[str]Returns
Tuple[daft.DataFrame, str] - Tuple of (DataFrame with default attributes, SQL template string)get_grouped_dataframe_by_prefix
get_grouped_dataframe_by_prefix(self, dataframe)Parameters
dataframedaft.DataFrameReturns
daft.DataFrame - DataFrame with columns grouped into nested structsUsage Examples
Basic transformation
Initialize transformer and transform metadata for tables using default YAML templates
from application_sdk.transformers.query import QueryBasedTransformer
# Initialize transformer
transformer = QueryBasedTransformer(
connector_name="postgresql-connector",
tenant_id="tenant-123"
)
# Transform metadata
transformed_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_table_df,
workflow_id="extract-tables",
workflow_run_id="run-001",
connection_qualified_name="tenant/postgresql/1",
connection_name="production"
)
Processing multiple entity types
Transform different entity types in sequence
# Transform different entity types
databases_df = transformer.transform_metadata(
typename="DATABASE",
dataframe=raw_databases_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection_qualified_name=connection_qn,
connection_name=connection_name
)
tables_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_tables_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection_qualified_name=connection_qn,
connection_name=connection_name
)
columns_df = transformer.transform_metadata(
typename="COLUMN",
dataframe=raw_columns_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection_qualified_name=connection_qn,
connection_name=connection_name
)
Using custom templates
Use custom YAML templates by providing custom template path mappings
from application_sdk.transformers.query import QueryBasedTransformer
from application_sdk.transformers.common.utils import get_yaml_query_template_path_mappings
# Get custom template mappings
custom_mappings = get_yaml_query_template_path_mappings(
custom_templates_path="./custom_templates",
assets=["TABLE", "COLUMN"]
)
# Initialize transformer
transformer = QueryBasedTransformer(
connector_name="my-connector",
tenant_id="tenant-123"
)
# Use custom templates
transformed_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
entity_class_definitions=custom_mappings,
connection_qualified_name="tenant/connector/1"
)
Default template mappings
Asset types are automatically mapped to YAML template files when using default templates:
| Asset Type | Template File | Location |
|---|---|---|
TABLE | table.yaml | application_sdk/transformers/query/templates/ |
COLUMN | column.yaml | application_sdk/transformers/query/templates/ |
DATABASE | database.yaml | application_sdk/transformers/query/templates/ |
SCHEMA | schema.yaml | application_sdk/transformers/query/templates/ |
FUNCTION | function.yaml | application_sdk/transformers/query/templates/ |
EXTRAS-PROCEDURE | extras-procedure.yaml | application_sdk/transformers/query/templates/ |
Template structure
YAML templates define column mappings using nested dictionaries with dot notation for column paths.
Template format
table: Table
columns:
attributes:
name:
source_query: table_name
qualifiedName:
source_query: concat(connection_qualified_name, '/', table_catalog, '/', table_schema, '/', table_name)
source_columns: [connection_qualified_name, table_catalog, table_schema, table_name]
customAttributes:
table_type:
source_query: table_type
typeName:
source_query: |
CASE
WHEN table_type = 'VIEW' THEN 'View'
ELSE 'Table'
END
source_columns: [table_type]
status:
source_query: "'ACTIVE'"
Column definition fields
| Field | Type | Required | Description |
|---|---|---|---|
source_query | str | Yes | SQL expression or literal value |
source_columns | List[str] | No | Source columns to validate before generating SQL query. If missing, column mapping is skipped with warning. |
Column expression types
| Type | Example | Notes |
|---|---|---|
| Direct column reference | source_query: table_name | Direct column mapping |
| SQL expression | source_query: concat(a, '/', b) | SQL function or expression |
| String literal | source_query: "'ACTIVE'" | Quoted string value |
| Boolean/Number literal | source_query: true | Unquoted boolean or number |
| Conditional expression | source_query: | CASE WHEN ... END | Multi-line SQL CASE statement |
Default attributes
The prepare_template_and_attributes method adds these default attributes to all transformations:
| Attribute | Source | Type |
|---|---|---|
connection_qualified_name | **kwargs | str |
connection_name | **kwargs | str |
tenant_id | Initialization parameter | str |
workflow_id | Method parameter | str |
workflow_run_id | Method parameter | str |
connector_name | Initialization parameter | str |
Default attributes are available in all SQL expressions within YAML templates.
Nested structure creation
The get_grouped_dataframe_by_prefix method groups flat columns with dot notation into nested structs.
Input format (flat columns):
attributes.name
attributes.qualifiedName
attributes.description
customAttributes.table_type
typeName
status
Output format (nested structs):
{
"attributes": {
"name": "...",
"qualifiedName": "...",
"description": "..."
},
"customAttributes": {
"table_type": "..."
},
"typeName": "...",
"status": "..."
}
Column name handling: Columns with dots are automatically quoted in SQL (for example, attributes.name becomes "attributes.name").
Error handling
| Error Type | Behavior |
|---|---|
| Template loading error | Raises exception, logs template path |
| Missing source column | Skips column mapping, logs warning |
| SQL execution error | Raises exception, logs SQL query and typename |
| Empty DataFrame | Returns None |
Performance characteristics
| Aspect | Behavior |
|---|---|
| Evaluation | Lazy evaluation (daft DataFrames) |
| Column processing | Only processes columns that exist in DataFrame |
| Struct building | Uses daft expressions (not row-by-row) |
| Template loading | On-demand per transformation (consider caching for high-volume scenarios) |
See also
- Transformers: Overview of all transformers and the TransformerInterface
- Atlas transformer: Convert metadata into Atlas entities using pyatlan library classes
- Application SDK README: Overview of the Application SDK and its components