Advanced Celery Task Throttling with Multiple Parameters
Introduction
Building on our previous guide on parameter-based rate limiting in Celery tasks, this article delves into a more advanced solution where tasks can be throttled based on multiple parameters. This is especially useful when you need fine-grained control over task execution rates across various dimensions, such as user ID and IP address.
Related Reading: If you haven’t read our previous blog post on Celery: Rate Limiting Tasks with Parameters , we recommend checking it out for foundational understanding and context.
Understanding the Enhanced Problem
In many scenarios, rate limiting based on a single parameter might not be sufficient. For example, you might need to throttle tasks based on both the user ID and the IP address to prevent abuse from a single user across different IPs or multiple users from the same IP.
The Enhanced Solution: Multi-Parameter Rate Limiting
Our enhanced solution involves modifying the existing throttling decorator to accept a list of parameters. This ensures tasks with the same combination of parameters do not exceed the allowed rate.
Step-by-Step Breakdown
1. Parsing the Rate
The parse_rate function remains the same as in the previous solution, converting a rate string into a tuple of allowed requests and time period in seconds.
def parse_rate(rate: str) -> Tuple[int, int]:
num, period = rate.split("/")
num_requests = int(num)
if len(period) > 1:
duration_multiplier = int(period[:-1])
duration_unit = period[-1]
else:
duration_multiplier = 1
duration_unit = period[-1]
duration_base = {"s": 1, "m": 60, "h": 3600, "d": 86400}[duration_unit]
duration = duration_base * duration_multiplier
return num_requests, duration
2. Throttling Decorator for Multiple Parameters
The throttle_task
decorator is modified to accept a list of parameters.
def throttle_task(rate: str, keys: List[str] = None) -> Callable:
def decorator_func(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
task = bound_args.arguments["self"]
key_values = []
if keys:
for key in keys:
try:
key_values.append(str(bound_args.arguments[key]))
except KeyError:
raise KeyError(
f"Unknown parameter '{key}' in throttle_task "
f"decorator of function {task.name}. "
f"`keys` parameter must match parameter "
f"names from function signature: '{sig}'"
)
composite_key = ":".join(key_values)
delay = get_task_wait(task, rate, key=composite_key)
if delay > 0:
task.request.retries = task.request.retries - 1
logger.info(
"Throttling task %s (%s) via decorator for %ss",
task.name,
task.request.id,
delay,
)
return task.retry(countdown=delay)
else:
return func(*args, **kwargs)
return wrapper
return decorator_func
Explanation:
- The decorator now accepts a list of parameter names (keys).
- It retrieves the values for these parameters and combines them into a composite key.
- The composite key is used to check and enforce the rate limit.
3. Rate Checking with Multiple Parameters
The is_rate_okay
function is updated to handle the composite key.
def is_rate_okay(task: Task, rate: str = "1/s", key: str = None) -> bool:
key = f"celery_throttle:{task.name}{':' + key if key else ''}"
r = make_redis_interface("CACHE")
num_tasks, duration = parse_rate(rate)
count = r.get(key)
if count is None:
r.set(key, 1)
r.expire(key, duration)
return True
else:
if int(count) <= num_tasks:
r.incr(key, 1)
return True
else:
return False
Explanation:
The composite key ensures that the rate limit is applied based on the combination of specified parameters.
4. Task Rescheduling
The set_for_next_window
function remains unchanged, ensuring tasks are rescheduled for the next time window.
How to Use the Enhanced Throttling Solution
To use this enhanced solution, follow these steps:
1. Define Your Celery Task:
Define the task you want to throttle with multiple parameters.
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True)
@throttle_task(rate='5/m', keys=['user_id', 'ip_address'])
def my_task(self, user_id, ip_address, data):
# Task logic here
pass
Explanation:
- The
@throttle_task
decorator is applied to the Celery task. - The rate parameter specifies the rate limit (e.g., 5 requests per minute).
- The keys parameter indicates the arguments (e.g., user_id, ip_address) used to differentiate between tasks.
2. Run Your Celery Worker:
Start the Celery worker to process tasks.
celery -A tasks worker --loglevel=info
3. Queue Tasks:
Queue tasks with different combinations of parameters to see the throttling in action.
my_task.apply_async(args=[1, '192.168.1.1', 'some data'])
my_task.apply_async(args=[1, '192.168.1.1', 'more data'])
my_task.apply_async(args=[2, '192.168.1.2', 'other data'])
Explanation:
- Tasks with the same combination of
user_id
andip_address
will be throttled according to the specified rate limit. - Tasks with different combinations will be processed independently.
Conclusion
By extending the Celery task throttling solution to handle multiple parameters, you can achieve even more precise control over task execution rates. This approach ensures that your tasks adhere to external rate limits while preventing abuse from multiple dimensions.