Apps and tasks
Every App Framework application is built from two primitives: the app and the task. Understanding the distinction between them—and why it exists—is the foundation for writing apps that are durable, testable, and easy to reason about.
App: Executable
An app is a self-contained unit of execution. It has:
- A typed
Inputthat describes what data the app receives - A typed
Outputthat describes what data the app produces - A
run()method that orchestrates the work
from application_sdk.app import App, task
from application_sdk.contracts import Input, Output
class ExtractionInput(Input):
connection_id: str
class ExtractionOutput(Output):
rows_extracted: int
class MyConnector(App):
async def run(self, input: ExtractionInput) -> ExtractionOutput:
...
The run() method is the plan.
- It receives the
Inputand decides which portions of it to send to which tasks. - It decides which tasks to execute, and in what order.
- It builds up the final
Outputto return from the app.
But it doesn't side-effect anything directly—all I/O is left to tasks.
Limitations in run()
The framework may replay your app's orchestration to rebuild state after a failure—so run() must be deterministic, producing the same result given the same inputs. This means only certain logic is allowed here.
- Allowed
- Conditional if-else logic—for example, deciding whether to run a particular task given the results of a previous one
- Parallelism mechanisms—for example, running multiple tasks in parallel with each other
- Forbidden
- Any I/O—network, files, API calls, SQL, database access, object store access, etc.—even via other SDKs
- Any side-effects or randomness—including
datetime.now()oruuid.uuid4()
- Uncommon escape hatch
- Any external dependencies—forbidden by default, but can be explicitly passed through when needed (uncommon)
Tasks are exempt from these requirements—they're executed once, and their results are checkpointed.
Tasks: Internal units of work
A task is where the real work happens. Decorate any async method on your App subclass with @task, and the framework treats its completion as a durable checkpoint. The framework also automatically retries the task if it fails, for example due to an ephemeral network issue.
class MyConnector(App):
@task(timeout_seconds=3600, auto_heartbeat_seconds=10)
async def fetch_data(self, input: ExtractionInput) -> ExtractionOutput:
# I/O, network calls, database access — all safe here
return ExtractionOutput(rows_extracted=42)
async def run(self, input: ExtractionInput) -> ExtractionOutput:
return await self.fetch_data(input)
Once a task completes, the framework records the result. If the app is interrupted and replayed, completed tasks are skipped—their stored outputs are returned directly without re-executing. This is what makes the framework durable: a failure only loses the progress since the last completed task, not the entire run.
Complete minimal example
from application_sdk.app import App, task
from application_sdk.contracts import Input, Output
class ExtractionInput(Input):
connection_id: str
class FetchInput(Input):
connection_id: str
class FetchOutput(Output):
rows_fetched: int
class TableListInput(Input):
connection_id: str
database: str
class TableListOutput(Output):
table_count: int
class ExtractionOutput(Output):
total_rows: int
total_tables: int
class MyConnector(App):
@task(timeout_seconds=1800, auto_heartbeat_seconds=30)
async def fetch_rows(self, input: FetchInput) -> FetchOutput:
# Perform the actual data fetch
return FetchOutput(rows_fetched=1000)
@task(timeout_seconds=600)
async def list_tables(self, input: TableListInput) -> TableListOutput:
return TableListOutput(table_count=12)
async def run(self, input: ExtractionInput) -> ExtractionOutput:
fetch_out = await self.fetch_rows(FetchInput(connection_id=input.connection_id))
table_out = await self.list_tables(
TableListInput(connection_id=input.connection_id, database="main")
)
return ExtractionOutput(
total_rows=fetch_out.rows_fetched,
total_tables=table_out.table_count,
)
The key points:
run()calls tasks by awaiting them.- Each
awaitis a potential checkpoint. - The outputs of one task can be used as input to another, and as part of the overall output of the app.
Task orchestration
Conditional
Tasks can be conditionally executed using normal Python if/else logic in run(). A task that's never awaited is never executed—and never checkpointed.
async def run(self, input: ExtractionInput) -> ExtractionOutput:
fetch_out = await self.fetch_rows(FetchInput(connection_id=input.connection_id))
if fetch_out.rows_fetched > 0:
transform_out = await self.transform_data(TransformInput(rows=fetch_out.rows_fetched))
return ExtractionOutput(total_rows=transform_out.rows_processed)
return ExtractionOutput(total_rows=0)
Sequential
Tasks execute in the order they're awaited, so they'll run sequentially when awaited one after another:
async def run(self, input: ExtractionInput) -> ExtractionOutput:
db_out = await self.fetch_databases(FetchDbInput(connection_id=input.connection_id))
schema_out = await self.fetch_schemas(FetchSchemaInput(databases=db_out.databases))
return ExtractionOutput(count=schema_out.count)
Parallel
Use asyncio.gather to run tasks concurrently. The framework handles scheduling correctly:
import asyncio
async def run(self, input: ExtractionInput) -> ExtractionOutput:
schema_out, transform_out = await asyncio.gather(
self.fetch_schemas(FetchSchemaInput(databases=["main", "analytics"])),
self.transform_data(TransformInput(source=input.connection_id)),
)
return ExtractionOutput(count=schema_out.count + transform_out.count)
Parallel tasks each create their own checkpoint independently—if one fails and the app retries, any already-completed sibling tasks aren't re-executed.
Task sizing
How large does each task need to be? The right size depends on how much progress you can afford to lose if a failure occurs. A task that takes 30 seconds to complete means a worst-case restart cost of 30 seconds; a task that takes 20 minutes means you might lose up to 20 minutes of work.
| Duration | Assessment | Example |
|---|---|---|
| < 1 min | Too granular | Individual API calls—slow to checkpoint and you'll exhaust your task budget |
| 1–5 min | Ideal | Batch of API calls, file processing, a page of records |
| 5–10 min | Acceptable | Large batch, complex transformation |
| > 15 min | Risky | Too much progress at risk—add more task boundaries |
A single app run supports up to approximately 5,000 task executions. For apps that process large datasets, design tasks around natural batch boundaries (pages of results, chunks of records) rather than individual operations.
Multiple entry points
By default, an app exposes a single point through which it can be triggered via run(). If your app needs more than one independently-triggerable entry point—for example, metadata extraction and query mining—use @entrypoint instead of overriding run():
from application_sdk.app import App, entrypoint, task
class SnowflakeApp(App):
@entrypoint
async def extract_metadata(self, input: ExtractionInput) -> ExtractionOutput:
return await self.fetch_tables(input)
@entrypoint
async def mine_queries(self, input: MiningInput) -> MiningOutput:
return await self.fetch_queries(input)
@task(timeout_seconds=1800)
async def fetch_tables(self, input: ExtractionInput) -> ExtractionOutput: ...
@task(timeout_seconds=3600)
async def fetch_queries(self, input: MiningInput) -> MiningOutput: ...
Each @entrypoint method becomes an independently triggerable entry point. All entry points share the same @task methods.
When an app has multiple @entrypoint methods, don't also define run()—the @entrypoint decorator replaces it.
Lifecycle hooks
on_complete
on_complete() is called after run() finishes, whether it succeeded or raised an exception. Use it to send notifications, trigger downstream processes, or perform any cleanup that must happen regardless of outcome:
class MyConnector(App):
async def run(self, input: ExtractionInput) -> ExtractionOutput: ...
async def on_complete(self) -> None:
await self.notify_downstream()
await super().on_complete() # always call super—preserves built-in cleanup
Always call await super().on_complete() to preserve the framework's built-in file and storage cleanup behavior.
Built-in cleanup tasks
The framework provides two cleanup tasks, automatically called as part of await super().on_complete():
cleanup_files(): removes local temporary files whose paths were tracked viaFileReferenceobjects in task outputs.cleanup_storage(): removes object storage artifacts uploaded withStorageTier.TRANSIENT. Files markedStorageTier.RETAINEDorStorageTier.PERSISTENTare left untouched.
External dependencies in run()
In the unlikely scenario your app imports a third-party library that's used inside run() directly (rather than inside a task), declare it using passthrough_modules:
class MyConnector(App, passthrough_modules=["my_connector", "third_party_lib"]):
...
This tells the framework to make the library available within its sandboxed execution environment. Libraries used only inside @task methods don't need to be declared here.
Quick reference
| Decorator / method | Purpose | When to use |
|---|---|---|
run() | Orchestrates task execution | Define the order and conditions under which tasks run |
@task | Marks a method as a durable, checkpointed unit of work | Any method that performs I/O, network calls, or other side-effects |
@entrypoint | Marks a method as an independently-triggerable starting point | When your app needs more than one triggerable starting point |
on_complete() | Post-run hook (success or failure) | Notifications, downstream triggers, cleanup |
passthrough_modules | Declares third-party libraries used in the orchestration layer | When run() imports non-standard libraries |
See also
- Inputs and outputs: Typed contracts, payload safety,
FileReference, and contract evolution rules - Handlers: Expose your app over HTTP with a typed handler that validates auth and returns metadata.