> Source: Pixabay
Apache Spark是一種開放源代碼的分布式計算引擎,目前是用于內存中批處理驅動的數據處理的最受歡迎的框架(它還支持實時數據流傳輸)。 得益于其先進的查詢優化器,DAG調度程序和執行引擎,Spark能夠非常高效地處理和分析大型數據集。 但是,在沒有仔細調整的情況下運行Spark作業仍會導致性能下降。
在博客文章中,我將分享一些Spark性能調優的技巧,以幫助您解決和加快運行緩慢的Spark作業。
(本文提到的所有功能均來自PySpark,您可以使用Spark API文檔找到與Scala / JAVA等效的功能。)
分區不均勻
當數據集最初由Spark加載并成為彈性分布式數據集(RDD)時,所有數據均勻地分布在分區之間。 但是,在用戶對其應用某些類型的數據操作之后,這些分區可能會變得不均勻。 例如,groupByKey操作可能導致分區偏斜,因為一個鍵可能比另一個鍵包含更多的記錄。 此外,由于Spark的DataFrameWriter允許使用partitionBy將分區數據寫入磁盤,因此磁盤上的分區也可能不均勻。
在DataFrame中重新平衡偏斜的分區將極大地提高Spark在DataFrame上的處理性能。 您可以使用getNumPartitions函數檢查DataFrame中的分區數,并通過運行簡單的Spark作業來查找每個分區中的記錄數,例如:
from pyspark.sql.functions import spark_partition_id
df.withColumn("partition_id", spark_partition_id())
.groupBy("partition_id")
.count()
.show()
如果發現DataFrame的分區大小高度不均勻,請在對它進行任何分析之前,使用重新分區或合并函數對DataFrame進行重新分區。 還建議在將數據寫回磁盤之前,先對內存中的數據進行分區。 RDD模塊也支持這些重新分區功能。
堅持RDD的缺點
由于惰性執行原理,除非用戶明確調用操作來收集結果,否則Spark不會對數據集執行任何實際的轉換。 此外,如果用戶希望對中間結果應用其他轉換,Spark將需要從頭開始重新計算所有內容。 為了允許用戶更有效地重用日期,Spark可以使用持久性或緩存功能將數據緩存在內存和/或磁盤中。
但是,緩存并不總是一個好主意。 Spark緩存數據集后,Catalyst優化器優化進一步轉換的能力將受到限制,因為它不再能夠改善源數據級別的修剪。 例如,如果將過濾器應用于在源數據庫中建立索引的列,則Catalyst將無法利用索引來提高性能。
因此,僅當緩存數據將在以后多次重用時才建議使用緩存數據。 迭代探索數據集或調整ML模型時。
> Source: Pixabay
基于成本的優化器(CBO)
基于成本的優化器(CBO)通過向Catalyst提供其他表級統計信息,可以加快Spark SQL作業的速度,這對于連接許多數據集的作業特別有用。 使用者可以通過將spark.sql.cbo.enabled設置為true(默認值)來啟用CBO。
為了充分利用CBO,用戶需要保持列級和表級統計信息都是最新的,從而使CBO可以使用準確的估算來優化查詢計劃。 為此,在對表運行SQL查詢之前,請使用ANALYZE TABLE命令收集統計信息。 記住在修改表之后再次分析表,以確保統計信息是最新的。
廣播Join
除了啟用CBO,在Spark中優化連接數據集的另一種方法是使用廣播聯接。 在無序連接中,兩個表中的記錄都將通過網絡傳輸給執行器,當一個表比另一個大得多時,這是次優的。 在廣播聯接中,較小的表將被發送給執行程序,以與較大的表聯接,從而避免了通過網絡發送大量數據的情況。
用戶可以通過spark.sql.autoBroadcastJoinThreshold配置控制廣播聯接,指示要廣播的表的最大大小。 此外,即使表的大小大于spark.sql.autoBroadcastJoinThreshold,也可以使用廣播提示來告訴Spark廣播表:
from pyspark.sql.functions import broadcast
broadcast(spark.table("tbl_a")).join(spark.table("tbl_b"), "key")
垃圾收集(GC)
由于所有Spark作業都占用大量內存,因此確保有效進行垃圾收集非常重要-我們希望產生較少的內存"垃圾"以減少GC時間。 要了解您的Spark作業是否在GC中花費過多時間,請在Spark UI中檢查"任務反序列化時間"和" GC時間"。
例如,由于Spark需要反序列化更多對象,因此使用用戶定義函數(UDF)和lambda函數將導致更長的GC時間。 還建議避免創建中間對象并將不必要的RDD緩存到JVM堆。
TL; DR:
· 使用重新分區或合并來重新平衡不均勻的分區。
· 僅當數據將被多次重用時才保留數據。
· 使用ANALYZE TABLE命令可以維護CBO的最新統計信息。
· 為小表啟用廣播連接以加快連接速度。
· 通過使用較少的UDF并避免緩存大對象來優化GC。
(本文翻譯自Xinran Waibel的文章《Apache Spark Optimization Toolkit》,參考:https://towardsdatascience.com/apache-spark-optimization-toolkit-17cf3e491992)