一个简易的celery+sqlite案例
Page content
Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,并且提供维护这样一个系统的必需工具。今天我们通过一个案例简单得讲一下celery的快速应用。
环境依赖
python3
sqlite3
用于创建数据库requests_html
请求测试celery
异步任务
预设任务方法 task.py
通过设定broker
参数为sqla+sqlite:///xxx
来已连接sqlite的方式实例化Celery
from datetime import timedelta
from celery.schedules import crontab
import sqlite3
from requests_html import HTMLSession
from celery import Celery
celery_db = "db" # db名称
sqlite3.connect(celery_db) # 新建或连接库
app = Celery("demo", broker=f"sqla+sqlite:///{celery_db}") # 设定celery
app.conf.update(
task_serializer="pickle", result_serializer="pickle", accept_content=["pickle"]
) # 设定为可以传输对象
# 设定定时任务
app.conf.update(
beat_schedule={
"延迟检测": {
"task":"task.check_poc",
"schedule":timedelta(seconds=10),
"args":((0,'http://baidu.com'))
},
"定时检测": {
"task":"task.check_poc",
"schedule":crontab(hour=8,minute=30),
"args":((2,'http://qq.com'))
}
}
)
@app.task() # celery方法写法
def check_poc(index,url):
resp = HTMLSession().get(url) # poc 测试
print(f"[{index}]{url}: {resp.html.find('title', first=True).text}") # 打印
发布任务执行
我们通过若干个网址作为payload
来做测试
from task import check_poc
urls = ["http://qq.com", "http://baidu.com", "http://freebuf.com"] # 测试案例站点
for index,url in enumerate(urls):
check_poc.delay(index,url) # 异步调用
使用效果
最后使用celery worker -A task --pool=solo -l info
命令启动celery
的worker
celery beat -A task -l INFO
启动定时任务
一键bash
也可以通过如下脚本快速重启celery
kill -9 `ps -ef|grep celery |awk '{print $2}'`
nohup celery worker -A task --pool=solo -l INFO >> celery_worker.log 2>&1 &
nohup celery beat -A task -l INFO >> celery_beats.log 2>&1 &