Skip to main content

Inputs and outputs

Every task boundary in the App Framework is a typed contract: a single Input object in, a single Output object out. These contracts are validated at startup—before your app runs a single task—so schema errors and unsafe types are caught at import time rather than in production.

Why typed contracts matter

The App Framework serializes Input and Output objects as it passes data between tasks. This means:

  • Unsafe types fail fast. If a contract field uses a type the serialization layer can't safely handle, the framework raises a PayloadSafetyError when your class is imported—not when a user runs a job.
  • Contract evolution is safe by design. Single-object contracts can gain new optional fields without breaking callers. Multi-parameter signatures can't—adding a parameter is a breaking change for every existing caller.
  • Contracts are self-documenting. Named fields on a Pydantic model are readable at a glance; positional tuples aren't.

Define inputs and outputs

Input and Output are Pydantic base classes. Subclass them to define your contracts:

from application_sdk.contracts import Input, Output

class ExtractionInput(Input):
connection_id: str
max_records: int = 1000 # optional with default

class ExtractionOutput(Output):
rows_extracted: int
checkpoint_path: str = "" # optional with default

Every run() method and every @task method must accept exactly one Input and return exactly one Output. This one-in, one-out pattern is what enables safe contract evolution over time.

For more on using these contracts with tasks and the run() method, see Apps and tasks.

Payload safety

The framework's serialization layer passes task outputs to subsequent tasks as serialized data. Certain Python types are unsafe in this context:

Unsafe types

Forbidden typeRisk
AnyBypasses type validation entirely
bytes / bytearrayBinary data corrupts under JSON serialization
Unbounded list[T]No upper bound on size—risks truncation or out-of-memory errors
Unbounded dict[K, V]Same as unbounded list

Using any of these types in a contract field raises PayloadSafetyError when the class is imported.

Safe alternatives

NeedSafe alternative
List with a known upper boundAnnotated[list[str], MaxItems(1000)]
Large binary or file dataFileReference (stored in object storage between tasks)
Enum valuesSerializableEnum
Large structured outputFileReference pointing to a Parquet or JSON file
from typing import Annotated
from application_sdk.contracts import Input, Output
from application_sdk.contracts.types import MaxItems, FileReference, SerializableEnum, BoundedList, BoundedDict

class Status(SerializableEnum):
PENDING = "pending"
COMPLETE = "complete"

class ProcessInput(Input):
items: Annotated[list[str], MaxItems(5000)]
status: Status = Status.PENDING

class ProcessOutput(Output):
results: FileReference # large data — stored in object storage
processed_count: int

class BatchInput(Input):
record_ids: BoundedList[str, 10_000] # list[str] with MaxItems(10_000)
metadata: BoundedDict[str, str, 500] # dict[str, str] with MaxItems(500)

Passing data: FileReference

The framework's serialization layer has a size limit of approximately 2 MB per task payload. For data larger than this—Parquet files, CSV exports, large JSON blobs—use FileReference.

A FileReference holds a pointer to a file stored in object storage between tasks. The framework uploads the file automatically when the producing task returns its output, and downloads it automatically before the consuming task starts.

from application_sdk.contracts.types import FileReference

class FetchOutput(Output):
results: FileReference # uploaded automatically when this output leaves the task

class ProcessInput(Input):
results: FileReference # downloaded automatically before this task runs

class MyConnector(App):
@task
async def fetch(self, input: FetchInput) -> FetchOutput:
write_parquet(data, "/tmp/results.parquet")
return FetchOutput(results=FileReference.from_local("/tmp/results.parquet"))

@task
async def process(self, input: ProcessInput) -> ProcessOutput:
data = read_parquet(input.results.local_path)
...

async def run(self, input: FetchInput) -> ProcessOutput:
fetch_out = await self.fetch(input)
return await self.process(ProcessInput(results=fetch_out.results))

The local_path attribute on a received FileReference gives you the path to the downloaded file. You don't manage the upload or download lifecycle—the framework handles both.

Resumable task contract

If you can't break up a long-running task into smaller tasks, but still want to be able to track the progress and resume that task where it left off if there's a failure, use HeartbeatDetails to save that progress.

from application_sdk.contracts import HeartbeatDetails

class BatchProgress(HeartbeatDetails):
last_processed_id: str
records_done: int

class MyConnector(App):
@task(heartbeat_timeout_seconds=60)
async def process_batches(self, input: BatchInput) -> BatchOutput:
prev = await self.task_context.get_heartbeat_details(BatchProgress)
start_id = prev.last_processed_id if prev else None

for batch in get_batches(start_from=start_id):
process(batch)
await self.task_context.heartbeat(
BatchProgress(last_processed_id=batch.id, records_done=batch.count)
)

On a retry, get_heartbeat_details() returns the last BatchProgress that was saved before the failure. If no heartbeat was saved yet (first attempt or fresh start), it returns None.

HeartbeatDetails subclasses follow the same Pydantic rules and payload safety constraints as Input and Output.

Contract evolution rules

Contracts must evolve in a backwards-compatible way. The framework may be running old versions of your app alongside new versions—an in-flight run that started with the old contract must continue to work after a deployment.

The rule is simple: always add, never remove or rename.

ChangeSafe?Notes
Add a new field with a default valueYesOld callers omit the field; the default fills in
Add a new field without a defaultNoOld callers can't provide it—deserialization fails
Remove a fieldNoOld callers still send it; new code silently drops it but behavior changes
Rename a fieldNoEquivalent to removing the old field and adding a new one
Change a field's typeNoOld serialized values may not deserialize to the new type
# Version 1
class ExtractionInput(Input):
connection_id: str
max_records: int = 1000

# Version 2 — safe evolution
class ExtractionInput(Input):
connection_id: str
max_records: int = 1000
retry_on_failure: bool = True # new field with default — safe
timeout_seconds: int = 300 # new field with default — safe

If you need to change behavior in a way that requires an incompatible contract change, create a new contract class (ExtractionInputV2) and migrate callers explicitly rather than modifying the existing class in place.

Quick tips

  • Define contracts close to the tasks that use them—co-location makes the boundary obvious.
  • Prefer specific types over broad ones: a nested Pydantic model is more self-documenting and safer than a dict[str, Any].
  • Use FileReference for any structured data that might possibly be larger than a few hundred kilobytes to avoid hitting the serialization size limit.
  • Always add new fields with default values—this is the single most important contract evolution habit.
  • Run mypy, pyright, ty, or any similar type checker on your app code: because Input and Output are Pydantic models, type errors across the run()@taskrun() call chain are fully statically checkable.

See also

  • Apps and tasks: How Input and Output fit into the App and task lifecycle
  • Credentials: Securely pass credentials to your app using the framework's credential model