This reference provides common implementation patterns for using output classes to write data to various destinations. Each pattern includes code examples showing how to implement specific functionality, from automatic chunking and batch processing to error handling and workflow integration.
Writing with automatic chunking
All output classes automatically handle chunking when data exceeds size limits. Configure buffer_size and chunk_size parameters to control how data is split:
from application_sdk.outputs import ParquetOutput
# Initialize with size limits
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data",
buffer_size=5000, # Rows per buffer
chunk_size=100000 # Max records per chunk
)
# Write large DataFrame - automatically chunks if needed
await parquet_output.write_dataframe(large_df)
The output class estimates file size based on a DataFrame sample, then splits large DataFrames into chunks based on buffer_size and max_file_size_bytes settings. Each chunk is written as a separate file, and files are automatically uploaded to object stores when size limits are reached.
Writing batched data
Process large datasets in batches without loading everything into memory. Use batched methods to stream data from inputs to outputs:
from application_sdk.outputs import JsonOutput
from application_sdk.inputs import ParquetInput
# Initialize input and output
parquet_input = ParquetInput(
path="s3://bucket/large-dataset/",
chunk_size=50000
)
json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data"
)
# Process batches from input to output
async for batch_df in parquet_input.get_batched_dataframe():
await json_output.write_batched_dataframe(batch_df)
# Get final statistics
stats = await json_output.get_statistics(typename="table")
print(f"Wrote {stats.total_record_count} records in {stats.chunk_count} chunks")
Batched methods handle both AsyncGenerator and Generator types, skip empty DataFrames automatically, and process each batch through the appropriate write method.
Writing to object stores
All outputs automatically upload to object stores when files are written. Configure the output path and prefix to specify object store destinations:
from application_sdk.outputs import ParquetOutput
# Files are automatically uploaded to object store
parquet_output = ParquetOutput(
output_path="/tmp/output", # Local staging path
output_suffix="s3://bucket/prefix" # Object store destination
)
await parquet_output.write_dataframe(df)
# File is written locally, then uploaded to S3
The output class writes files locally first, then uploads them to the specified object store when size limits are reached or when the write operation completes. Set retain_local_copy=True to keep local files after upload.
Convert between dataframe types
Output classes support both pandas and daft DataFrames. Use write_dataframe() for pandas DataFrames and write_daft_dataframe() for daft DataFrames:
from application_sdk.outputs import ParquetOutput
import pandas as pd
import daft
# Write pandas DataFrame
pandas_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="pandas_data"
)
await parquet_output.write_dataframe(pandas_df)
# Write daft DataFrame
daft_df = daft.from_pandas(pandas_df)
parquet_output_daft = ParquetOutput(
output_path="/tmp/output",
output_suffix="daft_data"
)
await parquet_output_daft.write_daft_dataframe(daft_df)
When using write_dataframe() with a pandas DataFrame, the output class internally handles the conversion. For daft DataFrames, use write_daft_dataframe() directly for better performance and access to daft-specific features like Hive partitioning.
Output-specific patterns
Different output types provide specialized features. Use tabs below to explore patterns specific to each output type:
- ParquetOutput
- JsonOutput
- IcebergOutput
Using Hive partitioning
ParquetOutput supports Hive-style partitioning for efficient data organization. Partitioning organizes data into directory structures based on column values, improving query performance and data management:
from application_sdk.outputs import ParquetOutput, WriteMode
import daft
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="partitioned_data"
)
# Write with partitioning by year and month
await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["year", "month"],
write_mode=WriteMode.OVERWRITE
)
# Creates directory structure:
# /tmp/output/partitioned_data/year=2024/month=01/file1.parquet
# /tmp/output/partitioned_data/year=2024/month=02/file2.parquet
Hive partitioning creates a directory structure where each partition column becomes a directory level. This enables efficient partition pruning during queries, where only relevant partitions are read instead of scanning entire datasets.
Partition column selection
Choose partition columns based on your query patterns. Common strategies include:
- Time-based partitioning: Use date or timestamp columns (year, month, day) for time-series data
- Category-based partitioning: Use categorical columns (region, department) for filtering by category
- Hierarchical partitioning: Combine multiple columns (year/month/day) for multi-level organization
# Time-based partitioning
await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["year", "month", "day"],
write_mode=WriteMode.APPEND
)
# Category-based partitioning
await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["region", "department"],
write_mode=WriteMode.APPEND
)
Write modes with partitioning
ParquetOutput supports three write modes when using Hive partitioning:
- APPEND: Add new data to existing partitions
- OVERWRITE: Replace all data in the output path
- OVERWRITE_PARTITIONS: Replace only the partitions being written
from application_sdk.outputs import ParquetOutput, WriteMode
# Overwrite specific partitions only
await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["year", "month"],
write_mode=WriteMode.OVERWRITE_PARTITIONS
)
Using consolidation
Consolidation enables efficient buffered writing of large datasets by accumulating DataFrames and merging them into optimized files. This approach reduces file fragmentation and improves read performance:
from application_sdk.outputs import ParquetOutput
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data",
use_consolidation=True,
chunk_size=100000
)
# Consolidation automatically merges batches into optimized files
await parquet_output.write_batched_dataframe(batched_df)
When use_consolidation=True, ParquetOutput:
- Accumulates DataFrames into temporary folders instead of writing immediately
- Uses Daft to merge accumulated DataFrames into optimized files when threshold is reached
- Automatically cleans up temporary files after consolidation
- Provides better memory efficiency for large datasets
Consolidation workflow
The consolidation process works as follows:
# Initialize with consolidation enabled
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="consolidated_data",
use_consolidation=True,
chunk_size=100000,
buffer_size=5000
)
# Write multiple batches - they accumulate in temp folders
for batch in batched_dataframes:
await parquet_output.write_batched_dataframe(batch)
# When threshold is reached, batches are automatically consolidated
# into optimized Parquet files
Consolidation is particularly useful when:
- Processing large datasets in small batches
- Reducing file fragmentation from many small files
- Improving query performance by creating larger, optimized files
- Managing memory usage for large-scale data processing
JSONL format
JsonOutput writes data in JSONL format (one JSON object per line), which is efficient for streaming and processing large datasets:
from application_sdk.outputs import JsonOutput
json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data",
chunk_size=50000,
buffer_size=5000
)
# Each row becomes a JSON object on its own line
await json_output.write_dataframe(df)
Null field processing
JsonOutput provides flexible null field processing to control how null values are handled in the output:
from application_sdk.outputs import JsonOutput
json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data"
)
# Preserve specific fields even if null, convert others to empty dicts
await json_output.write_daft_dataframe(
daft_df,
preserve_fields=["required_field", "identity_cycle"],
null_to_empty_dict_fields=["attributes", "customAttributes"]
)
Datetime conversion
JsonOutput automatically converts datetime objects to epoch timestamps (milliseconds) for JSON compatibility:
from application_sdk.outputs import JsonOutput
json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data"
)
# Datetime columns are automatically converted to epoch timestamps
await json_output.write_daft_dataframe(daft_df)
Writing to Iceberg tables
Create and write to Iceberg tables with automatic schema inference:
from application_sdk.outputs import IcebergOutput
from pyiceberg.catalog import load_catalog
# Load catalog
catalog = load_catalog("my_catalog")
# Initialize output - table is created if it doesn't exist
iceberg_output = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="analytics",
iceberg_table="events",
mode="append"
)
# Write DataFrame - schema is inferred automatically
await iceberg_output.write_dataframe(df)
# Get statistics
stats = await iceberg_output.get_statistics(typename="table")
print(f"Wrote {stats.total_record_count} records")
Automatic table creation
IcebergOutput automatically creates tables if they don't exist, inferring the schema from the DataFrame:
from application_sdk.outputs import IcebergOutput
# Table "my_table" will be created automatically if it doesn't exist
iceberg_output = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="default",
iceberg_table="my_table", # String table name - will be created if needed
mode="append"
)
await iceberg_output.write_dataframe(df)
Using existing table objects
You can also use an existing PyIceberg Table object if you need more control:
from pyiceberg.table import Table
# Load existing table
table = catalog.load_table("default.my_table")
iceberg_output = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="default",
iceberg_table=table, # Existing table object
mode="overwrite"
)
await iceberg_output.write_dataframe(df)
Write modes
IcebergOutput supports two write modes:
append: Add data to existing table (default)overwrite: Replace all data in the table
from application_sdk.outputs import IcebergOutput
# Append mode (default)
iceberg_output = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="analytics",
iceberg_table="events",
mode="append"
)
# Overwrite mode
iceberg_output_overwrite = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="analytics",
iceberg_table="events",
mode="overwrite"
)
Using path generation
Generate file paths for output chunks with support for query extraction markers using the path_gen() method:
from application_sdk.outputs import ParquetOutput
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data"
)
# Generate path for chunk
path = parquet_output.path_gen(
chunk_count=5,
chunk_part=2,
start_marker="query_start",
end_marker="query_end"
)
# Returns: /tmp/output/data_chunk_2_query_start_query_end.parquet
The path_gen() method constructs file paths based on chunk information and optional markers, which is useful for tracking query extraction boundaries or organizing output files.
Processing null fields
Use the process_null_fields() method to clean data before writing. This method recursively removes null values from dictionaries and lists:
from application_sdk.outputs import JsonOutput
json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data"
)
# Clean data with null field processing
cleaned_data = json_output.process_null_fields(
obj={"field1": "value", "field2": None, "nested": {"key": None}},
preserve_fields=["required_field", "identity_cycle"],
null_to_empty_dict_fields=["attributes", "customAttributes"]
)
# Returns: {"field1": "value", "nested": {"key": None}, "attributes": {}}
The method preserves specified fields even if null, converts nulls to empty dictionaries for specified fields, and removes null values from all other fields. This is particularly useful for JsonOutput when writing structured data.
Error handling
All output methods raise exceptions on failure. Wrap calls in try-except blocks to handle errors gracefully:
from application_sdk.outputs import ParquetOutput
from application_sdk.common.error_codes import IOError
try:
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data"
)
await parquet_output.write_dataframe(df)
except IOError as e:
print(f"IO error: {e}")
# Handle file system or object store errors
except ValueError as e:
print(f"Invalid configuration: {e}")
# Handle configuration errors
except Exception as e:
print(f"Unexpected error: {e}")
# Handle other errors
Common exceptions include IOError for file system and object store issues, ValueError for invalid configurations, and TypeError for incorrect DataFrame types.
Statistics and monitoring
All outputs track statistics and record metrics for monitoring. Retrieve statistics after writing data:
from application_sdk.outputs import JsonOutput
json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data"
)
# Write data
await json_output.write_dataframe(df)
# Get statistics
stats = await json_output.get_statistics(typename="table")
print(f"Total records: {stats.total_record_count}")
print(f"Chunks written: {stats.chunk_count}")
print(f"Partitions: {stats.partitions}")
The get_statistics() method returns an ActivityStatistics object containing total record count, chunk count, and partition information. Use the typename parameter to filter statistics by entity type.
Workflow integration
Integrate outputs into metadata extraction workflows by using outputs within fetch and transform functions:
from application_sdk.outputs import ParquetOutput
from application_sdk.inputs import SQLQueryInput
async def fetch_and_transform_databases(workflow_args):
# Fetch data from source
sql_input = SQLQueryInput(
query="SELECT * FROM information_schema.databases",
engine=workflow_args["engine"]
)
df = await sql_input.get_dataframe()
# Transform data
transformed_df = transform_database_metadata(df)
# Write to output
parquet_output = ParquetOutput(
output_path=workflow_args["output_path"],
output_suffix=workflow_args["output_prefix"],
typename="database"
)
await parquet_output.write_dataframe(transformed_df)
# Return statistics for workflow tracking
return await parquet_output.get_statistics(typename="database")
Outputs integrate seamlessly with workflow patterns, allowing you to write transformed metadata to various destinations while maintaining statistics for workflow monitoring and reporting.
See also
- Outputs: Base Output class and all output implementations
- ParquetOutput: Write data to Parquet files with chunking and Hive partitioning
- JsonOutput: Write data to JSON files in JSONL format
- IcebergOutput: Write data to Apache Iceberg tables with automatic table creation
- Inputs: Read data from various sources to use with outputs