Skip to main content

Apps and tasks

Every App Framework application is built from two primitives: the app and the task. Understanding the distinction between them—and why it exists—is the foundation for writing apps that are durable, testable, and easy to reason about.

App: Executable

An app is a self-contained unit of execution. It has:

  • A typed Input that describes what data the app receives
  • A typed Output that describes what data the app produces
  • A run() method that orchestrates the work
from application_sdk.app import App, task
from application_sdk.contracts import Input, Output

class ExtractionInput(Input):
connection_id: str

class ExtractionOutput(Output):
rows_extracted: int

class MyConnector(App):
async def run(self, input: ExtractionInput) -> ExtractionOutput:
...

The run() method is the plan.

  • It receives the Input and decides which portions of it to send to which tasks.
  • It decides which tasks to execute, and in what order.
  • It builds up the final Output to return from the app.

But it doesn't side-effect anything directly—all I/O is left to tasks.

Limitations in run()

The framework may replay your app's orchestration to rebuild state after a failure—so run() must be deterministic, producing the same result given the same inputs. This means only certain logic is allowed here.

  • Allowed
    • Conditional if-else logic—for example, deciding whether to run a particular task given the results of a previous one
    • Parallelism mechanisms—for example, running multiple tasks in parallel with each other
  • Forbidden
    • Any I/O—network, files, API calls, SQL, database access, object store access, etc.—even via other SDKs
    • Any side-effects or randomness—including datetime.now() or uuid.uuid4()
  • Uncommon escape hatch
    • Any external dependencies—forbidden by default, but can be explicitly passed through when needed (uncommon)

Tasks are exempt from these requirements—they're executed once, and their results are checkpointed.

Tasks: Internal units of work

A task is where the real work happens. Decorate any async method on your App subclass with @task, and the framework treats its completion as a durable checkpoint. The framework also automatically retries the task if it fails, for example due to an ephemeral network issue.

class MyConnector(App):
@task(timeout_seconds=3600, auto_heartbeat_seconds=10)
async def fetch_data(self, input: ExtractionInput) -> ExtractionOutput:
# I/O, network calls, database access — all safe here
return ExtractionOutput(rows_extracted=42)

async def run(self, input: ExtractionInput) -> ExtractionOutput:
return await self.fetch_data(input)

Once a task completes, the framework records the result. If the app is interrupted and replayed, completed tasks are skipped—their stored outputs are returned directly without re-executing. This is what makes the framework durable: a failure only loses the progress since the last completed task, not the entire run.

Complete minimal example

from application_sdk.app import App, task
from application_sdk.contracts import Input, Output

class ExtractionInput(Input):
connection_id: str

class FetchInput(Input):
connection_id: str

class FetchOutput(Output):
rows_fetched: int

class TableListInput(Input):
connection_id: str
database: str

class TableListOutput(Output):
table_count: int

class ExtractionOutput(Output):
total_rows: int
total_tables: int

class MyConnector(App):
@task(timeout_seconds=1800, auto_heartbeat_seconds=30)
async def fetch_rows(self, input: FetchInput) -> FetchOutput:
# Perform the actual data fetch
return FetchOutput(rows_fetched=1000)

@task(timeout_seconds=600)
async def list_tables(self, input: TableListInput) -> TableListOutput:
return TableListOutput(table_count=12)

async def run(self, input: ExtractionInput) -> ExtractionOutput:
fetch_out = await self.fetch_rows(FetchInput(connection_id=input.connection_id))
table_out = await self.list_tables(
TableListInput(connection_id=input.connection_id, database="main")
)
return ExtractionOutput(
total_rows=fetch_out.rows_fetched,
total_tables=table_out.table_count,
)

The key points:

  • run() calls tasks by awaiting them.
  • Each await is a potential checkpoint.
  • The outputs of one task can be used as input to another, and as part of the overall output of the app.

Task orchestration

Conditional

Tasks can be conditionally executed using normal Python if/else logic in run(). A task that's never awaited is never executed—and never checkpointed.

async def run(self, input: ExtractionInput) -> ExtractionOutput:
fetch_out = await self.fetch_rows(FetchInput(connection_id=input.connection_id))

if fetch_out.rows_fetched > 0:
transform_out = await self.transform_data(TransformInput(rows=fetch_out.rows_fetched))
return ExtractionOutput(total_rows=transform_out.rows_processed)

return ExtractionOutput(total_rows=0)

Sequential

Tasks execute in the order they're awaited, so they'll run sequentially when awaited one after another:

async def run(self, input: ExtractionInput) -> ExtractionOutput:
db_out = await self.fetch_databases(FetchDbInput(connection_id=input.connection_id))
schema_out = await self.fetch_schemas(FetchSchemaInput(databases=db_out.databases))
return ExtractionOutput(count=schema_out.count)

Parallel

Use asyncio.gather to run tasks concurrently. The framework handles scheduling correctly:

import asyncio

async def run(self, input: ExtractionInput) -> ExtractionOutput:
schema_out, transform_out = await asyncio.gather(
self.fetch_schemas(FetchSchemaInput(databases=["main", "analytics"])),
self.transform_data(TransformInput(source=input.connection_id)),
)
return ExtractionOutput(count=schema_out.count + transform_out.count)

Parallel tasks each create their own checkpoint independently—if one fails and the app retries, any already-completed sibling tasks aren't re-executed.

Task sizing

How large does each task need to be? The right size depends on how much progress you can afford to lose if a failure occurs. A task that takes 30 seconds to complete means a worst-case restart cost of 30 seconds; a task that takes 20 minutes means you might lose up to 20 minutes of work.

DurationAssessmentExample
< 1 minToo granularIndividual API calls—slow to checkpoint and you'll exhaust your task budget
1–5 minIdealBatch of API calls, file processing, a page of records
5–10 minAcceptableLarge batch, complex transformation
> 15 minRiskyToo much progress at risk—add more task boundaries
info

A single app run supports up to approximately 5,000 task executions. For apps that process large datasets, design tasks around natural batch boundaries (pages of results, chunks of records) rather than individual operations.

Multiple entry points

By default, an app exposes a single point through which it can be triggered via run(). If your app needs more than one independently-triggerable entry point—for example, metadata extraction and query mining—use @entrypoint instead of overriding run():

from application_sdk.app import App, entrypoint, task

class SnowflakeApp(App):
@entrypoint
async def extract_metadata(self, input: ExtractionInput) -> ExtractionOutput:
return await self.fetch_tables(input)

@entrypoint
async def mine_queries(self, input: MiningInput) -> MiningOutput:
return await self.fetch_queries(input)

@task(timeout_seconds=1800)
async def fetch_tables(self, input: ExtractionInput) -> ExtractionOutput: ...

@task(timeout_seconds=3600)
async def fetch_queries(self, input: MiningInput) -> MiningOutput: ...

Each @entrypoint method becomes an independently triggerable entry point. All entry points share the same @task methods.

warning

When an app has multiple @entrypoint methods, don't also define run()—the @entrypoint decorator replaces it.

Lifecycle hooks

on_complete

on_complete() is called after run() finishes, whether it succeeded or raised an exception. Use it to send notifications, trigger downstream processes, or perform any cleanup that must happen regardless of outcome:

class MyConnector(App):
async def run(self, input: ExtractionInput) -> ExtractionOutput: ...

async def on_complete(self) -> None:
await self.notify_downstream()
await super().on_complete() # always call super—preserves built-in cleanup
info

Always call await super().on_complete() to preserve the framework's built-in file and storage cleanup behavior.

Built-in cleanup tasks

The framework provides two cleanup tasks, automatically called as part of await super().on_complete():

  • cleanup_files(): removes local temporary files whose paths were tracked via FileReference objects in task outputs.
  • cleanup_storage(): removes object storage artifacts uploaded with StorageTier.TRANSIENT. Files marked StorageTier.RETAINED or StorageTier.PERSISTENT are left untouched.

External dependencies in run()

In the unlikely scenario your app imports a third-party library that's used inside run() directly (rather than inside a task), declare it using passthrough_modules:

class MyConnector(App, passthrough_modules=["my_connector", "third_party_lib"]):
...

This tells the framework to make the library available within its sandboxed execution environment. Libraries used only inside @task methods don't need to be declared here.

Quick reference

Decorator / methodPurposeWhen to use
run()Orchestrates task executionDefine the order and conditions under which tasks run
@taskMarks a method as a durable, checkpointed unit of workAny method that performs I/O, network calls, or other side-effects
@entrypointMarks a method as an independently-triggerable starting pointWhen your app needs more than one triggerable starting point
on_complete()Post-run hook (success or failure)Notifications, downstream triggers, cleanup
passthrough_modulesDeclares third-party libraries used in the orchestration layerWhen run() imports non-standard libraries

See also

  • Inputs and outputs: Typed contracts, payload safety, FileReference, and contract evolution rules
  • Handlers: Expose your app over HTTP with a typed handler that validates auth and returns metadata.