Use distributed locking
Apply the @needs_lock
decorator to control concurrent operations across workflow instances and prevent resource overload.
Prerequisites
- Application SDK installed and configured
- Distributed locking configured
- External resource credentials and rate limit documentation
Apply @needs_lock to activities
- API rate limiting
- Database operations
- File operations
Control concurrent external API calls to prevent rate limit violations and API overload.
-
Import the decorator:
from temporalio import activity
from application_sdk.decorators.locks import needs_lock -
Determine the appropriate
max_locks
value based on your API's rate limits:# Example: API allows 10 concurrent requests
@activity.defn
@needs_lock(max_locks=10, lock_name="external_api")
async def call_external_api(request_data: dict) -> dict:
"""Call external API with rate limiting."""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.example.com/endpoint",
json=request_data
)
return response.json() -
Call the activity from your workflow with required timeout:
from datetime import timedelta
from temporalio import workflow
@workflow.defn
class APIWorkflow:
@workflow.run
async def run(self, data: dict) -> dict:
result = await workflow.execute_activity(
call_external_api,
args=[data],
schedule_to_close_timeout=timedelta(minutes=5)
)
return result
Control concurrent database operations to prevent connection pool exhaustion and performance degradation.
-
Import the decorator:
from temporalio import activity
from application_sdk.decorators.locks import needs_lock -
Set
max_locks
based on your database connection pool limits:# Example: Database pool allows 5 heavy queries concurrently
@activity.defn
@needs_lock(max_locks=5, lock_name="analytics_queries")
async def run_heavy_analytics(query: str, params: dict) -> list:
"""Execute resource-intensive analytics query."""
async with get_analytics_db() as conn:
result = await conn.execute(query, params)
return result.fetchall() -
Call the activity from your workflow:
from datetime import timedelta
from temporalio import workflow
@workflow.defn
class AnalyticsWorkflow:
@workflow.run
async def run(self, query_config: dict) -> list:
result = await workflow.execute_activity(
run_heavy_analytics,
args=[query_config["query"], query_config["params"]],
schedule_to_close_timeout=timedelta(minutes=10)
)
return result
Control concurrent file operations to prevent I/O bottlenecks and system resource exhaustion.
-
Import the decorator:
from temporalio import activity
from application_sdk.decorators.locks import needs_lock -
Set
max_locks
based on your system's I/O capacity:# Example: System handles 3 concurrent large file operations
@activity.defn
@needs_lock(max_locks=3, lock_name="file_processing")
async def process_large_files(file_paths: list) -> dict:
"""Process large files with I/O rate limiting."""
results = []
for path in file_paths:
result = await process_file(path)
results.append(result)
return {"processed": len(results), "results": results} -
Call the activity from your workflow:
from datetime import timedelta
from temporalio import workflow
@workflow.defn
class FileProcessingWorkflow:
@workflow.run
async def run(self, files: list) -> dict:
result = await workflow.execute_activity(
process_large_files,
args=[files],
schedule_to_close_timeout=timedelta(minutes=15)
)
return result
See also
- @needs_lock decorator: Decorator parameters reference
- Troubleshoot distributed locking: Common issues and solutions