Skip to main content

RedisLockInterceptor manages distributed locks for activities using Redis. It limits concurrent execution to a specified number of activities with the same lock name, preventing resource contention. Activities must be decorated with @needs_lock decorator to use locking. The interceptor automatically acquires locks before activity execution and releases them after completion, even if activities fail. Lock acquisition uses retry policy with configurable interval, and lock release failures are logged but don't fail workflows.

RedisLockInterceptor is part of the Interceptors system and can be combined with other interceptors.

RedisLockInterceptor

Class
📁application_sdk.interceptors.lock

Manages distributed locks for activities using Redis. Limits concurrent execution to a specified number of activities with the same lock name, preventing resource contention. Activities must be decorated with @needs_lock decorator to use locking. The interceptor automatically acquires locks before activity execution and releases them after completion, even if activities fail. Lock acquisition uses retry policy with configurable interval, and lock release failures are logged but don't fail workflows.

Methods1

__init__

__init__(self, activities: Dict[str, Callable])
Initialize RedisLockInterceptor with a dictionary mapping activity names to their functions. The interceptor uses this mapping to check for @needs_lock decorators on activities.
Parameters
activitiesDict[str, Callable]
Required
Dictionary mapping activity names to activity functions

Usage Examples

Use distributed locking for activities

Register RedisLockInterceptor and decorate activities with @needs_lock to limit concurrent execution

from application_sdk.decorators.locks import needs_lock
from application_sdk.interceptors import RedisLockInterceptor
from temporalio import activity
from temporalio.worker import Worker

@needs_lock(lock_name="database_connection", max_locks=3)
@activity.defn
async def database_activity():
# Only 3 instances of this activity can run concurrently
pass

worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[
database_activity,
acquire_distributed_lock,
release_distributed_lock
],
interceptors=[
RedisLockInterceptor(activities={
"database_activity": database_activity
})
]
)

Activity timeout requirement

Activities with @needs_lock must specify schedule_to_close_timeout when called from workflows

from datetime import timedelta

# Correct - with timeout
await workflow.execute_activity(
database_activity,
schedule_to_close_timeout=timedelta(minutes=10)
)

# Incorrect - missing timeout raises WorkflowError
await workflow.execute_activity(database_activity)

Lock configuration

Activities are configured for locking using the @needs_lock decorator:

from application_sdk.decorators.locks import needs_lock

@needs_lock(lock_name="database_connection", max_locks=5)
@activity.defn
async def my_activity():
pass

Decorator parameters:

  • lock_name (str): Name of the lock (defaults to activity name)
  • max_locks (int): Maximum number of concurrent executions (default: 5)

Requirements

  • Activities with @needs_lock must be called with schedule_to_close_timeout parameter
  • Lock activities (acquire_distributed_lock, release_distributed_lock) must be registered with the worker
  • Activities dictionary must map activity names to functions for interceptor to check decorators

Configuration

Environment variables:

  • IS_LOCKING_DISABLED: Set to True to disable locking (useful for testing)
  • LOCK_RETRY_INTERVAL_SECONDS: Interval between lock acquisition retries

See also