Query extraction workflows orchestrate the extraction of SQL queries from database query logs and history. They handle batch processing of queries, manage time-range based extraction, and process query metadata for analysis.
SQLQueryExtractionWorkflow
Classapplication_sdk.workflows.query_extraction.sqlQueryExtractionWorkflowOrchestrates the extraction of SQL queries from database query logs. Processes queries in batches, handles time-range based extraction, and manages the workflow state for query mining operations. Generates batches of queries to extract based on time ranges and processes them in parallel.
Properties
activities_clsType[SQLQueryExtractionActivities]"SQLQueryExtractionActivities"fetch_queries_sqlstr"\"\""sql_clientBaseSQLClient | None"None"application_namestrbatch_sizeint"100000"default_heartbeat_timeouttimedelta"timedelta(seconds=300)"Methods2
get_activities
@staticmethod get_activities(activities: SQLQueryExtractionActivities) -> Sequence[Callable[..., Any]]Parameters
activitiesSQLQueryExtractionActivitiesReturns
Sequence[Callable[..., Any]] - List of activity methods to be executedrun
@workflow.run async run(self, workflow_config: Dict[str, Any]) -> NoneParameters
workflow_configDict[str, Any]Returns
None - No return valueUsage Examples
Basic workflow usage
Workflow is automatically used when registered with application
from application_sdk.workflows.query_extraction.sql import SQLQueryExtractionWorkflow
from application_sdk.activities.query_extraction.sql import SQLQueryExtractionActivities
app = BaseSQLMetadataExtractionApplication(
name="snowflake-miner",
client_class=SQLClient,
handler_class=BaseSQLHandler
)
await app.setup_workflow(
workflow_and_activities_classes=[
(SQLQueryExtractionWorkflow, SQLQueryExtractionActivities)
]
)
Custom workflow with query configuration
Customize the SQL query and batch size for specific requirements
from application_sdk.workflows.query_extraction.sql import SQLQueryExtractionWorkflow
class CustomQueryWorkflow(SQLQueryExtractionWorkflow):
fetch_queries_sql = """
SELECT
query_text,
start_time,
end_time,
user_name
FROM query_history
WHERE start_time >= ? AND start_time < ?
ORDER BY start_time
"""
batch_size = 50000 # Smaller batches for faster processing
default_heartbeat_timeout = timedelta(seconds=600) # Longer timeout for large queries
Workflow configuration
Workflow configuration is stored in StateStore and retrieved using workflow_id.
Input (workflow_config):
{
"workflow_id": "workflow-uuid"
}
Retrieved from StateStore (workflow_args):
{
"credentials": {...},
"connection": {
"connection_name": "production",
"connection_qualified_name": "tenant/snowflake/1"
},
"metadata": {
"start_time": "2024-01-01T00:00:00Z",
"end_time": "2024-01-02T00:00:00Z",
"batch_size": 100000
},
"output_path": "/tmp/queries",
"output_prefix": "query-logs"
}
Batch structure
Each query batch returned by get_query_batches activity:
{
"sql": "SELECT * FROM query_history WHERE start_time >= ? AND start_time < ?",
"start": "2024-01-01T00:00:00Z", # Start marker
"end": "2024-01-01T01:00:00Z" # End marker
}
| Field | Type | Description |
|---|---|---|
sql | str | SQL query template with placeholders for time range |
start | str | Start timestamp or identifier for the batch |
end | str | End timestamp or identifier for the batch |
Activity specifications
The get_activities method returns activity methods from the SQLQueryExtractionActivities class: get_query_batches, fetch_queries, preflight_check, get_workflow_args.
SQLQueryExtractionActivities
Classapplication_sdk.activities.query_extraction.sqlActivities class that implements the actual work for query extraction workflows. Contains methods for generating query batches, fetching queries from database logs, and managing query extraction operations.
Methods2
get_query_batches
async get_query_batches(workflow_args: Dict[str, Any]) -> List[Dict[str, str]]Parameters
workflow_argsDict[str, Any]Returns
List[Dict[str, str]] - List of batch configurations, each containing sql, start, and end fields (see Batch structure)fetch_queries
async fetch_queries(activity_args: Dict[str, Any]) -> NoneParameters
activity_argsDict[str, Any]Returns
None - Queries are stored to output path, no return valueRetry policy
| Parameter | Value |
|---|---|
| Maximum attempts | 6 |
| Backoff type | Exponential |
| Backoff coefficient | 2 |
Error handling
| Error Type | Behavior |
|---|---|
| Activity error | Retry per policy, log with batch information |
| Workflow error | Log with workflow ID, Temporal UI visibility |
| Batch-level error | Other batches continue processing |
| Query processing error | Log and skip invalid queries, continue batch processing |
Performance configuration
| Property | Default | Description |
|---|---|---|
batch_size | 100000 | Size of each batch for processing |
default_heartbeat_timeout | timedelta(seconds=300) | Heartbeat timeout for activities |
fetch_queries_sql | "" | Custom SQL query (empty uses activity default) |
Execution characteristics
| Aspect | Behavior |
|---|---|
| Batch processing | All batches processed concurrently via asyncio.gather() |
| Time range | Batches generated to cover entire time range from workflow arguments |
| Marker tracking | Start/end markers used in output file naming for resumable extraction |
| Resource limits | Memory limited by batch size, concurrency limited by Temporal worker capacity |
Metrics
| Metric Category | Metrics |
|---|---|
| Query extraction | Batches processed, queries per batch, duration per batch, error counts |
| Workflow | Total execution time, batches processed, success/failure status, activity execution times |
See also
- Workflows overview: Overview of all workflows
- Metadata extraction workflows: Workflows for metadata extraction
- StateStore: Persistent state management for workflows
- Application SDK README: Overview of the Application SDK and its components