IcebergOutput writes data to Apache Iceberg tables using daft. It supports table creation, schema inference, and multiple write modes. It inherits from the base Output class and provides specialized functionality for writing to Iceberg table format.
IcebergOutput
Class📁
application_sdk.outputs.icebergInheritance chain:
OutputWrites data to Apache Iceberg tables using daft. Supports table creation, schema inference, and multiple write modes.
Methods3
▸
__init__
__init__(self, iceberg_catalog: Catalog, iceberg_namespace: str, iceberg_table: Union[str, Table], mode: str = 'append', total_record_count: int = 0, chunk_count: int = 0, retain_local_copy: bool = False)Initialize IcebergOutput with PyIceberg catalog, namespace, and table. Accepts either a table name (string) or existing Table object.
Parameters
iceberg_catalogCatalogPyIceberg Catalog object
iceberg_namespacestrIceberg namespace (database name)
iceberg_tableUnion[str, Table]Table name or PyIceberg Table object
modestrWrite mode: 'append' or 'overwrite' (default: 'append')
total_record_countintInitial total record count (default: 0)
chunk_countintInitial chunk count (default: 0)
retain_local_copyboolWhether to retain local copy (default: False)
▸
write_dataframe
async
async write_dataframe(self, dataframe: pd.DataFrame) -> NoneWrites a pandas DataFrame to an Iceberg table. Internally converts to daft DataFrame, calls write_daft_dataframe() internally, records metrics for successful writes, and skips empty DataFrames.
Parameters
dataframepd.DataFrameThe pandas DataFrame to write
▸
write_daft_dataframe
async
async write_daft_dataframe(self, dataframe: daft.DataFrame) -> NoneWrites a daft DataFrame to an Iceberg table with automatic table creation if needed. If table name is provided (string), creates table if it doesn't exist using DataFrame schema. If table object is provided, uses existing table. Writes using Daft's write_iceberg() method, updates chunk_count and total_record_count, records metrics for monitoring, and skips empty DataFrames. Automatically infers schema from DataFrame and uses create_table_if_not_exists() to avoid errors if table already exists.
Parameters
dataframedaft.DataFrameThe daft DataFrame to write
Usage Examples
Initialize with catalog and table name
Create IcebergOutput with catalog and table name - table is created if it doesn't exist
from application_sdk.outputs import IcebergOutput
from pyiceberg.catalog import Catalog
iceberg_output = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="default",
iceberg_table="my_table",
mode="append"
)
await iceberg_output.write_dataframe(df)
Initialize with existing table object
Use existing PyIceberg Table object
from pyiceberg.table import Table
table = catalog.load_table("default.my_table")
iceberg_output = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="default",
iceberg_table=table,
mode="overwrite"
)
Usage patterns
For detailed usage patterns including writing to Iceberg tables, automatic table creation, using existing table objects, write modes, and other IcebergOutput-specific features, see Output usage patterns and select the IcebergOutput tab.
See also
- Outputs: Base Output class and common usage patterns for all output types
- ParquetOutput: Write data to Parquet files with chunking and Hive partitioning
- JsonOutput: Write data to JSON files in JSONL format