Skip to main content

QueryBasedTransformer transforms raw metadata DataFrames into structured entities using SQL queries defined in YAML templates. It executes SQL transformations on raw dataframes using the daft engine and automatically creates nested structures from flat dataframes with dot notation. This transformer is ideal when you need SQL-based transformations with flexible template definitions.

QueryBasedTransformer

Class
📁application_sdk.transformers.query

Transforms metadata using SQL queries defined in YAML templates. Executes SQL transformations on raw dataframes using the daft engine and automatically creates nested structures from flat dataframes with dot notation. Uses YAML files to define SQL queries for each asset type, executes these queries on raw dataframes using daft's SQL engine, then automatically groups columns with dot notation into nested structs.

Methods5

__init__

__init__(self, connector_name, tenant_id)
Initialize the transformer with connector name and tenant ID.
Parameters
connector_namestr
Required
Name of the connector
tenant_idstr
Required
ID of the tenant

transform_metadata

transform_metadata(self, typename, dataframe, workflow_id, workflow_run_id, entity_class_definitions=None, **kwargs)
Transforms metadata using SQL queries from YAML templates. Loads YAML template for the typename, prepares default attributes and SQL template, generates SQL query from template, executes SQL on DataFrame using daft engine, groups columns with dot notation into nested structs, and returns transformed DataFrame. Returns None if input DataFrame is empty.
Parameters
typenamestr
Required
Type identifier (e.g., 'DATABASE', 'SCHEMA', 'TABLE', 'COLUMN')
dataframedaft.DataFrame
Required
Raw metadata as daft DataFrame
workflow_idstr
Required
Workflow identifier
workflow_run_idstr
Required
Workflow run identifier
entity_class_definitionsOptional[Dict[str, str]]
Optional
Custom template path mappings
**kwargsdict
Optional
Additional keyword arguments including connection_qualified_name (str) and connection_name (str)
Returns
Optional[daft.DataFrame] - Transformed DataFrame with nested structures, or None if input is empty

generate_sql_query

generate_sql_query(self, yaml_path, dataframe, default_attributes)
Generates a SQL query from a YAML template and DataFrame. Loads YAML template from file, flattens nested columns dictionary, generates SQL column expressions, creates SELECT statement with column expressions, and returns SQL query and literal columns.
Parameters
yaml_pathstr
Required
Path to YAML template file
dataframedaft.DataFrame
Required
DataFrame to reference for column names
default_attributesDict[str, Any]
Required
Default attributes to add to SQL query
Returns
Tuple[str, List[str]] - Tuple of (SQL query string, list of literal columns)

prepare_template_and_attributes

prepare_template_and_attributes(self, dataframe, workflow_id, workflow_run_id, connection_qualified_name=None, connection_name=None, entity_sql_template_path=None)
Prepares the SQL template and default attributes for transformation. Adds default attributes including connection_qualified_name, connection_name, tenant_id, workflow tracking fields, and connector_name.
Parameters
dataframedaft.DataFrame
Required
Input DataFrame
workflow_idstr
Required
Workflow identifier
workflow_run_idstr
Required
Workflow run identifier
connection_qualified_nameOptional[str]
Optional
Connection qualified name
connection_nameOptional[str]
Optional
Connection name
entity_sql_template_pathOptional[str]
Optional
Path to SQL template
Returns
Tuple[daft.DataFrame, str] - Tuple of (DataFrame with default attributes, SQL template string)

get_grouped_dataframe_by_prefix

get_grouped_dataframe_by_prefix(self, dataframe)
Groups columns with dot notation into nested structs. Transforms flat columns like 'attributes.name' into nested structures with attributes.name grouped under attributes struct.
Parameters
dataframedaft.DataFrame
Required
Flat DataFrame with dot notation columns
Returns
daft.DataFrame - DataFrame with columns grouped into nested structs

Usage Examples

Basic transformation

Initialize transformer and transform metadata for tables using default YAML templates

from application_sdk.transformers.query import QueryBasedTransformer

# Initialize transformer
transformer = QueryBasedTransformer(
connector_name="postgresql-connector",
tenant_id="tenant-123"
)

# Transform metadata
transformed_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_table_df,
workflow_id="extract-tables",
workflow_run_id="run-001",
connection_qualified_name="tenant/postgresql/1",
connection_name="production"
)

Processing multiple entity types

Transform different entity types in sequence

# Transform different entity types
databases_df = transformer.transform_metadata(
typename="DATABASE",
dataframe=raw_databases_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection_qualified_name=connection_qn,
connection_name=connection_name
)

tables_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_tables_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection_qualified_name=connection_qn,
connection_name=connection_name
)

columns_df = transformer.transform_metadata(
typename="COLUMN",
dataframe=raw_columns_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
connection_qualified_name=connection_qn,
connection_name=connection_name
)

Using custom templates

Use custom YAML templates by providing custom template path mappings

from application_sdk.transformers.query import QueryBasedTransformer
from application_sdk.transformers.common.utils import get_yaml_query_template_path_mappings

# Get custom template mappings
custom_mappings = get_yaml_query_template_path_mappings(
custom_templates_path="./custom_templates",
assets=["TABLE", "COLUMN"]
)

# Initialize transformer
transformer = QueryBasedTransformer(
connector_name="my-connector",
tenant_id="tenant-123"
)

# Use custom templates
transformed_df = transformer.transform_metadata(
typename="TABLE",
dataframe=raw_df,
workflow_id="workflow-123",
workflow_run_id="run-456",
entity_class_definitions=custom_mappings,
connection_qualified_name="tenant/connector/1"
)

Default template mappings

Asset types are automatically mapped to YAML template files when using default templates:

Asset TypeTemplate FileLocation
TABLEtable.yamlapplication_sdk/transformers/query/templates/
COLUMNcolumn.yamlapplication_sdk/transformers/query/templates/
DATABASEdatabase.yamlapplication_sdk/transformers/query/templates/
SCHEMAschema.yamlapplication_sdk/transformers/query/templates/
FUNCTIONfunction.yamlapplication_sdk/transformers/query/templates/
EXTRAS-PROCEDUREextras-procedure.yamlapplication_sdk/transformers/query/templates/

Template structure

YAML templates define column mappings using nested dictionaries with dot notation for column paths.

Template format

table: Table
columns:
attributes:
name:
source_query: table_name
qualifiedName:
source_query: concat(connection_qualified_name, '/', table_catalog, '/', table_schema, '/', table_name)
source_columns: [connection_qualified_name, table_catalog, table_schema, table_name]
customAttributes:
table_type:
source_query: table_type
typeName:
source_query: |
CASE
WHEN table_type = 'VIEW' THEN 'View'
ELSE 'Table'
END
source_columns: [table_type]
status:
source_query: "'ACTIVE'"

Column definition fields

FieldTypeRequiredDescription
source_querystrYesSQL expression or literal value
source_columnsList[str]NoSource columns to validate before generating SQL query. If missing, column mapping is skipped with warning.

Column expression types

TypeExampleNotes
Direct column referencesource_query: table_nameDirect column mapping
SQL expressionsource_query: concat(a, '/', b)SQL function or expression
String literalsource_query: "'ACTIVE'"Quoted string value
Boolean/Number literalsource_query: trueUnquoted boolean or number
Conditional expressionsource_query: | CASE WHEN ... ENDMulti-line SQL CASE statement

Default attributes

The prepare_template_and_attributes method adds these default attributes to all transformations:

AttributeSourceType
connection_qualified_name**kwargsstr
connection_name**kwargsstr
tenant_idInitialization parameterstr
workflow_idMethod parameterstr
workflow_run_idMethod parameterstr
connector_nameInitialization parameterstr

Default attributes are available in all SQL expressions within YAML templates.

Nested structure creation

The get_grouped_dataframe_by_prefix method groups flat columns with dot notation into nested structs.

Input format (flat columns):

attributes.name
attributes.qualifiedName
attributes.description
customAttributes.table_type
typeName
status

Output format (nested structs):

{
"attributes": {
"name": "...",
"qualifiedName": "...",
"description": "..."
},
"customAttributes": {
"table_type": "..."
},
"typeName": "...",
"status": "..."
}

Column name handling: Columns with dots are automatically quoted in SQL (for example, attributes.name becomes "attributes.name").

Error handling

Error TypeBehavior
Template loading errorRaises exception, logs template path
Missing source columnSkips column mapping, logs warning
SQL execution errorRaises exception, logs SQL query and typename
Empty DataFrameReturns None

Performance characteristics

AspectBehavior
EvaluationLazy evaluation (daft DataFrames)
Column processingOnly processes columns that exist in DataFrame
Struct buildingUses daft expressions (not row-by-row)
Template loadingOn-demand per transformation (consider caching for high-volume scenarios)

See also