Input
Abstract Classapplication_sdk.inputsInput classes provide a unified interface for reading data from various sources in the Application SDK. It is an Abstract base class that defines the interface that all input implementations must follow. Provides common functionality for downloading files from object stores and defines abstract methods for data retrieval. All input classes must implement get_dataframe(), get_batched_dataframe(), get_daft_dataframe(), and get_batched_daft_dataframe() methods.
Methods5
get_dataframe
async get_dataframe(self) -> pd.DataFrameReturns
pd.DataFrame - Complete data as pandas DataFrameget_batched_dataframe
async get_batched_dataframe(self) -> AsyncIterator[pd.DataFrame]Returns
AsyncIterator[pd.DataFrame] - Iterator yielding batches of pandas DataFramesget_daft_dataframe
async get_daft_dataframe(self) -> daft.DataFrameReturns
daft.DataFrame - Complete data as daft DataFrameget_batched_daft_dataframe
async get_batched_daft_dataframe(self) -> AsyncIterator[daft.DataFrame]Returns
AsyncIterator[daft.DataFrame] - Iterator yielding batches of daft DataFramesdownload_files
async download_files(self) -> List[str]Returns
List[str] - List of file paths (local or downloaded)Input implementations
The Application SDK provides four concrete implementations of the base Input class, each optimized for different data sources and formats. All implementations inherit the common functionality from the base class, including automatic file downloading from object stores, batch processing capabilities, and unified DataFrame interfaces.
SQLQueryInput
SQL DatabaseReads data from SQL databases by executing SQL queries. Supports both SQLAlchemy engines and connection strings, with automatic handling of async and sync operations.
ParquetInput
Columnar FormatReads data from Parquet files, supporting both single files and directories containing multiple Parquet files. Automatically handles local and object store paths.
JsonInput
JSONL FormatReads data from JSON files, supporting both single files and directories containing multiple JSON files. Supports JSONL (JSON Lines) format where each line is a separate JSON object.
IcebergInput
Table FormatReads data from Apache Iceberg tables using daft. Provides support for reading Iceberg table data as DataFrames with lazy evaluation.
Usage patterns
Reading from object stores
All file-based inputs (ParquetInput, JsonInput) automatically handle downloading files from object stores when files aren't available locally:
from application_sdk.inputs import ParquetInput
# Files will be automatically downloaded from S3 if not local
parquet_input = ParquetInput(
path="s3://my-bucket/data/",
file_names=["file1.parquet", "file2.parquet"]
)
# Files are downloaded automatically on first access
df = await parquet_input.get_dataframe()
Processing large datasets in batches
Use batched methods to process large datasets without loading everything into memory:
from application_sdk.inputs import ParquetInput
parquet_input = ParquetInput(
path="s3://bucket/large-dataset/",
chunk_size=50000 # Process 50k rows at a time
)
# Process in batches to avoid memory issues
async for batch_df in parquet_input.get_batched_dataframe():
# Process each batch
processed = transform_data(batch_df)
await save_results(processed)
Combining multiple data sources
You can combine data from different input sources:
from application_sdk.inputs import SQLQueryInput, ParquetInput
import pandas as pd
# Read from SQL
sql_input = SQLQueryInput(
query="SELECT * FROM users",
engine=db_engine
)
sql_df = await sql_input.get_dataframe()
# Read from Parquet
parquet_input = ParquetInput(path="data/additional.parquet")
parquet_df = await parquet_input.get_dataframe()
# Combine DataFrames
combined_df = pd.concat([sql_df, parquet_df], ignore_index=True)
Error handling
All input methods raise exceptions on failure. Wrap calls in try-except blocks:
from application_sdk.inputs import ParquetInput
from application_sdk.common.error_codes import IOError
try:
parquet_input = ParquetInput(path="data/missing.parquet")
df = await parquet_input.get_dataframe()
except IOError as e:
print(f"Failed to read files: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
See also
- Outputs: Write data to various destinations including Parquet files, JSON files, and Iceberg tables
- Application SDK README: Overview of the Application SDK and its components
- App structure: Standardized folder structure for Atlan applications
- StateStore: Persistent state management for workflows and credentials