Celey使用
使用流程:创建Celery包 ==> 创建main.py文件 ==> 在main.py文件中从celery中导入Celery类,创建Celery实例 ==> 进行配置(创建config.py配置文件,配置连接的数据库)==> 创建任务 ==> 注册任务 ==> 编写任务代码 ==> 启动Celery服务 ==> 调用任务
详细流程:
- 安装Celery:
```bash
$ pip install -U Celery
```
- [Celery官方文档](http://docs.celeryproject.org/en/latest/index.html)
> **定义Celery包,并且创建main.py文件**
> **在main.py中创建Celery实例**
> celery_tasks.main.py
```python
# celery启动文件
from celery import Celery
# 创建celery实例
celery_app = Celery('meiduo')
```
> **加载Celery配置**
> celery_tasks.config.py
```python
# 指定消息队列的位置
broker_url= 'amqp://guest:guest@192.168.103.158:5672'
# 根据要连接的数据库来更改ip和端口
```
> celery_tasks.main.py
```python
# celery启动文件
from celery import Celery
# 创建celery实例
celery_app = Celery('meiduo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
```
> **注册任务:celery_tasks.main.py**
```python
# celery启动文件
from celery import Celery
# 创建celery实例
celery_app = Celery('meiduo')
# 加载celery配置
celery_app.config_from_object('celery_tasks.config')
# 自动注册celery任务
celery_app.autodiscover_tasks(['celery_tasks.sms'])
```
> **2.定义任务:celery_tasks.sms.tasks.py**
```python
# bind:保证task对象会作为第一个参数自动传入
# name:异步任务别名
# retry_backoff:异常自动重试的时间间隔 第n次(retry_backoff×2^(n-1))s
# max_retries:异常自动重试次数的上限
@celery_app.task(bind=True, name='函数名', retry_backoff=重复执行的时间间隔)
def 函数名(self, 参数):
try:
要执行的任务
except Exception as e:
logger.error(e)
# 有异常自动重试三次
raise self.retry(exc=e, max_retries=3)
if send_ret != 0:
# 有异常自动重试三次
raise self.retry(exc=Exception('发送短信失败'), max_retries=3)
return 返回值
```
4. 启动Celery服务
```bash
$ celery -A celery_tasks.main worker -l info
# 要根据所在的位置填写对应的文件路径
# 启动Celery就是执行main.py文件,所以才要进行导入配置,应用注册
```
> - `-A`指对应的应用程序, 其参数是项目中 Celery实例的位置。
> - `worker`指这里要启动的worker。
> - `-l`指日志等级,比如`info`等级。
5. 调用任务
```python
任务名.delay(自定义任务的参数)
```
|
|