Use distributed locking
Apply the @needs_lock decorator to control concurrent operations across workflow instances and prevent resource overload.
Prerequisites
- Application SDK installed and configured
- Distributed locking configured
- External resource credentials and rate limit documentation
Apply @needs_lock to activities
- API rate limiting
- Database operations
- File operations
Control concurrent external API calls to prevent rate limit violations and API overload.
-
Import the decorator:
from temporalio import activityfrom application_sdk.decorators.locks import needs_lock -
Determine the appropriate
max_locksvalue based on your API's rate limits:# Example: API allows 10 concurrent requests@activity.defn@needs_lock(max_locks=10, lock_name="external_api")async def call_external_api(request_data: dict) -> dict:"""Call external API with rate limiting."""async with httpx.AsyncClient() as client:response = await client.post("https://api.example.com/endpoint",json=request_data)return response.json() -
Call the activity from your workflow with required timeout:
from datetime import timedeltafrom temporalio import workflow@workflow.defnclass APIWorkflow:@workflow.runasync def run(self, data: dict) -> dict:result = await workflow.execute_activity(call_external_api,args=[data],schedule_to_close_timeout=timedelta(minutes=5))return result
Control concurrent database operations to prevent connection pool exhaustion and performance degradation.
-
Import the decorator:
from temporalio import activityfrom application_sdk.decorators.locks import needs_lock -
Set
max_locksbased on your database connection pool limits:# Example: Database pool allows 5 heavy queries concurrently@activity.defn@needs_lock(max_locks=5, lock_name="analytics_queries")async def run_heavy_analytics(query: str, params: dict) -> list:"""Execute resource-intensive analytics query."""async with get_analytics_db() as conn:result = await conn.execute(query, params)return result.fetchall() -
Call the activity from your workflow:
from datetime import timedeltafrom temporalio import workflow@workflow.defnclass AnalyticsWorkflow:@workflow.runasync def run(self, query_config: dict) -> list:result = await workflow.execute_activity(run_heavy_analytics,args=[query_config["query"], query_config["params"]],schedule_to_close_timeout=timedelta(minutes=10))return result
Control concurrent file operations to prevent I/O bottlenecks and system resource exhaustion.
-
Import the decorator:
from temporalio import activityfrom application_sdk.decorators.locks import needs_lock -
Set
max_locksbased on your system's I/O capacity:# Example: System handles 3 concurrent large file operations@activity.defn@needs_lock(max_locks=3, lock_name="file_processing")async def process_large_files(file_paths: list) -> dict:"""Process large files with I/O rate limiting."""results = []for path in file_paths:result = await process_file(path)results.append(result)return {"processed": len(results), "results": results} -
Call the activity from your workflow:
from datetime import timedeltafrom temporalio import workflow@workflow.defnclass FileProcessingWorkflow:@workflow.runasync def run(self, files: list) -> dict:result = await workflow.execute_activity(process_large_files,args=[files],schedule_to_close_timeout=timedelta(minutes=15))return result
See also
- @needs_lock decorator: Decorator parameters reference
- Troubleshoot distributed locking: Common issues and solutions