My blogger

My way, my world

0%

asyncio+celery

目前python asyncio的生态已经可以支撑大部分业务开发了,但异步任务这块,celery一是还没支持asyncio(记得5.0版本是计划支持的)二是还没看到好用的代替品,所以依赖异步任务的场景还是需要集成celery。

event loop

在考虑celery前,asyncio是自带异步任务的,一些小的异步任务可以通过ensure_feature丢到event loop里,比如starlette就支持在response上挂backgroud task

celery

因为celery不支持asyncio,所以worker在调用task需要使用loop.run_until_complete来运行asyncio代码,这样实际上worker在跟broker交互时是syncio,实际执行任务时是asyncio,性能上不会比同步代码差,而且执行任务过程可以不用再拆分大task到小task来实现并发(直接利用loop运行异步的小task),一段简单的包装代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from celery import shared_task


def task(
*task_args, **task_kwargs
) -> typing.Callable[[typing.Callable], typing.Callable]:
def decorator(func: typing.Callable):
@wraps(func)
def wrapper(*args, **kwargs):
if asyncio.iscoroutinefunction(func):
return asyncio.get_event_loop().run_until_complete(
func(*args, **kwargs)
)
else:
return func(*args, **kwargs)

# setattr(wrapper, "origion_func", func)
return shared_task(*task_args, **task_kwargs)(wrapper)

return decorator


@task()
async def hello():
await asyncio.sleep(1)

hello.delay()

坑一

由于celery worker是fork出来的子进程,然而event_loop是不支持fork的,如果在全局代码里生成了event_loop则子进程无法使用fork的event_loop,这时可以去掉全局代码的event_loop,也可以通过celery loader的hook重新生成event_loop:

1
2
3
4
5
6
7
class CeleryLoader(AppLoader):
def on_worker_process_init(self):
# uvloop.install()
asyncio.set_event_loop(asyncio.new_event_loop())
async_run(init_db())

celery = Celery("celery", loader=CeleryLoader)

坑二

如果不通过delay而是直接调用task函数的话(比如单元测试),由于是无法在一个running状态的loop里再调用run_until_complete的,最直接的解决方案是拿到注册的task的原始函数:

1
2
3
4
5
6
async def run_task(t, *args, **kwargs):
f = t._get_current_object().run.origion_func # 在task装饰器里注入origin_func变量,见上文
if asyncio.iscoroutinefunction(f):
return await f(*args, **kwargs)
else:
return f(*args, **kwargs)

总结

通过一点点的妥协,我们是可以在asyncio的生态享受到celery的便捷的,包括celery-beater等等都没问题。希望celery早日正式加入asyncio生态。