在Pandas 2.0發布以后,我們發布過一些評測的文章,這次我們看看,除了Pandas以外,常用的兩個都是為了大數據處理的并行數據框架的對比測試。
本文我們使用兩個類似的腳本來執行提取、轉換和加載(ETL)過程。
測試內容
這兩個腳本主要功能包括:
從兩個parquet 文件中提取數據,對于小型數據集,變量path1將為“yellow_tripdata/ yellow_tripdata_2014-01”,對于中等大小的數據集,變量path1將是“yellow_tripdata/yellow_tripdata”。對于大數據集,變量path1將是“yellow_tripdata/yellow_tripdata*.parquet”;
進行數據轉換:a)連接兩個DF,b)根據PULocationID計算行程距離的平均值,c)只選擇某些條件的行,d)將步驟b的值四舍五入為2位小數,e)將列“trip_distance”重命名為“mean_trip_distance”,f)對列“mean_trip_distance”進行排序。
將最終的結果保存到新的文件。
腳本
1、Polars
數據加載讀取
def extraction():
"""
Extract two datasets from parquet files
"""
path1="yellow_tripdata/yellow_tripdata_2014-01.parquet"
df_trips= pl_read_parquet(path1,)
path2 = "taxi+_zone_lookup.parquet"
df_zone = pl_read_parquet(path2,)
return df_trips, df_zone
def pl_read_parquet(path, ):
"""
Converting parquet file into Polars dataframe
"""
df= pl.scan_parquet(path,)
return df
轉換函數
def transformation(df_trips, df_zone):
"""
Proceed to several transformations
"""
df_trips= mean_test_speed_pl(df_trips, )
df = df_trips.join(df_zone,how="inner", left_on="PULocationID", right_on="LocationID",)
df = df.select(["Borough","Zone","trip_distance",])
df = get_Queens_test_speed_pd(df)
df = round_column(df, "trip_distance",2)
df = rename_column(df, "trip_distance","mean_trip_distance")
df = sort_by_columns_desc(df, "mean_trip_distance")
return df
def mean_test_speed_pl(df_pl,):
"""
Getting Mean per PULocationID
"""
df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
return df_pl
def get_Queens_test_speed_pd(df_pl):
"""
Only getting Borough in Queens
"""
df_pl = df_pl.filter(pl.col("Borough")=='Queens')
return df_pl
def round_column(df, column,to_round):
"""
Round numbers on columns
"""
df = df.with_columns(pl.col(column).round(to_round))
return df
def rename_column(df, column_old, column_new):
"""
Renaming columns
"""
df = df.rename({column_old: column_new})
return df
def sort_by_columns_desc(df, column):
"""
Sort by column
"""
df = df.sort(column, descending=True)
return df
保存
def loading_into_parquet(df_pl):
"""
Save dataframe in parquet
"""
df_pl.collect(streaming=True).write_parquet(f'yellow_tripdata_pl.parquet')
其他代碼
import polars as pl
import time
def pl_read_parquet(path, ):
"""
Converting parquet file into Polars dataframe
"""
df= pl.scan_parquet(path,)
return df
def mean_test_speed_pl(df_pl,):
"""
Getting Mean per PULocationID
"""
df_pl = df_pl.groupby('PULocationID').agg(pl.col(["trip_distance",]).mean())
return df_pl
def get_Queens_test_speed_pd(df_pl):
"""
Only getting Borough in Queens
"""
df_pl = df_pl.filter(pl.col("Borough")=='Queens')
return df_pl
def round_column(df, column,to_round):
"""
Round numbers on columns
"""
df = df.with_columns(pl.col(column).round(to_round))
return df
def rename_column(df, column_old, column_new):
"""
Renaming columns
"""
df = df.rename({column_old: column_new})
return df
def sort_by_columns_desc(df, column):
"""
Sort by column
"""
df = df.sort(column, descending=True)
return df
def mAIn():
print(f'Starting ETL for Polars')
start_time = time.perf_counter()
print('Extracting...')
df_trips, df_zone =extraction()
end_extract=time.perf_counter()
time_extract =end_extract- start_time
print(f'Extraction Parquet end in {round(time_extract,5)} seconds')
print('Transforming...')
df = transformation(df_trips, df_zone)
end_transform = time.perf_counter()
time_transformation =time.perf_counter() - end_extract
print(f'Transformation end in {round(time_transformation,5)} seconds')
print('Loading...')
loading_into_parquet(df,)
load_transformation =time.perf_counter() - end_transform
print(f'Loading end in {round(load_transformation,5)} seconds')
print(f"End ETL for Polars in {str(time.perf_counter()-start_time)}")
if __name__ == "__main__":
main()
2、Dask
函數功能與上面一樣,所以我們把代碼整合在一起:
import dask.dataframe as dd
from dask.distributed import Client
import time
def extraction():
path1 = "yellow_tripdata/yellow_tripdata_2014-01.parquet"
df_trips = dd.read_parquet(path1)
path2 = "taxi+_zone_lookup.parquet"
df_zone = dd.read_parquet(path2)
return df_trips, df_zone
def transformation(df_trips, df_zone):
df_trips = mean_test_speed_dask(df_trips)
df = df_trips.merge(df_zone, how="inner", left_on="PULocationID", right_on="LocationID")
df = df[["Borough", "Zone", "trip_distance"]]
df = get_Queens_test_speed_dask(df)
df = round_column(df, "trip_distance", 2)
df = rename_column(df, "trip_distance", "mean_trip_distance")
df = sort_by_columns_desc(df, "mean_trip_distance")
return df
def loading_into_parquet(df_dask):
df_dask.to_parquet("yellow_tripdata_dask.parquet", engine="fastparquet")
def mean_test_speed_dask(df_dask):
df_dask = df_dask.groupby("PULocationID").agg({"trip_distance": "mean"})
return df_dask
def get_Queens_test_speed_dask(df_dask):
df_dask = df_dask[df_dask["Borough"] == "Queens"]
return df_dask
def round_column(df, column, to_round):
df[column] = df[column].round(to_round)
return df
def rename_column(df, column_old, column_new):
df = df.rename(columns={column_old: column_new})
return df
def sort_by_columns_desc(df, column):
df = df.sort_values(column, ascending=False)
return df
def main():
print("Starting ETL for Dask")
start_time = time.perf_counter()
client = Client() # Start Dask Client
df_trips, df_zone = extraction()
end_extract = time.perf_counter()
time_extract = end_extract - start_time
print(f"Extraction Parquet end in {round(time_extract, 5)} seconds")
print("Transforming...")
df = transformation(df_trips, df_zone)
end_transform = time.perf_counter()
time_transformation = time.perf_counter() - end_extract
print(f"Transformation end in {round(time_transformation, 5)} seconds")
print("Loading...")
loading_into_parquet(df)
load_transformation = time.perf_counter() - end_transform
print(f"Loading end in {round(load_transformation, 5)} seconds")
print(f"End ETL for Dask in {str(time.perf_counter() - start_time)}")
client.close() # Close Dask Client
if __name__ == "__main__":
main()
測試結果對比
1、小數據集
我們使用164 Mb的數據集,這樣大小的數據集對我們來說比較小,在日常中也時非常常見的。
下面是每個庫運行五次的結果:
Polars
Dask
2、中等數據集
我們使用1.1 Gb的數據集,這種類型的數據集是GB級別,雖然可以完整的加載到內存中,但是數據體量要比小數據集大很多。
Polars
Dask
3、大數據集
我們使用一個8gb的數據集,這樣大的數據集可能一次性加載不到內存中,需要框架的處理。
Polars
Dask
總結
從結果中可以看出,Polars和Dask都可以使用惰性求值。所以讀取和轉換非??欤瑘绦兴鼈兊臅r間幾乎不隨數據集大小而變化;
可以看到這兩個庫都非常擅長處理中等規模的數據集。
由于polar和Dask都是使用惰性運行的,所以下面展示了完整ETL的結果(平均運行5次)。
Polars在小型數據集和中型數據集的測試中都取得了勝利。但是,Dask在大型數據集上的平均時間性能為26秒。
這可能和Dask的并行計算優化有關,因為官方的文檔說“Dask任務的運行速度比Spark ETL查詢快三倍,并且使用更少的CPU資源”。
上面是測試使用的電腦配置,Dask在計算時占用的CPU更多,可以說并行性能更好。