Skip to main content

ParquetOutput writes data to Parquet files with support for chunking, consolidation, Hive partitioning, and automatic object store uploads. It inherits from the base Output class and provides specialized functionality for columnar data storage.

ParquetOutput

Class
📁application_sdk.outputs.parquet
Inheritance chain:
Output

Writes data to Parquet files with support for chunking, consolidation, Hive partitioning, and automatic object store uploads.

Methods5

__init__

__init__(self, output_path: str, output_suffix: str = '', typename: Optional[str] = None, chunk_size: Optional[int] = 100000, buffer_size: int = 5000, chunk_start: Optional[int] = None, start_marker: Optional[str] = None, end_marker: Optional[str] = None, retain_local_copy: bool = False, use_consolidation: bool = False)
Initialize ParquetOutput with output path and configuration options.
Parameters
output_pathstr
Required
Base path where Parquet files are written
output_suffixstr
Optional
Suffix for output files (default: '')
typenameOptional[str]
Optional
Type name of the entity (e.g., 'database', 'schema', 'table')
chunk_sizeOptional[int]
Optional
Maximum records per chunk (default: 100000)
buffer_sizeint
Optional
Number of rows per buffer (default: 5000)
chunk_startOptional[int]
Optional
Starting index for chunk numbering
start_markerOptional[str]
Optional
Start marker for query extraction
end_markerOptional[str]
Optional
End marker for query extraction
retain_local_copybool
Optional
Whether to retain local copy after upload (default: False)
use_consolidationbool
Optional
Enable consolidation for efficient buffered writing (default: False)

write_dataframe

async
async write_dataframe(self, dataframe: pd.DataFrame) -> None
Writes a pandas DataFrame to Parquet files with automatic chunking and size management. Estimates file size based on DataFrame sample, splits into chunks when max_file_size_bytes is exceeded, uses Snappy compression, automatically uploads to object store, and records metrics for monitoring.
Parameters
dataframepd.DataFrame
Required
The pandas DataFrame to write

write_batched_dataframe

async
async write_batched_dataframe(self, batch_df: pd.DataFrame) -> None
Writes batched pandas DataFrames with optional consolidation support. When use_consolidation=True, accumulates DataFrames into temporary folders, uses Daft to merge into optimized files when threshold is reached, automatically cleans up temporary files, and is more memory-efficient for large datasets.
Parameters
batch_dfpd.DataFrame
Required
Batched pandas DataFrame to write

write_daft_dataframe

async
async write_daft_dataframe(self, dataframe: daft.DataFrame, partition_cols: Optional[List] = None, write_mode: Union[WriteMode, str] = WriteMode.APPEND, morsel_size: int = 100000) -> None
Writes a daft DataFrame to Parquet files with support for Hive partitioning and write modes. Automatically splits large DataFrames into multiple files based on max_file_size_bytes, supports Hive-style directory structure with partitioning, uses Daft's native file size management, and handles overwrite operations by deleting existing prefix in object store.
Parameters
dataframedaft.DataFrame
Required
The DataFrame to write
partition_colsOptional[List]
Optional
Column names or expressions for Hive partitioning
write_modeUnion[WriteMode, str]
Optional
Write mode (APPEND, OVERWRITE, OVERWRITE_PARTITIONS)
morsel_sizeint
Optional
Rows per morsel for local execution (default: 100000)

get_full_path

get_full_path(self) -> str
Returns the full output path including suffix and typename.
Returns
str - Full output directory path

Usage Examples

Basic initialization

Initialize ParquetOutput with basic configuration

from application_sdk.outputs import ParquetOutput

parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data",
typename="table",
chunk_size=100000,
buffer_size=5000
)

await parquet_output.write_dataframe(df)

With consolidation

Use consolidation for efficient buffered writing of large datasets

parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="data",
use_consolidation=True,
chunk_size=100000
)

await parquet_output.write_batched_dataframe(batched_df)

With Hive partitioning

Write with Hive-style partitioning for efficient data organization

from application_sdk.outputs import ParquetOutput, WriteMode
import daft

parquet_output = ParquetOutput(
output_path="/tmp/output",
output_suffix="partitioned_data"
)

await parquet_output.write_daft_dataframe(
daft_df,
partition_cols=["year", "month"],
write_mode=WriteMode.OVERWRITE
)

Usage patterns

For detailed usage patterns including Hive partitioning, consolidation, and other ParquetOutput-specific features, see Output usage patterns and select the ParquetOutput tab.

See also

  • Outputs: Base Output class and common usage patterns for all output types
  • JsonOutput: Write data to JSON files in JSONL format
  • IcebergOutput: Write data to Apache Iceberg tables with automatic table creation