Skip to main content

Query extraction workflows orchestrate the extraction of SQL queries from database query logs and history. They handle batch processing of queries, manage time-range based extraction, and process query metadata for analysis.

SQLQueryExtractionWorkflow

Class
📁application_sdk.workflows.query_extraction.sql
Inheritance chain:
QueryExtractionWorkflow

Orchestrates the extraction of SQL queries from database query logs. Processes queries in batches, handles time-range based extraction, and manages the workflow state for query mining operations. Generates batches of queries to extract based on time ranges and processes them in parallel.

Properties

activities_clsType[SQLQueryExtractionActivities]
Required
Activities class for query extraction operations
Default: "SQLQueryExtractionActivities"
fetch_queries_sqlstr
SQL query for fetching queries (can be customized)
Default: "\"\""
sql_clientBaseSQLClient | None
SQL client instance (optional)
Default: "None"
application_namestr
Name of the application (default: from APPLICATION_NAME constant)
batch_sizeint
Size of each batch for processing (default: 100000)
Default: "100000"
default_heartbeat_timeouttimedelta
Heartbeat timeout for activities (default: 300 seconds)
Default: "timedelta(seconds=300)"

Methods2

get_activities

static
@staticmethod get_activities(activities: SQLQueryExtractionActivities) -> Sequence[Callable[..., Any]]
Returns the sequence of activities to be executed by the workflow. Returns list of activity methods: get_query_batches, fetch_queries, preflight_check, get_workflow_args.
Parameters
activitiesSQLQueryExtractionActivities
Required
Instance of the activities class
Returns
Sequence[Callable[..., Any]] - List of activity methods to be executed

run

async
@workflow.run async run(self, workflow_config: Dict[str, Any]) -> None
Orchestrates the query extraction process. Calls base workflow run() for preflight check, retrieves workflow arguments from StateStore, executes get_query_batches to generate query batches, creates parallel activities for each batch, and executes all fetch activities concurrently using asyncio.gather(). Uses retry policy with maximum 6 attempts and backoff coefficient 2.
Parameters
workflow_configDict[str, Any]
Required
Configuration dictionary containing workflow_id
Returns
None - No return value

Usage Examples

Basic workflow usage

Workflow is automatically used when registered with application

from application_sdk.workflows.query_extraction.sql import SQLQueryExtractionWorkflow
from application_sdk.activities.query_extraction.sql import SQLQueryExtractionActivities

app = BaseSQLMetadataExtractionApplication(
name="snowflake-miner",
client_class=SQLClient,
handler_class=BaseSQLHandler
)

await app.setup_workflow(
workflow_and_activities_classes=[
(SQLQueryExtractionWorkflow, SQLQueryExtractionActivities)
]
)

Custom workflow with query configuration

Customize the SQL query and batch size for specific requirements

from application_sdk.workflows.query_extraction.sql import SQLQueryExtractionWorkflow

class CustomQueryWorkflow(SQLQueryExtractionWorkflow):
fetch_queries_sql = """
SELECT
query_text,
start_time,
end_time,
user_name
FROM query_history
WHERE start_time >= ? AND start_time < ?
ORDER BY start_time
"""

batch_size = 50000 # Smaller batches for faster processing
default_heartbeat_timeout = timedelta(seconds=600) # Longer timeout for large queries

Workflow configuration

Workflow configuration is stored in StateStore and retrieved using workflow_id.

Input (workflow_config):

{
"workflow_id": "workflow-uuid"
}

Retrieved from StateStore (workflow_args):

{
"credentials": {...},
"connection": {
"connection_name": "production",
"connection_qualified_name": "tenant/snowflake/1"
},
"metadata": {
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-02T00:00:00Z",
"batch_size": 100000
},
"output_path": "/tmp/queries",
"output_prefix": "query-logs"
}

Batch structure

Each query batch returned by get_query_batches activity:

{
"sql": "SELECT * FROM query_history WHERE start_time >= ? AND start_time < ?",
"start": "2024-01-01T00:00:00Z", # Start marker
"end": "2024-01-01T01:00:00Z" # End marker
}
FieldTypeDescription
sqlstrSQL query template with placeholders for time range
startstrStart timestamp or identifier for the batch
endstrEnd timestamp or identifier for the batch

Activity specifications

The get_activities method returns activity methods from the SQLQueryExtractionActivities class: get_query_batches, fetch_queries, preflight_check, get_workflow_args.

SQLQueryExtractionActivities

Class
📁application_sdk.activities.query_extraction.sql

Activities class that implements the actual work for query extraction workflows. Contains methods for generating query batches, fetching queries from database logs, and managing query extraction operations.

Methods2

get_query_batches

async
async get_query_batches(workflow_args: Dict[str, Any]) -> List[Dict[str, str]]
Generates batches of queries to extract based on time ranges from workflow arguments. Creates batch configurations with SQL query templates and time range markers.
Parameters
workflow_argsDict[str, Any]
Required
Workflow arguments dictionary containing time range and configuration
Returns
List[Dict[str, str]] - List of batch configurations, each containing sql, start, and end fields (see Batch structure)

fetch_queries

async
async fetch_queries(activity_args: Dict[str, Any]) -> None
Executes SQL query to fetch queries from database logs, processes them in batches per batch_size, stores queries to output path with markers, and records statistics.
Parameters
activity_argsDict[str, Any]
Required
Activity arguments dictionary containing sql_query (str), start_marker (str), end_marker (str), plus workflow arguments
Returns
None - Queries are stored to output path, no return value

Retry policy

ParameterValue
Maximum attempts6
Backoff typeExponential
Backoff coefficient2

Error handling

Error TypeBehavior
Activity errorRetry per policy, log with batch information
Workflow errorLog with workflow ID, Temporal UI visibility
Batch-level errorOther batches continue processing
Query processing errorLog and skip invalid queries, continue batch processing

Performance configuration

PropertyDefaultDescription
batch_size100000Size of each batch for processing
default_heartbeat_timeouttimedelta(seconds=300)Heartbeat timeout for activities
fetch_queries_sql""Custom SQL query (empty uses activity default)

Execution characteristics

AspectBehavior
Batch processingAll batches processed concurrently via asyncio.gather()
Time rangeBatches generated to cover entire time range from workflow arguments
Marker trackingStart/end markers used in output file naming for resumable extraction
Resource limitsMemory limited by batch size, concurrency limited by Temporal worker capacity

Metrics

Metric CategoryMetrics
Query extractionBatches processed, queries per batch, duration per batch, error counts
WorkflowTotal execution time, batches processed, success/failure status, activity execution times

See also