Skip to main content

@needs_lock

Interface
📁application_sdk.decorators.locks

Decorator that applies distributed locking to Temporal activities, coordinating resource access across multiple workflow instances to prevent race conditions and implement rate limiting.

Parameters

max_locksint
Required
Maximum number of concurrent executions allowed across all workflow instances. This value determines how many activities can execute simultaneously with the same lock name.
lock_namestr
Required
Unique identifier for the resource group being protected. Activities with the same lock name share the same pool of available slots.

Usage Examples

Control API rate limiting

This example applies @needs_lock to an activity that calls an external API. The decorator limits concurrent executions to 5 activities across all workflow instances, preventing API rate limit violations. When all 5 slots are occupied, additional activities wait and retry based on LOCK_RETRY_INTERVAL.

from temporalio import activity
from application_sdk.decorators.locks import needs_lock

@activity.defn
@needs_lock(max_locks=5, lock_name="example_api")
async def publish_to_api(items: list) -> dict:
"""Publish items to an external API with rate limiting."""
async with get_api_client() as client:
result = await client.publish_items(items)
return {"published": len(items), "result": result}

Invoke locked activity from workflow

This example invokes an activity with @needs_lock from a workflow. The schedule_to_close_timeout is required—the system uses this value to calculate lock TTL and prevent deadlocks. The lock is automatically released after the activity completes or fails.

from datetime import timedelta
from temporalio import workflow

@workflow.defn
class DataPublishWorkflow:
@workflow.run
async def run(self, data: dict) -> dict:
result = await workflow.execute_activity(
publish_to_api,
args=[data["entities"]],
schedule_to_close_timeout=timedelta(minutes=5) # Required
)
return result

See also