django - Is there any way to define task quota in celery? -
i have requirements:
- i have few heavy-resource-consume task - exporting different reports require big complex queries, sub queries
- there lot users.
- i have built project in django, , queue task using celery
- i want restrict user can request 10 report per minute. idea can put hundreds of request 10 minute, want celery execute 10 task user. every user gets turn.
is there way celery can this?
thanks
celery has setting control rate_limit (http://celery.readthedocs.org/en/latest/userguide/tasks.html#task.rate_limit), means, number of task running in time frame. set '100/m' (hundred per second) maning system allows 100 tasks per seconds, important notice, setting not per user neither task, per time frame. have thought approach instead of limiting per user?
in order have 'rate_limit' per task , user pair have it. think (not sure) use taskrouter or signal based on needs. taskrouters (http://celery.readthedocs.org/en/latest/userguide/routing.html#routers) allow route tasks specify queue aplying logic. signals (http://celery.readthedocs.org/en/latest/userguide/signals.html) allow execute code in few well-defined points of task's scheduling cycle.
an example of router's logic be:
if task == 'a': user_id = args[0] # in task user_id first arg qty = get_task_qty('a', user_id) if qty > limit_for_a: return elif task == 'b': user_id = args[2] # in task user_id seconds arg qty = get_task_qty('b', user_id) if qty > limit_for_b: return return {'queue': 'default'}
with approach above, every time task starts should increment 1 in place (for example redis) pair user_id/task_type , every time task finishes should decrement value in same place.
its seems kind of complex, hard maintain , few failure points me.
other approach, think fit, implement kind of 'distributed semaphore' (similar distributed lock) per user , task, in each task needs limit number of task running use it.
the idea is, every time task should have 'concurrency control' starts have check if there resource available if not return.
you imagine idea below:
@shared_task def my_task_a(user_id, arg1, arg2): resource_key = 'my_task_a_{}'.format(user_id) available = semaphoremanager.is_available_resource(resource_key) if not available: # no resources abort return try: # resourse acquired before other if semaphoremanager.acquire(resource_key): #execute code finally: semaphoremanager.release(resource_key)
its hard approach should take because depends on application.
hope helps you!
good luck!
Comments
Post a Comment