Skip to main content

Input

Abstract Class
📁application_sdk.inputs

Input classes provide a unified interface for reading data from various sources in the Application SDK. It is an Abstract base class that defines the interface that all input implementations must follow. Provides common functionality for downloading files from object stores and defines abstract methods for data retrieval. All input classes must implement get_dataframe(), get_batched_dataframe(), get_daft_dataframe(), and get_batched_daft_dataframe() methods.

Methods5

get_dataframe

async
async get_dataframe(self) -> pd.DataFrame
Returns a single pandas DataFrame with all data from the input source. Must be implemented by all input classes.
Returns
pd.DataFrame - Complete data as pandas DataFrame

get_batched_dataframe

async
async get_batched_dataframe(self) -> AsyncIterator[pd.DataFrame]
Returns an async iterator of pandas DataFrames, yielding data in batches. Must be implemented by all input classes.
Returns
AsyncIterator[pd.DataFrame] - Iterator yielding batches of pandas DataFrames

get_daft_dataframe

async
async get_daft_dataframe(self) -> daft.DataFrame
Returns a single daft DataFrame with all data from the input source. Must be implemented by all input classes.
Returns
daft.DataFrame - Complete data as daft DataFrame

get_batched_daft_dataframe

async
async get_batched_daft_dataframe(self) -> AsyncIterator[daft.DataFrame]
Returns an async iterator of daft DataFrames, yielding data in batches. Must be implemented by all input classes.
Returns
AsyncIterator[daft.DataFrame] - Iterator yielding batches of daft DataFrames

download_files

async
async download_files(self) -> List[str]
Automatically handles file retrieval from object stores when files are not available locally. Checks if files exist locally at the specified path, if not found attempts to download from object store, filters by file_names if provided, and returns list of file paths.
Returns
List[str] - List of file paths (local or downloaded)

Input implementations

The Application SDK provides four concrete implementations of the base Input class, each optimized for different data sources and formats. All implementations inherit the common functionality from the base class, including automatic file downloading from object stores, batch processing capabilities, and unified DataFrame interfaces.

Usage patterns

Reading from object stores

All file-based inputs (ParquetInput, JsonInput) automatically handle downloading files from object stores when files aren't available locally:

from application_sdk.inputs import ParquetInput

# Files will be automatically downloaded from S3 if not local
parquet_input = ParquetInput(
path="s3://my-bucket/data/",
file_names=["file1.parquet", "file2.parquet"]
)

# Files are downloaded automatically on first access
df = await parquet_input.get_dataframe()

Processing large datasets in batches

Use batched methods to process large datasets without loading everything into memory:

from application_sdk.inputs import ParquetInput

parquet_input = ParquetInput(
path="s3://bucket/large-dataset/",
chunk_size=50000 # Process 50k rows at a time
)

# Process in batches to avoid memory issues
async for batch_df in parquet_input.get_batched_dataframe():
# Process each batch
processed = transform_data(batch_df)
await save_results(processed)

Combining multiple data sources

You can combine data from different input sources:

from application_sdk.inputs import SQLQueryInput, ParquetInput
import pandas as pd

# Read from SQL
sql_input = SQLQueryInput(
query="SELECT * FROM users",
engine=db_engine
)
sql_df = await sql_input.get_dataframe()

# Read from Parquet
parquet_input = ParquetInput(path="data/additional.parquet")
parquet_df = await parquet_input.get_dataframe()

# Combine DataFrames
combined_df = pd.concat([sql_df, parquet_df], ignore_index=True)

Error handling

All input methods raise exceptions on failure. Wrap calls in try-except blocks:

from application_sdk.inputs import ParquetInput
from application_sdk.common.error_codes import IOError

try:
parquet_input = ParquetInput(path="data/missing.parquet")
df = await parquet_input.get_dataframe()
except IOError as e:
print(f"Failed to read files: {e}")
except Exception as e:
print(f"Unexpected error: {e}")

See also

  • Outputs: Write data to various destinations including Parquet files, JSON files, and Iceberg tables
  • Application SDK README: Overview of the Application SDK and its components
  • App structure: Standardized folder structure for Atlan applications
  • StateStore: Persistent state management for workflows and credentials