> Source: Pixabay
Apache Spark是一種開放源代碼的分布式計算引擎,目前是用于內(nèi)存中批處理驅(qū)動的數(shù)據(jù)處理的最受歡迎的框架(它還支持實時數(shù)據(jù)流傳輸)。 得益于其先進的查詢優(yōu)化器,DAG調(diào)度程序和執(zhí)行引擎,Spark能夠非常高效地處理和分析大型數(shù)據(jù)集。 但是,在沒有仔細調(diào)整的情況下運行Spark作業(yè)仍會導致性能下降。
在博客文章中,我將分享一些Spark性能調(diào)優(yōu)的技巧,以幫助您解決和加快運行緩慢的Spark作業(yè)。
(本文提到的所有功能均來自PySpark,您可以使用Spark API文檔找到與Scala / JAVA等效的功能。)
分區(qū)不均勻
當數(shù)據(jù)集最初由Spark加載并成為彈性分布式數(shù)據(jù)集(RDD)時,所有數(shù)據(jù)均勻地分布在分區(qū)之間。 但是,在用戶對其應用某些類型的數(shù)據(jù)操作之后,這些分區(qū)可能會變得不均勻。 例如,groupByKey操作可能導致分區(qū)偏斜,因為一個鍵可能比另一個鍵包含更多的記錄。 此外,由于Spark的DataFrameWriter允許使用partitionBy將分區(qū)數(shù)據(jù)寫入磁盤,因此磁盤上的分區(qū)也可能不均勻。
在DataFrame中重新平衡偏斜的分區(qū)將極大地提高Spark在DataFrame上的處理性能。 您可以使用getNumPartitions函數(shù)檢查DataFrame中的分區(qū)數(shù),并通過運行簡單的Spark作業(yè)來查找每個分區(qū)中的記錄數(shù),例如:
from pyspark.sql.functions import spark_partition_id
df.withColumn("partition_id", spark_partition_id())
.groupBy("partition_id")
.count()
.show()
如果發(fā)現(xiàn)DataFrame的分區(qū)大小高度不均勻,請在對它進行任何分析之前,使用重新分區(qū)或合并函數(shù)對DataFrame進行重新分區(qū)。 還建議在將數(shù)據(jù)寫回磁盤之前,先對內(nèi)存中的數(shù)據(jù)進行分區(qū)。 RDD模塊也支持這些重新分區(qū)功能。
堅持RDD的缺點
由于惰性執(zhí)行原理,除非用戶明確調(diào)用操作來收集結(jié)果,否則Spark不會對數(shù)據(jù)集執(zhí)行任何實際的轉(zhuǎn)換。 此外,如果用戶希望對中間結(jié)果應用其他轉(zhuǎn)換,Spark將需要從頭開始重新計算所有內(nèi)容。 為了允許用戶更有效地重用日期,Spark可以使用持久性或緩存功能將數(shù)據(jù)緩存在內(nèi)存和/或磁盤中。
但是,緩存并不總是一個好主意。 Spark緩存數(shù)據(jù)集后,Catalyst優(yōu)化器優(yōu)化進一步轉(zhuǎn)換的能力將受到限制,因為它不再能夠改善源數(shù)據(jù)級別的修剪。 例如,如果將過濾器應用于在源數(shù)據(jù)庫中建立索引的列,則Catalyst將無法利用索引來提高性能。
因此,僅當緩存數(shù)據(jù)將在以后多次重用時才建議使用緩存數(shù)據(jù)。 迭代探索數(shù)據(jù)集或調(diào)整ML模型時。

> Source: Pixabay
基于成本的優(yōu)化器(CBO)
基于成本的優(yōu)化器(CBO)通過向Catalyst提供其他表級統(tǒng)計信息,可以加快Spark SQL作業(yè)的速度,這對于連接許多數(shù)據(jù)集的作業(yè)特別有用。 使用者可以通過將spark.sql.cbo.enabled設置為true(默認值)來啟用CBO。
為了充分利用CBO,用戶需要保持列級和表級統(tǒng)計信息都是最新的,從而使CBO可以使用準確的估算來優(yōu)化查詢計劃。 為此,在對表運行SQL查詢之前,請使用ANALYZE TABLE命令收集統(tǒng)計信息。 記住在修改表之后再次分析表,以確保統(tǒng)計信息是最新的。
廣播Join
除了啟用CBO,在Spark中優(yōu)化連接數(shù)據(jù)集的另一種方法是使用廣播聯(lián)接。 在無序連接中,兩個表中的記錄都將通過網(wǎng)絡傳輸給執(zhí)行器,當一個表比另一個大得多時,這是次優(yōu)的。 在廣播聯(lián)接中,較小的表將被發(fā)送給執(zhí)行程序,以與較大的表聯(lián)接,從而避免了通過網(wǎng)絡發(fā)送大量數(shù)據(jù)的情況。
用戶可以通過spark.sql.autoBroadcastJoinThreshold配置控制廣播聯(lián)接,指示要廣播的表的最大大小。 此外,即使表的大小大于spark.sql.autoBroadcastJoinThreshold,也可以使用廣播提示來告訴Spark廣播表:
from pyspark.sql.functions import broadcast
broadcast(spark.table("tbl_a")).join(spark.table("tbl_b"), "key")
垃圾收集(GC)
由于所有Spark作業(yè)都占用大量內(nèi)存,因此確保有效進行垃圾收集非常重要-我們希望產(chǎn)生較少的內(nèi)存"垃圾"以減少GC時間。 要了解您的Spark作業(yè)是否在GC中花費過多時間,請在Spark UI中檢查"任務反序列化時間"和" GC時間"。
例如,由于Spark需要反序列化更多對象,因此使用用戶定義函數(shù)(UDF)和lambda函數(shù)將導致更長的GC時間。 還建議避免創(chuàng)建中間對象并將不必要的RDD緩存到JVM堆。
TL; DR:
· 使用重新分區(qū)或合并來重新平衡不均勻的分區(qū)。
· 僅當數(shù)據(jù)將被多次重用時才保留數(shù)據(jù)。
· 使用ANALYZE TABLE命令可以維護CBO的最新統(tǒng)計信息。
· 為小表啟用廣播連接以加快連接速度。
· 通過使用較少的UDF并避免緩存大對象來優(yōu)化GC。
(本文翻譯自Xinran Waibel的文章《Apache Spark Optimization Toolkit》,參考:https://towardsdatascience.com/apache-spark-optimization-toolkit-17cf3e491992)