Skip to main content

This reference provides common implementation patterns for using output classes to write data to various destinations. Each pattern includes code examples showing how to implement specific functionality, from automatic chunking and batch processing to error handling and workflow integration.

Writing with automatic chunking

All output classes automatically handle chunking when data exceeds size limits. Configure buffer_size and chunk_size parameters to control how data is split:

from application_sdk.outputs import ParquetOutput

# Initialize with size limits
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data",
buffer_size=5000, # Rows per buffer
chunk_size=100000 # Max records per chunk
)

# Write large DataFrame - automatically chunks if needed
await parquet_output.write_dataframe(large_df)

The output class estimates file size based on a DataFrame sample, then splits large DataFrames into chunks based on buffer_size and max_file_size_bytes settings. Each chunk is written as a separate file, and files are automatically uploaded to object stores when size limits are reached.

Writing batched data

Process large datasets in batches without loading everything into memory. Use batched methods to stream data from inputs to outputs:

from application_sdk.outputs import JsonOutput
from application_sdk.inputs import ParquetInput

# Initialize input and output
parquet_input = ParquetInput(
path="s3://bucket/large-dataset/",
chunk_size=50000
)

json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data"
)

# Process batches from input to output
async for batch_df in parquet_input.get_batched_dataframe():
await json_output.write_batched_dataframe(batch_df)

# Get final statistics
stats = await json_output.get_statistics(typename="table")
print(f"Wrote {stats.total_record_count} records in {stats.chunk_count} chunks")

Batched methods handle both AsyncGenerator and Generator types, skip empty DataFrames automatically, and process each batch through the appropriate write method.

Writing to object stores

All outputs automatically upload to object stores when files are written. Configure the output path and prefix to specify object store destinations:

from application_sdk.outputs import ParquetOutput

# Files are automatically uploaded to object store
parquet_output = ParquetOutput(
output_path="/tmp/output", # Local staging path
output_suffix="s3://bucket/prefix" # Object store destination
)

await parquet_output.write_dataframe(df)
# File is written locally, then uploaded to S3

The output class writes files locally first, then uploads them to the specified object store when size limits are reached or when the write operation completes. Set retain_local_copy=True to keep local files after upload.

Convert between dataframe types

Output classes support both pandas and daft DataFrames. Use write_dataframe() for pandas DataFrames and write_daft_dataframe() for daft DataFrames:

from application_sdk.outputs import ParquetOutput
import pandas as pd
import daft

# Write pandas DataFrame
pandas_df = pd.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="pandas_data"
)
await parquet_output.write_dataframe(pandas_df)

# Write daft DataFrame
daft_df = daft.from_pandas(pandas_df)
parquet_output_daft = ParquetOutput(
output_path="/tmp/output",
output_suffix="daft_data"
)
await parquet_output_daft.write_daft_dataframe(daft_df)

When using write_dataframe() with a pandas DataFrame, the output class internally handles the conversion. For daft DataFrames, use write_daft_dataframe() directly for better performance and access to daft-specific features like Hive partitioning.

Output-specific patterns

Different output types provide specialized features. Use tabs below to explore patterns specific to each output type:

Using Hive partitioning

ParquetOutput supports Hive-style partitioning for efficient data organization. Partitioning organizes data into directory structures based on column values, improving query performance and data management:

from application_sdk.outputs import ParquetOutput, WriteMode
import daft

parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="partitioned_data"
)

# Write with partitioning by year and month
await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["year", "month"],
write_mode=WriteMode.OVERWRITE
)

# Creates directory structure:
# /tmp/output/partitioned_data/year=2024/month=01/file1.parquet
# /tmp/output/partitioned_data/year=2024/month=02/file2.parquet

Hive partitioning creates a directory structure where each partition column becomes a directory level. This enables efficient partition pruning during queries, where only relevant partitions are read instead of scanning entire datasets.

Partition column selection

Choose partition columns based on your query patterns. Common strategies include:

  • Time-based partitioning: Use date or timestamp columns (year, month, day) for time-series data
  • Category-based partitioning: Use categorical columns (region, department) for filtering by category
  • Hierarchical partitioning: Combine multiple columns (year/month/day) for multi-level organization
# Time-based partitioning
await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["year", "month", "day"],
write_mode=WriteMode.APPEND
)

# Category-based partitioning
await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["region", "department"],
write_mode=WriteMode.APPEND
)

Write modes with partitioning

ParquetOutput supports three write modes when using Hive partitioning:

  • APPEND: Add new data to existing partitions
  • OVERWRITE: Replace all data in the output path
  • OVERWRITE_PARTITIONS: Replace only the partitions being written
from application_sdk.outputs import ParquetOutput, WriteMode

# Overwrite specific partitions only
await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["year", "month"],
write_mode=WriteMode.OVERWRITE_PARTITIONS
)

Using consolidation

Consolidation enables efficient buffered writing of large datasets by accumulating DataFrames and merging them into optimized files. This approach reduces file fragmentation and improves read performance:

from application_sdk.outputs import ParquetOutput

parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data",
use_consolidation=True,
chunk_size=100000
)

# Consolidation automatically merges batches into optimized files
await parquet_output.write_batched_dataframe(batched_df)

When use_consolidation=True, ParquetOutput:

  1. Accumulates DataFrames into temporary folders instead of writing immediately
  2. Uses Daft to merge accumulated DataFrames into optimized files when threshold is reached
  3. Automatically cleans up temporary files after consolidation
  4. Provides better memory efficiency for large datasets

Consolidation workflow

The consolidation process works as follows:

# Initialize with consolidation enabled
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="consolidated_data",
use_consolidation=True,
chunk_size=100000,
buffer_size=5000
)

# Write multiple batches - they accumulate in temp folders
for batch in batched_dataframes:
await parquet_output.write_batched_dataframe(batch)

# When threshold is reached, batches are automatically consolidated
# into optimized Parquet files

Consolidation is particularly useful when:

  • Processing large datasets in small batches
  • Reducing file fragmentation from many small files
  • Improving query performance by creating larger, optimized files
  • Managing memory usage for large-scale data processing

Using path generation

Generate file paths for output chunks with support for query extraction markers using the path_gen() method:

from application_sdk.outputs import ParquetOutput

parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data"
)

# Generate path for chunk
path = parquet_output.path_gen(
chunk_count=5,
chunk_part=2,
start_marker="query_start",
end_marker="query_end"
)
# Returns: /tmp/output/data_chunk_2_query_start_query_end.parquet

The path_gen() method constructs file paths based on chunk information and optional markers, which is useful for tracking query extraction boundaries or organizing output files.

Processing null fields

Use the process_null_fields() method to clean data before writing. This method recursively removes null values from dictionaries and lists:

from application_sdk.outputs import JsonOutput

json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data"
)

# Clean data with null field processing
cleaned_data = json_output.process_null_fields(
obj={"field1": "value", "field2": None, "nested": {"key": None}},
preserve_fields=["required_field", "identity_cycle"],
null_to_empty_dict_fields=["attributes", "customAttributes"]
)
# Returns: {"field1": "value", "nested": {"key": None}, "attributes": {}}

The method preserves specified fields even if null, converts nulls to empty dictionaries for specified fields, and removes null values from all other fields. This is particularly useful for JsonOutput when writing structured data.

Error handling

All output methods raise exceptions on failure. Wrap calls in try-except blocks to handle errors gracefully:

from application_sdk.outputs import ParquetOutput
from application_sdk.common.error_codes import IOError

try:
parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data"
)
await parquet_output.write_dataframe(df)
except IOError as e:
print(f"IO error: {e}")
# Handle file system or object store errors
except ValueError as e:
print(f"Invalid configuration: {e}")
# Handle configuration errors
except Exception as e:
print(f"Unexpected error: {e}")
# Handle other errors

Common exceptions include IOError for file system and object store issues, ValueError for invalid configurations, and TypeError for incorrect DataFrame types.

Statistics and monitoring

All outputs track statistics and record metrics for monitoring. Retrieve statistics after writing data:

from application_sdk.outputs import JsonOutput

json_output = JsonOutput(
output_path="/tmp/output",
output_suffix="data"
)

# Write data
await json_output.write_dataframe(df)

# Get statistics
stats = await json_output.get_statistics(typename="table")
print(f"Total records: {stats.total_record_count}")
print(f"Chunks written: {stats.chunk_count}")
print(f"Partitions: {stats.partitions}")

The get_statistics() method returns an ActivityStatistics object containing total record count, chunk count, and partition information. Use the typename parameter to filter statistics by entity type.

Workflow integration

Integrate outputs into metadata extraction workflows by using outputs within fetch and transform functions:

from application_sdk.outputs import ParquetOutput
from application_sdk.inputs import SQLQueryInput

async def fetch_and_transform_databases(workflow_args):
# Fetch data from source
sql_input = SQLQueryInput(
query="SELECT * FROM information_schema.databases",
engine=workflow_args["engine"]
)
df = await sql_input.get_dataframe()

# Transform data
transformed_df = transform_database_metadata(df)

# Write to output
parquet_output = ParquetOutput(
output_path=workflow_args["output_path"],
output_suffix=workflow_args["output_prefix"],
typename="database"
)
await parquet_output.write_dataframe(transformed_df)

# Return statistics for workflow tracking
return await parquet_output.get_statistics(typename="database")

Outputs integrate seamlessly with workflow patterns, allowing you to write transformed metadata to various destinations while maintaining statistics for workflow monitoring and reporting.

See also

  • Outputs: Base Output class and all output implementations
  • ParquetOutput: Write data to Parquet files with chunking and Hive partitioning
  • JsonOutput: Write data to JSON files in JSONL format
  • IcebergOutput: Write data to Apache Iceberg tables with automatic table creation
  • Inputs: Read data from various sources to use with outputs