Skip to main content

IcebergOutput writes data to Apache Iceberg tables using daft. It supports table creation, schema inference, and multiple write modes. It inherits from the base Output class and provides specialized functionality for writing to Iceberg table format.

IcebergOutput

Class
📁application_sdk.outputs.iceberg
Inheritance chain:
Output

Writes data to Apache Iceberg tables using daft. Supports table creation, schema inference, and multiple write modes.

Methods3

__init__

__init__(self, iceberg_catalog: Catalog, iceberg_namespace: str, iceberg_table: Union[str, Table], mode: str = 'append', total_record_count: int = 0, chunk_count: int = 0, retain_local_copy: bool = False)
Initialize IcebergOutput with PyIceberg catalog, namespace, and table. Accepts either a table name (string) or existing Table object.
Parameters
iceberg_catalogCatalog
Required
PyIceberg Catalog object
iceberg_namespacestr
Required
Iceberg namespace (database name)
iceberg_tableUnion[str, Table]
Required
Table name or PyIceberg Table object
modestr
Optional
Write mode: 'append' or 'overwrite' (default: 'append')
total_record_countint
Optional
Initial total record count (default: 0)
chunk_countint
Optional
Initial chunk count (default: 0)
retain_local_copybool
Optional
Whether to retain local copy (default: False)

write_dataframe

async
async write_dataframe(self, dataframe: pd.DataFrame) -> None
Writes a pandas DataFrame to an Iceberg table. Internally converts to daft DataFrame, calls write_daft_dataframe() internally, records metrics for successful writes, and skips empty DataFrames.
Parameters
dataframepd.DataFrame
Required
The pandas DataFrame to write

write_daft_dataframe

async
async write_daft_dataframe(self, dataframe: daft.DataFrame) -> None
Writes a daft DataFrame to an Iceberg table with automatic table creation if needed. If table name is provided (string), creates table if it doesn't exist using DataFrame schema. If table object is provided, uses existing table. Writes using Daft's write_iceberg() method, updates chunk_count and total_record_count, records metrics for monitoring, and skips empty DataFrames. Automatically infers schema from DataFrame and uses create_table_if_not_exists() to avoid errors if table already exists.
Parameters
dataframedaft.DataFrame
Required
The daft DataFrame to write

Usage Examples

Initialize with catalog and table name

Create IcebergOutput with catalog and table name - table is created if it doesn't exist

from application_sdk.outputs import IcebergOutput
from pyiceberg.catalog import Catalog

iceberg_output = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="default",
iceberg_table="my_table",
mode="append"
)

await iceberg_output.write_dataframe(df)

Initialize with existing table object

Use existing PyIceberg Table object

from pyiceberg.table import Table

table = catalog.load_table("default.my_table")
iceberg_output = IcebergOutput(
iceberg_catalog=catalog,
iceberg_namespace="default",
iceberg_table=table,
mode="overwrite"
)

Usage patterns

For detailed usage patterns including writing to Iceberg tables, automatic table creation, using existing table objects, write modes, and other IcebergOutput-specific features, see Output usage patterns and select the IcebergOutput tab.

See also

  • Outputs: Base Output class and common usage patterns for all output types
  • ParquetOutput: Write data to Parquet files with chunking and Hive partitioning
  • JsonOutput: Write data to JSON files in JSONL format