這張圖解釋了 Apache Spark DataFrame 寫入 API 的流程。它始于對(duì)寫入數(shù)據(jù)的 API 調(diào)用,支持的格式包括 CSV、JSON 或 Parquet。流程根據(jù)選擇的保存模式(追加、覆蓋、忽略或報(bào)錯(cuò))而分岔。每種模式執(zhí)行必要的檢查和操作,例如分區(qū)和數(shù)據(jù)寫入處理。流程以數(shù)據(jù)的最終寫入或錯(cuò)誤結(jié)束,取決于這些檢查和操作的結(jié)果。
Apache Spark 是一個(gè)開源的分布式計(jì)算系統(tǒng),提供了強(qiáng)大的平臺(tái)用于處理大規(guī)模數(shù)據(jù)。寫入 API 是 Spark 數(shù)據(jù)處理能力的基本組成部分,允許用戶將數(shù)據(jù)從他們的 Spark 應(yīng)用程序?qū)懭牖蜉敵龅讲煌臄?shù)據(jù)源。
一、理解 Spark 寫入 API
1.數(shù)據(jù)源
Spark 支持將數(shù)據(jù)寫入各種數(shù)據(jù)源,包括但不限于:
- 分布式文件系統(tǒng),如 HDFS
- 云存儲(chǔ),如 AWS S3、Azure Blob Storage
- 傳統(tǒng)數(shù)據(jù)庫(包括 SQL 和 NoSQL)
- 大數(shù)據(jù)文件格式(Parquet、Avro、ORC)
2.DataFrameWriter
寫入 API 的核心類是 DataFrameWriter。它提供配置和執(zhí)行寫入操作的功能。通過在 DataFrame 或 Dataset 上調(diào)用 .write 方法獲得 DataFrameWriter。
3.寫入模式
指定 Spark 在寫入數(shù)據(jù)時(shí)應(yīng)如何處理現(xiàn)有數(shù)據(jù)的模式。常見的模式包括:
- Append:將新數(shù)據(jù)添加到現(xiàn)有數(shù)據(jù)中。
- overwrite:用新數(shù)據(jù)覆蓋現(xiàn)有數(shù)據(jù)。
- ignore:如果數(shù)據(jù)已存在,則忽略寫入操作。
- errorIfExists(默認(rèn)):如果數(shù)據(jù)已存在,則拋出錯(cuò)誤。
4.格式規(guī)范
可以使用 .format("formatType") 方法指定輸出數(shù)據(jù)的格式,如 JSON、CSV、Parquet 等。
5.分區(qū)
為了實(shí)現(xiàn)有效的數(shù)據(jù)存儲(chǔ),可以使用 .partitionBy("column") 方法根據(jù)一個(gè)或多個(gè)列對(duì)輸出數(shù)據(jù)進(jìn)行分區(qū)。
6.配置選項(xiàng)
可以使用 .option("key", "value") 方法設(shè)置特定于數(shù)據(jù)源的各種選項(xiàng),如壓縮、CSV 文件的自定義分隔符等。
7.保存數(shù)據(jù)
最后,使用 .save("path") 方法將 DataFrame 寫入指定的路徑。其他方法如 .saveAsTable("tableName") 也可用于不同的寫入場景。
from pyspark.sql import SparkSession
from pyspark.sql import Row
import os
# 初始化 SparkSession
spark = SparkSession.builder
.appName("DataFrameWriterSaveModesExample")
.getOrCreate()
# 示例數(shù)據(jù)
data = [
Row(name="Alice", age=25, country="USA"),
Row(name="Bob", age=30, country="UK")
]
# 附加數(shù)據(jù)用于追加模式
additional_data = [
Row(name="Carlos", age=35, country="SpAIn"),
Row(name="Daisy", age=40, country="Australia")
]
# 創(chuàng)建 DataFrames
df = spark.createDataFrame(data)
additional_df = spark.createDataFrame(additional_data)
# 定義輸出路徑
output_path = "output/csv_save_modes"
# 函數(shù):列出目錄中的文件
def list_files_in_directory(path):
files = os.listdir(path)
return files
# 顯示初始 DataFrame
print("初始 DataFrame:")
df.show()
# 使用覆蓋模式寫入 CSV 格式
df.write.csv(output_path, mode="overwrite", header=True)
print("覆蓋模式后的文件:", list_files_in_directory(output_path))
# 顯示附加 DataFrame
print("附加 DataFrame:")
additional_df.show()
# 使用追加模式寫入 CSV 格式
additional_df.write.csv(output_path, mode="append", header=True)
print("追加模式后的文件:", list_files_in_directory(output_path))
# 使用忽略模式寫入 CSV 格式
additional_df.write.csv(output_path, mode="ignore", header=True)
print("忽略模式后的文件:", list_files_in_directory(output_path))
# 使用 errorIfExists 模式寫入 CSV 格式
try:
additional_df.write.csv(output_path, mode="errorIfExists", header=True)
except Exception as e:
print("errorIfExists 模式中發(fā)生錯(cuò)誤:", e)
# 停止 SparkSession
spark.stop()
二、Spark 架構(gòu)概述
在 Apache Spark 中寫入 DataFrame 遵循一種順序流程。Spark 基于用戶 DataFrame 操作創(chuàng)建邏輯計(jì)劃,優(yōu)化為物理計(jì)劃,并分成階段。系統(tǒng)按分區(qū)處理數(shù)據(jù),對(duì)其進(jìn)行日志記錄以確保可靠性,并帶有定義的分區(qū)和寫入模式寫入到本地存儲(chǔ)。Spark 的架構(gòu)確保在計(jì)算集群中高效管理和擴(kuò)展數(shù)據(jù)寫入任務(wù)。
從 Spark 內(nèi)部架構(gòu)的角度來看,Apache Spark 寫入 API 涉及了解 Spark 如何在幕后管理數(shù)據(jù)處理、分發(fā)和寫入操作。讓我們來詳細(xì)了解:
三、Spark 架構(gòu)概述
- 驅(qū)動(dòng)程序和執(zhí)行器: Spark 采用主從架構(gòu)。驅(qū)動(dòng)節(jié)點(diǎn)運(yùn)行應(yīng)用程序的 main() 函數(shù)并維護(hù)有關(guān) Spark 應(yīng)用程序的信息。執(zhí)行器節(jié)點(diǎn)執(zhí)行數(shù)據(jù)處理和寫入操作。
- DAG 調(diào)度器: 當(dāng)觸發(fā)寫入操作時(shí),Spark 的 DAG(有向無環(huán)圖)調(diào)度器將高級(jí)轉(zhuǎn)換轉(zhuǎn)換為一系列可以在集群中并行執(zhí)行的階段。
- 任務(wù)調(diào)度器: 任務(wù)調(diào)度器在每個(gè)階段內(nèi)啟動(dòng)任務(wù)。這些任務(wù)分布在執(zhí)行器之間。
- 執(zhí)行計(jì)劃和物理計(jì)劃: Spark 使用 Catalyst 優(yōu)化器創(chuàng)建高效的執(zhí)行計(jì)劃。這包括將邏輯計(jì)劃(要做什么)轉(zhuǎn)換為物理計(jì)劃(如何做),考慮到分區(qū)、數(shù)據(jù)本地性和其他因素。
四、在 Spark 內(nèi)部寫入數(shù)據(jù)
(1) 數(shù)據(jù)分布: Spark 中的數(shù)據(jù)分布在分區(qū)中。當(dāng)啟動(dòng)寫入操作時(shí),Spark 首先確定這些分區(qū)中的數(shù)據(jù)布局。
(2) 寫入任務(wù)執(zhí)行: 每個(gè)分區(qū)的數(shù)據(jù)由一個(gè)任務(wù)處理。這些任務(wù)在不同的執(zhí)行器之間并行執(zhí)行。
寫入模式和一致性:
- 對(duì)于 overwrite 和 append 模式,Spark 確保一致性,通過管理數(shù)據(jù)文件的替換或添加來實(shí)現(xiàn)。
- 對(duì)于基于文件的數(shù)據(jù)源,Spark 以分階段的方式寫入數(shù)據(jù),先寫入臨時(shí)位置再提交到最終位置,有助于確保一致性和處理故障。
(3) 格式處理和序列化: 根據(jù)指定的格式(例如,Parquet、CSV),Spark 使用相應(yīng)的序列化器將數(shù)據(jù)轉(zhuǎn)換為所需的格式。執(zhí)行器處理此過程。
(4) 分區(qū)和文件管理:
- 如果指定了分區(qū),則Spark在寫入之前根據(jù)這些分區(qū)對(duì)數(shù)據(jù)進(jìn)行排序和組織。這通常涉及在執(zhí)行器之間移動(dòng)數(shù)據(jù)。
- Spark 試圖最小化每個(gè)分區(qū)創(chuàng)建的文件數(shù)量,以優(yōu)化大文件大小,在分布式文件系統(tǒng)中更有效。
(5) 錯(cuò)誤處理和容錯(cuò): 在寫入操作期間,如果任務(wù)失敗,Spark 可以重試任務(wù),確保容錯(cuò)。但并非所有寫入操作都是完全原子的,特定情況可能需要手動(dòng)干預(yù)以確保數(shù)據(jù)完整性。
(6) 優(yōu)化技術(shù):
- Catalyst 優(yōu)化器: 為效率優(yōu)化寫入計(jì)劃,例如最小化數(shù)據(jù)移動(dòng)。
- Tungsten: Spark 的 Tungsten 引擎優(yōu)化數(shù)據(jù)序列化和反序列化過程中的內(nèi)存和 CPU 使用。
(7) 寫入提交協(xié)議: Spark 使用寫入提交協(xié)議來協(xié)調(diào)特定數(shù)據(jù)源的任務(wù)提交和中止過程,確保對(duì)寫入數(shù)據(jù)的一致視圖。
Spark 的寫入 API 旨在實(shí)現(xiàn)高效和可靠的數(shù)據(jù)寫入,它以復(fù)雜的方式編排任務(wù)分發(fā)、數(shù)據(jù)序列化和文件管理。它利用 Spark 的核心組件,如 DAG 調(diào)度器、任務(wù)調(diào)度器和 Catalyst 優(yōu)化器,有效地執(zhí)行寫入操作。