目前python asyncio的生态已经可以支撑大部分业务开发了,但异步任务这块,celery一是还没支持asyncio(记得5.0版本是计划支持的)二是还没看到好用的代替品,所以依赖异步任务的场景还是需要集成celery。
event loop
在考虑celery前,asyncio是自带异步任务的,一些小的异步任务可以通过ensure_feature
丢到event loop里,比如starlette就支持在response上挂backgroud task
celery
因为celery不支持asyncio,所以worker在调用task需要使用loop.run_until_complete
来运行asyncio代码,这样实际上worker在跟broker交互时是syncio,实际执行任务时是asyncio,性能上不会比同步代码差,而且执行任务过程可以不用再拆分大task到小task来实现并发(直接利用loop运行异步的小task),一段简单的包装代码:
1 | from celery import shared_task |
坑一
由于celery worker是fork出来的子进程,然而event_loop是不支持fork的,如果在全局代码里生成了event_loop则子进程无法使用fork的event_loop,这时可以去掉全局代码的event_loop,也可以通过celery loader的hook重新生成event_loop:
1 | class CeleryLoader(AppLoader): |
坑二
如果不通过delay而是直接调用task函数的话(比如单元测试),由于是无法在一个running状态的loop里再调用run_until_complete
的,最直接的解决方案是拿到注册的task的原始函数:
1 | async def run_task(t, *args, **kwargs): |
总结
通过一点点的妥协,我们是可以在asyncio的生态享受到celery的便捷的,包括celery-beater等等都没问题。希望celery早日正式加入asyncio生态。