Inputs and outputs
Every task boundary in the App Framework is a typed contract: a single Input object in, a single Output object out. These contracts are validated at startup—before your app runs a single task—so schema errors and unsafe types are caught at import time rather than in production.
Why typed contracts matter
The App Framework serializes Input and Output objects as it passes data between tasks. This means:
- Unsafe types fail fast. If a contract field uses a type the serialization layer can't safely handle, the framework raises a
PayloadSafetyErrorwhen your class is imported—not when a user runs a job. - Contract evolution is safe by design. Single-object contracts can gain new optional fields without breaking callers. Multi-parameter signatures can't—adding a parameter is a breaking change for every existing caller.
- Contracts are self-documenting. Named fields on a Pydantic model are readable at a glance; positional tuples aren't.
Define inputs and outputs
Input and Output are Pydantic base classes. Subclass them to define your contracts:
from application_sdk.contracts import Input, Output
class ExtractionInput(Input):
connection_id: str
max_records: int = 1000 # optional with default
class ExtractionOutput(Output):
rows_extracted: int
checkpoint_path: str = "" # optional with default
Every run() method and every @task method must accept exactly one Input and return exactly one Output. This one-in, one-out pattern is what enables safe contract evolution over time.
For more on using these contracts with tasks and the run() method, see Apps and tasks.
Payload safety
The framework's serialization layer passes task outputs to subsequent tasks as serialized data. Certain Python types are unsafe in this context:
Unsafe types
| Forbidden type | Risk |
|---|---|
Any | Bypasses type validation entirely |
bytes / bytearray | Binary data corrupts under JSON serialization |
Unbounded list[T] | No upper bound on size—risks truncation or out-of-memory errors |
Unbounded dict[K, V] | Same as unbounded list |
Using any of these types in a contract field raises PayloadSafetyError when the class is imported.
Safe alternatives
| Need | Safe alternative |
|---|---|
| List with a known upper bound | Annotated[list[str], MaxItems(1000)] |
| Large binary or file data | FileReference (stored in object storage between tasks) |
| Enum values | SerializableEnum |
| Large structured output | FileReference pointing to a Parquet or JSON file |
from typing import Annotated
from application_sdk.contracts import Input, Output
from application_sdk.contracts.types import MaxItems, FileReference, SerializableEnum, BoundedList, BoundedDict
class Status(SerializableEnum):
PENDING = "pending"
COMPLETE = "complete"
class ProcessInput(Input):
items: Annotated[list[str], MaxItems(5000)]
status: Status = Status.PENDING
class ProcessOutput(Output):
results: FileReference # large data — stored in object storage
processed_count: int
class BatchInput(Input):
record_ids: BoundedList[str, 10_000] # list[str] with MaxItems(10_000)
metadata: BoundedDict[str, str, 500] # dict[str, str] with MaxItems(500)
Passing data: FileReference
The framework's serialization layer has a size limit of approximately 2 MB per task payload. For data larger than this—Parquet files, CSV exports, large JSON blobs—use FileReference.
A FileReference holds a pointer to a file stored in object storage between tasks. The framework uploads the file automatically when the producing task returns its output, and downloads it automatically before the consuming task starts.
from application_sdk.contracts.types import FileReference
class FetchOutput(Output):
results: FileReference # uploaded automatically when this output leaves the task
class ProcessInput(Input):
results: FileReference # downloaded automatically before this task runs
class MyConnector(App):
@task
async def fetch(self, input: FetchInput) -> FetchOutput:
write_parquet(data, "/tmp/results.parquet")
return FetchOutput(results=FileReference.from_local("/tmp/results.parquet"))
@task
async def process(self, input: ProcessInput) -> ProcessOutput:
data = read_parquet(input.results.local_path)
...
async def run(self, input: FetchInput) -> ProcessOutput:
fetch_out = await self.fetch(input)
return await self.process(ProcessInput(results=fetch_out.results))
The local_path attribute on a received FileReference gives you the path to the downloaded file. You don't manage the upload or download lifecycle—the framework handles both.
Resumable task contract
If you can't break up a long-running task into smaller tasks, but still want to be able to track the progress and resume that task where it left off if there's a failure, use HeartbeatDetails to save that progress.
from application_sdk.contracts import HeartbeatDetails
class BatchProgress(HeartbeatDetails):
last_processed_id: str
records_done: int
class MyConnector(App):
@task(heartbeat_timeout_seconds=60)
async def process_batches(self, input: BatchInput) -> BatchOutput:
prev = await self.task_context.get_heartbeat_details(BatchProgress)
start_id = prev.last_processed_id if prev else None
for batch in get_batches(start_from=start_id):
process(batch)
await self.task_context.heartbeat(
BatchProgress(last_processed_id=batch.id, records_done=batch.count)
)
On a retry, get_heartbeat_details() returns the last BatchProgress that was saved before the failure. If no heartbeat was saved yet (first attempt or fresh start), it returns None.
HeartbeatDetails subclasses follow the same Pydantic rules and payload safety constraints as Input and Output.
Contract evolution rules
Contracts must evolve in a backwards-compatible way. The framework may be running old versions of your app alongside new versions—an in-flight run that started with the old contract must continue to work after a deployment.
The rule is simple: always add, never remove or rename.
| Change | Safe? | Notes |
|---|---|---|
| Add a new field with a default value | Yes | Old callers omit the field; the default fills in |
| Add a new field without a default | No | Old callers can't provide it—deserialization fails |
| Remove a field | No | Old callers still send it; new code silently drops it but behavior changes |
| Rename a field | No | Equivalent to removing the old field and adding a new one |
| Change a field's type | No | Old serialized values may not deserialize to the new type |
# Version 1
class ExtractionInput(Input):
connection_id: str
max_records: int = 1000
# Version 2 — safe evolution
class ExtractionInput(Input):
connection_id: str
max_records: int = 1000
retry_on_failure: bool = True # new field with default — safe
timeout_seconds: int = 300 # new field with default — safe
If you need to change behavior in a way that requires an incompatible contract change, create a new contract class (ExtractionInputV2) and migrate callers explicitly rather than modifying the existing class in place.
Quick tips
- Define contracts close to the tasks that use them—co-location makes the boundary obvious.
- Prefer specific types over broad ones: a nested Pydantic model is more self-documenting and safer than a
dict[str, Any]. - Use
FileReferencefor any structured data that might possibly be larger than a few hundred kilobytes to avoid hitting the serialization size limit. - Always add new fields with default values—this is the single most important contract evolution habit.
- Run
mypy,pyright,ty, or any similar type checker on your app code: becauseInputandOutputare Pydantic models, type errors across therun()→@task→run()call chain are fully statically checkable.
See also
- Apps and tasks: How
InputandOutputfit into the App and task lifecycle - Credentials: Securely pass credentials to your app using the framework's credential model