RedisLockInterceptor manages distributed locks for activities using Redis. It limits concurrent execution to a specified number of activities with the same lock name, preventing resource contention. Activities must be decorated with @needs_lock decorator to use locking. The interceptor automatically acquires locks before activity execution and releases them after completion, even if activities fail. Lock acquisition uses retry policy with configurable interval, and lock release failures are logged but don't fail workflows.
RedisLockInterceptor is part of the Interceptors system and can be combined with other interceptors.
RedisLockInterceptor
Classapplication_sdk.interceptors.lockManages distributed locks for activities using Redis. Limits concurrent execution to a specified number of activities with the same lock name, preventing resource contention. Activities must be decorated with @needs_lock decorator to use locking. The interceptor automatically acquires locks before activity execution and releases them after completion, even if activities fail. Lock acquisition uses retry policy with configurable interval, and lock release failures are logged but don't fail workflows.
Methods1
__init__
__init__(self, activities: Dict[str, Callable])Parameters
activitiesDict[str, Callable]Usage Examples
Use distributed locking for activities
Register RedisLockInterceptor and decorate activities with @needs_lock to limit concurrent execution
from application_sdk.decorators.locks import needs_lock
from application_sdk.interceptors import RedisLockInterceptor
from temporalio import activity
from temporalio.worker import Worker
@needs_lock(lock_name="database_connection", max_locks=3)
@activity.defn
async def database_activity():
# Only 3 instances of this activity can run concurrently
pass
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[
database_activity,
acquire_distributed_lock,
release_distributed_lock
],
interceptors=[
RedisLockInterceptor(activities={
"database_activity": database_activity
})
]
)
Activity timeout requirement
Activities with @needs_lock must specify schedule_to_close_timeout when called from workflows
from datetime import timedelta
# Correct - with timeout
await workflow.execute_activity(
database_activity,
schedule_to_close_timeout=timedelta(minutes=10)
)
# Incorrect - missing timeout raises WorkflowError
await workflow.execute_activity(database_activity)
Lock configuration
Activities are configured for locking using the @needs_lock decorator:
from application_sdk.decorators.locks import needs_lock
@needs_lock(lock_name="database_connection", max_locks=5)
@activity.defn
async def my_activity():
pass
Decorator parameters:
lock_name(str): Name of the lock (defaults to activity name)max_locks(int): Maximum number of concurrent executions (default: 5)
Requirements
- Activities with
@needs_lockmust be called withschedule_to_close_timeoutparameter - Lock activities (
acquire_distributed_lock,release_distributed_lock) must be registered with the worker - Activities dictionary must map activity names to functions for interceptor to check decorators
Configuration
Environment variables:
IS_LOCKING_DISABLED: Set toTrueto disable locking (useful for testing)LOCK_RETRY_INTERVAL_SECONDS: Interval between lock acquisition retries
See also
- Interceptors: Overview of all interceptors and how to combine them
- EventInterceptor: Track workflow and activity execution events for monitoring
- CleanupInterceptor: Automatically clean up temporary artifacts and activity state
- Distributed locking: Concepts and usage of distributed locking for activities