來源:古明地覺的編程教室
什么是 celery
這次我們來介紹一下 Python/ target=_blank class=infotextkey>Python 的一個第三方模塊 celery,那么 celery 是什么呢?
-
celery 是一個靈活且可靠的,處理大量消息的分布式系統,可以在多個節點之間處理某個任務;
-
celery 是一個專注于實時處理的任務隊列,支持任務調度;
-
celery 是開源的,有很多的使用者;
-
celery 完全基于 Python 語言編寫;
所以 celery 本質上就是一個任務調度框架,類似于 Apache 的 airflow,當然 airflow 也是基于 Python 語言編寫。
不過有一點需要注意,celery 是用來調度任務的,但它本身并不具備存儲任務的功能,而調度任務的時候肯定是要把任務存起來的。因此要使用 celery 的話,還需要搭配一些具備存儲、訪問功能的工具,比如:消息隊列、redis緩存、數據庫等等。官方推薦的是消息隊列 RabbitMQ,個人認為有些時候使用 Redis 也是不錯的選擇,當然我們都會介紹。
那么 celery 都可以在哪些場景中使用呢?
-
異步任務:一些耗時的操作可以交給celery異步執行,而不用等著程序處理完才知道結果。比如:視頻轉碼、郵件發送、消息推送等等;
-
定時任務:比如定時推送消息、定時爬取數據、定時統計數據等等;
celery 的架構
我們看一下 celery 的架構:
-
producer:生產者,專門用來生產任務(task);
-
celery beat:任務調度器,調度器進程會讀取配置文件的內容,周期性地將配置文件里面到期需要執行的任務發送給消息隊列,說白了就是生產定時任務;
-
broker:任務隊列,用于存放生產者和調度器生產的任務。一般使用消息隊列或者 Redis 來存儲,當然具有存儲功能的數據庫也是可以的。這一部分是 celery 所不提供的,需要依賴第三方。作用就是接收任務,存進隊列;
-
worker:任務的執行單元,會將任務從隊列中順序取出并執行;
-
backend:用于在任務結束之后保存狀態信息和結果,以便查詢,一般是數據庫,當然只要具備存儲功能都可以作為 backend;
下面我們來安裝 celery,安裝比較簡單,直接 pip install celery 即可。這里我本地的 celery 版本是 5.2.7,Python 版本是 3.8.10。
另外,由于 celery 本身不提供任務存儲的功能,所以這里我們使用 Redis 作為消息隊列,負責存儲任務。因此你還要在機器上安裝 Redis,我這里有一臺云服務器,已經安裝好了。
后續 celery 就會將任務存到 broker 里面,當然要想實現這一點,就必須還要有能夠操作相應 broker 的驅動。Python 操作 Redis 的驅動也叫 redis,操作 RabbitMQ 的驅動叫 pika,直接 pip install ... 安裝即可。
celery 實現異步任務
我們新建一個工程,就叫 celery_demo,然后在里面新建一個 App.py 文件。
# 文件名:app.py
import time
# 這個 Celery 就類似于 flask.Flask
# 然后實例化得到一個app
from celery import Celery
# 指定一個 name、以及 broker 的地址、backend 的地址
app = Celery(
"satori",
# 這里使用我服務器上的 Redis
# broker 用 1 號庫, backend 用 2 號庫
broker="redis://:maverick@82.157.146.194:6379/1",
backend="redis://:maverick@82.157.146.194:6379/2")
# 這里通過 @app.task 對函數進行裝飾
# 那么之后我們便可調用 task.delay 創建一個任務
@app.task
def task(name, age):
print("準備執行任務啦")
time.sleep(3)
return f"name is {name}, age is {age}"
我們說執行任務的對象是 worker,那么我們是不是需要創建一個 worker 呢?顯然是需要的,而創建 worker 可以使用如下命令創建:
注意:在 5.0 之前我們可以寫成 celery worker -A app ...,也就是把所有的參數都放在子命令 celery worker 的后面。但從 5.0 開始這種做法就不允許了,必須寫成 celery -A app worker ...,因為 -A 變成了一個全局參數,所以它不應該放在 worker 的后面,而是要放在 worker 的前面。
下面執行該命令:
以上就前臺啟動了一個 worker,正在等待從隊列中獲取任務,圖中也顯示了相應的信息。然而此時隊列中并沒有任務,所以我們需要在另一個文件中創建任務并發送到隊列里面去。
import time
from app import task
# 從 app 導入 task, 創建任務, 但是注意: 不要直接調用 task
# 因為那樣的話就在本地執行了, 我們的目的是將任務發送到隊列里面去
# 然后讓監聽隊列的 worker 從隊列里面取任務并執行
# 而 task 被 @app.task 裝飾, 所以它不再是原來的 task 了
# 我們需要調用它的 delay 方法
# 調用 delay 之后, 就會創建一個任務
# 然后發送到隊列里面去, 也就是我們這里的 Redis
# 至于參數, 普通調用的時候怎么傳, 在 delay 里面依舊怎么傳
start = time.perf_counter()
task.delay("古明地覺", 17)
print(
time.perf_counter() - start
) # 0.11716766700000003
然后執行該文件,發現只用了 0.12 秒,而 task 里面明明 sleep 了 3 秒。所以說明這一步是不會阻塞的,調用 task.delay 只是創建一個任務并發送至隊列。我們再看一下 worker 的輸出信息:
可以看到任務已經被消費者接收并且消費了,而且調用 delay 方法是不會阻塞的,花費的那 0.12 秒是用在了其它地方,比如連接 Redis 發送任務等等。
另外需要注意,函數被 @app.task 裝飾之后,可以理解為它就變成了一個任務工廠,因為被裝飾了嘛,然后調用任務工廠的 delay 方法即可創建任務并發送到隊列里面。我們也可以創建很多個任務工廠,但是這些任務工廠必須要讓 worker 知道,否則不會生效。所以如果修改了某個任務工廠、或者添加、刪除了某個任務工廠,那么一定要讓 worker 知道,而做法就是先停止 celery worker 進程,然后再重新啟動。
如果我們新建了一個任務工廠,然后在沒有重啟 worker 的情況下,就用調用它的 delay 方法創建任務、并發送到隊列的話,那么會拋出一個 KeyError,提示找不到相應的任務工廠。
其實很好理解,因為代碼已經加載到內存里面了,光修改了源文件而不重啟是沒用的。因為加載到內存里面的還是原來的代碼,不是修改過后的。
然后我們再來看看 Redis 中存儲的信息,1 號庫用作 broker,負責存儲任務;2 號庫用作 backend,負責存儲執行結果。我們來看 2 號庫:
以上我們就啟動了一個 worker 并成功消費了隊列中的任務,并且還從 Redis 里面拿到了執行信息。當然啦,如果只能通過查詢 backend 才能拿到信息的話,那 celery 就太不智能了。我們也可以直接從程序中獲取。
直接查詢任務執行信息
Redis(backend)里面存儲了很多關于任務的信息,這些信息我們可以直接在程序中獲取。
from app import task
res = task.delay("古明地覺", 17)
print(type(res))
"""
"""
# 直接打印,顯示任務的 id
print(res)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 獲取狀態, 顯然此刻沒有執行完
# 因此結果是PENDING, 表示等待狀態
print(res.status)
"""
PENDING
"""
# 獲取 id,兩種方式均可
print(res.task_id)
print(res.id)
"""
4bd48a6d-1f0e-45d6-a225-6884067253c3
4bd48a6d-1f0e-45d6-a225-6884067253c3
"""
# 獲取任務執行結束時的時間
# 任務還沒有結束, 所以返回None
print(res.date_done)
"""
None
"""
# 獲取任務的返回值, 可以通過 result 或者 get()
# 注意: 如果是 result, 那么任務還沒有執行完的話會直接返回 None
# 如果是 get(), 那么會阻塞直到任務完成
print(res.result)
print(res.get())
"""
None
name is 古明地覺, age is 17
"""
# 再次查看狀態和執行結束時的時間
# 發現 status 變成SUCCESS
# date_done 變成了執行結束時的時間
print(res.status)
# 但顯示的是 UTC 時間
print(res.date_done)
"""
SUCCESS
2022-09-08 06:40:34.525492
"""
另外我們說結果需要存儲在 backend 中,如果沒有配置 backend,那么獲取結果的時候會報錯。至于 backend,因為它是存儲結果的,所以一般會保存在數據庫中,因為要持久化。我這里為了方便,就還是保存在 Redis 中。
celery.result.AsyncResult 對象
調用完任務工廠的 delay 方法之后,會創建一個任務并發送至隊列,同時返回一個 AsyncResult 對象,基于此對象我們可以拿到任務執行時的所有信息。但是 AsyncResult 對象我們也可以手動構造,舉個例子:
import time
# 我們不光要導入 task, 還要導入里面的 app
from app import app, task
# 導入 AsyncResult 這個類
from celery.result import AsyncResult
# 發送任務到隊列當中
res = task.delay("古明地覺", 17)
# 傳入任務的 id 和 app, 創建 AsyncResult 對象
async_result = AsyncResult(res.id, app=app)
# 此時的這個 res 和 async_result 之間是等價的
# 兩者都是 AsyncResult 對象, 它們所擁有的方法也是一樣的
# 下面用誰都可以
while True:
# 等價于async_result.state == "SUCCESS"
if async_result.successful():
print(async_result.get())
break
# 等價于async_result.state == "FAILURE"
elif async_result.failed():
print("任務執行失敗")
elif async_result.status == "PENDING":
print("任務正在被執行")
elif async_result.status == "RETRY":
print("任務執行異常正在重試")
elif async_result.status == "REJECTED":
print("任務被拒絕接收")
elif async_result.status == "REVOKED":
print("任務被取消")
else:
print("其它的一些狀態")
time.sleep(0.8)
"""
任務正在被執行
任務正在被執行
任務正在被執行
任務正在被執行
name is 古明地覺, age is 17
"""
以上就是任務可能出現的一些狀態,通過輪詢的方式,我們也可以查看任務是否已經執行完畢。當然 AsyncResult 還有一些別的方法,我們來看一下:
from app import task
res = task.delay("古明地覺", 17)
# 1. ready():查看任務狀態,返回布爾值。
# 任務執行完成返回 True,否則為 False
# 那么問題來了,它和 successful() 有什么區別呢?
# successful() 是在任務執行成功之后返回 True, 否則返回 False
# 而 ready() 只要是任務沒有處于阻塞狀態就會返回 True
# 比如執行成功、執行失敗、被 worker 拒收都看做是已經 ready 了
print(res.ready())
"""
False
"""
# 2. wait():和之前的 get 一樣, 因為在源碼中寫了: wait = get
# 所以調用哪個都可以, 不過 wait 可能會被移除,建議直接用 get 就行
print(res.wait())
print(res.get())
"""
name is 古明地覺, age is 17
name is 古明地覺, age is 17
"""
# 3. trackback:如果任務拋出了一個異常,可以獲取原始的回溯信息
# 執行成功就是 None
print(res.traceback)
"""
None
"""
以上就是獲取任務執行結果相關的部分。
celery 的配置
celery 的配置不同,所表現出來的性能也不同,比如序列化的方式、連接隊列的方式,單線程、多線程、多進程等等。那么 celery 都有那些配置呢?
-
broker_url:broker 的地址,就是類 Celery 里面傳入的 broker 參數。
-
result_backend:存儲結果地址,就是類 Celery 里面傳入的 backend 參數。
-
task_serializer:任務序列化方式,支持以下幾種:
-
-
binary:二進制序列化方式,pickle 模塊默認的序列化方法;
-
json:支持多種語言,可解決多語言的問題,但通用性不高;
-
xml:標簽語言,和 json 定位相似;
-
msgpack:二進制的類 json 序列化,但比 json 更小、更快;
-
yaml:表達能力更強、支持的類型更多,但是在 Python里面的性能不如 json;
根據情況,選擇合適的類型。如果不是跨語言的話,直接選擇 binary 即可,默認是 json。
-
-
result_serializer:任務執行結果序列化方式,支持的方式和任務序列化方式一致。
-
result_expires:任務結果的過期時間,單位是秒。
-
accept_content:指定任務接受的內容序列化類型(序列化),一個列表,比如:["msgpack", "binary", "json"]。
-
timezone:時區,默認是 UTC 時區。
-
enable_utc:是否開啟 UTC 時區,默認為 True;如果為 False,則使用本地時區。
-
task_publish_retry:發送消息失敗時是否重試,默認為 True。
-
worker_concurrency:并發的 worker 數量。
-
worker_prefetch_multiplier:每次 worker 從任務隊列中獲取的任務數量。
-
worker_max_tasks_per_child:每個 worker 執行多少次就會被殺掉,默認是無限的。
-
task_time_limit:單個任務執行的最大時間,單位是秒。
-
task_default_queue :設置默認的隊列名稱,如果一個消息不符合其它的隊列規則,就會放在默認隊列里面。如果什么都不設置的話,數據都會發送到默認的隊列中。
-
task_queues :設置詳細的隊列
# 將 RabbitMQ 作為 broker 時需要使用
task_queues = {
# 這是指定的默認隊列
"default": {
"exchange": "default",
"exchange_type": "direct",
"routing_key": "default"
},
# 凡是 topic 開頭的 routing key
# 都會被放到這個隊列
"topicqueue": {
"routing_key": "topic.#",
"exchange": "topic_exchange",
"exchange_type": "topic",
},
"task_eeg": { # 設置扇形交換機
"exchange": "tasks",
"exchange_type": "fanout",
"binding_key": "tasks",
},
celery 的配置非常多,不止我們上面說的那些,更多配置可以查看官網,寫的比較詳細。
https://docs.celeryq.dev/en/latest/userguide/configuration.html#general-settings
值得一提的是,在 5.0 之前配置項都是大寫的,而從 5.0 開始配置項改成小寫了。不過老的寫法目前仍然支持,只是啟動的時候會拋警告,并且在 6.0 的時候不再兼容老的寫法。
官網也很貼心地將老版本的配置和新版本的配置羅列了出來,盡管配置有很多,但并不是每一個都要用,可以根據自身的業務合理選擇。
然后下面我們就根據配置文件的方式啟動 celery,當前目錄結構如下:
celery_demo/config.py
broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379"
# 寫倆就完事了
celery_demo/tasks/task1.py
celery 可以支持非常多的定時任務,而不同種類的定時任務我們一般都會寫在不同的模塊中(當然這里目前只有一個),然后再將這些模塊組織在一個單獨的目錄中。
當前只有一個 task1.py,我們隨便往里面寫點東西,當然你也可以創建更多的文件。
def add(x, y):
return x + y
def sub(x, y):
return x - y
def mul(x, y):
return x * y
def div(x, y):
return x / y
celery_demo/app.py
from celery import Celery
import config
from tasks.task1 import (
add, sub, mul, div
# 指定一個 name 即可
app = Celery("satori")
# 其它參數通過加載配置文件的方式指定
# 和 flask 非常類似
app.config_from_object(config)
# 創建任務工廠,有了任務工廠才能創建任務
# 這種方式和裝飾器的方式是等價的
add = app.task(add)
sub = app.task(sub)
mul = app.task(mul)
div = app.task(div)
然后重新啟動 worker:
輸出結果顯示,任務工廠都已經被加載進來了,然后我們創建任務并發送至隊列。
# 在 celery_demo 目錄下
# 將 app.py 里面的任務工廠導入進來
>>> from app import add, sub, mul, div
# 然后創建任務發送至隊列,并等待結果
>>> add.delay(3, 4).get()
7
>>> sub.delay(3, 4).get()
-1
>>> mul.delay(3, 4).get()
12
>>> div.delay(3, 4).get()
0.75
結果正常返回了,再來看看 worker 的輸出,
多個任務都被執行了。
發送任務時指定參數
我們在發送任務到隊列的時候,使用的是 delay 方法,里面直接傳遞函數所需的參數即可,那么除了函數需要的參數之外,還有沒有其它參數呢?
首先 delay 方法實際上是調用的 apply_async 方法,并且 delay 方法里面只接收函數的參數,但是 apply_async 接收的參數就很多了,我們先來看看它們的函數原型:
delay 方法的 *args 和 **kwargs 就是函數的參數,它會傳遞給 apply_async 的 args 和 kwargs。而其它的參數就是發送任務時所設置的一些參數,我們這里重點介紹一下 apply_async 的其它參數。
-
countdown:倒計時,表示任務延遲多少秒之后再執行,參數為整型;
-
eta:任務的開始時間,datetime 類型,如果指定了 countdown,那么這個參數就不應該再指定;
-
expires:datetime 或者整型,如果到規定時間、或者未來的多少秒之內,任務還沒有發送到隊列被 worker 執行,那么該任務將被丟棄;
-
shadow:重新指定任務的名稱,覆蓋 app.py 創建任務時日志上所指定的名字;
-
retry:任務失敗之后是否重試,bool 類型;
-
retry_policy:重試所采用的策略,如果指定這個參數,那么 retry 必須要為 True。參數類型是一個字典,里面參數如下:
-
-
max_retries : 最大重試次數,默認為 3 次;
-
interval_start : 重試等待的時間間隔秒數,默認為 0,表示直接重試不等待;
-
interval_step : 每次重試讓重試間隔增加的秒數,可以是數字或浮點數,默認為 0.2;
-
interval_max : 重試間隔最大的秒數,即通過 interval_step 增大到多少秒之后, 就不在增加了, 可以是數字或者浮點數;
-
-
routing_key:自定義路由鍵,針對 RabbitMQ;
-
queue:指定發送到哪個隊列,針對 RabbitMQ;
-
exchange:指定發送到哪個交換機,針對 RabbitMQ;
-
priority:任務隊列的優先級,0-9 之間,對于 RabbitMQ 而言,0是最高級;
-
serializer:任務序列化方法,通常不設置;
-
compression:壓縮方案,通常有zlib、bzip2;
-
headers:為任務添加額外的消息頭;
-
link:任務成功執行后的回調方法,是一個signature對象,可以用作關聯任務;
-
link_error: 任務失敗后的回調方法,是一個signature對象;
我們隨便挑幾個舉例說明:
>>> from app import add
# 使用 apply_async,要注意參數的傳遞
# 位置參數使用元組或者列表,關鍵字參數使用字典
# 因為是args和kwargs, 不是 *args和 **kwargs
>>> add.apply_async([3], {"y": 4},
... task_id="戀戀",
... countdown=5).get()
7
>>>
查看一下 worker 的輸出:
注意左邊的時間,16:25:16 收到的消息,但 5 秒后才執行完畢,因為我們將 countdown 參數設置為 5。并且任務的 id 也被我們修改了。
另外還需要注意一下那些接收時間的參數,比如 eta。如果我們手動指定了eta,那么一定要注意時區的問題,要保證 celery 所使用的時區和你傳遞的 datetime 的時區是統一的。
其它的參數可以自己手動測試一下,這里不細說了,根據自身的業務選擇合適的參數即可。
創建任務工廠的另一種方式
之前在創建任務工廠的時候,是將函數導入到 app.py 中,然后通過 add = app.task(add) 的方式手動裝飾,因為有哪些任務工廠必須要讓 worker 知道,所以一定要在 app.py 里面出現。但是這顯然不夠優雅,那么可不可以這么做呢?
# celery_demo/tasks/task1.py
from app import app
# celery_demo 所在路徑位于 sys.path 中
# 因此這里可以直接 from app import app
@app.task
def add(x, y):
return x + y
@app.task
def sub(x, y):
return x - y
# celery_demo/app.py
from tasks.task1 import add, sub
按照上面這種做法,理想上可以,但現實不行,因為會發生循環導入。
所以 celery 提供了一個辦法,我們依舊在 task1.py 中 import app,但在 app.py 中不再使用 import,而是通過 include 加載的方式,我們看一下:
# celery_demo/tasks/task1.py
from app import app
@app.task
def add(x, y):
return x + y
@app.task
def sub(x, y):
return x - y
# celery_demo/app.py
from celery import Celery
import config
# 通過 include 指定存放任務的 py 文件
# 注意它和 worker 啟動路徑之間的關系
# 我們是在 celery_demo 目錄下啟動的 worker
# 所以應該寫成 "tasks.task1"
# 如果是在 celery_demo 的上一級目錄啟動 worker
# 那么這里就要指定為 "celery_demo.tasks.task1"
# 當然啟動時的 -A app 也要換成 -A celery_demo.app
app = Celery(__name__, include=["tasks.task1"])
# 如果還有其它文件,比如 task2.py, task3.py
# 那么就把 "tasks.task2", "tasks.task3" 加進去
app.config_from_object(config)
在 celery_demo 目錄下重新啟動 worker。
為了方便,我們只保留了兩個任務工廠。可以看到此時就成功啟動了,并且也更加方便和優雅一些。之前是在 task1.py 中定義函數,然后再把 task1.py 中的函數導入到 app.py 里面,然后手動進行裝飾。雖然這么做是沒問題的,但很明顯這種做法不適合管理。
所以還是要將 app.py 中的 app 導入到 task1.py 中直接創建任務工廠,但如果再將 task1.py 中的任務工廠導入到 app.py 中就會發生循環導入。于是 celery 提供了一個 include 參數,可以在創建 app 的時候自動將里面所有的任務工廠加載進來,然后啟動并告訴 worker。
我們來測試一下:
# 通過 tasks.task1 導入任務工廠
# 然后創建任務,發送至隊列
>>> from tasks.task1 import add, sub
>>> add.delay(11, 22).get()
33
>>> sub.delay(11, 22).get()
-11
查看一下 worker 的輸出:
結果一切正常。
Task 對象
我們之前通過對一個函數使用 @app.task 即可將其變成一個任務工廠,而這個任務工廠就是一個 Task 實例對象。而我們在使用 @app.task 的時候,其實是可以加上很多的參數的,常用參數如下:
-
name:默認的任務名是一個uuid,我們可以通過 name 參數指定任務名,當然這個 name 就是 apply_async 的參數 name。如果在 apply_async 中指定了,那么以 apply_async 指定的為準;
-
bind:一個 bool 值,表示是否和任務工廠進行綁定。如果綁定,任務工廠會作為參數傳遞到方法中;
-
base:定義任務的基類,用于定義回調函數,當任務到達某個狀態時觸發不同的回調函數,默認是 Task,所以我們一般會自己寫一個類然后繼承 Task;
-
default_retry_delay:設置該任務重試的延遲機制,當任務執行失敗后,會自動重試,單位是秒,默認是3分鐘;
-
serializer:指定序列化的方法;
當然 app.task 還有很多不常用的參數,這里就不說了,有興趣可以去查看官網或源碼,我們演示一下幾個常用的參數:
# celery_demo/tasks/task1.py
from app import app
@app.task(name="你好")
def add(x, y):
return x + y
@app.task(name="我不好", bind=True)
def sub(self, x, y):
"""
如果 bind=True,則需要多指定一個 self
這個 self 就是對應的任務工廠
"""
# self.request 是一個 celery.task.Context 對象
# 獲取它的屬性字典,即可拿到該任務的所有屬性
print(self.request.__dict__)
return x - y
其它代碼不變,我們重新啟動 worker:
然后創建任務發送至隊列,再由 worker 取出執行:
>>> from tasks.task1 import add, sub
>>> add.delay(111, 222).get()
333
>>> sub.delay(111, 222).get()
-111
執行沒有問題,然后看看 worker 的輸出:
創建任務工廠時,如果指定了 bind=True,那么執行任務時會將任務工廠本身作為第一個參數傳過去。任務工廠本質上就是 Task 實例對象,調用它的 delay 方法即可創建任務。
所以如果我們在 sub 內部繼續調用 self.delay(11, 22),會有什么后果呢?沒錯,worker 會進入無限遞歸。因為執行任務的時候,在任務的內部又創建了任務,所以會死循環下去。
當然 self 還有很多其它屬性和方法,具體有哪些可以通過 Task 這個類來查看。這里面比較重要的是 self.request,它包含了某個具體任務的相關信息,而且信息非常多。
比如當前傳遞的參數是什么,就可以通過 self.request 拿到。當然啦,self.request 是一個 Context 對象,因為不同任務獲取 self.request 的結果肯定是不同的,但 self(任務工廠)卻只有一個,所以要基于 Context 進行隔離。
我們可以通過 __dict__ 拿到 Context 對象的屬性字典,然后再進行操作。
最后再來說一說 @app.task 里面的 base 參數。
# celery_demo/tasks/task1.py
from celery import app
from app import Task
class MyTask(Task):
"""
自定義一個類,繼承自celery.Task
exc: 失敗時的錯誤的類型;
task_id: 任務的id;
args: 任務函數的位置參數;
kwargs: 任務函數的關鍵字參數;
einfo: 失敗時的異常詳細信息;
retval: 任務成功執行的返回值;
"""
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""任務失敗時執行"""
def on_success(self, retval, task_id, args, kwargs):
"""任務成功時執行"""
print("任務執行成功")
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""任務重試時執行"""
# 使用 @app.task 的時候,指定 base 即可
# 然后任務在執行的時候,會觸發 MyTask 里面的回調函數
@app.task(name="地靈殿", base=MyTask)
def add(x, y):
print("加法計算")
return x + y
重新啟動 worker,然后創建任務。
指定了 base,任務在執行的時候會根據執行狀態的不同,觸發 MyTask 里面的不同方法。
自定義任務流
有時候我們也可以將執行的多個任務,劃分到一個組中。
# celery_demo/tasks/task1.py
from app import app
@app.task()
def add(x, y):
print("加法計算")
return x + y
@app.task()
def sub(x, y):
print("減法計算")
return x - y
@app.task()
def mul(x, y):
print("乘法計算")
return x * y
@app.task()
def div(x, y):
print("除法計算")
return x // y
老規矩,重啟 worker,因為我們修改了任務工廠。
然后來導入它們,創建任務,并將這些任務劃分到一個組中。
>>> from tasks.task1 import add, sub, mul, div
>>> from celery import group
# 調用 signature 方法,得到 signature 對象
# 此時 t1.delay() 和 add.delay(2, 3) 是等價的
>>> t1 = add.signature(args=(2, 3))
>>> t2 = sub.signature(args=(2, 3))
>>> t3 = mul.signature(args=(2, 3))
>>> t4 = div.signature(args=(4, 2))
# 但是變成 signature 對象之后,
# 我們可以將其放到一個組里面
>>> gp = group(t1, t2, t3, t4)
# 執行組任務
# 返回 celery.result.GroupResult 對象
>>> res = gp()
# 每個組也有一個唯一 id
>>> print("組id:", res.id)
組id: 65f32cc4-b8ce-4bf8-916b-f5cc359a901a
# 調用 get 方法也會阻塞,知道組里面任務全部完成
>>> print("組結果:", res.get())
組結果: [5, -1, 6, 2]
>>>
可以看到整個組也是有唯一 id 的,另外 signature 也可以寫成 subtask 或者 s,在源碼里面這幾個是等價的。
我們觀察一下 worker 的輸出,任務是并發執行的,所以哪個先完成不好說。但是調用組的 get 方法時,里面的返回值順序一定和任務添加時候的順序保持一致。
除此之外,celery 還支持將多個任務像鏈子一樣串起來,第一個任務的輸出會作為第二個任務的輸入,傳遞給下一個任務的第一個參數。
# celery_demo/tasks/task1.py
from app import app
@app.task
def task1():
l = []
return l
@app.task
# task1 的返回值會傳遞給這里的 task1_return
def task2(task1_return, value):
task1_return.append(value)
return task1_return
@app.task
# task2 的返回值會傳遞給這里的 task2_return
def task3(task2_return, num):
return [i + num for i in task2_return]
@app.task
# task3 的返回值會傳遞給這里的 task3_return
def task4(task3_return):
return sum(task3_return)
然后我們看怎么將這些任務像鏈子一樣串起來。
>>> from tasks.task1 import *
>>> from celery import chain
# 將多個 signature 對象進行與運算
# 當然內部肯定重寫了 __or__ 這個魔法方法
>>> my_chain = chain(
... task1.s() | task2.s(123) | task3.s(5) | task4.s())
# 執行任務鏈
>>> res = my_chain()
# 獲取返回值
>>> print(res.get())
128
這種鏈式處理的場景非常常見,比如 MapReduce。
celery 實現定時任務
既然是定時任務,那么就意味著 worker 要后臺啟動,否則一旦遠程連接斷開,就停掉了。因此 celery 是支持我們后臺啟動的,并且可以啟動多個。
# 啟動 worker
celery multi start w1 -A app -l info
# 可以同時啟動多個
celery multi start w2 w3 -A app -l info
# 以上我們就啟動了 3 個 worker
# 如果想停止的話
celery multi stop w1 w2 w3 -A app -l info
但是注意,這種啟動方式在 windows 上面不支持,因為 celery 會默認創建兩個目錄,分別是 /var/log/celery 和 /var/run/celery,顯然這是類 Unix 系統的目錄結構。
顯然啟動和關閉是沒有問題的,不過為了更好地觀察到輸出,我們還是用之前的方式,選擇前臺啟動。
然后回顧一下 celery 的架構,里面除了 producer 之外還有一個 celery beat,也就是調度器。我們調用任務工廠的 delay 方法,手動將任務發送到隊列,此時就相當于 producer。如果是設置定時任務,那么會由調度器自動將任務添加到隊列。
我們在 tasks 目錄里面再創建一個 period_task1.py 文件。
# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
from .task1 import task1, task2, task3, task4
@app.on_after_configure.connect
def period_task(sender, **kwargs):
# 第一個參數為 schedule,可以是 float,或者 crontab
# crontab 后面會說,第二個參數是任務,第三個參數是名字
sender.add_periodic_task(10.0, task1.s(),
name="每10秒執行一次")
sender.add_periodic_task(15.0, task2.s("task2"),
name="每15秒執行一次")
sender.add_periodic_task(20.0, task3.s(),
name="每20秒執行一次")
sender.add_periodic_task(
crontab(hour=18, minute=5, day_of_week=0),
task4.s("task4"),
name="每個星期天的18:05運行一次"
)
# celery_demo/tasks/task1.py
from app import app
@app.task
def task1():
print("我是task1")
return "task1你好"
@app.task
def task2(name):
print(f"我是{name}")
return f"{name}你好"
@app.task
def task3():
print("我是task3")
return "task3你好"
@app.task
def task4(name):
print(f"我是{name}")
return f"{name}你好"
既然使用了定時任務,那么一定要設置時區。
# celery_demo/config.py
broker_url = "redis://:maverick@82.157.146.194:6379/1"
result_backend = "redis://:maverick@82.157.146.194:6379/2"
# 之前說過,celery 默認使用 utc 時間
# 其實我們是可以手動禁用的,然后手動指定時區
enable_utc = False
timezone = "Asia/Shanghai"
最后是修改 app.py,將定時任務加進去。
from celery import Celery
import config
app = Celery(
__name__,
include=["tasks.task1", "tasks.period_task1"])
app.config_from_object(config)
下面就來啟動任務,先來啟動 worker,生產上應該后臺啟動,這里為了看到信息,選擇前臺啟動。
tasks.task1 里面的 4 個任務工廠都被添加進來了,然后再來啟動調度器。
調度器啟動之后會自動檢測定時任務,如果到時間了,就發送到隊列。而啟動調度器的命令如下:
根據調度器的輸出內容,我們知道定時任務執行完了,但很明顯定時任務本質上也是任務,只不過有定時功能,但也要發到隊列里面。然后 worker 從隊列里面取出任務,并執行,那么 worker 必然會有信息輸出。
調度器啟動到現在已經有一段時間了,worker 在終端中輸出了非常多的信息。
此時我們就成功實現了定時任務,并且是通過定義函數、打上裝飾器的方式實現的。除此之外,我們還可以通過配置的方式實現。
# celery_demo/tasks/period_task1.py
from celery.schedules import crontab
from app import app
# 此時也不需顯式導入任務工廠了
# 直接以字符串的方式指定即可
app.conf.beat_schedule = {
# 參數通過 args 和 kwargs 指定
"每10秒執行一次": {"task": "tasks.task1.task1",
"schedule": 10.0},
"每15秒執行一次": {"task": "tasks.task1.task2",
"schedule": 15.0,
"args": ("task2",)},
"每20秒執行一次": {"task": "tasks.task1.task3",
"schedule": 20.0},
"每個星期天的18:05運行一次": {"task": "tasks.task1.task4",
"schedule": crontab(hour=18,
minute=5,
day_of_week=0),
"args": ("task4",)}
}
需要注意:雖然我們不用顯式導入任務工廠,但其實是 celery 自動幫我們導入。由于這些任務工廠都位于 celery_demo/tasks/task1.py 里面,而 worker 也是在 celery_demo 目錄下啟動的,所以需要指定為 tasks.task1.task{1234}。
這種啟動方式也是可以成功的,貌似還更方便一些,但是會多出一個文件,用來存儲配置信息。
crontab 參數
定時任務除了指定一個浮點數之外(表示每隔多少秒執行一次),還可以指定 crontab。關于 crontab 應該都知道是什么,我們在 linux 上想啟動定時任務的話,直接 crontab -e 然后添加即可。
而 celery 的 crontab 和 Linux 高度相似,我們看一下函數原型就知道了。
簡單解釋一下:
-
minute:0-59,表示第幾分鐘觸發,* 表示每分鐘觸發一次;
-
hour:0-23,表示第幾個小時觸發,* 表示每小時都會觸發。比如 minute=2, hour=*,表示每小時的第二分鐘觸發一次;
-
day_of_week:0-6,表示一周的第幾天觸發,0 是星期天,1-6 分別是星期一到星期六,不習慣的話也可以用字符串 mon,tue,wed,thu,fri,sat,sun 表示;
-
month_of_year:當前年份的第幾個月;
以上就是這些參數的含義,并且參數接收的值還可以是一些特殊的通配符:
-
*:所有,比如 minute=*,表示每分鐘觸發;
-
*/a:所有可被 a 整除的時候觸發;
-
a-b:a 到 b范圍內觸發;
-
a-b/c:范圍 a-b 且能夠被 c 整除的時候觸發;
-
2,10,40:比如 minute=2,10,40 表示第 2、10、40 分鐘的時候觸發;
通配符之間是可以自由組合的,比如 */3,8-17 就表示能被 3 整除,或范圍處于 8-17 的時候觸發。
除此之外,還可以根據天色來設置定時任務(有點離譜)。
from celery.schedules import solar
app.conf.beat_schedule = {
"日落": {"task": "task1",
"schedule": solar("sunset",
-37.81753,
144.96715)
},
solar 里面接收三個參數,分別是 event、lat、lon,后兩個比較簡單,表示觀測者所在的緯度和經度。值大于 0,則對應東經/北緯,小于 0,則對應西經/南緯。
我們重點看第一個參數 event,可選值如下:
比如代碼中的 "sunset", -37.81753, 144.96715 就表示,當站在南緯 37.81753、東經 144.96715 的地方觀察,發現傍晚太陽的上邊緣消失在西方地平線上的時候,觸發任務執行。
個人覺得這個功能有點強悍,但估計絕大部分人應該都用不到,可能氣象領域相關的會用的比較多。
小結
以上就是 celery 的使用,另外這里的 broker 和 backend 用的都是 Redis,其實還可以使用 RabbitMQ 和數據庫。
broker_url = "amqp://admin:123456@82.157.146.194:5672//"
result_backend = "MySQL+pymysql://root:123456@82.157.146.194:3306/store"
可以自己測試一下,但不管用的是哪種存儲介質,對于我們使用 celery 而言,都是沒有區別的。
celery 在工作中用的還是比較多的,而且有一個調度工具 Apache airflow,它的核心調度功能也是基于 celery 實現的。