Advanced Celery Task Throttling with Multiple Parameters

Introduction

Building on our previous guide on parameter-based rate limiting in Celery tasks, this article delves into a more advanced solution where tasks can be throttled based on multiple parameters. This is especially useful when you need fine-grained control over task execution rates across various dimensions, such as user ID and IP address.

Related Reading: If you haven’t read our previous blog post on Celery: Rate Limiting Tasks with Parameters , we recommend checking it out for foundational understanding and context.

Understanding the Enhanced Problem

In many scenarios, rate limiting based on a single parameter might not be sufficient. For example, you might need to throttle tasks based on both the user ID and the IP address to prevent abuse from a single user across different IPs or multiple users from the same IP.

The Enhanced Solution: Multi-Parameter Rate Limiting

Our enhanced solution involves modifying the existing throttling decorator to accept a list of parameters. This ensures tasks with the same combination of parameters do not exceed the allowed rate.

Step-by-Step Breakdown

1. Parsing the Rate

The parse_rate function remains the same as in the previous solution, converting a rate string into a tuple of allowed requests and time period in seconds.

def parse_rate(rate: str) -> Tuple[int, int]:
    num, period = rate.split("/")
    num_requests = int(num)
    if len(period) > 1:
        duration_multiplier = int(period[:-1])
        duration_unit = period[-1]
    else:
        duration_multiplier = 1
        duration_unit = period[-1]
    duration_base = {"s": 1, "m": 60, "h": 3600, "d": 86400}[duration_unit]
    duration = duration_base * duration_multiplier
    return num_requests, duration

2. Throttling Decorator for Multiple Parameters

The throttle_task decorator is modified to accept a list of parameters.

def throttle_task(rate: str, keys: List[str] = None) -> Callable:
    def decorator_func(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            sig = inspect.signature(func)
            bound_args = sig.bind(*args, **kwargs)
            task = bound_args.arguments["self"]
            key_values = []
            if keys:
                for key in keys:
                    try:
                        key_values.append(str(bound_args.arguments[key]))
                    except KeyError:
                        raise KeyError(
                            f"Unknown parameter '{key}' in throttle_task "
                            f"decorator of function {task.name}. "
                            f"`keys` parameter must match parameter "
                            f"names from function signature: '{sig}'"
                        )
            composite_key = ":".join(key_values)
            delay = get_task_wait(task, rate, key=composite_key)
            if delay > 0:
                task.request.retries = task.request.retries - 1
                logger.info(
                    "Throttling task %s (%s) via decorator for %ss",
                    task.name,
                    task.request.id,
                    delay,
                )
                return task.retry(countdown=delay)
            else:
                return func(*args, **kwargs)
        return wrapper
    return decorator_func
Explanation:
  • The decorator now accepts a list of parameter names (keys).
  • It retrieves the values for these parameters and combines them into a composite key.
  • The composite key is used to check and enforce the rate limit.

3. Rate Checking with Multiple Parameters

The is_rate_okay function is updated to handle the composite key.

def is_rate_okay(task: Task, rate: str = "1/s", key: str = None) -> bool:
    key = f"celery_throttle:{task.name}{':' + key if key else ''}"
    r = make_redis_interface("CACHE")
    num_tasks, duration = parse_rate(rate)
    count = r.get(key)
    if count is None:
        r.set(key, 1)
        r.expire(key, duration)
        return True
    else:
        if int(count) <= num_tasks:
            r.incr(key, 1)
            return True
        else:
            return False
Explanation:

The composite key ensures that the rate limit is applied based on the combination of specified parameters.

4. Task Rescheduling

The set_for_next_window function remains unchanged, ensuring tasks are rescheduled for the next time window.

How to Use the Enhanced Throttling Solution

To use this enhanced solution, follow these steps:

1. Define Your Celery Task:

Define the task you want to throttle with multiple parameters.

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True)
@throttle_task(rate='5/m', keys=['user_id', 'ip_address'])
def my_task(self, user_id, ip_address, data):
    # Task logic here
    pass
Explanation:
  • The @throttle_task decorator is applied to the Celery task.
  • The rate parameter specifies the rate limit (e.g., 5 requests per minute).
  • The keys parameter indicates the arguments (e.g., user_id, ip_address) used to differentiate between tasks.

2. Run Your Celery Worker:

Start the Celery worker to process tasks.

celery -A tasks worker --loglevel=info

3. Queue Tasks:

Queue tasks with different combinations of parameters to see the throttling in action.

my_task.apply_async(args=[1, '192.168.1.1', 'some data'])
my_task.apply_async(args=[1, '192.168.1.1', 'more data'])
my_task.apply_async(args=[2, '192.168.1.2', 'other data'])

Explanation:

  • Tasks with the same combination of user_id and ip_address will be throttled according to the specified rate limit.
  • Tasks with different combinations will be processed independently.

Conclusion

By extending the Celery task throttling solution to handle multiple parameters, you can achieve even more precise control over task execution rates. This approach ensures that your tasks adhere to external rate limits while preventing abuse from multiple dimensions.

Explore More Celery Posts

Effective Celery Task Throttling: Parameter-Based Rate Limiting

Learn how to implement parameter-based rate limiting in Celery tasks to control execution rates and comply with API rate limits efficiently.

Read More
Celery: Efficient Task Queue Management with Python

Learn how to use Celery for task queue management in Python applications. Discover the best practices, and implementation with examples. Improve proc…

Read More
Celery with Redis for Efficient Task Queue Management in Python

Learn how to use Celery with Redis for efficient task queue management and how to monitor task results and failures in a Celery application.

Read More
Optimize Your Celery Setup: Tips & Techniques

Maximize task efficiency and minimize failure with these Celery best practices and third-party tools. Implement Redis, retries and callbacks with exa…

Read More