Skip to main content

SQLQueryInput reads data from SQL databases by executing SQL queries. It supports both SQLAlchemy engines and connection strings, with automatic handling of async and sync operations. Handles different database connectors including Redshift, ConnectorX supported sources, and SQLAlchemy fallback.

SQLQueryInput inherits from the base Input class and provides specialized functionality for SQL database queries.

SQLQueryInput

Class
📁application_sdk.inputs.sql_query
Inheritance chain:
Input

Reads data from SQL databases by executing SQL queries. Supports both SQLAlchemy engines and connection strings, with automatic handling of async and sync operations. Handles different database connectors including Redshift, ConnectorX supported sources, and SQLAlchemy fallback.

Methods5

__init__

__init__(self, query: str, engine: Union[Engine, str], chunk_size: Optional[int] = 5000)
Initialize SQLQueryInput with SQL query and database engine or connection string.
Parameters
querystr
Required
SQL query string to execute
engineUnion[Engine, str]
Required
SQLAlchemy engine or connection string
chunk_sizeOptional[int]
Optional
Number of rows per batch (default: 5000)

get_dataframe

async
async get_dataframe(self) -> pd.DataFrame
Returns all query results as a single pandas DataFrame. Raises ValueError if engine is a string instead of SQLAlchemy engine. Raises Exception if query execution fails.
Returns
pd.DataFrame - Complete query results

get_batched_dataframe

async
async get_batched_dataframe(self) -> AsyncIterator[pd.DataFrame]
Returns query results as an async iterator of pandas DataFrames. For async engines, uses async sessions. For sync engines, runs in thread pool executor.
Returns
AsyncIterator[pd.DataFrame] - Iterator yielding batches of results

get_daft_dataframe

async
async get_daft_dataframe(self) -> daft.DataFrame
Returns query results as a single daft DataFrame. Uses ConnectorX for supported connectors when connection string is provided.
Returns
daft.DataFrame - Query results as daft DataFrame

get_batched_daft_dataframe

async
async get_batched_daft_dataframe(self) -> AsyncIterator[daft.DataFrame]
Returns query results as batched daft DataFrames. Internally uses pandas for batch reading since daft doesn't support native batch reading.
Returns
AsyncIterator[daft.DataFrame] - Iterator yielding batches as daft DataFrames

Usage Examples

Using SQLAlchemy engine

Initialize with SQLAlchemy engine for database queries

from application_sdk.inputs import SQLQueryInput
from sqlalchemy import create_engine

engine = create_engine("postgresql://user:pass@localhost/db")
sql_input = SQLQueryInput(
query="SELECT * FROM users WHERE age > 25",
engine=engine,
chunk_size=5000
)

df = await sql_input.get_dataframe()

Using connection string

Use connection string for daft operations with ConnectorX support

sql_input = SQLQueryInput(
query="SELECT * FROM users",
engine="postgresql://user:pass@localhost/db",
chunk_size=5000
)

daft_df = await sql_input.get_daft_dataframe()

See also

  • Inputs: Overview of all input classes and common usage patterns
  • ParquetInput: Read data from Parquet files, supporting single files and directories
  • JsonInput: Read data from JSON files in JSONL format
  • Application SDK README: Overview of the Application SDK and its components