雖然現代的網絡應用比以往任何時候都更快速、更便捷,但仍有許多情況下,需要把繁重的任務轉移到系統的其他部分執行,而不是在主線程上進行工作。
這些情況中的示例如下:
- 周期性任務 —— 計劃在特定時間間隔內運行的工作。例如,每日、每月的報告生成。
- 第三方工具 —— 應用程序應該快速向用戶返回響應,而不是等待其他任務先完成。例如,發送電子郵件、通知,將更新進度傳遞給內部工具。
- 長時間運行的工作 —— 執行復雜或資源昂貴的工作,并且用戶需要等待工作完成。例如。DAG工作流、基于Map-Reduce的任務、長時間運行的Spark作業等。
那么,如何處理這些情況呢?這時,Celery就派上用場了。
什么是Celery?
Celery是一個開源的任務隊列實現,通常與基于Python/ target=_blank class=infotextkey>Python的網絡框架(如Flask和Django)相結合,在典型的請求-響應周期之外異步執行任務。
因此,Celery本質上是一個基于分布式消息傳遞的任務隊列。執行單元或任務在一個或多個worker上使用多處理、gevent或Eventlet同時執行。這些任務可以同步執行(即等到準備就緒)或異步執行(即在后臺)。
Celery是如何工作的?
Celery是一個分布式任務隊列,基于生產者-消費者模式。
任務隊列是用于跨線程和機器分配工作的機制,本質上是生產者(Web應用程序)和消費者(Celery工作者)之間的消息中介。
Celery通過消息進行交互,代理(broker)在客戶(生產者)和工作者(消費者)之間充當中間人。為了啟動任務,客戶端將消息推送到隊列中,然后代理將該消息傳遞給工作者。
Celery系統可以由多個worker和broker組成,這為高可用性和橫向擴展提供了可能。
簡而言之,Celery客戶端是生產者,它通過消息代理向隊列中添加新的任務。然后,Celery工作者同樣通過消息代理從隊列中獲取新的任務。一旦處理完畢,結果就會存儲在結果后端。
工作實例
下面的例子將使用redisMQ作為消息代理。
設置Redis
在linux/macOS系統上,通過以下命令在本地運行Redis服務器:
$ wget http://download.redis.io/redis-stable.tar.gz
$ tar xvzf redis-stable.tar.gz
$ rm redis-stable.tar.gz
$ cd redis-stable
$ make
設置好Redis后,通過執行以下命令運行Redis服務器:
$ redis-server
該服務器在默認的6379端口運行。
設置應用程序
首先,在本地設置Python項目。
Celery可以通過標準工具如pip或easy_install來安裝。通過以下命令安裝Celery和Redis:
pip install celery redis==4.3.4
現在需要一個Celery實例來運行應用程序,Celery實現任何任務都是以實例開始,比如創建和管理任務等。
在項目中創建一個文件tasks.py:
From celery import Celery
broker_url = 'redi://localhost:6379/0'
App = Celery('tasks',broker = broker_url)
@app.task
def add(x, y):
return x+y
這里定義了一個簡單的任務add(),返回兩個數字的總和。
運行Celery Worker
在終端上,切換到項目位置并用以下命令運行Celery worker:
$ celery -A tasks worker - loglevel=info
關于Celery worker命令行的詳細信息,可以使用help:
$ celery worker - help
調用任務
在Celery中,使用delay()方法來調用任務。
打開項目的另一個終端窗口并運行以下命令:
$ python
這將打開Python命令行。
>> from tasks import add
>> add.delay(1,2)
這將返回一個AsyncResult實例,可以用來檢查任務狀態,獲得其返回值,等待任務完成,也可以在失敗時獲得異常和回溯。
運行add.delay()命令后,任務會被推送到隊列中,然后被worker獲取。這可以在Celery worker終端上進行驗證,可以清楚地看到任務被接收,之后任務成功完成。