本篇內(nèi)容介紹了“Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)公司專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站設(shè)計(jì)、成都做網(wǎng)站、九江網(wǎng)絡(luò)推廣、小程序制作、九江網(wǎng)絡(luò)營(yíng)銷、九江企業(yè)策劃、九江品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營(yíng)等,從售前售中售后,我們都將竭誠(chéng)為您服務(wù),您的肯定,是我們最大的嘉獎(jiǎng);成都創(chuàng)新互聯(lián)公司為所有大學(xué)生創(chuàng)業(yè)者提供九江建站搭建服務(wù),24小時(shí)服務(wù)熱線:18982081108,官方網(wǎng)址:chinadenli.net
這次我們來(lái)介紹一下 Python 的一個(gè)第三方模塊 celery,那么 celery 是什么呢?
celery 是一個(gè)靈活且可靠的,處理大量消息的分布式系統(tǒng),可以在多個(gè)節(jié)點(diǎn)之間處理某個(gè)任務(wù);
celery 是一個(gè)專注于實(shí)時(shí)處理的任務(wù)隊(duì)列,支持任務(wù)調(diào)度;
celery 是開(kāi)源的,有很多的使用者;
celery 完全基于 Python 語(yǔ)言編寫;
所以 celery 本質(zhì)上就是一個(gè)任務(wù)調(diào)度框架,類似于 Apache 的 airflow,當(dāng)然 airflow 也是基于 Python 語(yǔ)言編寫。
不過(guò)有一點(diǎn)需要注意,celery 是用來(lái)調(diào)度任務(wù)的,但它本身并不具備存儲(chǔ)任務(wù)的功能,而調(diào)度任務(wù)的時(shí)候肯定是要把任務(wù)存起來(lái)的。因此要使用 celery 的話,還需要搭配一些具備存儲(chǔ)、訪問(wèn)功能的工具,比如:消息隊(duì)列、redis緩存、數(shù)據(jù)庫(kù)等等。官方推薦的是消息隊(duì)列 RabbitMQ,個(gè)人認(rèn)為有些時(shí)候使用 Redis 也是不錯(cuò)的選擇,當(dāng)然我們都會(huì)介紹。
那么 celery 都可以在哪些場(chǎng)景中使用呢?
異步任務(wù):一些耗時(shí)的操作可以交給celery異步執(zhí)行,而不用等著程序處理完才知道結(jié)果。比如:視頻轉(zhuǎn)碼、郵件發(fā)送、消息推送等等;
定時(shí)任務(wù):比如定時(shí)推送消息、定時(shí)爬取數(shù)據(jù)、定時(shí)統(tǒng)計(jì)數(shù)據(jù)等等;
我們看一下 celery 的架構(gòu):
producer:生產(chǎn)者,專門用來(lái)生產(chǎn)任務(wù)(task);
celery beat:任務(wù)調(diào)度器,調(diào)度器進(jìn)程會(huì)讀取配置文件的內(nèi)容,周期性地將配置文件里面到期需要執(zhí)行的任務(wù)發(fā)送給消息隊(duì)列,說(shuō)白了就是生產(chǎn)定時(shí)任務(wù);
broker:任務(wù)隊(duì)列,用于存放生產(chǎn)者和調(diào)度器生產(chǎn)的任務(wù)。一般使用消息隊(duì)列或者 Redis 來(lái)存儲(chǔ),當(dāng)然具有存儲(chǔ)功能的數(shù)據(jù)庫(kù)也是可以的。這一部分是 celery 所不提供的,需要依賴第三方。作用就是接收任務(wù),存進(jìn)隊(duì)列;
worker:任務(wù)的執(zhí)行單元,會(huì)將任務(wù)從隊(duì)列中順序取出并執(zhí)行;
backend:用于在任務(wù)結(jié)束之后保存狀態(tài)信息和結(jié)果,以便查詢,一般是數(shù)據(jù)庫(kù),當(dāng)然只要具備存儲(chǔ)功能都可以作為 backend;
下面我們來(lái)安裝 celery,安裝比較簡(jiǎn)單,直接 pip install celery 即可。這里我本地的 celery 版本是 5.2.7,Python 版本是 3.8.10。
另外,由于 celery 本身不提供任務(wù)存儲(chǔ)的功能,所以這里我們使用 Redis 作為消息隊(duì)列,負(fù)責(zé)存儲(chǔ)任務(wù)。因此你還要在機(jī)器上安裝 Redis,我這里有一臺(tái)云服務(wù)器,已經(jīng)安裝好了。
后續(xù) celery 就會(huì)將任務(wù)存到 broker 里面,當(dāng)然要想實(shí)現(xiàn)這一點(diǎn),就必須還要有能夠操作相應(yīng) broker 的驅(qū)動(dòng)。Python 操作 Redis 的驅(qū)動(dòng)也叫 redis,操作 RabbitMQ 的驅(qū)動(dòng)叫 pika,直接 pip install ... 安裝即可。
我們新建一個(gè)工程,就叫 celery_demo,然后在里面新建一個(gè) app.py 文件。
# 文件名:app.py import time # 這個(gè) Celery 就類似于 flask.Flask # 然后實(shí)例化得到一個(gè)app from celery import Celery # 指定一個(gè) name、以及 broker 的地址、backend 的地址 app = Celery( "satori", # 這里使用我服務(wù)器上的 Redis # broker 用 1 號(hào)庫(kù), backend 用 2 號(hào)庫(kù) broker="redis://:maverick@82.157.146.194:6379/1", backend="redis://:maverick@82.157.146.194:6379/2") # 這里通過(guò) @app.task 對(duì)函數(shù)進(jìn)行裝飾 # 那么之后我們便可調(diào)用 task.delay 創(chuàng)建一個(gè)任務(wù) @app.task def task(name, age): print("準(zhǔn)備執(zhí)行任務(wù)啦") time.sleep(3) return f"name is {name}, age is {age}"
我們說(shuō)執(zhí)行任務(wù)的對(duì)象是 worker,那么我們是不是需要?jiǎng)?chuàng)建一個(gè) worker 呢?顯然是需要的,而創(chuàng)建 worker 可以使用如下命令創(chuàng)建:
注意:在 5.0 之前我們可以寫成 celery worker -A app ...,也就是把所有的參數(shù)都放在子命令 celery worker 的后面。但從 5.0 開(kāi)始這種做法就不允許了,必須寫成 celery -A app worker ...,因?yàn)?-A 變成了一個(gè)全局參數(shù),所以它不應(yīng)該放在 worker 的后面,而是要放在 worker 的前面。
下面執(zhí)行該命令:
以上就前臺(tái)啟動(dòng)了一個(gè) worker,正在等待從隊(duì)列中獲取任務(wù),圖中也顯示了相應(yīng)的信息。然而此時(shí)隊(duì)列中并沒(méi)有任務(wù),所以我們需要在另一個(gè)文件中創(chuàng)建任務(wù)并發(fā)送到隊(duì)列里面去。
import time from app import task # 從 app 導(dǎo)入 task, 創(chuàng)建任務(wù), 但是注意: 不要直接調(diào)用 task # 因?yàn)槟菢拥脑捑驮诒镜貓?zhí)行了, 我們的目的是將任務(wù)發(fā)送到隊(duì)列里面去 # 然后讓監(jiān)聽(tīng)隊(duì)列的 worker 從隊(duì)列里面取任務(wù)并執(zhí)行 # 而 task 被 @app.task 裝飾, 所以它不再是原來(lái)的 task 了 # 我們需要調(diào)用它的 delay 方法 # 調(diào)用 delay 之后, 就會(huì)創(chuàng)建一個(gè)任務(wù) # 然后發(fā)送到隊(duì)列里面去, 也就是我們這里的 Redis # 至于參數(shù), 普通調(diào)用的時(shí)候怎么傳, 在 delay 里面依舊怎么傳 start = time.perf_counter() task.delay("古明地覺(jué)", 17) print( time.perf_counter() - start )# 0.11716766700000003
然后執(zhí)行該文件,發(fā)現(xiàn)只用了 0.12 秒,而 task 里面明明 sleep 了 3 秒。所以說(shuō)明這一步是不會(huì)阻塞的,調(diào)用 task.delay 只是創(chuàng)建一個(gè)任務(wù)并發(fā)送至隊(duì)列。我們?cè)倏匆幌?worker 的輸出信息:
可以看到任務(wù)已經(jīng)被消費(fèi)者接收并且消費(fèi)了,而且調(diào)用 delay 方法是不會(huì)阻塞的,花費(fèi)的那 0.12 秒是用在了其它地方,比如連接 Redis 發(fā)送任務(wù)等等。
另外需要注意,函數(shù)被 @app.task 裝飾之后,可以理解為它就變成了一個(gè)任務(wù)工廠,因?yàn)楸谎b飾了嘛,然后調(diào)用任務(wù)工廠的 delay 方法即可創(chuàng)建任務(wù)并發(fā)送到隊(duì)列里面。我們也可以創(chuàng)建很多個(gè)任務(wù)工廠,但是這些任務(wù)工廠必須要讓 worker 知道,否則不會(huì)生效。所以如果修改了某個(gè)任務(wù)工廠、或者添加、刪除了某個(gè)任務(wù)工廠,那么一定要讓 worker 知道,而做法就是先停止 celery worker 進(jìn)程,然后再重新啟動(dòng)。
如果我們新建了一個(gè)任務(wù)工廠,然后在沒(méi)有重啟 worker 的情況下,就用調(diào)用它的 delay 方法創(chuàng)建任務(wù)、并發(fā)送到隊(duì)列的話,那么會(huì)拋出一個(gè) KeyError,提示找不到相應(yīng)的任務(wù)工廠。
其實(shí)很好理解,因?yàn)榇a已經(jīng)加載到內(nèi)存里面了,光修改了源文件而不重啟是沒(méi)用的。因?yàn)榧虞d到內(nèi)存里面的還是原來(lái)的代碼,不是修改過(guò)后的。
然后我們?cè)賮?lái)看看 Redis 中存儲(chǔ)的信息,1 號(hào)庫(kù)用作 broker,負(fù)責(zé)存儲(chǔ)任務(wù);2 號(hào)庫(kù)用作 backend,負(fù)責(zé)存儲(chǔ)執(zhí)行結(jié)果。我們來(lái)看 2 號(hào)庫(kù):
以上我們就啟動(dòng)了一個(gè) worker 并成功消費(fèi)了隊(duì)列中的任務(wù),并且還從 Redis 里面拿到了執(zhí)行信息。當(dāng)然啦,如果只能通過(guò)查詢 backend 才能拿到信息的話,那 celery 就太不智能了。我們也可以直接從程序中獲取。
Redis(backend)里面存儲(chǔ)了很多關(guān)于任務(wù)的信息,這些信息我們可以直接在程序中獲取。
from app import task res = task.delay("古明地覺(jué)", 17) print(type(res)) """""" # 直接打印,顯示任務(wù)的 id print(res) """ 4bd48a6d-1f0e-45d6-a225-6884067253c3 """ # 獲取狀態(tài), 顯然此刻沒(méi)有執(zhí)行完 # 因此結(jié)果是PENDING, 表示等待狀態(tài) print(res.status) """ PENDING """ # 獲取 id,兩種方式均可 print(res.task_id) print(res.id) """ 4bd48a6d-1f0e-45d6-a225-6884067253c3 4bd48a6d-1f0e-45d6-a225-6884067253c3 """ # 獲取任務(wù)執(zhí)行結(jié)束時(shí)的時(shí)間 # 任務(wù)還沒(méi)有結(jié)束, 所以返回None print(res.date_done) """ None """ # 獲取任務(wù)的返回值, 可以通過(guò) result 或者 get() # 注意: 如果是 result, 那么任務(wù)還沒(méi)有執(zhí)行完的話會(huì)直接返回 None # 如果是 get(), 那么會(huì)阻塞直到任務(wù)完成 print(res.result) print(res.get()) """ None name is 古明地覺(jué), age is 17 """ # 再次查看狀態(tài)和執(zhí)行結(jié)束時(shí)的時(shí)間 # 發(fā)現(xiàn) status 變成SUCCESS # date_done 變成了執(zhí)行結(jié)束時(shí)的時(shí)間 print(res.status) # 但顯示的是 UTC 時(shí)間 print(res.date_done) """ SUCCESS 2022-09-08 06:40:34.525492 """
另外我們說(shuō)結(jié)果需要存儲(chǔ)在 backend 中,如果沒(méi)有配置 backend,那么獲取結(jié)果的時(shí)候會(huì)報(bào)錯(cuò)。至于 backend,因?yàn)樗谴鎯?chǔ)結(jié)果的,所以一般會(huì)保存在數(shù)據(jù)庫(kù)中,因?yàn)橐志没?。我這里為了方便,就還是保存在 Redis 中。
調(diào)用完任務(wù)工廠的 delay 方法之后,會(huì)創(chuàng)建一個(gè)任務(wù)并發(fā)送至隊(duì)列,同時(shí)返回一個(gè) AsyncResult 對(duì)象,基于此對(duì)象我們可以拿到任務(wù)執(zhí)行時(shí)的所有信息。但是 AsyncResult 對(duì)象我們也可以手動(dòng)構(gòu)造,舉個(gè)例子:
import time # 我們不光要導(dǎo)入 task, 還要導(dǎo)入里面的 app from app import app, task # 導(dǎo)入 AsyncResult 這個(gè)類 from celery.result import AsyncResult # 發(fā)送任務(wù)到隊(duì)列當(dāng)中 res = task.delay("古明地覺(jué)", 17) # 傳入任務(wù)的 id 和 app, 創(chuàng)建 AsyncResult 對(duì)象 async_result = AsyncResult(res.id, app=app) # 此時(shí)的這個(gè) res 和 async_result 之間是等價(jià)的 # 兩者都是 AsyncResult 對(duì)象, 它們所擁有的方法也是一樣的 # 下面用誰(shuí)都可以 while True: # 等價(jià)于async_result.state == "SUCCESS" if async_result.successful(): print(async_result.get()) break # 等價(jià)于async_result.state == "FAILURE" elif async_result.failed(): print("任務(wù)執(zhí)行失敗") elif async_result.status == "PENDING": print("任務(wù)正在被執(zhí)行") elif async_result.status == "RETRY": print("任務(wù)執(zhí)行異常正在重試") elif async_result.status == "REJECTED": print("任務(wù)被拒絕接收") elif async_result.status == "REVOKED": print("任務(wù)被取消") else: print("其它的一些狀態(tài)") time.sleep(0.8) """ 任務(wù)正在被執(zhí)行 任務(wù)正在被執(zhí)行 任務(wù)正在被執(zhí)行 任務(wù)正在被執(zhí)行 name is 古明地覺(jué), age is 17 """
以上就是任務(wù)可能出現(xiàn)的一些狀態(tài),通過(guò)輪詢的方式,我們也可以查看任務(wù)是否已經(jīng)執(zhí)行完畢。當(dāng)然 AsyncResult 還有一些別的方法,我們來(lái)看一下:
from app import task res = task.delay("古明地覺(jué)", 17) # 1. ready():查看任務(wù)狀態(tài),返回布爾值。 # 任務(wù)執(zhí)行完成返回 True,否則為 False # 那么問(wèn)題來(lái)了,它和 successful() 有什么區(qū)別呢? # successful() 是在任務(wù)執(zhí)行成功之后返回 True, 否則返回 False # 而 ready() 只要是任務(wù)沒(méi)有處于阻塞狀態(tài)就會(huì)返回 True # 比如執(zhí)行成功、執(zhí)行失敗、被 worker 拒收都看做是已經(jīng) ready 了 print(res.ready()) """ False """ # 2. wait():和之前的 get 一樣, 因?yàn)樵谠创a中寫了: wait = get # 所以調(diào)用哪個(gè)都可以, 不過(guò) wait 可能會(huì)被移除,建議直接用 get 就行 print(res.wait()) print(res.get()) """ name is 古明地覺(jué), age is 17 name is 古明地覺(jué), age is 17 """ # 3. trackback:如果任務(wù)拋出了一個(gè)異常,可以獲取原始的回溯信息 # 執(zhí)行成功就是 None print(res.traceback) """ None """
以上就是獲取任務(wù)執(zhí)行結(jié)果相關(guān)的部分。
celery 的配置不同,所表現(xiàn)出來(lái)的性能也不同,比如序列化的方式、連接隊(duì)列的方式,單線程、多線程、多進(jìn)程等等。那么 celery 都有那些配置呢?
broker_url:broker 的地址,就是類 Celery 里面?zhèn)魅氲?broker 參數(shù)。
result_backend:存儲(chǔ)結(jié)果地址,就是類 Celery 里面?zhèn)魅氲?backend 參數(shù)。
task_serializer:任務(wù)序列化方式,支持以下幾種:
binary:二進(jìn)制序列化方式,pickle 模塊默認(rèn)的序列化方法;
json:支持多種語(yǔ)言,可解決多語(yǔ)言的問(wèn)題,但通用性不高;
xml:標(biāo)簽語(yǔ)言,和 json 定位相似;
msgpack:二進(jìn)制的類 json 序列化,但比 json 更小、更快;
yaml:表達(dá)能力更強(qiáng)、支持的類型更多,但是在 Python里面的性能不如 json;
根據(jù)情況,選擇合適的類型。如果不是跨語(yǔ)言的話,直接選擇 binary 即可,默認(rèn)是 json。
result_serializer:任務(wù)執(zhí)行結(jié)果序列化方式,支持的方式和任務(wù)序列化方式一致。
result_expires:任務(wù)結(jié)果的過(guò)期時(shí)間,單位是秒。
accept_content:指定任務(wù)接受的內(nèi)容序列化類型(序列化),一個(gè)列表,比如:["msgpack", "binary", "json"]。
timezone:時(shí)區(qū),默認(rèn)是 UTC 時(shí)區(qū)。
enable_utc:是否開(kāi)啟 UTC 時(shí)區(qū),默認(rèn)為 True;如果為 False,則使用本地時(shí)區(qū)。
task_publish_retry:發(fā)送消息失敗時(shí)是否重試,默認(rèn)為 True。
worker_concurrency:并發(fā)的 worker 數(shù)量。
worker_prefetch_multiplier:每次 worker 從任務(wù)隊(duì)列中獲取的任務(wù)數(shù)量。
worker_max_tasks_per_child:每個(gè) worker 執(zhí)行多少次就會(huì)被殺掉,默認(rèn)是無(wú)限的。
task_time_limit:?jiǎn)蝹€(gè)任務(wù)執(zhí)行的最大時(shí)間,單位是秒。
task_default_queue :設(shè)置默認(rèn)的隊(duì)列名稱,如果一個(gè)消息不符合其它的隊(duì)列規(guī)則,就會(huì)放在默認(rèn)隊(duì)列里面。如果什么都不設(shè)置的話,數(shù)據(jù)都會(huì)發(fā)送到默認(rèn)的隊(duì)列中。
task_queues :設(shè)置詳細(xì)的隊(duì)列
# 將 RabbitMQ 作為 broker 時(shí)需要使用 task_queues = { # 這是指定的默認(rèn)隊(duì)列 "default": { "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, # 凡是 topic 開(kāi)頭的 routing key # 都會(huì)被放到這個(gè)隊(duì)列 "topicqueue": { "routing_key": "topic.#", "exchange": "topic_exchange", "exchange_type": "topic", }, "task_eeg": { # 設(shè)置扇形交換機(jī) "exchange": "tasks", "exchange_type": "fanout", "binding_key": "tasks", }, }
celery 的配置非常多,不止我們上面說(shuō)的那些,更多配置可以查看官網(wǎng),寫的比較詳細(xì)。
https://docs.celeryq.dev/en/latest/userguide/configuration.html#general-settings
值得一提的是,在 5.0 之前配置項(xiàng)都是大寫的,而從 5.0 開(kāi)始配置項(xiàng)改成小寫了。不過(guò)老的寫法目前仍然支持,只是啟動(dòng)的時(shí)候會(huì)拋警告,并且在 6.0 的時(shí)候不再兼容老的寫法。
官網(wǎng)也很貼心地將老版本的配置和新版本的配置羅列了出來(lái),盡管配置有很多,但并不是每一個(gè)都要用,可以根據(jù)自身的業(yè)務(wù)合理選擇。
然后下面我們就根據(jù)配置文件的方式啟動(dòng) celery,當(dāng)前目錄結(jié)構(gòu)如下:
celery_demo/config.py
broker_url = "redis://:maverick@82.157.146.194:6379/1" result_backend = "redis://:maverick@82.157.146.194:6379" # 寫倆就完事了
celery_demo/tasks/task1.py
celery 可以支持非常多的定時(shí)任務(wù),而不同種類的定時(shí)任務(wù)我們一般都會(huì)寫在不同的模塊中(當(dāng)然這里目前只有一個(gè)),然后再將這些模塊組織在一個(gè)單獨(dú)的目錄中。
當(dāng)前只有一個(gè) task1.py,我們隨便往里面寫點(diǎn)東西,當(dāng)然你也可以創(chuàng)建更多的文件。
def add(x, y): return x + y def sub(x, y): return x - y def mul(x, y): return x * y def div(x, y): return x / y
celery_demo/app.py
from celery import Celery import config from tasks.task1 import ( add, sub, mul, div ) # 指定一個(gè) name 即可 app = Celery("satori") # 其它參數(shù)通過(guò)加載配置文件的方式指定 # 和 flask 非常類似 app.config_from_object(config) # 創(chuàng)建任務(wù)工廠,有了任務(wù)工廠才能創(chuàng)建任務(wù) # 這種方式和裝飾器的方式是等價(jià)的 add = app.task(add) sub = app.task(sub) mul = app.task(mul) div = app.task(div)
然后重新啟動(dòng) worker:
輸出結(jié)果顯示,任務(wù)工廠都已經(jīng)被加載進(jìn)來(lái)了,然后我們創(chuàng)建任務(wù)并發(fā)送至隊(duì)列。
# 在 celery_demo 目錄下 # 將 app.py 里面的任務(wù)工廠導(dǎo)入進(jìn)來(lái) >>> from app import add, sub, mul, div # 然后創(chuàng)建任務(wù)發(fā)送至隊(duì)列,并等待結(jié)果 >>> add.delay(3, 4).get() 7 >>> sub.delay(3, 4).get() -1 >>> mul.delay(3, 4).get() 12 >>> div.delay(3, 4).get() 0.75 >>>
結(jié)果正常返回了,再來(lái)看看 worker 的輸出,
多個(gè)任務(wù)都被執(zhí)行了。
我們?cè)诎l(fā)送任務(wù)到隊(duì)列的時(shí)候,使用的是 delay 方法,里面直接傳遞函數(shù)所需的參數(shù)即可,那么除了函數(shù)需要的參數(shù)之外,還有沒(méi)有其它參數(shù)呢?
首先 delay 方法實(shí)際上是調(diào)用的 apply_async 方法,并且 delay 方法里面只接收函數(shù)的參數(shù),但是 apply_async 接收的參數(shù)就很多了,我們先來(lái)看看它們的函數(shù)原型:
delay 方法的 *args 和 **kwargs 就是函數(shù)的參數(shù),它會(huì)傳遞給 apply_async 的 args 和 kwargs。而其它的參數(shù)就是發(fā)送任務(wù)時(shí)所設(shè)置的一些參數(shù),我們這里重點(diǎn)介紹一下 apply_async 的其它參數(shù)。
countdown:倒計(jì)時(shí),表示任務(wù)延遲多少秒之后再執(zhí)行,參數(shù)為整型;
eta:任務(wù)的開(kāi)始時(shí)間,datetime 類型,如果指定了 countdown,那么這個(gè)參數(shù)就不應(yīng)該再指定;
expires:datetime 或者整型,如果到規(guī)定時(shí)間、或者未來(lái)的多少秒之內(nèi),任務(wù)還沒(méi)有發(fā)送到隊(duì)列被 worker 執(zhí)行,那么該任務(wù)將被丟棄;
shadow:重新指定任務(wù)的名稱,覆蓋 app.py 創(chuàng)建任務(wù)時(shí)日志上所指定的名字;
retry:任務(wù)失敗之后是否重試,bool 類型;
retry_policy:重試所采用的策略,如果指定這個(gè)參數(shù),那么 retry 必須要為 True。參數(shù)類型是一個(gè)字典,里面參數(shù)如下:
max_retries : 最大重試次數(shù),默認(rèn)為 3 次;
interval_start : 重試等待的時(shí)間間隔秒數(shù),默認(rèn)為 0,表示直接重試不等待;
interval_step : 每次重試讓重試間隔增加的秒數(shù),可以是數(shù)字或浮點(diǎn)數(shù),默認(rèn)為 0.2;
interval_max : 重試間隔最大的秒數(shù),即通過(guò) interval_step 增大到多少秒之后, 就不在增加了, 可以是數(shù)字或者浮點(diǎn)數(shù);
routing_key:自定義路由鍵,針對(duì) RabbitMQ;
queue:指定發(fā)送到哪個(gè)隊(duì)列,針對(duì) RabbitMQ;
exchange:指定發(fā)送到哪個(gè)交換機(jī),針對(duì) RabbitMQ;
priority:任務(wù)隊(duì)列的優(yōu)先級(jí),0-9 之間,對(duì)于 RabbitMQ 而言,0是最高級(jí);
serializer:任務(wù)序列化方法,通常不設(shè)置;
compression:壓縮方案,通常有zlib、bzip2;
headers:為任務(wù)添加額外的消息頭;
link:任務(wù)成功執(zhí)行后的回調(diào)方法,是一個(gè)signature對(duì)象,可以用作關(guān)聯(lián)任務(wù);
link_error: 任務(wù)失敗后的回調(diào)方法,是一個(gè)signature對(duì)象;
我們隨便挑幾個(gè)舉例說(shuō)明:
>>> from app import add # 使用 apply_async,要注意參數(shù)的傳遞 # 位置參數(shù)使用元組或者列表,關(guān)鍵字參數(shù)使用字典 # 因?yàn)槭莂rgs和kwargs, 不是 *args和 **kwargs >>> add.apply_async([3], {"y": 4}, ... task_, ... countdown=5).get() 7 >>>
查看一下 worker 的輸出:
注意左邊的時(shí)間,16:25:16 收到的消息,但 5 秒后才執(zhí)行完畢,因?yàn)槲覀儗?countdown 參數(shù)設(shè)置為 5。并且任務(wù)的 id 也被我們修改了。
另外還需要注意一下那些接收時(shí)間的參數(shù),比如 eta。如果我們手動(dòng)指定了eta,那么一定要注意時(shí)區(qū)的問(wèn)題,要保證 celery 所使用的時(shí)區(qū)和你傳遞的 datetime 的時(shí)區(qū)是統(tǒng)一的。
其它的參數(shù)可以自己手動(dòng)測(cè)試一下,這里不細(xì)說(shuō)了,根據(jù)自身的業(yè)務(wù)選擇合適的參數(shù)即可。
之前在創(chuàng)建任務(wù)工廠的時(shí)候,是將函數(shù)導(dǎo)入到 app.py 中,然后通過(guò) add = app.task(add) 的方式手動(dòng)裝飾,因?yàn)橛心男┤蝿?wù)工廠必須要讓 worker 知道,所以一定要在 app.py 里面出現(xiàn)。但是這顯然不夠優(yōu)雅,那么可不可以這么做呢?
# celery_demo/tasks/task1.py from app import app # celery_demo 所在路徑位于 sys.path 中 # 因此這里可以直接 from app import app @app.task def add(x, y): return x + y @app.task def sub(x, y): return x - y # celery_demo/app.py from tasks.task1 import add, sub
按照上面這種做法,理想上可以,但現(xiàn)實(shí)不行,因?yàn)闀?huì)發(fā)生循環(huán)導(dǎo)入。
所以 celery 提供了一個(gè)辦法,我們依舊在 task1.py 中 import app,但在 app.py 中不再使用 import,而是通過(guò) include 加載的方式,我們看一下:
# celery_demo/tasks/task1.py from app import app @app.task def add(x, y): return x + y @app.task def sub(x, y): return x - y # celery_demo/app.py from celery import Celery import config # 通過(guò) include 指定存放任務(wù)的 py 文件 # 注意它和 worker 啟動(dòng)路徑之間的關(guān)系 # 我們是在 celery_demo 目錄下啟動(dòng)的 worker # 所以應(yīng)該寫成 "tasks.task1" # 如果是在 celery_demo 的上一級(jí)目錄啟動(dòng) worker # 那么這里就要指定為 "celery_demo.tasks.task1" # 當(dāng)然啟動(dòng)時(shí)的 -A app 也要換成 -A celery_demo.app app = Celery(__name__, include=["tasks.task1"]) # 如果還有其它文件,比如 task2.py, task3.py # 那么就把 "tasks.task2", "tasks.task3" 加進(jìn)去 app.config_from_object(config)
在 celery_demo 目錄下重新啟動(dòng) worker。
為了方便,我們只保留了兩個(gè)任務(wù)工廠??梢钥吹酱藭r(shí)就成功啟動(dòng)了,并且也更加方便和優(yōu)雅一些。之前是在 task1.py 中定義函數(shù),然后再把 task1.py 中的函數(shù)導(dǎo)入到 app.py 里面,然后手動(dòng)進(jìn)行裝飾。雖然這么做是沒(méi)問(wèn)題的,但很明顯這種做法不適合管理。
所以還是要將 app.py 中的 app 導(dǎo)入到 task1.py 中直接創(chuàng)建任務(wù)工廠,但如果再將 task1.py 中的任務(wù)工廠導(dǎo)入到 app.py 中就會(huì)發(fā)生循環(huán)導(dǎo)入。于是 celery 提供了一個(gè) include 參數(shù),可以在創(chuàng)建 app 的時(shí)候自動(dòng)將里面所有的任務(wù)工廠加載進(jìn)來(lái),然后啟動(dòng)并告訴 worker。
我們來(lái)測(cè)試一下:
# 通過(guò) tasks.task1 導(dǎo)入任務(wù)工廠 # 然后創(chuàng)建任務(wù),發(fā)送至隊(duì)列 >>> from tasks.task1 import add, sub >>> add.delay(11, 22).get() 33 >>> sub.delay(11, 22).get() -11 >>>
查看一下 worker 的輸出:
結(jié)果一切正常。
我們之前通過(guò)對(duì)一個(gè)函數(shù)使用 @app.task 即可將其變成一個(gè)任務(wù)工廠,而這個(gè)任務(wù)工廠就是一個(gè) Task 實(shí)例對(duì)象。而我們?cè)谑褂?@app.task 的時(shí)候,其實(shí)是可以加上很多的參數(shù)的,常用參數(shù)如下:
name:默認(rèn)的任務(wù)名是一個(gè)uuid,我們可以通過(guò) name 參數(shù)指定任務(wù)名,當(dāng)然這個(gè) name 就是 apply_async 的參數(shù) name。如果在 apply_async 中指定了,那么以 apply_async 指定的為準(zhǔn);
bind:一個(gè) bool 值,表示是否和任務(wù)工廠進(jìn)行綁定。如果綁定,任務(wù)工廠會(huì)作為參數(shù)傳遞到方法中;
base:定義任務(wù)的基類,用于定義回調(diào)函數(shù),當(dāng)任務(wù)到達(dá)某個(gè)狀態(tài)時(shí)觸發(fā)不同的回調(diào)函數(shù),默認(rèn)是 Task,所以我們一般會(huì)自己寫一個(gè)類然后繼承 Task;
default_retry_delay:設(shè)置該任務(wù)重試的延遲機(jī)制,當(dāng)任務(wù)執(zhí)行失敗后,會(huì)自動(dòng)重試,單位是秒,默認(rèn)是3分鐘;
serializer:指定序列化的方法;
當(dāng)然 app.task 還有很多不常用的參數(shù),這里就不說(shuō)了,有興趣可以去查看官網(wǎng)或源碼,我們演示一下幾個(gè)常用的參數(shù):
# celery_demo/tasks/task1.py from app import app @app.task(name="你好") def add(x, y): return x + y @app.task(name="我不好", bind=True) def sub(self, x, y): """ 如果 bind=True,則需要多指定一個(gè) self 這個(gè) self 就是對(duì)應(yīng)的任務(wù)工廠 """ # self.request 是一個(gè) celery.task.Context 對(duì)象 # 獲取它的屬性字典,即可拿到該任務(wù)的所有屬性 print(self.request.__dict__) return x - y
其它代碼不變,我們重新啟動(dòng) worker:
然后創(chuàng)建任務(wù)發(fā)送至隊(duì)列,再由 worker 取出執(zhí)行:
>>> from tasks.task1 import add, sub >>> add.delay(111, 222).get() 333 >>> sub.delay(111, 222).get() -111 >>>
執(zhí)行沒(méi)有問(wèn)題,然后看看 worker 的輸出:
創(chuàng)建任務(wù)工廠時(shí),如果指定了 bind=True,那么執(zhí)行任務(wù)時(shí)會(huì)將任務(wù)工廠本身作為第一個(gè)參數(shù)傳過(guò)去。任務(wù)工廠本質(zhì)上就是 Task 實(shí)例對(duì)象,調(diào)用它的 delay 方法即可創(chuàng)建任務(wù)。
所以如果我們?cè)?sub 內(nèi)部繼續(xù)調(diào)用 self.delay(11, 22),會(huì)有什么后果呢?沒(méi)錯(cuò),worker 會(huì)進(jìn)入無(wú)限遞歸。因?yàn)閳?zhí)行任務(wù)的時(shí)候,在任務(wù)的內(nèi)部又創(chuàng)建了任務(wù),所以會(huì)死循環(huán)下去。
當(dāng)然 self 還有很多其它屬性和方法,具體有哪些可以通過(guò) Task 這個(gè)類來(lái)查看。這里面比較重要的是 self.request,它包含了某個(gè)具體任務(wù)的相關(guān)信息,而且信息非常多。
比如當(dāng)前傳遞的參數(shù)是什么,就可以通過(guò) self.request 拿到。當(dāng)然啦,self.request 是一個(gè) Context 對(duì)象,因?yàn)椴煌蝿?wù)獲取 self.request 的結(jié)果肯定是不同的,但 self(任務(wù)工廠)卻只有一個(gè),所以要基于 Context 進(jìn)行隔離。
我們可以通過(guò) __dict__ 拿到 Context 對(duì)象的屬性字典,然后再進(jìn)行操作。
最后再來(lái)說(shuō)一說(shuō) @app.task 里面的 base 參數(shù)。
# celery_demo/tasks/task1.py from celery import app from app import Task class MyTask(Task): """ 自定義一個(gè)類,繼承自celery.Task exc: 失敗時(shí)的錯(cuò)誤的類型; task_id: 任務(wù)的id; args: 任務(wù)函數(shù)的位置參數(shù); kwargs: 任務(wù)函數(shù)的關(guān)鍵字參數(shù); einfo: 失敗時(shí)的異常詳細(xì)信息; retval: 任務(wù)成功執(zhí)行的返回值; """ def on_failure(self, exc, task_id, args, kwargs, einfo): """任務(wù)失敗時(shí)執(zhí)行""" def on_success(self, retval, task_id, args, kwargs): """任務(wù)成功時(shí)執(zhí)行""" print("任務(wù)執(zhí)行成功") def on_retry(self, exc, task_id, args, kwargs, einfo): """任務(wù)重試時(shí)執(zhí)行""" # 使用 @app.task 的時(shí)候,指定 base 即可 # 然后任務(wù)在執(zhí)行的時(shí)候,會(huì)觸發(fā) MyTask 里面的回調(diào)函數(shù) @app.task(name="地靈殿", base=MyTask) def add(x, y): print("加法計(jì)算") return x + y
重新啟動(dòng) worker,然后創(chuàng)建任務(wù)。
指定了 base,任務(wù)在執(zhí)行的時(shí)候會(huì)根據(jù)執(zhí)行狀態(tài)的不同,觸發(fā) MyTask 里面的不同方法。
有時(shí)候我們也可以將執(zhí)行的多個(gè)任務(wù),劃分到一個(gè)組中。
# celery_demo/tasks/task1.py from app import app @app.task() def add(x, y): print("加法計(jì)算") return x + y @app.task() def sub(x, y): print("減法計(jì)算") return x - y @app.task() def mul(x, y): print("乘法計(jì)算") return x * y @app.task() def div(x, y): print("除法計(jì)算") return x // y
老規(guī)矩,重啟 worker,因?yàn)槲覀冃薷牧巳蝿?wù)工廠。
然后來(lái)導(dǎo)入它們,創(chuàng)建任務(wù),并將這些任務(wù)劃分到一個(gè)組中。
>>> from tasks.task1 import add, sub, mul, div >>> from celery import group # 調(diào)用 signature 方法,得到 signature 對(duì)象 # 此時(shí) t1.delay() 和 add.delay(2, 3) 是等價(jià)的 >>> t1 = add.signature(args=(2, 3)) >>> t2 = sub.signature(args=(2, 3)) >>> t3 = mul.signature(args=(2, 3)) >>> t4 = div.signature(args=(4, 2)) # 但是變成 signature 對(duì)象之后, # 我們可以將其放到一個(gè)組里面 >>> gp = group(t1, t2, t3, t4) # 執(zhí)行組任務(wù) # 返回 celery.result.GroupResult 對(duì)象 >>> res = gp() # 每個(gè)組也有一個(gè)唯一 id >>> print("組id:", res.id) 組id: 65f32cc4-b8ce-4bf8-916b-f5cc359a901a # 調(diào)用 get 方法也會(huì)阻塞,知道組里面任務(wù)全部完成 >>> print("組結(jié)果:", res.get()) 組結(jié)果: [5, -1, 6, 2] >>>
可以看到整個(gè)組也是有唯一 id 的,另外 signature 也可以寫成 subtask 或者 s,在源碼里面這幾個(gè)是等價(jià)的。
我們觀察一下 worker 的輸出,任務(wù)是并發(fā)執(zhí)行的,所以哪個(gè)先完成不好說(shuō)。但是調(diào)用組的 get 方法時(shí),里面的返回值順序一定和任務(wù)添加時(shí)候的順序保持一致。
除此之外,celery 還支持將多個(gè)任務(wù)像鏈子一樣串起來(lái),第一個(gè)任務(wù)的輸出會(huì)作為第二個(gè)任務(wù)的輸入,傳遞給下一個(gè)任務(wù)的第一個(gè)參數(shù)。
# celery_demo/tasks/task1.py from app import app @app.task def task1(): l = [] return l @app.task # task1 的返回值會(huì)傳遞給這里的 task1_return def task2(task1_return, value): task1_return.append(value) return task1_return @app.task # task2 的返回值會(huì)傳遞給這里的 task2_return def task3(task2_return, num): return [i + num for i in task2_return] @app.task # task3 的返回值會(huì)傳遞給這里的 task3_return def task4(task3_return): return sum(task3_return)
然后我們看怎么將這些任務(wù)像鏈子一樣串起來(lái)。
>>> from tasks.task1 import * >>> from celery import chain # 將多個(gè) signature 對(duì)象進(jìn)行與運(yùn)算 # 當(dāng)然內(nèi)部肯定重寫了 __or__ 這個(gè)魔法方法 >>> my_chain = chain( ... task1.s() | task2.s(123) | task3.s(5) | task4.s()) >>> # 執(zhí)行任務(wù)鏈 >>> res = my_chain() # 獲取返回值 >>> print(res.get()) 128 >>>
這種鏈?zhǔn)教幚淼膱?chǎng)景非常常見(jiàn),比如 MapReduce。
既然是定時(shí)任務(wù),那么就意味著 worker 要后臺(tái)啟動(dòng),否則一旦遠(yuǎn)程連接斷開(kāi),就停掉了。因此 celery 是支持我們后臺(tái)啟動(dòng)的,并且可以啟動(dòng)多個(gè)。
# 啟動(dòng) worker celery multi start w1 -A app -l info # 可以同時(shí)啟動(dòng)多個(gè) celery multi start w2 w3 -A app -l info # 以上我們就啟動(dòng)了 3 個(gè) worker # 如果想停止的話 celery multi stop w1 w2 w3 -A app -l info
但是注意,這種啟動(dòng)方式在 Windows 上面不支持,因?yàn)?celery 會(huì)默認(rèn)創(chuàng)建兩個(gè)目錄,分別是 /var/log/celery 和 /var/run/celery,顯然這是類 Unix 系統(tǒng)的目錄結(jié)構(gòu)。
顯然啟動(dòng)和關(guān)閉是沒(méi)有問(wèn)題的,不過(guò)為了更好地觀察到輸出,我們還是用之前的方式,選擇前臺(tái)啟動(dòng)。
然后回顧一下 celery 的架構(gòu),里面除了 producer 之外還有一個(gè) celery beat,也就是調(diào)度器。我們調(diào)用任務(wù)工廠的 delay 方法,手動(dòng)將任務(wù)發(fā)送到隊(duì)列,此時(shí)就相當(dāng)于 producer。如果是設(shè)置定時(shí)任務(wù),那么會(huì)由調(diào)度器自動(dòng)將任務(wù)添加到隊(duì)列。
我們?cè)?tasks 目錄里面再創(chuàng)建一個(gè) period_task1.py 文件。
# celery_demo/tasks/period_task1.py from celery.schedules import crontab from app import app from .task1 import task1, task2, task3, task4 @app.on_after_configure.connect def period_task(sender, **kwargs): # 第一個(gè)參數(shù)為 schedule,可以是 float,或者 crontab # crontab 后面會(huì)說(shuō),第二個(gè)參數(shù)是任務(wù),第三個(gè)參數(shù)是名字 sender.add_periodic_task(10.0, task1.s(), name="每10秒執(zhí)行一次") sender.add_periodic_task(15.0, task2.s("task2"), name="每15秒執(zhí)行一次") sender.add_periodic_task(20.0, task3.s(), name="每20秒執(zhí)行一次") sender.add_periodic_task( crontab(hour=18, minute=5, day_of_week=0), task4.s("task4"), name="每個(gè)星期天的18:05運(yùn)行一次" ) # celery_demo/tasks/task1.py from app import app @app.task def task1(): print("我是task1") return "task1你好" @app.task def task2(name): print(f"我是{name}") return f"{name}你好" @app.task def task3(): print("我是task3") return "task3你好" @app.task def task4(name): print(f"我是{name}") return f"{name}你好"
既然使用了定時(shí)任務(wù),那么一定要設(shè)置時(shí)區(qū)。
# celery_demo/config.py broker_url = "redis://:maverick@82.157.146.194:6379/1" result_backend = "redis://:maverick@82.157.146.194:6379/2" # 之前說(shuō)過(guò),celery 默認(rèn)使用 utc 時(shí)間 # 其實(shí)我們是可以手動(dòng)禁用的,然后手動(dòng)指定時(shí)區(qū) enable_utc = False timezone = "Asia/Shanghai"
最后是修改 app.py,將定時(shí)任務(wù)加進(jìn)去。
from celery import Celery import config app = Celery( __name__, include=["tasks.task1", "tasks.period_task1"]) app.config_from_object(config)
下面就來(lái)啟動(dòng)任務(wù),先來(lái)啟動(dòng) worker,生產(chǎn)上應(yīng)該后臺(tái)啟動(dòng),這里為了看到信息,選擇前臺(tái)啟動(dòng)。
tasks.task1 里面的 4 個(gè)任務(wù)工廠都被添加進(jìn)來(lái)了,然后再來(lái)啟動(dòng)調(diào)度器。
調(diào)度器啟動(dòng)之后會(huì)自動(dòng)檢測(cè)定時(shí)任務(wù),如果到時(shí)間了,就發(fā)送到隊(duì)列。而啟動(dòng)調(diào)度器的命令如下:
根據(jù)調(diào)度器的輸出內(nèi)容,我們知道定時(shí)任務(wù)執(zhí)行完了,但很明顯定時(shí)任務(wù)本質(zhì)上也是任務(wù),只不過(guò)有定時(shí)功能,但也要發(fā)到隊(duì)列里面。然后 worker 從隊(duì)列里面取出任務(wù),并執(zhí)行,那么 worker 必然會(huì)有信息輸出。
調(diào)度器啟動(dòng)到現(xiàn)在已經(jīng)有一段時(shí)間了,worker 在終端中輸出了非常多的信息。
此時(shí)我們就成功實(shí)現(xiàn)了定時(shí)任務(wù),并且是通過(guò)定義函數(shù)、打上裝飾器的方式實(shí)現(xiàn)的。除此之外,我們還可以通過(guò)配置的方式實(shí)現(xiàn)。
# celery_demo/tasks/period_task1.py from celery.schedules import crontab from app import app # 此時(shí)也不需顯式導(dǎo)入任務(wù)工廠了 # 直接以字符串的方式指定即可 app.conf.beat_schedule = { # 參數(shù)通過(guò) args 和 kwargs 指定 "每10秒執(zhí)行一次": {"task": "tasks.task1.task1", "schedule": 10.0}, "每15秒執(zhí)行一次": {"task": "tasks.task1.task2", "schedule": 15.0, "args": ("task2",)}, "每20秒執(zhí)行一次": {"task": "tasks.task1.task3", "schedule": 20.0}, "每個(gè)星期天的18:05運(yùn)行一次": {"task": "tasks.task1.task4", "schedule": crontab(hour=18, minute=5, day_of_week=0), "args": ("task4",)} }
需要注意:雖然我們不用顯式導(dǎo)入任務(wù)工廠,但其實(shí)是 celery 自動(dòng)幫我們導(dǎo)入。由于這些任務(wù)工廠都位于 celery_demo/tasks/task1.py 里面,而 worker 也是在 celery_demo 目錄下啟動(dòng)的,所以需要指定為 tasks.task1.task{1234}。
這種啟動(dòng)方式也是可以成功的,貌似還更方便一些,但是會(huì)多出一個(gè)文件,用來(lái)存儲(chǔ)配置信息。
定時(shí)任務(wù)除了指定一個(gè)浮點(diǎn)數(shù)之外(表示每隔多少秒執(zhí)行一次),還可以指定 crontab。關(guān)于 crontab 應(yīng)該都知道是什么,我們?cè)?Linux 上想啟動(dòng)定時(shí)任務(wù)的話,直接 crontab -e 然后添加即可。
而 celery 的 crontab 和 Linux 高度相似,我們看一下函數(shù)原型就知道了。
簡(jiǎn)單解釋一下:
minute:0-59,表示第幾分鐘觸發(fā),* 表示每分鐘觸發(fā)一次;
hour:0-23,表示第幾個(gè)小時(shí)觸發(fā),* 表示每小時(shí)都會(huì)觸發(fā)。比如 minute=2, hour=*,表示每小時(shí)的第二分鐘觸發(fā)一次;
day_of_week:0-6,表示一周的第幾天觸發(fā),0 是星期天,1-6 分別是星期一到星期六,不習(xí)慣的話也可以用字符串 mon,tue,wed,thu,fri,sat,sun 表示;
month_of_year:當(dāng)前年份的第幾個(gè)月;
以上就是這些參數(shù)的含義,并且參數(shù)接收的值還可以是一些特殊的通配符:
*:所有,比如 minute=*,表示每分鐘觸發(fā);
*/a:所有可被 a 整除的時(shí)候觸發(fā);
a-b:a 到 b范圍內(nèi)觸發(fā);
a-b/c:范圍 a-b 且能夠被 c 整除的時(shí)候觸發(fā);
2,10,40:比如 minute=2,10,40 表示第 2、10、40 分鐘的時(shí)候觸發(fā);
通配符之間是可以自由組合的,比如 */3,8-17 就表示能被 3 整除,或范圍處于 8-17 的時(shí)候觸發(fā)。
除此之外,還可以根據(jù)天色來(lái)設(shè)置定時(shí)任務(wù)(有點(diǎn)離譜)。
from celery.schedules import solar app.conf.beat_schedule = { "日落": {"task": "task1", "schedule": solar("sunset", -37.81753, 144.96715) }, }
solar 里面接收三個(gè)參數(shù),分別是 event、lat、lon,后兩個(gè)比較簡(jiǎn)單,表示觀測(cè)者所在的緯度和經(jīng)度。值大于 0,則對(duì)應(yīng)東經(jīng)/北緯,小于 0,則對(duì)應(yīng)西經(jīng)/南緯。
我們重點(diǎn)看第一個(gè)參數(shù) event,可選值如下:
比如代碼中的 "sunset", -37.81753, 144.96715 就表示,當(dāng)站在南緯 37.81753、東經(jīng) 144.96715 的地方觀察,發(fā)現(xiàn)傍晚太陽(yáng)的上邊緣消失在西方地平線上的時(shí)候,觸發(fā)任務(wù)執(zhí)行。
“Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
本文名稱:Python強(qiáng)大的任務(wù)調(diào)度框架Celery怎么使用
瀏覽地址:http://chinadenli.net/article46/joieeg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開(kāi)發(fā)、網(wǎng)站制作、網(wǎng)站導(dǎo)航、微信小程序、手機(jī)網(wǎng)站建設(shè)、虛擬主機(jī)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)