欧美一区二区三区老妇人-欧美做爰猛烈大尺度电-99久久夜色精品国产亚洲a-亚洲福利视频一区二区

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

本篇內(nèi)容介紹了“Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

成都創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站、九江網(wǎng)絡(luò)推廣、小程序制作、九江網(wǎng)絡(luò)營(yíng)銷、九江企業(yè)策劃、九江品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);成都創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供九江建站搭建服務(wù),24小時(shí)服務(wù)熱線:18982081108,官方網(wǎng)址:chinadenli.net

什么是 celery

這次我們來(lái)介紹一下 Python 的一個(gè)第三方模塊 celery,那么 celery 是什么呢?

  • celery 是一個(gè)靈活且可靠的,處理大量消息的分布式系統(tǒng),可以在多個(gè)節(jié)點(diǎn)之間處理某個(gè)任務(wù);

  • celery 是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,支持任務(wù)調(diào)度;

  • celery 是開(kāi)源的,有很多的使用者;

  • celery 完全基于 Python 語(yǔ)言編寫;

所以 celery 本質(zhì)上就是一個(gè)任務(wù)調(diào)度框架,類似于 Apache 的 airflow,當(dāng)然 airflow 也是基于 Python 語(yǔ)言編寫。

不過(guò)有一點(diǎn)需要注意,celery 是用來(lái)調(diào)度任務(wù)的,但它本身并不具備存儲(chǔ)任務(wù)的功能,而調(diào)度任務(wù)的時(shí)候肯定是要把任務(wù)存起來(lái)的。因此要使用 celery 的話,還需要搭配一些具備存儲(chǔ)、訪問(wèn)功能的工具,比如:消息隊(duì)列、redis緩存、數(shù)據(jù)庫(kù)等等。官方推薦的是消息隊(duì)列 RabbitMQ,個(gè)人認(rèn)為有些時(shí)候使用 Redis 也是不錯(cuò)的選擇,當(dāng)然我們都會(huì)介紹。

那么 celery 都可以在哪些場(chǎng)景中使用呢?

  • 異步任務(wù):一些耗時(shí)的操作可以交給celery異步執(zhí)行,而不用等著程序處理完才知道結(jié)果。比如:視頻轉(zhuǎn)碼、郵件發(fā)送、消息推送等等;

  • 定時(shí)任務(wù):比如定時(shí)推送消息、定時(shí)爬取數(shù)據(jù)、定時(shí)統(tǒng)計(jì)數(shù)據(jù)等等;

celery 的架構(gòu)

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

我們看一下 celery 的架構(gòu):

  • producer:生產(chǎn)者,專門用來(lái)生產(chǎn)任務(wù)(task);

  • celery beat:任務(wù)調(diào)度器,調(diào)度器進(jìn)程會(huì)讀取配置文件的內(nèi)容,周期性地將配置文件里面到期需要執(zhí)行的任務(wù)發(fā)送給消息隊(duì)列,說(shuō)白了就是生產(chǎn)定時(shí)任務(wù);

  • broker:任務(wù)隊(duì)列,用于存放生產(chǎn)者和調(diào)度器生產(chǎn)的任務(wù)。一般使用消息隊(duì)列或者 Redis 來(lái)存儲(chǔ),當(dāng)然具有存儲(chǔ)功能的數(shù)據(jù)庫(kù)也是可以的。這一部分是 celery 所不提供的,需要依賴第三方。作用就是接收任務(wù),存進(jìn)隊(duì)列;

  • worker:任務(wù)的執(zhí)行單元,會(huì)將任務(wù)從隊(duì)列中順序取出并執(zhí)行;

  • backend:用于在任務(wù)結(jié)束之后保存狀態(tài)信息和結(jié)果,以便查詢,一般是數(shù)據(jù)庫(kù),當(dāng)然只要具備存儲(chǔ)功能都可以作為 backend;

下面我們來(lái)安裝 celery,安裝比較簡(jiǎn)單,直接 pip install celery 即可。這里我本地的 celery 版本是 5.2.7,Python 版本是 3.8.10。

另外,由于 celery 本身不提供任務(wù)存儲(chǔ)的功能,所以這里我們使用 Redis 作為消息隊(duì)列,負(fù)責(zé)存儲(chǔ)任務(wù)。因此你還要在機(jī)器上安裝 Redis,我這里有一臺(tái)云服務(wù)器,已經(jīng)安裝好了。

后續(xù) celery 就會(huì)將任務(wù)存到 broker 里面,當(dāng)然要想實(shí)現(xiàn)這一點(diǎn),就必須還要有能夠操作相應(yīng) broker 的驅(qū)動(dòng)。Python 操作 Redis 的驅(qū)動(dòng)也叫 redis,操作 RabbitMQ 的驅(qū)動(dòng)叫 pika,直接 pip install ... 安裝即可。

celery 實(shí)現(xiàn)異步任務(wù)

我們新建一個(gè)工程,就叫 celery_demo,然后在里面新建一個(gè) app.py 文件。

# 文件名:app.py
import time
# 這個(gè) Celery 就類似于 flask.Flask
# 然后實(shí)例化得到一個(gè)app
from celery import Celery
# 指定一個(gè) name、以及 broker 的地址、backend 的地址
app = Celery(
 "satori",
 # 這里使用我服務(wù)器上的 Redis
 # broker 用 1 號(hào)庫(kù), backend 用 2 號(hào)庫(kù)
 broker="redis://:maverick@82.157.146.194:6379/1",
 backend="redis://:maverick@82.157.146.194:6379/2")
# 這里通過(guò) @app.task 對(duì)函數(shù)進(jìn)行裝飾
# 那么之后我們便可調(diào)用 task.delay 創(chuàng)建一個(gè)任務(wù)
@app.task
def task(name, age):
 print("準(zhǔn)備執(zhí)行任務(wù)啦")
 time.sleep(3)
 return f"name is {name}, age is {age}"

我們說(shuō)執(zhí)行任務(wù)的對(duì)象是 worker,那么我們是不是需要?jiǎng)?chuàng)建一個(gè) worker 呢?顯然是需要的,而創(chuàng)建 worker 可以使用如下命令創(chuàng)建:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

注意:在 5.0 之前我們可以寫成 celery worker -A app ...,也就是把所有的參數(shù)都放在子命令 celery worker 的后面。但從 5.0 開(kāi)始這種做法就不允許了,必須寫成 celery -A app worker ...,因?yàn)?-A 變成了一個(gè)全局參數(shù),所以它不應(yīng)該放在 worker 的后面,而是要放在 worker 的前面。

下面執(zhí)行該命令:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

以上就前臺(tái)啟動(dòng)了一個(gè) worker,正在等待從隊(duì)列中獲取任務(wù),圖中也顯示了相應(yīng)的信息。然而此時(shí)隊(duì)列中并沒(méi)有任務(wù),所以我們需要在另一個(gè)文件中創(chuàng)建任務(wù)并發(fā)送到隊(duì)列里面去。

import time
from app import task
# 從 app 導(dǎo)入 task, 創(chuàng)建任務(wù), 但是注意: 不要直接調(diào)用 task
# 因?yàn)槟菢拥脑捑驮诒镜貓?zhí)行了, 我們的目的是將任務(wù)發(fā)送到隊(duì)列里面去
# 然后讓監(jiān)聽(tīng)隊(duì)列的 worker 從隊(duì)列里面取任務(wù)并執(zhí)行
# 而 task 被 @app.task 裝飾, 所以它不再是原來(lái)的 task 了
# 我們需要調(diào)用它的 delay 方法
# 調(diào)用 delay 之后, 就會(huì)創(chuàng)建一個(gè)任務(wù)
# 然后發(fā)送到隊(duì)列里面去, 也就是我們這里的 Redis
# 至于參數(shù), 普通調(diào)用的時(shí)候怎么傳, 在 delay 里面依舊怎么傳
start = time.perf_counter()
task.delay("古明地覺(jué)", 17)
print(
 time.perf_counter() - start
)# 0.11716766700000003

然后執(zhí)行該文件,發(fā)現(xiàn)只用了 0.12 秒,而 task 里面明明 sleep 了 3 秒。所以說(shuō)明這一步是不會(huì)阻塞的,調(diào)用 task.delay 只是創(chuàng)建一個(gè)任務(wù)并發(fā)送至隊(duì)列。我們?cè)倏匆幌?worker 的輸出信息:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

可以看到任務(wù)已經(jīng)被消費(fèi)者接收并且消費(fèi)了,而且調(diào)用 delay 方法是不會(huì)阻塞的,花費(fèi)的那 0.12 秒是用在了其它地方,比如連接 Redis 發(fā)送任務(wù)等等。

另外需要注意,函數(shù)被 @app.task 裝飾之后,可以理解為它就變成了一個(gè)任務(wù)工廠,因?yàn)楸谎b飾了嘛,然后調(diào)用任務(wù)工廠的 delay 方法即可創(chuàng)建任務(wù)并發(fā)送到隊(duì)列里面。我們也可以創(chuàng)建很多個(gè)任務(wù)工廠,但是這些任務(wù)工廠必須要讓 worker 知道,否則不會(huì)生效。所以如果修改了某個(gè)任務(wù)工廠、或者添加、刪除了某個(gè)任務(wù)工廠,那么一定要讓 worker 知道,而做法就是先停止 celery worker 進(jìn)程,然后再重新啟動(dòng)。

如果我們新建了一個(gè)任務(wù)工廠,然后在沒(méi)有重啟 worker 的情況下,就用調(diào)用它的 delay 方法創(chuàng)建任務(wù)、并發(fā)送到隊(duì)列的話,那么會(huì)拋出一個(gè) KeyError,提示找不到相應(yīng)的任務(wù)工廠。

  • 其實(shí)很好理解,因?yàn)榇a已經(jīng)加載到內(nèi)存里面了,光修改了源文件而不重啟是沒(méi)用的。因?yàn)榧虞d到內(nèi)存里面的還是原來(lái)的代碼,不是修改過(guò)后的。

然后我們?cè)賮?lái)看看 Redis 中存儲(chǔ)的信息,1 號(hào)庫(kù)用作 broker,負(fù)責(zé)存儲(chǔ)任務(wù);2 號(hào)庫(kù)用作 backend,負(fù)責(zé)存儲(chǔ)執(zhí)行結(jié)果。我們來(lái)看 2 號(hào)庫(kù):

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

以上我們就啟動(dòng)了一個(gè) worker 并成功消費(fèi)了隊(duì)列中的任務(wù),并且還從 Redis 里面拿到了執(zhí)行信息。當(dāng)然啦,如果只能通過(guò)查詢 backend 才能拿到信息的話,那 celery 就太不智能了。我們也可以直接從程序中獲取。

直接查詢?nèi)蝿?wù)執(zhí)行信息

Redis(backend)里面存儲(chǔ)了很多關(guān)于任務(wù)的信息,這些信息我們可以直接在程序中獲取。

from app import task
res = task.delay("古明地覺(jué)", 17)
print(type(res))
""""""
# 直接打印,顯示任務(wù)的 id
print(res)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 獲取狀態(tài), 顯然此刻沒(méi)有執(zhí)行完
# 因此結(jié)果是PENDING, 表示等待狀態(tài)
print(res.status)
"""
PENDING
"""
# 獲取 id,兩種方式均可
print(res.task_id)
print(res.id)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 獲取任務(wù)執(zhí)行結(jié)束時(shí)的時(shí)間
# 任務(wù)還沒(méi)有結(jié)束, 所以返回None
print(res.date_done)
"""
None
"""
# 獲取任務(wù)的返回值, 可以通過(guò) result 或者 get()
# 注意: 如果是 result, 那么任務(wù)還沒(méi)有執(zhí)行完的話會(huì)直接返回 None
# 如果是 get(), 那么會(huì)阻塞直到任務(wù)完成
print(res.result)
print(res.get())
"""
None
name is 古明地覺(jué), age is 17
"""
# 再次查看狀態(tài)和執(zhí)行結(jié)束時(shí)的時(shí)間
# 發(fā)現(xiàn) status 變成SUCCESS
# date_done 變成了執(zhí)行結(jié)束時(shí)的時(shí)間
print(res.status)
# 但顯示的是 UTC 時(shí)間
print(res.date_done)
"""
SUCCESS
2022-09-08 06:40:34.525492
"""

另外我們說(shuō)結(jié)果需要存儲(chǔ)在 backend 中,如果沒(méi)有配置 backend,那么獲取結(jié)果的時(shí)候會(huì)報(bào)錯(cuò)。至于 backend,因?yàn)樗谴鎯?chǔ)結(jié)果的,所以一般會(huì)保存在數(shù)據(jù)庫(kù)中,因?yàn)橐志没?。我這里為了方便,就還是保存在 Redis 中。

celery.result.AsyncResult 對(duì)象

調(diào)用完任務(wù)工廠的 delay 方法之后,會(huì)創(chuàng)建一個(gè)任務(wù)并發(fā)送至隊(duì)列,同時(shí)返回一個(gè) AsyncResult 對(duì)象,基于此對(duì)象我們可以拿到任務(wù)執(zhí)行時(shí)的所有信息。但是 AsyncResult 對(duì)象我們也可以手動(dòng)構(gòu)造,舉個(gè)例子:

import time
# 我們不光要導(dǎo)入 task, 還要導(dǎo)入里面的 app
from app import app, task
# 導(dǎo)入 AsyncResult 這個(gè)類
from celery.result import AsyncResult
# 發(fā)送任務(wù)到隊(duì)列當(dāng)中
res = task.delay("古明地覺(jué)", 17)
# 傳入任務(wù)的 id 和 app, 創(chuàng)建 AsyncResult 對(duì)象
async_result = AsyncResult(res.id, app=app)
# 此時(shí)的這個(gè) res 和 async_result 之間是等價(jià)的
# 兩者都是 AsyncResult 對(duì)象, 它們所擁有的方法也是一樣的
# 下面用誰(shuí)都可以
while True:
 # 等價(jià)于async_result.state == "SUCCESS"
 if async_result.successful():
 print(async_result.get())
 break
 # 等價(jià)于async_result.state == "FAILURE"
 elif async_result.failed():
 print("任務(wù)執(zhí)行失敗")
 elif async_result.status == "PENDING":
 print("任務(wù)正在被執(zhí)行")
 elif async_result.status == "RETRY":
 print("任務(wù)執(zhí)行異常正在重試")
 elif async_result.status == "REJECTED":
 print("任務(wù)被拒絕接收")
 elif async_result.status == "REVOKED":
 print("任務(wù)被取消")
 else:
 print("其它的一些狀態(tài)")
 time.sleep(0.8)
"""
任務(wù)正在被執(zhí)行
任務(wù)正在被執(zhí)行
任務(wù)正在被執(zhí)行
任務(wù)正在被執(zhí)行
name is 古明地覺(jué), age is 17
"""

以上就是任務(wù)可能出現(xiàn)的一些狀態(tài),通過(guò)輪詢的方式,我們也可以查看任務(wù)是否已經(jīng)執(zhí)行完畢。當(dāng)然 AsyncResult 還有一些別的方法,我們來(lái)看一下:

from app import task
res = task.delay("古明地覺(jué)", 17)
# 1. ready():查看任務(wù)狀態(tài),返回布爾值。
# 任務(wù)執(zhí)行完成返回 True,否則為 False
# 那么問(wèn)題來(lái)了,它和 successful() 有什么區(qū)別呢?
# successful() 是在任務(wù)執(zhí)行成功之后返回 True, 否則返回 False
# 而 ready() 只要是任務(wù)沒(méi)有處于阻塞狀態(tài)就會(huì)返回 True
# 比如執(zhí)行成功、執(zhí)行失敗、被 worker 拒收都看做是已經(jīng) ready 了
print(res.ready())
"""
False
"""
# 2. wait():和之前的 get 一樣, 因?yàn)樵谠创a中寫了: wait = get
# 所以調(diào)用哪個(gè)都可以, 不過(guò) wait 可能會(huì)被移除,建議直接用 get 就行
print(res.wait())
print(res.get())
"""
name is 古明地覺(jué), age is 17
name is 古明地覺(jué), age is 17
"""
# 3. trackback:如果任務(wù)拋出了一個(gè)異常,可以獲取原始的回溯信息
# 執(zhí)行成功就是 None
print(res.traceback)
"""
None
"""

以上就是獲取任務(wù)執(zhí)行結(jié)果相關(guān)的部分。

celery 的配置

celery 的配置不同,所表現(xiàn)出來(lái)的性能也不同,比如序列化的方式、連接隊(duì)列的方式,單線程、多線程、多進(jìn)程等等。那么 celery 都有那些配置呢?

  • broker_url:broker 的地址,就是類 Celery 里面?zhèn)魅氲?broker 參數(shù)。

  • result_backend:存儲(chǔ)結(jié)果地址,就是類 Celery 里面?zhèn)魅氲?backend 參數(shù)。

  • task_serializer:任務(wù)序列化方式,支持以下幾種:

  • binary:二進(jìn)制序列化方式,pickle 模塊默認(rèn)的序列化方法;

  • json:支持多種語(yǔ)言,可解決多語(yǔ)言的問(wèn)題,但通用性不高;

  • xml:標(biāo)簽語(yǔ)言,和 json 定位相似;

  • msgpack:二進(jìn)制的類 json 序列化,但比 json 更小、更快;

  • yaml:表達(dá)能力更強(qiáng)、支持的類型更多,但是在 Python里面的性能不如 json;

  • 根據(jù)情況,選擇合適的類型。如果不是跨語(yǔ)言的話,直接選擇 binary 即可,默認(rèn)是 json。

  • result_serializer:任務(wù)執(zhí)行結(jié)果序列化方式,支持的方式和任務(wù)序列化方式一致。

  • result_expires:任務(wù)結(jié)果的過(guò)期時(shí)間,單位是秒。

  • accept_content:指定任務(wù)接受的內(nèi)容序列化類型(序列化),一個(gè)列表,比如:["msgpack", "binary", "json"]。

  • timezone:時(shí)區(qū),默認(rèn)是 UTC 時(shí)區(qū)。

  • enable_utc:是否開(kāi)啟 UTC 時(shí)區(qū),默認(rèn)為 True;如果為 False,則使用本地時(shí)區(qū)。

  • task_publish_retry:發(fā)送消息失敗時(shí)是否重試,默認(rèn)為 True。

  • worker_concurrency:并發(fā)的 worker 數(shù)量。

  • worker_prefetch_multiplier:每次 worker 從任務(wù)隊(duì)列中獲取的任務(wù)數(shù)量。

  • worker_max_tasks_per_child:每個(gè) worker 執(zhí)行多少次就會(huì)被殺掉,默認(rèn)是無(wú)限的。

  • task_time_limit:?jiǎn)蝹€(gè)任務(wù)執(zhí)行的最大時(shí)間,單位是秒。

  • task_default_queue :設(shè)置默認(rèn)的隊(duì)列名稱,如果一個(gè)消息不符合其它的隊(duì)列規(guī)則,就會(huì)放在默認(rèn)隊(duì)列里面。如果什么都不設(shè)置的話,數(shù)據(jù)都會(huì)發(fā)送到默認(rèn)的隊(duì)列中。

  • task_queues :設(shè)置詳細(xì)的隊(duì)列

# 將 RabbitMQ 作為 broker 時(shí)需要使用
task_queues = {
 # 這是指定的默認(rèn)隊(duì)列
"default": {
 "exchange": "default",
 "exchange_type": "direct",
 "routing_key": "default"
 },
 # 凡是 topic 開(kāi)頭的 routing key
 # 都會(huì)被放到這個(gè)隊(duì)列
"topicqueue": {
 "routing_key": "topic.#",
 "exchange": "topic_exchange",
 "exchange_type": "topic",
 },
 "task_eeg": { # 設(shè)置扇形交換機(jī)
 "exchange": "tasks",
 "exchange_type": "fanout",
 "binding_key": "tasks",
 },
}

celery 的配置非常多,不止我們上面說(shuō)的那些,更多配置可以查看官網(wǎng),寫的比較詳細(xì)。

  • https://docs.celeryq.dev/en/latest/userguide/configuration.html#general-settings

值得一提的是,在 5.0 之前配置項(xiàng)都是大寫的,而從 5.0 開(kāi)始配置項(xiàng)改成小寫了。不過(guò)老的寫法目前仍然支持,只是啟動(dòng)的時(shí)候會(huì)拋警告,并且在 6.0 的時(shí)候不再兼容老的寫法。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

官網(wǎng)也很貼心地將老版本的配置和新版本的配置羅列了出來(lái),盡管配置有很多,但并不是每一個(gè)都要用,可以根據(jù)自身的業(yè)務(wù)合理選擇。

然后下面我們就根據(jù)配置文件的方式啟動(dòng) celery,當(dāng)前目錄結(jié)構(gòu)如下:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

celery_demo/config.py

broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379"
# 寫倆就完事了

celery_demo/tasks/task1.py

celery 可以支持非常多的定時(shí)任務(wù),而不同種類的定時(shí)任務(wù)我們一般都會(huì)寫在不同的模塊中(當(dāng)然這里目前只有一個(gè)),然后再將這些模塊組織在一個(gè)單獨(dú)的目錄中。

當(dāng)前只有一個(gè) task1.py,我們隨便往里面寫點(diǎn)東西,當(dāng)然你也可以創(chuàng)建更多的文件。

def add(x, y):
return x + y
def sub(x, y):
 return x - y
def mul(x, y):
return x * y
def div(x, y):
 return x / y

celery_demo/app.py

from celery import Celery
import config
from tasks.task1 import (
 add, sub, mul, div
)
# 指定一個(gè) name 即可
app = Celery("satori")
# 其它參數(shù)通過(guò)加載配置文件的方式指定
# 和 flask 非常類似
app.config_from_object(config)
# 創(chuàng)建任務(wù)工廠,有了任務(wù)工廠才能創(chuàng)建任務(wù)
# 這種方式和裝飾器的方式是等價(jià)的
add = app.task(add)
sub = app.task(sub)
mul = app.task(mul)
div = app.task(div)

然后重新啟動(dòng) worker:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

輸出結(jié)果顯示,任務(wù)工廠都已經(jīng)被加載進(jìn)來(lái)了,然后我們創(chuàng)建任務(wù)并發(fā)送至隊(duì)列。

# 在 celery_demo 目錄下
# 將 app.py 里面的任務(wù)工廠導(dǎo)入進(jìn)來(lái)
>>> from app import add, sub, mul, div
# 然后創(chuàng)建任務(wù)發(fā)送至隊(duì)列,并等待結(jié)果
>>> add.delay(3, 4).get()
7
>>> sub.delay(3, 4).get()
-1
>>> mul.delay(3, 4).get()
12
>>> div.delay(3, 4).get()
0.75
>>>

結(jié)果正常返回了,再來(lái)看看 worker 的輸出,

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

多個(gè)任務(wù)都被執(zhí)行了。

發(fā)送任務(wù)時(shí)指定參數(shù)

我們?cè)诎l(fā)送任務(wù)到隊(duì)列的時(shí)候,使用的是 delay 方法,里面直接傳遞函數(shù)所需的參數(shù)即可,那么除了函數(shù)需要的參數(shù)之外,還有沒(méi)有其它參數(shù)呢?

首先 delay 方法實(shí)際上是調(diào)用的 apply_async 方法,并且 delay 方法里面只接收函數(shù)的參數(shù),但是 apply_async 接收的參數(shù)就很多了,我們先來(lái)看看它們的函數(shù)原型:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

delay 方法的 *args 和 **kwargs 就是函數(shù)的參數(shù),它會(huì)傳遞給 apply_async 的 args 和 kwargs。而其它的參數(shù)就是發(fā)送任務(wù)時(shí)所設(shè)置的一些參數(shù),我們這里重點(diǎn)介紹一下 apply_async 的其它參數(shù)。

  • countdown:倒計(jì)時(shí),表示任務(wù)延遲多少秒之后再執(zhí)行,參數(shù)為整型;

  • eta:任務(wù)的開(kāi)始時(shí)間,datetime 類型,如果指定了 countdown,那么這個(gè)參數(shù)就不應(yīng)該再指定;

  • expires:datetime 或者整型,如果到規(guī)定時(shí)間、或者未來(lái)的多少秒之內(nèi),任務(wù)還沒(méi)有發(fā)送到隊(duì)列被 worker 執(zhí)行,那么該任務(wù)將被丟棄;

  • shadow:重新指定任務(wù)的名稱,覆蓋 app.py 創(chuàng)建任務(wù)時(shí)日志上所指定的名字;

  • retry:任務(wù)失敗之后是否重試,bool 類型;

  • retry_policy:重試所采用的策略,如果指定這個(gè)參數(shù),那么 retry 必須要為 True。參數(shù)類型是一個(gè)字典,里面參數(shù)如下:

  • max_retries : 最大重試次數(shù),默認(rèn)為 3 次;

  • interval_start : 重試等待的時(shí)間間隔秒數(shù),默認(rèn)為 0,表示直接重試不等待;

  • interval_step : 每次重試讓重試間隔增加的秒數(shù),可以是數(shù)字或浮點(diǎn)數(shù),默認(rèn)為 0.2;

  • interval_max : 重試間隔最大的秒數(shù),即通過(guò) interval_step 增大到多少秒之后, 就不在增加了, 可以是數(shù)字或者浮點(diǎn)數(shù);

  • routing_key:自定義路由鍵,針對(duì) RabbitMQ;

  • queue:指定發(fā)送到哪個(gè)隊(duì)列,針對(duì) RabbitMQ;

  • exchange:指定發(fā)送到哪個(gè)交換機(jī),針對(duì) RabbitMQ;

  • priority:任務(wù)隊(duì)列的優(yōu)先級(jí),0-9 之間,對(duì)于 RabbitMQ 而言,0是最高級(jí);

  • serializer:任務(wù)序列化方法,通常不設(shè)置;

  • compression:壓縮方案,通常有zlib、bzip2;

  • headers:為任務(wù)添加額外的消息頭;

  • link:任務(wù)成功執(zhí)行后的回調(diào)方法,是一個(gè)signature對(duì)象,可以用作關(guān)聯(lián)任務(wù);

  • link_error: 任務(wù)失敗后的回調(diào)方法,是一個(gè)signature對(duì)象;

我們隨便挑幾個(gè)舉例說(shuō)明:

>>> from app import add
# 使用 apply_async,要注意參數(shù)的傳遞
# 位置參數(shù)使用元組或者列表,關(guān)鍵字參數(shù)使用字典
# 因?yàn)槭莂rgs和kwargs, 不是 *args和 **kwargs
>>> add.apply_async([3], {"y": 4},
... task_,
... countdown=5).get()
7
>>>

查看一下 worker 的輸出:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

注意左邊的時(shí)間,16:25:16 收到的消息,但 5 秒后才執(zhí)行完畢,因?yàn)槲覀儗?countdown 參數(shù)設(shè)置為 5。并且任務(wù)的 id 也被我們修改了。

另外還需要注意一下那些接收時(shí)間的參數(shù),比如 eta。如果我們手動(dòng)指定了eta,那么一定要注意時(shí)區(qū)的問(wèn)題,要保證 celery 所使用的時(shí)區(qū)和你傳遞的 datetime 的時(shí)區(qū)是統(tǒng)一的。

其它的參數(shù)可以自己手動(dòng)測(cè)試一下,這里不細(xì)說(shuō)了,根據(jù)自身的業(yè)務(wù)選擇合適的參數(shù)即可。

創(chuàng)建任務(wù)工廠的另一種方式

之前在創(chuàng)建任務(wù)工廠的時(shí)候,是將函數(shù)導(dǎo)入到 app.py 中,然后通過(guò) add = app.task(add) 的方式手動(dòng)裝飾,因?yàn)橛心男┤蝿?wù)工廠必須要讓 worker 知道,所以一定要在 app.py 里面出現(xiàn)。但是這顯然不夠優(yōu)雅,那么可不可以這么做呢?

# celery_demo/tasks/task1.py
from app import app
# celery_demo 所在路徑位于 sys.path 中
# 因此這里可以直接 from app import app
@app.task
def add(x, y):
 return x + y
@app.task
def sub(x, y):
 return x - y
# celery_demo/app.py
from tasks.task1 import add, sub

按照上面這種做法,理想上可以,但現(xiàn)實(shí)不行,因?yàn)闀?huì)發(fā)生循環(huán)導(dǎo)入。

所以 celery 提供了一個(gè)辦法,我們依舊在 task1.py 中 import app,但在 app.py 中不再使用 import,而是通過(guò) include 加載的方式,我們看一下:

# celery_demo/tasks/task1.py
from app import app
@app.task
def add(x, y):
 return x + y
@app.task
def sub(x, y):
 return x - y
# celery_demo/app.py
from celery import Celery
import config
# 通過(guò) include 指定存放任務(wù)的 py 文件
# 注意它和 worker 啟動(dòng)路徑之間的關(guān)系
# 我們是在 celery_demo 目錄下啟動(dòng)的 worker
# 所以應(yīng)該寫成 "tasks.task1"
# 如果是在 celery_demo 的上一級(jí)目錄啟動(dòng) worker
# 那么這里就要指定為 "celery_demo.tasks.task1"
# 當(dāng)然啟動(dòng)時(shí)的 -A app 也要換成 -A celery_demo.app
app = Celery(__name__, include=["tasks.task1"])
# 如果還有其它文件,比如 task2.py, task3.py
# 那么就把 "tasks.task2", "tasks.task3" 加進(jìn)去
app.config_from_object(config)

在 celery_demo 目錄下重新啟動(dòng) worker。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

為了方便,我們只保留了兩個(gè)任務(wù)工廠??梢钥吹酱藭r(shí)就成功啟動(dòng)了,并且也更加方便和優(yōu)雅一些。之前是在 task1.py 中定義函數(shù),然后再把 task1.py 中的函數(shù)導(dǎo)入到 app.py 里面,然后手動(dòng)進(jìn)行裝飾。雖然這么做是沒(méi)問(wèn)題的,但很明顯這種做法不適合管理。

所以還是要將 app.py 中的 app 導(dǎo)入到 task1.py 中直接創(chuàng)建任務(wù)工廠,但如果再將 task1.py 中的任務(wù)工廠導(dǎo)入到 app.py 中就會(huì)發(fā)生循環(huán)導(dǎo)入。于是 celery 提供了一個(gè) include 參數(shù),可以在創(chuàng)建 app 的時(shí)候自動(dòng)將里面所有的任務(wù)工廠加載進(jìn)來(lái),然后啟動(dòng)并告訴 worker。

我們來(lái)測(cè)試一下:

# 通過(guò) tasks.task1 導(dǎo)入任務(wù)工廠
# 然后創(chuàng)建任務(wù),發(fā)送至隊(duì)列
>>> from tasks.task1 import add, sub
>>> add.delay(11, 22).get()
33
>>> sub.delay(11, 22).get()
-11
>>>

查看一下 worker 的輸出:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

結(jié)果一切正常。

Task 對(duì)象

我們之前通過(guò)對(duì)一個(gè)函數(shù)使用 @app.task 即可將其變成一個(gè)任務(wù)工廠,而這個(gè)任務(wù)工廠就是一個(gè) Task 實(shí)例對(duì)象。而我們?cè)谑褂?@app.task 的時(shí)候,其實(shí)是可以加上很多的參數(shù)的,常用參數(shù)如下:

name:默認(rèn)的任務(wù)名是一個(gè)uuid,我們可以通過(guò) name 參數(shù)指定任務(wù)名,當(dāng)然這個(gè) name 就是 apply_async 的參數(shù) name。如果在 apply_async 中指定了,那么以 apply_async 指定的為準(zhǔn);

  • bind:一個(gè) bool 值,表示是否和任務(wù)工廠進(jìn)行綁定。如果綁定,任務(wù)工廠會(huì)作為參數(shù)傳遞到方法中;

  • base:定義任務(wù)的基類,用于定義回調(diào)函數(shù),當(dāng)任務(wù)到達(dá)某個(gè)狀態(tài)時(shí)觸發(fā)不同的回調(diào)函數(shù),默認(rèn)是 Task,所以我們一般會(huì)自己寫一個(gè)類然后繼承 Task;

  • default_retry_delay:設(shè)置該任務(wù)重試的延遲機(jī)制,當(dāng)任務(wù)執(zhí)行失敗后,會(huì)自動(dòng)重試,單位是秒,默認(rèn)是3分鐘;

  • serializer:指定序列化的方法;

當(dāng)然 app.task 還有很多不常用的參數(shù),這里就不說(shuō)了,有興趣可以去查看官網(wǎng)或源碼,我們演示一下幾個(gè)常用的參數(shù):

# celery_demo/tasks/task1.py
from app import app
@app.task(name="你好")
def add(x, y):
 return x + y
@app.task(name="我不好", bind=True)
def sub(self, x, y):
 """
 如果 bind=True,則需要多指定一個(gè) self
 這個(gè) self 就是對(duì)應(yīng)的任務(wù)工廠
 """
 # self.request 是一個(gè) celery.task.Context 對(duì)象
 # 獲取它的屬性字典,即可拿到該任務(wù)的所有屬性
 print(self.request.__dict__)
 return x - y

其它代碼不變,我們重新啟動(dòng) worker:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

然后創(chuàng)建任務(wù)發(fā)送至隊(duì)列,再由 worker 取出執(zhí)行:

>>> from tasks.task1 import add, sub
>>> add.delay(111, 222).get()
333
>>> sub.delay(111, 222).get()
-111
>>>

執(zhí)行沒(méi)有問(wèn)題,然后看看 worker 的輸出:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

創(chuàng)建任務(wù)工廠時(shí),如果指定了 bind=True,那么執(zhí)行任務(wù)時(shí)會(huì)將任務(wù)工廠本身作為第一個(gè)參數(shù)傳過(guò)去。任務(wù)工廠本質(zhì)上就是 Task 實(shí)例對(duì)象,調(diào)用它的 delay 方法即可創(chuàng)建任務(wù)。

所以如果我們?cè)?sub 內(nèi)部繼續(xù)調(diào)用 self.delay(11, 22),會(huì)有什么后果呢?沒(méi)錯(cuò),worker 會(huì)進(jìn)入無(wú)限遞歸。因?yàn)閳?zhí)行任務(wù)的時(shí)候,在任務(wù)的內(nèi)部又創(chuàng)建了任務(wù),所以會(huì)死循環(huán)下去。

當(dāng)然 self 還有很多其它屬性和方法,具體有哪些可以通過(guò) Task 這個(gè)類來(lái)查看。這里面比較重要的是 self.request,它包含了某個(gè)具體任務(wù)的相關(guān)信息,而且信息非常多。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

比如當(dāng)前傳遞的參數(shù)是什么,就可以通過(guò) self.request 拿到。當(dāng)然啦,self.request 是一個(gè) Context 對(duì)象,因?yàn)椴煌蝿?wù)獲取 self.request 的結(jié)果肯定是不同的,但 self(任務(wù)工廠)卻只有一個(gè),所以要基于 Context 進(jìn)行隔離。

我們可以通過(guò) __dict__ 拿到 Context 對(duì)象的屬性字典,然后再進(jìn)行操作。

最后再來(lái)說(shuō)一說(shuō) @app.task 里面的 base 參數(shù)。

# celery_demo/tasks/task1.py
from celery import app
from app import Task
class MyTask(Task):
 """
 自定義一個(gè)類,繼承自celery.Task
 exc: 失敗時(shí)的錯(cuò)誤的類型;
 task_id: 任務(wù)的id;
 args: 任務(wù)函數(shù)的位置參數(shù);
 kwargs: 任務(wù)函數(shù)的關(guān)鍵字參數(shù);
 einfo: 失敗時(shí)的異常詳細(xì)信息;
 retval: 任務(wù)成功執(zhí)行的返回值;
 """
 def on_failure(self, exc, task_id, args, kwargs, einfo):
 """任務(wù)失敗時(shí)執(zhí)行"""
 def on_success(self, retval, task_id, args, kwargs):
 """任務(wù)成功時(shí)執(zhí)行"""
 print("任務(wù)執(zhí)行成功")
 def on_retry(self, exc, task_id, args, kwargs, einfo):
 """任務(wù)重試時(shí)執(zhí)行"""
# 使用 @app.task 的時(shí)候,指定 base 即可
# 然后任務(wù)在執(zhí)行的時(shí)候,會(huì)觸發(fā) MyTask 里面的回調(diào)函數(shù)
@app.task(name="地靈殿", base=MyTask)
def add(x, y):
 print("加法計(jì)算")
 return x + y

重新啟動(dòng) worker,然后創(chuàng)建任務(wù)。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

指定了 base,任務(wù)在執(zhí)行的時(shí)候會(huì)根據(jù)執(zhí)行狀態(tài)的不同,觸發(fā) MyTask 里面的不同方法。

自定義任務(wù)流

有時(shí)候我們也可以將執(zhí)行的多個(gè)任務(wù),劃分到一個(gè)組中。

# celery_demo/tasks/task1.py
from app import app
@app.task()
def add(x, y):
 print("加法計(jì)算")
 return x + y
@app.task()
def sub(x, y):
 print("減法計(jì)算")
 return x - y
@app.task()
def mul(x, y):
 print("乘法計(jì)算")
 return x * y
@app.task()
def div(x, y):
 print("除法計(jì)算")
 return x // y

老規(guī)矩,重啟 worker,因?yàn)槲覀冃薷牧巳蝿?wù)工廠。

然后來(lái)導(dǎo)入它們,創(chuàng)建任務(wù),并將這些任務(wù)劃分到一個(gè)組中。

>>> from tasks.task1 import add, sub, mul, div
>>> from celery import group
# 調(diào)用 signature 方法,得到 signature 對(duì)象
# 此時(shí) t1.delay() 和 add.delay(2, 3) 是等價(jià)的
>>> t1 = add.signature(args=(2, 3))
>>> t2 = sub.signature(args=(2, 3))
>>> t3 = mul.signature(args=(2, 3))
>>> t4 = div.signature(args=(4, 2))
# 但是變成 signature 對(duì)象之后,
# 我們可以將其放到一個(gè)組里面
>>> gp = group(t1, t2, t3, t4)
# 執(zhí)行組任務(wù)
# 返回 celery.result.GroupResult 對(duì)象
>>> res = gp()
# 每個(gè)組也有一個(gè)唯一 id
>>> print("組id:", res.id)
組id: 65f32cc4-b8ce-4bf8-916b-f5cc359a901a
# 調(diào)用 get 方法也會(huì)阻塞,知道組里面任務(wù)全部完成
>>> print("組結(jié)果:", res.get())
組結(jié)果: [5, -1, 6, 2]
>>>

可以看到整個(gè)組也是有唯一 id 的,另外 signature 也可以寫成 subtask 或者 s,在源碼里面這幾個(gè)是等價(jià)的。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

我們觀察一下 worker 的輸出,任務(wù)是并發(fā)執(zhí)行的,所以哪個(gè)先完成不好說(shuō)。但是調(diào)用組的 get 方法時(shí),里面的返回值順序一定和任務(wù)添加時(shí)候的順序保持一致。

除此之外,celery 還支持將多個(gè)任務(wù)像鏈子一樣串起來(lái),第一個(gè)任務(wù)的輸出會(huì)作為第二個(gè)任務(wù)的輸入,傳遞給下一個(gè)任務(wù)的第一個(gè)參數(shù)。

# celery_demo/tasks/task1.py
from app import app
@app.task
def task1():
 l = []
 return l
@app.task
# task1 的返回值會(huì)傳遞給這里的 task1_return
def task2(task1_return, value):
 task1_return.append(value)
 return task1_return
@app.task
# task2 的返回值會(huì)傳遞給這里的 task2_return
def task3(task2_return, num):
 return [i + num for i in task2_return]
@app.task
# task3 的返回值會(huì)傳遞給這里的 task3_return
def task4(task3_return):
 return sum(task3_return)

然后我們看怎么將這些任務(wù)像鏈子一樣串起來(lái)。

>>> from tasks.task1 import *
>>> from celery import chain
# 將多個(gè) signature 對(duì)象進(jìn)行與運(yùn)算
# 當(dāng)然內(nèi)部肯定重寫了 __or__ 這個(gè)魔法方法
>>> my_chain = chain(
... task1.s() | task2.s(123) | task3.s(5) | task4.s())
>>>
# 執(zhí)行任務(wù)鏈
>>> res = my_chain()
# 獲取返回值
>>> print(res.get())
128
>>>

這種鏈?zhǔn)教幚淼膱?chǎng)景非常常見(jiàn),比如 MapReduce。

celery 實(shí)現(xiàn)定時(shí)任務(wù)

既然是定時(shí)任務(wù),那么就意味著 worker 要后臺(tái)啟動(dòng),否則一旦遠(yuǎn)程連接斷開(kāi),就停掉了。因此 celery 是支持我們后臺(tái)啟動(dòng)的,并且可以啟動(dòng)多個(gè)。

# 啟動(dòng) worker
celery multi start w1 -A app -l info
# 可以同時(shí)啟動(dòng)多個(gè)
celery multi start w2 w3 -A app -l info
# 以上我們就啟動(dòng)了 3 個(gè) worker
# 如果想停止的話
celery multi stop w1 w2 w3 -A app -l info

但是注意,這種啟動(dòng)方式在 Windows 上面不支持,因?yàn)?celery 會(huì)默認(rèn)創(chuàng)建兩個(gè)目錄,分別是 /var/log/celery 和 /var/run/celery,顯然這是類 Unix 系統(tǒng)的目錄結(jié)構(gòu)。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

顯然啟動(dòng)和關(guān)閉是沒(méi)有問(wèn)題的,不過(guò)為了更好地觀察到輸出,我們還是用之前的方式,選擇前臺(tái)啟動(dòng)。

然后回顧一下 celery 的架構(gòu),里面除了 producer 之外還有一個(gè) celery beat,也就是調(diào)度器。我們調(diào)用任務(wù)工廠的 delay 方法,手動(dòng)將任務(wù)發(fā)送到隊(duì)列,此時(shí)就相當(dāng)于 producer。如果是設(shè)置定時(shí)任務(wù),那么會(huì)由調(diào)度器自動(dòng)將任務(wù)添加到隊(duì)列。

我們?cè)?tasks 目錄里面再創(chuàng)建一個(gè) period_task1.py 文件。

# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
from .task1 import task1, task2, task3, task4
@app.on_after_configure.connect
def period_task(sender, **kwargs):
 # 第一個(gè)參數(shù)為 schedule,可以是 float,或者 crontab
 # crontab 后面會(huì)說(shuō),第二個(gè)參數(shù)是任務(wù),第三個(gè)參數(shù)是名字
 sender.add_periodic_task(10.0, task1.s(),
name="每10秒執(zhí)行一次")
 sender.add_periodic_task(15.0, task2.s("task2"),
name="每15秒執(zhí)行一次")
 sender.add_periodic_task(20.0, task3.s(),
name="每20秒執(zhí)行一次")
 sender.add_periodic_task(
 crontab(hour=18, minute=5, day_of_week=0),
 task4.s("task4"),
 name="每個(gè)星期天的18:05運(yùn)行一次"
 )
# celery_demo/tasks/task1.py
from app import app
@app.task
def task1():
 print("我是task1")
 return "task1你好"
@app.task
def task2(name):
 print(f"我是{name}")
 return f"{name}你好"
@app.task
def task3():
 print("我是task3")
 return "task3你好"
@app.task
def task4(name):
 print(f"我是{name}")
 return f"{name}你好"

既然使用了定時(shí)任務(wù),那么一定要設(shè)置時(shí)區(qū)。

# celery_demo/config.py
broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379/2"
# 之前說(shuō)過(guò),celery 默認(rèn)使用 utc 時(shí)間
# 其實(shí)我們是可以手動(dòng)禁用的,然后手動(dòng)指定時(shí)區(qū)
enable_utc = False
timezone = "Asia/Shanghai"

最后是修改 app.py,將定時(shí)任務(wù)加進(jìn)去。

from celery import Celery
import config
app = Celery(
 __name__,
 include=["tasks.task1", "tasks.period_task1"])
app.config_from_object(config)

下面就來(lái)啟動(dòng)任務(wù),先來(lái)啟動(dòng) worker,生產(chǎn)上應(yīng)該后臺(tái)啟動(dòng),這里為了看到信息,選擇前臺(tái)啟動(dòng)。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

tasks.task1 里面的 4 個(gè)任務(wù)工廠都被添加進(jìn)來(lái)了,然后再來(lái)啟動(dòng)調(diào)度器。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

調(diào)度器啟動(dòng)之后會(huì)自動(dòng)檢測(cè)定時(shí)任務(wù),如果到時(shí)間了,就發(fā)送到隊(duì)列。而啟動(dòng)調(diào)度器的命令如下:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

根據(jù)調(diào)度器的輸出內(nèi)容,我們知道定時(shí)任務(wù)執(zhí)行完了,但很明顯定時(shí)任務(wù)本質(zhì)上也是任務(wù),只不過(guò)有定時(shí)功能,但也要發(fā)到隊(duì)列里面。然后 worker 從隊(duì)列里面取出任務(wù),并執(zhí)行,那么 worker 必然會(huì)有信息輸出。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

調(diào)度器啟動(dòng)到現(xiàn)在已經(jīng)有一段時(shí)間了,worker 在終端中輸出了非常多的信息。

此時(shí)我們就成功實(shí)現(xiàn)了定時(shí)任務(wù),并且是通過(guò)定義函數(shù)、打上裝飾器的方式實(shí)現(xiàn)的。除此之外,我們還可以通過(guò)配置的方式實(shí)現(xiàn)。

# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
# 此時(shí)也不需顯式導(dǎo)入任務(wù)工廠了
# 直接以字符串的方式指定即可
app.conf.beat_schedule = {
 # 參數(shù)通過(guò) args 和 kwargs 指定
 "每10秒執(zhí)行一次": {"task": "tasks.task1.task1",
"schedule": 10.0},
 "每15秒執(zhí)行一次": {"task": "tasks.task1.task2",
"schedule": 15.0,
"args": ("task2",)},
 "每20秒執(zhí)行一次": {"task": "tasks.task1.task3",
"schedule": 20.0},
 "每個(gè)星期天的18:05運(yùn)行一次": {"task": "tasks.task1.task4",
 "schedule": crontab(hour=18,
 minute=5,
 day_of_week=0),
 "args": ("task4",)}
}

需要注意:雖然我們不用顯式導(dǎo)入任務(wù)工廠,但其實(shí)是 celery 自動(dòng)幫我們導(dǎo)入。由于這些任務(wù)工廠都位于 celery_demo/tasks/task1.py 里面,而 worker 也是在 celery_demo 目錄下啟動(dòng)的,所以需要指定為 tasks.task1.task{1234}。

這種啟動(dòng)方式也是可以成功的,貌似還更方便一些,但是會(huì)多出一個(gè)文件,用來(lái)存儲(chǔ)配置信息。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

crontab 參數(shù)

定時(shí)任務(wù)除了指定一個(gè)浮點(diǎn)數(shù)之外(表示每隔多少秒執(zhí)行一次),還可以指定 crontab。關(guān)于 crontab 應(yīng)該都知道是什么,我們?cè)?Linux 上想啟動(dòng)定時(shí)任務(wù)的話,直接 crontab -e 然后添加即可。

而 celery 的 crontab 和 Linux 高度相似,我們看一下函數(shù)原型就知道了。

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

簡(jiǎn)單解釋一下:

  • minute:0-59,表示第幾分鐘觸發(fā),* 表示每分鐘觸發(fā)一次;

  • hour:0-23,表示第幾個(gè)小時(shí)觸發(fā),* 表示每小時(shí)都會(huì)觸發(fā)。比如 minute=2, hour=*,表示每小時(shí)的第二分鐘觸發(fā)一次;

  • day_of_week:0-6,表示一周的第幾天觸發(fā),0 是星期天,1-6 分別是星期一到星期六,不習(xí)慣的話也可以用字符串 mon,tue,wed,thu,fri,sat,sun 表示;

  • month_of_year:當(dāng)前年份的第幾個(gè)月;

以上就是這些參數(shù)的含義,并且參數(shù)接收的值還可以是一些特殊的通配符:

  • *:所有,比如 minute=*,表示每分鐘觸發(fā);

  • */a:所有可被 a 整除的時(shí)候觸發(fā);

  • a-b:a 到 b范圍內(nèi)觸發(fā);

  • a-b/c:范圍 a-b 且能夠被 c 整除的時(shí)候觸發(fā);

  • 2,10,40:比如 minute=2,10,40 表示第 2、10、40 分鐘的時(shí)候觸發(fā);

通配符之間是可以自由組合的,比如 */3,8-17 就表示能被 3 整除,或范圍處于 8-17 的時(shí)候觸發(fā)。

除此之外,還可以根據(jù)天色來(lái)設(shè)置定時(shí)任務(wù)(有點(diǎn)離譜)。

from celery.schedules import solar
app.conf.beat_schedule = {
"日落": {"task": "task1",
 "schedule": solar("sunset",
 -37.81753,
144.96715)
 },
}

solar 里面接收三個(gè)參數(shù),分別是 event、lat、lon,后兩個(gè)比較簡(jiǎn)單,表示觀測(cè)者所在的緯度和經(jīng)度。值大于 0,則對(duì)應(yīng)東經(jīng)/北緯,小于 0,則對(duì)應(yīng)西經(jīng)/南緯。

我們重點(diǎn)看第一個(gè)參數(shù) event,可選值如下:

Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用

比如代碼中的 "sunset", -37.81753, 144.96715 就表示,當(dāng)站在南緯 37.81753、東經(jīng) 144.96715 的地方觀察,發(fā)現(xiàn)傍晚太陽(yáng)的上邊緣消失在西方地平線上的時(shí)候,觸發(fā)任務(wù)執(zhí)行。

“Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

本文名稱:Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用
瀏覽地址:http://chinadenli.net/article46/joieeg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開(kāi)發(fā)、網(wǎng)站制作、網(wǎng)站導(dǎo)航微信小程序、手機(jī)網(wǎng)站建設(shè)、虛擬主機(jī)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)
久久中文字人妻熟女小妇| 日韩一区二区免费在线观看| 在线观看免费无遮挡大尺度视频 | 福利视频一区二区在线| 欧美不雅视频午夜福利| 欧美人妻少妇精品久久性色| 91精品日本在线视频| 亚洲黑人精品一区二区欧美| 日韩免费av一区二区三区| 绝望的校花花间淫事2| 欧美熟妇一区二区在线| 黑丝国产精品一区二区| 精品一区二区三区乱码中文| 欧美黄色成人真人视频| 亚洲中文字幕熟女丝袜久久| 国产麻豆一线二线三线| 老司机亚洲精品一区二区| 午夜国产精品福利在线观看| 国产熟女一区二区三区四区| 青青草草免费在线视频| 真实偷拍一区二区免费视频| 天海翼精品久久中文字幕| 国产色偷丝袜麻豆亚洲| 国产精品制服丝袜美腿丝袜| 精品人妻一区二区三区免费| 欧美精品在线观看国产| 99久久国产亚洲综合精品| 日韩精品小视频在线观看| 欧美日韩久久精品一区二区| 中文字幕日韩无套内射| 五月天丁香婷婷狠狠爱| 久久精品国产熟女精品| 国产精品久久男人的天堂| 免费一区二区三区少妇| 人人妻在人人看人人澡| 欧美激情视频一区二区三区| 欧美亚洲另类久久久精品| 免费大片黄在线观看日本| 久热人妻中文字幕一区二区| 狠狠做五月深爱婷婷综合| 九九热这里只有精品哦|