關鍵概念,安裝和DAG實際示例
> Photo by ?ahin Ye?ilyaprak on Unsplash
Airflow是用于自動化和安排任務和工作流程的工具。 如果您想以數據科學家,數據分析師或數據工程師的身份高效工作,那么擁有一個可以自動執行要定期重復的流程的工具至關重要。 從提取,轉換和加載常規分析報告的數據到自動重新訓練機器學習模型,這可以是任何事情。
Airflow使您可以輕松地自動化主要由Python和SQL編寫的簡單到復雜的流程,并具有豐富的Web UI來可視化,監視和修復可能出現的任何問題。
下面的文章是對該工具的完整介紹。 我已經包含了從虛擬環境中安裝到以簡單易懂的步驟運行第一個dag的所有內容。
我將本教程分為6個部分,以使其易于理解,以便您可以跳過已經熟悉的部分。 包括以下步驟:
· 基本氣流概念。
· 如何在虛擬環境中設置Airflow安裝。
· 運行Airflow Web UI和調度程序。
· 常見的CLI命令列表。
· Web UI導覽。
· 創建一個真實的示例DAG。
1.基本概念
在討論Airflow的安裝和使用之前,我將簡要介紹該工具至關重要的幾個概念。
DAG
該工具的核心是DAG(有向無環圖)的概念。 DAG是您要在工作流中運行的一系列任務。 這可能包括通過SQL查詢提取數據,使用Python執行一些計算,然后將轉換后的數據加載到新表中。 在Airflow中,每個步驟都將作為DAG中的單獨任務編寫。
通過Airflow,您還可以指定任務之間的關系,任何依賴關系(例如,在運行任務之前已將數據加載到表中)以及應按順序運行任務。
DAG用Python編寫,并另存為.py文件。 該工具廣泛使用DAG_ID來協調DAG的運行。
DAG運行
我們具體說明了DAG應何時通過execute_date自動運行。 DAG按照指定的時間表運行(由CRON表達式定義),該時間表可以是每天,每周,每分鐘或幾乎任何其他時間間隔
Operator
Operator將要在每個任務中執行的操作封裝在DAG中。 Airflow有大量的內置操作員,它們可以執行特定任務,其中一些特定于平臺。 此外,可以創建自己的自定義運算符。
2.安裝
我將為您提供在隔離的Pipenv環境中進行Airflow設置的個人設置。 如果您使用其他虛擬環境工具,則步驟可能會有所不同。 此設置的大部分靈感來自于出色的Stackoverflow線程。
對您的Airflow項目使用版本控制是一個好主意,因此第一步是在Github上創建一個存儲庫。 我叫我airflow_sandbox。 使用git clone" git web url"創建到本地環境的存儲庫克隆后。
從終端導航到目錄,例如 cd / path / to / my_airflow_directory。
進入正確的目錄后,我們將安裝pipenv環境以及特定版本的Python,Airflow本身和Flask,這是運行Airflow所必需的依賴項。 為了使一切正常工作,最好為所有安裝指定特定版本。
pipenv install --python=3.7 Flask==1.0.3 Apache-airflow==1.10.3
氣流需要在本地系統上運行一個稱為AIRFLOW_HOME的位置。 如果未指定,則默認為您的路線目錄。 我更喜歡通過在.env文件中指定它來在我正在工作的項目目錄的路徑中設置Airflow。 為此,只需運行以下命令。
echo "AIRFLOW_HOME=${PWD}/airflow" >> .env
接下來,我們初始化pipenv環境。
pipenv shell
氣流需要運行數據庫后端。 默認設置為此使用SQLite數據庫,這對于學習和實驗來說是很好的選擇。 如果您想建立自己的數據庫后端,那么氣流文檔會提供很好的指導。 初始化數據庫類型。
airflow initdb
最后,我們創建一個目錄來存儲我們的dag。
mkdir -p ${AIRFLOW_HOME}/dags/
就是說初始基本設置完成了。 您現在應該具有一個如下所示的項目結構。
3.運行Airflow
Airflow具有出色的Web UI,您可以在其中查看和監視您的問題。 要啟動Web服務器以查看UI,只需運行以下CLI命令。 默認情況下,Airflow將使用端口8080,因為我已經使用該端口運行我指定8081的其他端口。
airflow webserver -p 8081
我們還需要啟動調度程序。
airflow scheduler
現在,如果我們導航到http:// localhost:8081 / admin /?showPaused = True。 我們將看到以下屏幕。
Airflow有一些顯示在用戶界面中的示例dag。 一旦開始創建自己的窗口,您可以通過單擊屏幕底部的"隱藏暫停的DAG"來隱藏它們。
4.基本的CLI命令
讓我們使用這些示例dag來瀏覽一些常見的Airflow CLI命令。
讓我們從教程dag中運行sleep任務。
airflow run tutorial sleep 2020-05-31
我們可以在教程DAG中列出任務。
bash-3.2$ airflow list_tasks tutorial
暫停此DAG。
airflow pause tutorial
取消暫停教程。
airflow unpause tutorial
回填(在過去的日期執行任務)。 指定dag_id(教程),開始日期(-s)和結束日期(-e)。
airflow backfill tutorial -s 2020-05-28 -e 2020-05-30
有關CLI命令的完整列表,請參見文檔中的此頁面。
5. Web UI
我們可以從Web UI監視,檢查和運行任務。 如果返回到Web服務器,我們可以看到我們在教程DAG上運行的CLI命令的效果。 為了便于查看,我隱藏了暫停的dag。
我們可以通過多種方法來檢查DAGS的運行情況。
如果我們選擇樹狀視圖。
我們可以輕松查看哪些任務已運行,正在運行或已失敗。
我們還可以通過單擊小方塊從此處運行,清除或標記特定任務。
如果單擊"渲染"按鈕,我們可以查看已運行的代碼或命令。
通過"代碼"視圖,我們可以查看組成DAG的代碼。
Graph View是一種很好的方式可視化任務的排序或相關方式。
Web UI中的另一個重要區域是Admin。 在這里,您可以定義與其他平臺(如數據庫)的連接,并定義可重用的變量。
6.第一個DAG
我將在此處嘗試提供一個接近實際的DAG示例,以說明至少一種使用Airflow的方法,并介紹隨之而來的一些復雜性。
我將編寫一個Airflow DAG,它首先檢查BigQuery公共數據集中感興趣日期的數據是否存在,然后按日程將數據加載到我自己的私有項目中的表中。
BigQuery有一個免費的使用層,該層允許您每月查詢1TB數據,因此,如果您想自己嘗試一下,則可以以零成本進行。
BigQuery設置
為了同時使用Airflow和BigQuery,我們需要首先完成一些附加的設置步驟。
為了能夠通過Airflow查詢和加載BigQuery中的數據,我們需要首先授予Airflow所需的安全權限。
為此,您需要在google Cloud Platform上創建一個服務帳戶。 這有點像創建一個有權訪問您的帳戶的用戶,但其目的是允許其他平臺訪問。
首先,從Google Cloud Console導航到服務帳戶。 然后單擊創建服務帳戶按鈕。
接下來填寫顯示的表格。
在下一頁上,您需要選擇要授予的訪問級別。 我已選擇所有資源的編輯器,因為這是我的個人帳戶,并且此處沒有存儲任何敏感數據。 如果我更擔心潛在的安全問題,那么我將授予更多的細化權限。
接下來,您需要創建一個私鑰,可以通過選擇創建密鑰來完成。 選擇JSON,因為這是Airflow所需要的。 私鑰將被下載到您需要安全存儲的本地系統。
現在,我們需要返回Airflow Web UI并使用此JSON文件的輸出更新bigquery_default連接。 您還需要添加一個默認的項目ID,如下所示。
我們還需要在Google Pipenv環境中安裝一些Google Cloud依賴項。 我已經安裝了以下內容。
pipenv install google-cloud-storage httplib2 google-api-python-client google-cloud-bigquery pandas_gbq
創建DAG
以下是將執行上述步驟的DAG的代碼。 應將其另存為.py文件在我們之前創建的dags目錄中。
DAG的頂部是必需的進口。 Airflow提供了一系列運營商,可在Google Cloud Platform上執行大多數功能。 我已經導入了BigQueryOperator(用于運行查詢和加載數據)和BigQueryCheckOperator(用于檢查特定日期的數據是否存在)。
在DAG的下一部分中,我們定義dag_args,然后創建DAG,該DAG提供諸如dag_id,start_date和應該多長時間運行一次任務等信息。 Airflow使用CRON表達式定義時間表,有關這些表達式的更多信息,請訪問此頁面。
然后,我們將每個步驟定義為一項任務,我將其定義為變量t1和t2。 這些任務均在工作流程中執行特定步驟。 這些命令的運行順序位于DAG的最底部。
現在,我們可以轉到Web UI并運行DAG。
如果我們轉到BigQuery控制臺,我們還將看到Airflow已創建并加載了數據的表。
本文旨在作為一個完整的介紹,讓您開始使用Airflow創建第一個DAG并開始運行。 有關更詳細的使用指南,請在此處找到Airflow文檔。
可以在此Github存儲庫中找到本文詳細介紹的完整項目的鏈接。
謝謝閱讀!
如果您想加入,我會每月發送一次通訊,請通過此鏈接注冊。 期待成為您學習之旅的一部分!
(本文翻譯自Rebecca Vickery的文章《A Complete Introduction to Apache Airflow》,參考:https://towardsdatascience.com/a-complete-introduction-to-apache-airflow-b7e238a33df)