SQLQueryInput reads data from SQL databases by executing SQL queries. It supports both SQLAlchemy engines and connection strings, with automatic handling of async and sync operations. Handles different database connectors including Redshift, ConnectorX supported sources, and SQLAlchemy fallback.
SQLQueryInput inherits from the base Input class and provides specialized functionality for SQL database queries.
SQLQueryInput
Classapplication_sdk.inputs.sql_queryInputReads data from SQL databases by executing SQL queries. Supports both SQLAlchemy engines and connection strings, with automatic handling of async and sync operations. Handles different database connectors including Redshift, ConnectorX supported sources, and SQLAlchemy fallback.
Methods5
__init__
__init__(self, query: str, engine: Union[Engine, str], chunk_size: Optional[int] = 5000)Parameters
querystrengineUnion[Engine, str]chunk_sizeOptional[int]get_dataframe
async get_dataframe(self) -> pd.DataFrameReturns
pd.DataFrame - Complete query resultsget_batched_dataframe
async get_batched_dataframe(self) -> AsyncIterator[pd.DataFrame]Returns
AsyncIterator[pd.DataFrame] - Iterator yielding batches of resultsget_daft_dataframe
async get_daft_dataframe(self) -> daft.DataFrameReturns
daft.DataFrame - Query results as daft DataFrameget_batched_daft_dataframe
async get_batched_daft_dataframe(self) -> AsyncIterator[daft.DataFrame]Returns
AsyncIterator[daft.DataFrame] - Iterator yielding batches as daft DataFramesUsage Examples
Using SQLAlchemy engine
Initialize with SQLAlchemy engine for database queries
from application_sdk.inputs import SQLQueryInput
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost/db")
sql_input = SQLQueryInput(
query="SELECT * FROM users WHERE age > 25",
engine=engine,
chunk_size=5000
)
df = await sql_input.get_dataframe()
Using connection string
Use connection string for daft operations with ConnectorX support
sql_input = SQLQueryInput(
query="SELECT * FROM users",
engine="postgresql://user:pass@localhost/db",
chunk_size=5000
)
daft_df = await sql_input.get_daft_dataframe()
See also
- Inputs: Overview of all input classes and common usage patterns
- ParquetInput: Read data from Parquet files, supporting single files and directories
- JsonInput: Read data from JSON files in JSONL format
- Application SDK README: Overview of the Application SDK and its components