Skip to main content

Use distributed locking

Apply the @needs_lock decorator to control concurrent operations across workflow instances and prevent resource overload.

Prerequisites

Apply @needs_lock to activities

Control concurrent external API calls to prevent rate limit violations and API overload.

  1. Import the decorator:

    from temporalio import activity
    from application_sdk.decorators.locks import needs_lock
  2. Determine the appropriate max_locks value based on your API's rate limits:

    # Example: API allows 10 concurrent requests
    @activity.defn
    @needs_lock(max_locks=10, lock_name="external_api")
    async def call_external_api(request_data: dict) -> dict:
    """Call external API with rate limiting."""
    async with httpx.AsyncClient() as client:
    response = await client.post(
    "https://api.example.com/endpoint",
    json=request_data
    )
    return response.json()
  3. Call the activity from your workflow with required timeout:

    from datetime import timedelta
    from temporalio import workflow

    @workflow.defn
    class APIWorkflow:
    @workflow.run
    async def run(self, data: dict) -> dict:
    result = await workflow.execute_activity(
    call_external_api,
    args=[data],
    schedule_to_close_timeout=timedelta(minutes=5)
    )
    return result

See also