Architecture (advanced)
You don't need to understand this to build apps. This page is for those who want to know what the framework does under the hood—or who are debugging unexpected behavior.
Two runtimes
The App Framework is built on two underlying systems:
Temporal—durability
Temporal is the execution engine that powers App.run() and every @entrypoint method (each as a separate Temporal workflow), and every @task method (as a Temporal activity). It provides:
- Automatic retry: failed tasks are retried according to the policy configured on
@task, without any code in your app. - State persistence: the result of every completed task is checkpointed. If the worker pod restarts mid-extraction, execution resumes from the last completed task, not from scratch.
- Replay: if a workflow needs to reconstruct its state (after a worker crash, for example), Temporal replays the
run()or@entrypointmethod using the recorded task results. This replay requirement is why these methods must be deterministic—more on that below.
You never import from temporalio directly. The App base class and @task decorator apply all necessary Temporal decorators under the hood at class definition time.
Infrastructure abstraction
Dapr is the infrastructure sidecar that runs alongside your app container. It provides a language-agnostic interface to:
- Secret store: retrieve credentials at runtime without hardcoding them in your image.
- State store: persist workflow state across runs (used by incremental extractors and checkpointing).
- Pub/sub: publish and subscribe to events between services.
- Bindings: invoke external services and triggers.
App code never calls Dapr directly. The self.context interface in @task methods and Handler methods talks to Dapr through a Protocol-based abstraction layer. The Dapr implementation is an internal detail; the interface is what your code depends on.
Object storage (uploading and downloading extraction artifacts) bypasses Dapr entirely and talks directly to S3, GCS, or Azure Blob via the obstore library. This avoids two Dapr limitations for large files: the gRPC message size cap, and Dapr's non-streaming gRPC implementation which otherwise materializes entire files in memory rather than streaming them through the connection.
Component map
| App Framework concept | What it's under the hood |
|---|---|
App.run() method | Temporal workflow |
@entrypoint method | Temporal workflow |
@task method | Temporal activity |
Handler class | FastAPI service |
self.context | Dapr sidecar (via Protocol interface) |
Secret store (for example self.context.get_secret()) | Dapr secret component |
State store (for example self.context.save_state()) | Dapr state component |
| Object storage (upload/download) | Direct S3/GCS/Azure access via obstore (no Dapr) |
MockStateStore, MockSecretStore in tests | In-memory implementations of the same Protocol interfaces |
The Protocol-based design means the framework can swap Temporal or Dapr for another engine without touching any app code. It also means unit tests never need a Dapr sidecar or Temporal server running—inject the mock implementations from application_sdk.testing and everything works in plain Python.
Why run() must be deterministic
Everything in this section applies equally to @entrypoint methods—each one is its own Temporal workflow and subject to the same determinism requirements as run().
When Temporal needs to rebuild the state of a running workflow (after a worker pod restart, for example), it replays the run() method. During replay, Temporal feeds the recorded task results back into run() instead of executing the tasks again. This lets the workflow resume from exactly where it left off.
For replay to work correctly, run() must produce the same sequence of task calls every time it's executed with the same input. If run() calls datetime.now() or uuid.uuid4(), those return different values on each replay—and Temporal detects the divergence and raises an error.
The App base class provides deterministic alternatives:
async def run(self, input: ExtractionInput) -> ExtractionOutput:
# Use self.now() instead of datetime.now()
run_timestamp = self.now()
# Use self.uuid() instead of uuid.uuid4()
batch_id = self.uuid()
result = await self.fetch_data(
FetchInput(connection_id=input.connection_id, batch_id=str(batch_id))
)
return ExtractionOutput(rows_extracted=result.count)
self.now() and self.uuid() are backed by Temporal's deterministic clock and UUID generator, so they return the same values on every replay.
The rule: run() is a plan—compose and sequence your tasks, but do no I/O and generate no random values. All side effects (API calls, database queries, file reads and writes) belong in @task methods, which don't need to be deterministic.
Sandboxing
The execution engine sandboxes the orchestration layer to enforce determinism. Code running in run() executes inside a restricted environment where certain operations are blocked.
Third-party libraries used in run() must be declared as passthrough_modules on the App class. Passthrough modules are excluded from the sandbox and pass through unchanged:
class MyConnector(App, passthrough_modules=["my_connector", "third_party_lib"]):
async def run(self, input: ExtractionInput) -> ExtractionOutput:
# third_party_lib is accessible here because it's declared as a passthrough
...
@task methods run outside the sandbox entirely. They can import any library without declaring passthrough modules.
If you see SandboxedCodeException or unexpected ImportError errors inside run(), the most likely fix is to add the failing module to passthrough_modules.
Payload limit
Temporal serializes task arguments and return values as JSON before persisting them and routing them between processes. This is what makes durable checkpointing possible—but it also means there is a hard limit: payloads larger than approximately 2 MB are rejected.
The App Framework enforces this at class definition time. Input and Output subclasses are inspected when Python imports your module. Forbidden types (Any, bytes, and unbounded list or dict) raise a PayloadSafetyError immediately, before any code runs:
class FetchOutput(Output):
rows: list[str] # PayloadSafetyError: unbounded list not allowed
rows: Annotated[list[str], MaxItems(10_000)] # correct — bounded list
data: bytes # PayloadSafetyError: bytes not allowed
data: FileReference # correct — large data stored externally
For task results that are genuinely large (parquet files, large record sets), use FileReference. The framework uploads the file to object storage automatically when the task completes, and downloads it automatically when the next task receives it as input. Only a lightweight reference (storage path and checksum) crosses the Temporal payload boundary:
from application_sdk.contracts.types import FileReference
class FetchOutput(Output):
results: FileReference # uploaded to object storage on task completion
class ProcessInput(Input):
results: FileReference # downloaded from object storage before the task runs
If the two tasks run back-to-back on the same worker, the file is already on disk and the download is skipped (verified via SHA-256 sidecar comparison).
As a rule of thumb: if you can't confidently bound a field in your contract to a small, fixed size, write the unbounded content to a file and use FileReference to reference it instead.
Deployment topology
Production
Handler Deployment (always-on, minimum 1 replica)
├── FastAPI on port 8000
├── /auth, /preflight, /metadata, /health endpoints
└── Accepts HTTP requests from the Atlan UI and platform
Worker Deployment (scales 0 → N based on queue depth)
├── Execution engine worker
├── Health endpoint on port 8081
└── Executes run() and @task methods from the task queue
The Handler pod is always on because the Atlan UI expects immediate HTTP responses for authentication checks, preflight validation, and metadata discovery. The Worker pod scales to zero when the task queue is empty—an idle app consumes zero compute resources.
Both deployments use the same container image, distinguished only by the --mode flag:
application-sdk --mode handler # Handler pod
application-sdk --mode worker # Worker pod
Local development
run_dev_combined() starts both the handler and the worker in a single process. This is equivalent to --mode combined and is the recommended approach for local development and integration tests:
import asyncio
from application_sdk.main import run_dev_combined
from app.connector import MyExtractor
asyncio.run(run_dev_combined(MyExtractor))
Infrastructure abstraction
Because Dapr and the execution engine are accessed through Protocol interfaces, the framework can swap them out without any changes to app code. This isn't just theoretical—it's how testing works:
from application_sdk.testing import MockStateStore, MockSecretStore
from application_sdk.infrastructure import InfrastructureContext, set_infrastructure
# Inject in-memory mocks — no Dapr sidecar needed
ctx = InfrastructureContext(
secret_store=MockSecretStore({"api-key": "test-secret"}),
state_store=MockStateStore(),
)
set_infrastructure(ctx)
# Now run @task methods directly — they call self.context.get_secret()
# and self.context.save_state() against the in-memory mocks
connector = MyConnector()
output = await connector.fetch_data(FetchInput(connection_id="test"))
The InfrastructureContext is a frozen dataclass held in a module-level singleton. It's set once at startup by application_sdk.main and accessed anywhere via get_infrastructure(). A module-level variable is used rather than a context variable because HTTP request handlers run in isolated contextvars.Context instances and silently receive None if a context variable is used.
The complete Protocol-to-implementation mapping:
| Protocol | Production implementation | Test implementation |
|---|---|---|
StateStore | Dapr state store | MockStateStore |
SecretStore | Dapr secret store | MockSecretStore, EnvironmentSecretStore |
PubSub | Dapr pub/sub | MockPubSub |
Binding | Dapr binding | MockBinding |
CapacityPool | Redis-backed pool | LocalCapacityPool |
Further reading
- Apps and tasks: the
App,@task,Input, andOutputprimitives - Configuration: the environment variables that connect your app to the execution engine and infrastructure runtime
- Test your apps: how to use the mock infrastructure implementations in your tests