在這個數據驅動的時代,信息的處理和分析變得越來越重要。而在眾多的大數據處理框架中,「Apache Spark」以其獨特的優勢脫穎而出。
本篇文章,我們將一起走進Spark的世界,探索并理解其相關的基礎概念和使用方法。本文主要目標是讓初學者能夠對Spark有一個全面的認識,并能實際應用到各類問題的解決之中。
一、Spark是什么
學習一個東西之前先要知道這個東西是什么。
Spark 是一個開源的大數據處理引擎,它提供了一整套開發 API,包括流計算和機器學習。它支持批處理和流處理。
Spark 的一個顯著特點是它能夠在內存中進行迭代計算,從而加快數據處理速度。盡管 Spark 是用 Scala 開發的,但它也為 JAVA、Scala、Python/ target=_blank class=infotextkey>Python 和 R 等高級編程語言提供了開發接口。
1.Spark組件
Spark提供了6大核心組件:
- Spark Core
- Spark SQL
- Spark Streaming
- Spark MLlib
- Spark GraphX
(1) Spark Core
Spark Core 是 Spark 的基礎,它提供了內存計算的能力,是分布式處理大數據集的基礎。它將分布式數據抽象為彈性分布式數據集(RDD),并為運行在其上的上層組件提供 API。所有 Spark 的上層組件都建立在 Spark Core 的基礎之上。
(2) Spark SQL
Spark SQL 是一個用于處理結構化數據的 Spark 組件。它允許使用 SQL 語句查詢數據。Spark 支持多種數據源,包括 Hive 表、Parquet 和 JSON 等。
(3) Spark Streaming
Spark Streaming 是一個用于處理動態數據流的 Spark 組件。它能夠開發出強大的交互和數據查詢程序。在處理動態數據流時,流數據會被分割成微小的批處理,這些微小批處理將會在 Spark Core 上按時間順序快速執行。
(4) Spark MLlib
Spark MLlib 是 Spark 的機器學習庫。它提供了常用的機器學習算法和實用程序,包括分類、回歸、聚類、協同過濾、降維等。MLlib 還提供了一些底層優化原語和高層流水線 API,可以幫助開發人員更快地創建和調試機器學習流水線。
(5) Spark GraphX
Spark GraphX 是 Spark 的圖形計算庫。它提供了一種分布式圖形處理框架,可以幫助開發人員更快地構建和分析大型圖形。
2.Spark的優勢
Spark 有許多優勢,其中一些主要優勢包括:
- 速度:Spark 基于內存計算,能夠比基于磁盤的計算快很多。對于迭代式算法和交互式數據挖掘任務,這種速度優勢尤為明顯。
- 易用性:Spark 支持多種語言,包括 Java、Scala、Python 和 R。它提供了豐富的內置 API,可以幫助開發人員更快地構建和運行應用程序。
- 通用性:Spark 提供了多種組件,可以支持不同類型的計算任務,包括批處理、交互式查詢、流處理、機器學習和圖形處理等。
- 兼容性:Spark 可以與多種數據源集成,包括 Hadoop 分布式文件系統(HDFS)、Apache Cassandra、Apache HBase 和 Amazon S3 等。
- 容錯性:Spark 提供了彈性分布式數據集(RDD)抽象,可以幫助開發人員更快地構建容錯應用程序。
3.word Count
上手寫一個簡單的代碼例子,下面是一個Word Count的Spark程序:
import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount {
def mAIn (args:Array [String]): Unit = {
//setMaster("local[9]") 表示在本地運行 Spark 程序,使用 9 個線程。local[*] 表示使用所有可用的處理器核心。
//這種模式通常用于本地測試和開發。
val conf = new SparkConf ().setAppName ("Word Count").setMaster("local[9]");
val sc = new SparkContext (conf);
sc.setLogLevel("ERROR")
val data = List("Hello World", "Hello Spark")
val textFile = sc.parallelize(data)
val wordCounts = textFile.flatMap (line => line.split (" ")).map (
word => (word, 1)).reduceByKey ( (a, b) => a + b)
wordCounts.collect().foreach(println)
}
}
輸出:
(Hello,2)
(World,1)
(Spark,1)
程序首先創建了一個 SparkConf 對象,用來設置應用程序名稱和運行模式。然后,它創建了一個 SparkContext 對象,用來連接到 Spark 集群。
接下來,程序創建了一個包含兩個字符串的列表,并使用 parallelize 方法將其轉換為一個 RDD。然后,它使用 flatMap 方法將每一行文本拆分成單詞,并使用 map 方法將每個單詞映射為一個鍵值對(key-value pair),其中鍵是單詞,值是 1。
最后,程序使用 reduceByKey 方法將具有相同鍵的鍵值對進行合并,并對它們的值進行求和。最終結果是一個包含每個單詞及其出現次數的 RDD。程序使用 collect 方法將結果收集到驅動程序,并使用 foreach 方法打印出來。
二、Spark基本概念
Spark的理論較多,為了更有效地學習Spark,首先來理解下其基本概念。
1.Application
Application指的就是用戶編寫的Spark應用程序。
如下,"Word Count"就是該應用程序的名字。
import org.apache.spark.sql.SparkSession
object WordCount {
def main(args: Array[String]) {
// 創建 SparkSession 對象,它是 Spark Application 的入口
val spark = SparkSession.builder.appName("Word Count").getOrCreate()
// 讀取文本文件并創建 Dataset
val textFile = spark.read.textFile("hdfs://...")
// 使用 flatMap 轉換將文本分割為單詞,并使用 reduceByKey 轉換計算每個單詞的數量
val counts = textFile.flatMap(line => line.split(" "))
.groupByKey(identity)
.count()
// 將結果保存到文本文件中
counts.write.text("hdfs://...")
// 停止 SparkSession
spark.stop()
}
}
2.Driver
Driver 是運行 Spark Application 的進程,它負責創建 SparkSession 和 SparkContext 對象,并將代碼轉換和操作。
它還負責創建邏輯和物理計劃,并與集群管理器協調調度任務。
簡而言之,Spark Application 是使用 Spark API 編寫的程序,而 Spark Driver 是負責運行該程序并與集群管理器協調的進程。
可以將Driver 理解為運行 Spark Application main 方法的進程。
driver的內存大小可以進行設置,配置如下:
# 設置 driver內存大小
driver-memory 1024m
3.Master & Worker
在Spark中,Master是獨立集群的控制者,而Worker是工作者。
一個Spark獨立集群需要啟動一個Master和多個Worker。Worker就是物理節點,Worker上面可以啟動Executor進程。
4.Executor
在每個Worker上為某應用啟動的一個進程,該進程負責運行Task,并且負責將數據存在內存或者磁盤上。
每個任務都有各自獨立的Executor。Executor是一個執行Task的容器。實際上它是一組計算資源(cpu核心、memory)的集合。
一個Worker節點可以有多個Executor。一個Executor可以運行多個Task。
Executor創建成功后,在日志文件會顯示如下信息:
INFO Executor: Starting executor ID [executorId] on host [executorHostname]
5.RDD
RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可并行計算的集合。
RDD的 Partition 是指數據集的分區。它是數據集中元素的集合,這些元素被分區到集群的節點上,可以并行操作。對于RDD來說,每個分片都會被一個計算任務處理,并決定并行計算的粒度。用戶可以在創建RDD時指定RDD的分片個數,如果沒有指定,那么就會采用默認值。默認值就是程序所分配到的CPU Core的數目。
一個函數會被作用在每一個分區。Spark 中 RDD 的計算是以分片為單位的,compute 函數會被作用到每個分區上。
RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
6.Job
一個Job包含多個RDD及作用于相應RDD上的各種操作,每個Action的觸發就會生成一個job。用戶提交的Job會提交給DAG Scheduler,Job會被分解成Stage,Stage會被細化成Task。
7.Task
被發送到Executor上的工作單元。每個Task負責計算一個分區的數據。
8.Stage
在 Spark 中,一個作業(Job)會被劃分為多個階段(Stage)。同一個 Stage 可以有多個 Task 并行執行(Task 數=分區數)。
階段之間的劃分是根據數據的依賴關系來確定的。當一個 RDD 的分區依賴于另一個 RDD 的分區時,這兩個 RDD 就屬于同一個階段。當一個 RDD 的分區依賴于多個 RDD 的分區時,這些 RDD 就屬于不同的階段。
上圖中,Stage表示一個可以順滑完成的階段。曲線表示 Shuffle 過程。
如果Stage能夠復用前面的Stage的話,那么會顯示灰色。
9.Shuffle
在 Spark 中,Shuffle 是指在不同階段之間重新分配數據的過程。它通常發生在需要對數據進行聚合或分組操作的時候,例如 reduceByKey 或 groupByKey 等操作。
在 Shuffle 過程中,Spark 會將數據按照鍵值進行分區,并將屬于同一分區的數據發送到同一個計算節點上。這樣,每個計算節點就可以獨立地處理屬于它自己分區的數據。
10.Stage的劃分
Stage的劃分,簡單來說是以寬依賴來劃分的。
對于窄依賴,Partition 的轉換處理在 Stage 中完成計算,不劃分(將窄依賴盡量放在在同一個 Stage 中,可以實現流水線計算)。
對于寬依賴,由于有 Shuffle 的存在,只能在父 RDD 處理完成后,才能開始接下來的計算,也就是說需要劃分 Stage。
Spark 會根據 Shuffle/寬依賴 使用回溯算法來對 DAG 進行 Stage 劃分,從后往前,遇到寬依賴就斷開,遇到窄依賴就把當前的 RDD 加入到當前的 Stage 階段中。
至于什么是窄依賴和寬依賴,下文馬上就會提及。
11.窄依賴 & 寬依賴
(1) 窄依賴
父 RDD 的一個分區只會被子 RDD 的一個分區依賴。比如:map,filter和union,這種依賴稱之為「窄依賴」。
窄依賴的多個分區可以并行計算,并且窄依賴的一個分區的數據如果丟失只需要重新計算對應的分區的數據就可以了。
(2) 寬依賴
指子RDD的分區依賴于父RDD的所有分區,稱之為「寬依賴」。
對于寬依賴,必須等到上一階段計算完成才能計算下一階段。
12.DAG
有向無環圖,其實說白了就是RDD之間的依賴關系圖。
- 開始:通過 SparkContext 創建的 RDD。
- 結束:觸發 Action,一旦觸發 Action 就形成了一個完整的 DAG(有幾個 Action,就有幾個 DAG)。
三、Spark執行流程
Spark的執行流程大致如下:
- 構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊并申請運行Executor資源。
- 資源管理器為Executor分配資源并啟動Executor進程,Executor運行情況將隨著“心跳”發送到資源管理器上。
- SparkContext構建DAG圖,將DAG圖分解成多個Stage,并把每個Stage的TaskSet(任務集)發送給Task Scheduler (任務調度器)。
- Executor向SparkContext申請Task, Task Scheduler將Task發放給Executor,同時,SparkContext將應用程序代碼發放給Executor。
- Task在Executor上運行,把執行結果反饋給Task Scheduler,然后再反饋給DAG Scheduler。
- 當一個階段完成后,Spark 會根據數據依賴關系將結果傳輸給下一個階段,并開始執行下一個階段的任務。
- 最后,當所有階段都完成后,Spark 會將最終結果返回給驅動程序,并完成作業的執行。
四、Spark運行模式
Spark 支持多種運行模式,包括本地模式、獨立模式、Mesos 模式、YARN 模式和 Kube.NETes 模式。
- 本地模式:在本地模式下,Spark 應用程序會在單個機器上運行,不需要連接到集群。這種模式適用于開發和測試,但不適用于生產環境。
- 獨立模式:在獨立模式下,Spark 應用程序會連接到一個獨立的 Spark 集群,并在集群中運行。這種模式適用于小型集群,但不支持動態資源分配。
- Mesos 模式:在 Mesos 模式下,Spark 應用程序會連接到一個 Apache Mesos 集群,并在集群中運行。這種模式支持動態資源分配和細粒度資源共享,目前國內使用較少。
- YARN 模式:在 YARN 模式下,Spark 應用程序會連接到一個 Apache Hadoop YARN 集群,并在集群中運行。這種模式支持動態資源分配和與其他 Hadoop 生態系統組件的集成,Spark在Yarn模式下是不需要Master和Worker的。
- Kubernetes 模式:在 Kubernetes 模式下,Spark 應用程序會連接到一個 Kubernetes 集群,并在集群中運行。這種模式支持動態資源分配和容器化部署。
五、RDD詳解
RDD的概念在Spark中十分重要,上面只是簡單的介紹了一下,下面詳細的對RDD展開介紹。
RDD是“Resilient Distributed Dataset”的縮寫,從全稱就可以了解到RDD的一些典型特性:
- Resilient(彈性):RDD之間會形成有向無環圖(DAG),如果RDD丟失了或者失效了,可以從父RDD重新計算得到。即容錯性。
- Distributed(分布式):RDD的數據是以邏輯分區的形式分布在集群的不同節點的。
- Dataset(數據集):即RDD存儲的數據記錄,可以從外部數據生成RDD,例如Json文件,CSV文件,文本文件,數據庫等。
RDD里面的數據集會被邏輯分成若干個分區,這些分區是分布在集群的不同節點的,基于這樣的特性,RDD才能在集群不同節點并行計算。
1.RDD特性
- 內存計算:Spark RDD運算數據是在內存中進行的,在內存足夠的情況下,不會把中間結果存儲在磁盤,所以計算速度非常高效。
- 惰性求值:所有的轉換操作都是惰性的,也就是說不會立即執行任務,只是把對數據的轉換操作記錄下來而已。只有碰到action操作才會被真正的執行。
- 容錯性:Spark RDD具備容錯特性,在RDD失效或者數據丟失的時候,可以根據DAG從父RDD重新把數據集計算出來,以達到數據容錯的效果。
- 不變性:RDD是進程安全的,因為RDD是不可修改的。它可以在任何時間點被創建和查詢,使得緩存,共享,備份都非常簡單。在計算過程中,是RDD的不可修改特性保證了數據的一致性。
- 持久化:可以調用cache或者persist函數,把RDD緩存在內存、磁盤,下次使用的時候不需要重新計算而是直接使用。
2.RDD操作
RDD支持兩種操作:
- 轉換操作(Transformation)。
- 行動操作(Actions)。
(1) 轉換操作(Transformation)
轉換操作以RDD做為輸入參數,然后輸出一個或者多個RDD。轉換操作不會修改輸入RDD。Map()、Filter()這些都屬于轉換操作。
轉換操作是惰性求值操作,只有在碰到行動操作(Actions)的時候,轉換操作才會真正實行。轉換操作分兩種:「窄依賴」和「寬依賴」。
下面是一些常見的轉換操作:
轉換操作 |
描述 |
map |
將函數應用于 RDD 中的每個元素,并返回一個新的 RDD |
filter |
返回一個新的 RDD,其中包含滿足給定謂詞的元素 |
flatMap |
將函數應用于 RDD 中的每個元素,并將返回的迭代器展平為一個新的 RDD |
union |
返回一個新的 RDD,其中包含兩個 RDD 的元素 |
distinct |
返回一個新的 RDD,其中包含原始 RDD 中不同的元素 |
groupByKey |
將鍵值對 RDD 中具有相同鍵的元素分組到一起,并返回一個新的 RDD |
reduceByKey |
將鍵值對 RDD 中具有相同鍵的元素聚合到一起,并返回一個新的 RDD |
sortByKey |
返回一個新的鍵值對 RDD,其中元素按照鍵排序 |
(2)行動操作(Action)
Action是數據執行部分,其通過執行count,reduce,collect等方法真正執行數據的計算部分。
Action 操作 |
描述 |
reduce |
通過函數聚合 RDD 中的所有元素 |
collect |
將 RDD 中的所有元素返回到驅動程序 |
count |
返回 RDD 中的元素個數 |
first |
返回 RDD 中的第一個元素 |
take |
返回 RDD 中的前 n 個元素 |
takeOrdered |
返回 RDD 中的前 n 個元素,按照自然順序或指定的順序排序 |
saveAsTextFile |
將 RDD 中的元素保存到文本文件中 |
foreach |
將函數應用于 RDD 中的每個元素 |
3.RDD 的創建方式
創建RDD有3種不同方式:
- 從外部存儲系統。
- 從其他RDD。
- 由一個已經存在的 Scala 集合創建。
(1) 從外部存儲系統
由外部存儲系統的數據集創建,包括本地的文件系統,還有所有 Hadoop 支持的數據集,比如 HDFS、Cassandra、HBase 等:
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
(2) 從其他RDD
通過已有的 RDD 經過算子轉換生成新的 RDD:
val rdd2=rdd1.flatMap(_.split(" "))
(3) 由一個已經存在的 Scala 集合創建
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
其實makeRDD 方法底層調用了 parallelize 方法:
4.RDD 緩存機制
RDD 緩存是在內存存儲RDD計算結果的一種優化技術。把中間結果緩存起來以便在需要的時候重復使用,這樣才能有效減輕計算壓力,提升運算性能。
要持久化一個RDD,只要調用其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接緩存在每個節點中。而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丟失了,那么Spark會自動通過其源RDD,使用transformation操作重新計算該partition。
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //緩存/持久化
rdd2.sortBy(_._2,false).collect//觸發action,會去讀取HDFS的文件,rdd2會真正執行持久化
rdd2.sortBy(_._2,false).collect//觸發action,會去讀緩存中的數據,執行速度會比之前快,因為rdd2已經持久化到內存中了
需要注意的是,在觸發action的時候,才會去執行持久化。
cache()和persist()的區別在于,cache()是persist()的一種簡化方式,cache()的底層就是調用的persist()的無參版本,就是調用persist(MEMORY_ONLY),將數據持久化到內存中。
如果需要從內存中去除緩存,那么可以使用unpersist()方法。
rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.unpersist()
5.存儲級別
RDD存儲級別主要有以下幾種。
級別 |
使用空間 |
CPU時間 |
是否在內存中 |
是否在磁盤上 |
備注 |
MEMORY_ONLY |
高 |
低 |
是 |
否 |
使用未序列化的Java對象格式,將數據保存在內存中。如果內存不夠存放所有的數據,則數據可能就不會進行持久化。 |
MEMORY_ONLY_2 |
高 |
低 |
是 |
否 |
數據存2份 |
MEMORY_ONLY_SER |
低 |
高 |
是 |
否 |
基本含義同MEMORY_ONLY。唯一的區別是,會將RDD中的數據進行序列化。這種方式更加節省內存 |
MEMORY_ONLY_SER_2 |
低 |
高 |
是 |
否 |
數據序列化,數據存2份 |
MEMORY_AND_DISK |
高 |
中等 |
部分 |
部分 |
如果數據在內存中放不下,則溢寫到磁盤 |
MEMORY_AND_DISK_2 |
高 |
中等 |
部分 |
部分 |
數據存2份 |
MEMORY_AND_DISK_SER |
低 |
高 |
部分 |
部分 |
基本含義同MEMORY_AND_DISK。唯一的區別是,會將RDD中的數據進行序列化 |
MEMORY_AND_DISK_SER_2 |
低 |
高 |
部分 |
部分 |
數據存2份 |
DISK_ONLY |
低 |
高 |
否 |
是 |
使用未序列化的Java對象格式,將數據全部寫入磁盤文件中。 |
DISK_ONLY_2 |
低 |
高 |
否 |
是 |
數據存2份 |
OFF_HEAP |
|
|
|
|
這個目前是試驗型選項,類似MEMORY_ONLY_SER,但是數據是存儲在堆外內存的。 |
對于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個持久化的數據,都復制一份副本,并將副本保存到其他節點上。
這種基于副本的持久化機制主要用于進行容錯。假如某個節點掛掉了,節點的內存或磁盤中的持久化數據丟失了,那么后續對RDD計算時還可以使用該數據在其他節點上的副本。如果沒有副本的話,就只能將這些數據從源頭處重新計算一遍了。
6.RDD的血緣關系
血緣關系是指 RDD 之間的依賴關系。當你對一個 RDD 執行轉換操作時,Spark 會生成一個新的 RDD,并記錄這兩個 RDD 之間的依賴關系。這種依賴關系就是血緣關系。
血緣關系可以幫助 Spark 在發生故障時恢復數據。當一個分區丟失時,Spark 可以根據血緣關系重新計算丟失的分區,而不需要從頭開始重新計算整個 RDD。
血緣關系還可以幫助 Spark 優化計算過程。Spark 可以根據血緣關系合并多個連續的窄依賴轉換,減少數據傳輸和通信開銷。
我們可以執行toDebugString打印RDD的依賴關系。
下面是一個簡單的例子:
val conf = new SparkConf().setAppName("Lineage Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)
println(filteredData.toDebugString)
在這個例子中,我們首先創建了一個包含 5 個元素的 RDD,并對它執行了兩個轉換操作:map 和 filter。然后,我們使用 toDebugString 方法打印了最終 RDD 的血緣關系。
運行這段代碼后,你會看到類似下面的輸出:
(2) MapPartitionsRDD[2] at filter at <console>:26 []
| MapPartitionsRDD[1] at map at <console>:24 []
| ParallelCollectionRDD[0] at parallelize at <console>:22 []
這個輸出表示最終的 RDD 是通過兩個轉換操作(map 和 filter)從原始的 ParallelCollectionRDD 轉換而來的。
六、CheckPoint
CheckPoint可以將RDD從其依賴關系中抽出來,保存到可靠的存儲系統(例如HDFS,S3等), 即它可以將數據和元數據保存到檢查指向目錄中。 因此,在程序發生崩潰的時候,Spark可以恢復此數據,并從停止的任何地方開始。
CheckPoint分為兩類:
- 高可用CheckPoint:容錯性優先。這種類型的檢查點可確保數據永久存儲,如存儲在HDFS或其他分布式文件系統上。 這也意味著數據通常會在網絡中復制,這會降低檢查點的運行速度。
- 本地CheckPoint:性能優先。 RDD持久保存到執行程序中的本地文件系統。 因此,數據寫得更快,但本地文件系統也不是完全可靠的,一旦數據丟失,工作將無法恢復。
開發人員可以使用RDD.checkpoint()方法來設置檢查點。在使用檢查點之前,必須使用SparkContext.setCheckpointDir(directory: String)方法設置檢查點目錄。
下面是一個簡單的例子:
import org.apache.spark.{SparkConf, SparkContext}
object CheckpointExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Checkpoint Example").setMaster("local")
val sc = new SparkContext(conf)
// 設置 checkpoint 目錄
sc.setCheckpointDir("/tmp/checkpoint")
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)
// 對 RDD 進行 checkpoint
filteredData.checkpoint()
// 觸發 checkpoint
filteredData.count()
}
}
RDD的檢查點機制就好比Hadoop將中間計算值存儲到磁盤,即使計算中出現了故障,我們也可以輕松地從中恢復。通過對 RDD 啟動檢查點機制可以實現容錯和高可用。
Persist VS CheckPoint
- 位置:Persist 和 Cache 只能保存在本地的磁盤和內存中(或者堆外內存–實驗中),而 Checkpoint 可以保存數據到 HDFS 這類可靠的存儲上。
- 生命周期:Cache 和 Persist 的 RDD 會在程序結束后會被清除或者手動調用 unpersist 方法,而 Checkpoint 的 RDD 在程序結束后依然存在,不會被刪除。CheckPoint將RDD持久化到HDFS或本地文件夾,如果不被手動remove掉,是一直存在的,也就是說可以被下一個driver使用,而Persist不能被其他dirver使用。
七、Spark-Submit
1.詳細參數說明
參數名 |
參數說明 |
—master |
master 的地址,提交任務到哪里執行,例如 spark://host:port, yarn, local。具體指可參考下面關于Master_URL的列表 |
—deploy-mode |
在本地 (client) 啟動 driver 或在 cluster 上啟動,默認是 client |
—class |
應用程序的主類,僅針對 java 或 scala 應用 |
—name |
應用程序的名稱 |
—jars |
用逗號分隔的本地 jar 包,設置后,這些 jar 將包含在 driver 和 executor 的 classpath 下 |
—packages |
包含在driver 和executor 的 classpath 中的 jar 的 maven 坐標 |
—exclude-packages |
為了避免沖突 而指定不包含的 package |
—repositories |
遠程 repository |
—conf PROP=VALUE |
指定 spark 配置屬性的值, 例如 -conf spark.executor.extraJavaOptinotallow=”-XX:MaxPermSize=256m” |
—properties-file |
加載的配置文件,默認為 conf/spark-defaults.conf |
—driver-memory |
Driver內存,默認 1G |
—driver-java-options |
傳給 driver 的額外的 Java 選項 |
—driver-library-path |
傳給 driver 的額外的庫路徑 |
—driver-class-path |
傳給 driver 的額外的類路徑 |
—driver-cores |
Driver 的核數,默認是1。在 yarn 或者 standalone 下使用 |
—executor-memory |
每個 executor 的內存,默認是1G |
—total-executor-cores |
所有 executor 總共的核數。僅僅在 mesos 或者 standalone 下使用 |
—num-executors |
啟動的 executor 數量。默認為2。在 yarn 下使用 |
—executor-core |
每個 executor 的核數。在yarn或者standalone下使用 |
2.Master_URL的值
Master URL |
含義 |
local |
使用1個worker線程在本地運行Spark應用程序 |
local[K] |
使用K個worker線程在本地運行Spark應用程序 |
local[*] |
使用所有剩余worker線程在本地運行Spark應用程序 |
spark://HOST:PORT |
連接到Spark Standalone集群,以便在該集群上運行Spark應用程序 |
mesos://HOST:PORT |
連接到Mesos集群,以便在該集群上運行Spark應用程序 |
yarn-client |
以client方式連接到YARN集群,集群的定位由環境變量HADOOP_CONF_DIR定義,該方式driver在client運行。 |
yarn-cluster |
以cluster方式連接到YARN集群,集群的定位由環境變量HADOOP_CONF_DIR定義,該方式driver也在集群中運行。 |
八、Spark 共享變量
一般情況下,當一個傳遞給Spark操作(例如map和reduce)的函數在遠程節點上面運行時,Spark操作實際上操作的是這個函數所用變量的一個獨立副本。
這些變量被復制到每臺機器上,并且這些變量在遠程機器上的所有更新都不會傳遞回驅動程序。通常跨任務的讀寫變量是低效的,所以,Spark提供了兩種共享變量:「廣播變量(broadcast variable)」和「累加器(accumulator)」。
1.廣播變量
廣播變量允許程序員緩存一個只讀的變量在每臺機器上面,而不是每個任務保存一份拷貝。說白了其實就是共享變量。
如果Executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。
一個廣播變量可以通過調用SparkContext.broadcast(v)方法從一個初始變量v中創建。廣播變量是v的一個包裝變量,它的值可以通過value方法訪問,下面的代碼說明了這個過程:
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
// 創建一個廣播變量
val factor = sc.broadcast(2)
// 使用廣播變量
val result = data.map(x => x * factor.value)
result.collect().foreach(println)
}
}
廣播變量創建以后,我們就能夠在集群的任何函數中使用它來代替變量v,這樣我們就不需要再次傳遞變量v到每個節點上。另外,為了保證所有的節點得到廣播變量具有相同的值,對象v不能在廣播之后被修改。
2.累加器
累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效的應用于并行操作中。它們能夠用來實現counters和sums。
一個累加器可以通過調用SparkContext.accumulator(v)方法從一個初始變量v中創建。運行在集群上的任務可以通過add方法或者使用+=操作來給它加值。然而,它們無法讀取這個值。只有驅動程序可以使用value方法來讀取累加器的值。
示例代碼如下:
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AccumulatorExample")
val sc = new SparkContext(conf)
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
println(accum.value) // 輸出 10
}
}
這個示例中,我們創建了一個名為 My Accumulator 的累加器,并使用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 來對其進行累加。最后,我們使用 println(accum.value) 來輸出累加器的值,結果為 10。
我們可以利用子類AccumulatorParam創建自己的累加器類型。AccumulatorParam接口有兩個方法:zero方法為你的數據類型提供一個“0 值”(zero value),addInPlace方法計算兩個值的和。例如,假設我們有一個Vector類代表數學上的向量,我們能夠如下定義累加器:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
九、Spark SQL
Spark為結構化數據處理引入了一個稱為Spark SQL的編程模塊。它提供了一個稱為DataFrame的編程抽象,并且可以充當分布式SQL查詢引擎。
1.Spark SQL的特性
- 集成:無縫地將SQL查詢與Spark程序混合。 Spark SQL允許將結構化數據作為Spark中的分布式數據集(RDD)進行查詢,在Python,Scala和Java中集成了API。這種緊密的集成使得可以輕松地運行SQL查詢以及復雜的分析算法。
- Hive兼容性:在現有倉庫上運行未修改的Hive查詢。 Spark SQL重用了Hive前端和MetaStore,提供與現有Hive數據,查詢和UDF的完全兼容性。只需將其與Hive一起安裝即可。
- 標準連接:通過JDBC或ODBC連接。 Spark SQL包括具有行業標準JDBC和ODBC連接的服務器模式。
- 可擴展性:對于交互式查詢和長查詢使用相同的引擎。 Spark SQL利用RDD模型來支持中查詢容錯,使其能夠擴展到大型作業。不要擔心為歷史數據使用不同的引擎。
2.Spark SQL 數據類型
Spark SQL 支持多種數據類型,包括數字類型、字符串類型、二進制類型、布爾類型、日期時間類型和區間類型等。
數字類型包括:
- ByteType:代表一個字節的整數,范圍是 -128 到 127¹²。
- ShortType:代表兩個字節的整數,范圍是 -32768 到 32767¹²。
- IntegerType:代表四個字節的整數,范圍是 -2147483648 到 2147483647¹²。
- LongType:代表八個字節的整數,范圍是 -9223372036854775808 到 9223372036854775807¹²。
- FloatType:代表四字節的單精度浮點數¹²。
- DoubleType:代表八字節的雙精度浮點數¹²。
- DecimalType:代表任意精度的十進制數據,通過內部的 java.math.BigDecimal 支持。BigDecimal 由一個任意精度的整型非標度值和一個 32 位整數組成¹²。
字符串類型包括:
- StringType:代表字符字符串值。
二進制類型包括:
- BinaryType:代表字節序列值。
布爾類型包括:
- BooleanType:代表布爾值。
日期時間類型包括:
- TimestampType:代表包含字段年、月、日、時、分、秒的值,與會話本地時區相關。時間戳值表示絕對時間點。
- DateType:代表包含字段年、月和日的值,不帶時區。
區間類型包括:
- YearMonthIntervalType (startField, endField):表示由以下字段組成的連續子集組成的年月間隔:MONTH(月份),YEAR(年份)。
- DayTimeIntervalType (startField, endField):表示由以下字段組成的連續子集組成的日時間間隔:SECOND(秒),MINUTE(分鐘),HOUR(小時),DAY(天)。
復合類型包括:
- ArrayType (elementType, containsNull):代表由 elementType 類型元素組成的序列值。containsNull 用來指明 ArrayType 中的值是否有 null 值。
- MapType (keyType, valueType, valueContainsNull):表示包括一組鍵值對的值。通過 keyType 表示 key 數據的類型,通過 valueType 表示 value 數據的類型。valueContainsNull 用來指明 MapType 中的值是否有 null 值。
- StructType (fields):表示一個擁有 StructFields (fields) 序列結構的值。
- StructField (name, dataType, nullable):代表 StructType 中的一個字段,字段的名字通過 name 指定,dataType 指定 field 的數據類型,nullable 表示字段的值是否有 null 值。
3.DataFrame
DataFrame 是 Spark 中用于處理結構化數據的一種數據結構。它類似于關系數據庫中的表,具有行和列。每一列都有一個名稱和一個類型,每一行都是一條記錄。
DataFrame 支持多種數據源,包括結構化數據文件、Hive 表、外部數據庫和現有的 RDD。它提供了豐富的操作,包括篩選、聚合、分組、排序等。
DataFrame 的優點在于它提供了一種高級的抽象,使得用戶可以使用類似于 SQL 的語言進行數據處理,而無需關心底層的實現細節。此外,Spark 會自動對 DataFrame 進行優化,以提高查詢性能。
下面是一個使用DataFrame的代碼例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
import spark.implicits._
val data = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
)
val df = data.toDF("name", "age")
df.show()
在這個示例中,我們首先創建了一個 SparkSession 對象,然后使用 toDF 方法將一個序列轉換為 DataFrame。最后,我們使用 show 方法來顯示 DataFrame 的內容。
4.創建 DataFrame
在 Scala 中,可以通過以下幾種方式創建 DataFrame:
從現有的 RDD 轉換而來。例如:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = rdd.toDF()
df.show()
從外部數據源讀取。例如,從 JSON 文件中讀取數據并創建 DataFrame:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
val df = spark.read.json("path/to/json/file")
df.show()
通過編程方式創建。例如,使用 createDataFrame 方法:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
val schema = StructType(
List(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
)
)
val data = Seq(Row("Alice", 25), Row("Bob", 30))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd, schema)
df.show()
5.DSL & SQL
在 Spark 中,可以使用兩種方式對 DataFrame 進行查詢:「DSL(Domain-Specific Language)」和「 SQL」。
DSL 是一種特定領域語言,它提供了一組用于操作 DataFrame 的方法。例如,下面是一個使用 DSL 進行查詢的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.select("name", "age")
.filter($"age" > 25)
.show()
SQL 是一種結構化查詢語言,它用于管理關系數據庫系統。在 Spark 中,可以使用 SQL 對 DataFrame 進行查詢。例如,下面是一個使用 SQL 進行查詢的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.createOrReplaceTempView("people")
spark.sql("SELECT name, age FROM people WHERE age > 25").show()
DSL 和 SQL 的區別在于語法和風格。DSL 使用方法調用鏈來構建查詢,而 SQL 使用聲明式語言來描述查詢。選擇哪種方式取決于個人喜好和使用場景。
6.Spark SQL 數據源
Spark SQL 支持多種數據源,包括 Parquet、JSON、CSV、JDBC、Hive 等。
下面是示例代碼:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Data Sources Example").getOrCreate()
// Parquet
val df = spark.read.parquet("path/to/parquet/file")
// JSON
val df = spark.read.json("path/to/json/file")
// CSV
val df = spark.read.option("header", "true").csv("path/to/csv/file")
// JDBC
val df = spark.read
.format("jdbc")
.option("url", "jdbc:MySQL://host:port/database")
.option("dbtable", "table")
.option("user", "username")
.option("password", "password")
.load()
df.show()
7.load & save
在 Spark 中,load 函數用于從外部數據源讀取數據并創建 DataFrame,而 save 函數用于將 DataFrame 保存到外部數據源。
下面是從 Parquet 文件中讀取數據并創建 DataFrame 的示例代碼:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
val df = spark.read.load("path/to/parquet/file")
df.show()
下面是將 DataFrame 保存到 Parquet 文件的示例代碼:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.write.save("path/to/parquet/file")
8.函數
Spark SQL 提供了豐富的內置函數,包括數學函數、字符串函數、日期時間函數、聚合函數等。你可以在 Spark SQL 的官方文檔中查看所有可用的內置函數。
此外,Spark SQL 還支持「自定義函數(User-Defined Function,UDF)」,可以讓用戶編寫自己的函數并在查詢中使用。
下面是一個使用 SQL 語法編寫自定義函數的示例代碼:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
val spark = SparkSession.builder.appName("UDF Example").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.createOrReplaceTempView("people")
val square = udf((x: Int) => x * x)
spark.udf.register("square", square)
spark.sql("SELECT name, square(age) FROM people").show()
在這個示例中,我們首先定義了一個名為 square 的自定義函數,它接受一個整數參數并返回它的平方。然后,我們使用 createOrReplaceTempView 方法創建一個臨時視圖,并使用 udf.register 方法注冊自定義函數。
最后,我們使用 spark.sql 方法執行 SQL 查詢,并在查詢中調用自定義函數。
9.DataSet
DataSet 是 Spark 1.6 版本中引入的一種新的數據結構,它提供了 RDD 的強類型和 DataFrame 的查詢優化能力。
10.創建DataSet
在 Scala 中,可以通過以下幾種方式創建 DataSet:
從現有的 RDD 轉換而來。例如:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val ds = rdd.toDS()
ds.show()
從外部數據源讀取。例如,從 JSON 文件中讀取數據并創建 DataSet:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Long)
val ds = spark.read.json("path/to/json/file").as[Person]
ds.show()
通過編程方式創建。例如,使用 createDataset 方法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val data = Seq(Person("Alice", 25), Person("Bob", 30))
val ds = spark.createDataset(data)
ds.show()
11.DataSet VS DataFrame
DataSet 和 DataFrame 都是 Spark 中用于處理結構化數據的數據結構。它們都提供了豐富的操作,包括篩選、聚合、分組、排序等。
它們之間的主要區別在于類型安全性。DataFrame 是一種弱類型的數據結構,它的列只有在運行時才能確定類型。這意味著,在編譯時無法檢測到類型錯誤,只有在運行時才會拋出異常。
而 DataSet 是一種強類型的數據結構,它的類型在編譯時就已經確定。這意味著,如果你試圖對一個不存在的列進行操作,或者對一個列進行錯誤的類型轉換,編譯器就會報錯。
此外,DataSet 還提供了一些額外的操作,例如 map、flatMap、reduce 等。
12.RDD & DataFrame & Dataset 轉化
RDD、DataFrame、Dataset三者有許多共性,有各自適用的場景常常需要在三者之間轉換。
DataFrame/Dataset 轉 RDD:
val rdd1=testDF.rdd
val rdd2=testDS.rdd
RDD 轉 DataSet:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
可以注意到,定義每一行的類型(case class)時,已經給出了字段名和類型,后面只要往case class里面添加值即可。
Dataset 轉 DataFrame:
import spark.implicits._
val testDF = testDS.toDF
DataFrame 轉 Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
val testDS = testDF.as[Coltest]
這種方法就是在給出每一列的類型后,使用as方法,轉成Dataset,這在數據類型在DataFrame需要針對各個字段處理時極為方便。
注意:在使用一些特殊的操作時,一定要加上 import spark.implicits._ 不然toDF、toDS無法使用。
十、Spark Streaming
Spark Streaming 的工作原理是將實時數據流拆分為小批量數據,并使用 Spark 引擎對這些小批量數據進行處理。這種微批處理(Micro-Batch Processing)的方式使得 Spark Streaming 能夠以近乎實時的延遲處理大規模的數據流。
下面是一個簡單的 Spark Streaming 示例代碼:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("Spark Streaming Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
我們首先創建了一個 StreamingContext 對象,并指定了批處理間隔為 1 秒。然后,我們使用 socketTextStream 方法從套接字源創建了一個 DStream。接下來,我們對 DStream 進行了一系列操作,包括 flatMap、map 和 reduceByKey。最后,我們使用 print 方法打印出單詞計數的結果。
1.Spark Streaming 優缺點
Spark Streaming 作為一種實時流處理框架,具有以下優點:
- 高性能:Spark Streaming 基于 Spark 引擎,能夠快速處理大規模的數據流。
- 易用性:Spark Streaming 提供了豐富的 API,可以讓開發人員快速構建實時流處理應用。
- 容錯性:Spark Streaming 具有良好的容錯性,能夠在節點故障時自動恢復。
- 集成性:Spark Streaming 能夠與 Spark 生態系統中的其他組件(如 Spark SQL、MLlib 等)無縫集成。
但是,Spark Streaming 也有一些缺點:
- 延遲:由于 Spark Streaming 基于微批處理模型,因此它的延遲相對較高。對于需要極低延遲的應用場景,Spark Streaming 可能不是最佳選擇。
- 復雜性:Spark Streaming 的配置和調優相對復雜,需要一定的經驗和技能。
2.DStream
DStream(離散化流)是 Spark Streaming 中用于表示實時數據流的一種抽象。它由一系列連續的 RDD 組成,每個 RDD 包含一段時間內收集到的數據。
在 Spark Streaming 中,可以通過以下幾種方式創建 DStream:
(1) 從輸入源創建。例如,從套接字源創建 DStream:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
lines.print()
ssc.start()
ssc.awaitTermination()
(2) 通過轉換操作創建。例如,對現有的 DStream 進行 map 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.print()
ssc.start()
ssc.awaitTermination()
(3) 通過連接操作創建。例如,對兩個 DStream 進行 union 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines1 = ssc.socketTextStream("localhost", 9999)
val lines2 = ssc.socketTextStream("localhost", 9998)
val lines = lines1.union(lines2)
lines.print()
ssc.start()
ssc.awaitTermination()
總結:簡單來說 DStream 就是對 RDD 的封裝,你對 DStream 進行操作,就是對 RDD 進行操作。對于 DataFrame/DataSet/DStream 來說本質上都可以理解成 RDD。
3.窗口函數
在 Spark Streaming 中,窗口函數用于對 DStream 中的數據進行窗口化處理。它允許你對一段時間內的數據進行聚合操作。
Spark Streaming 提供了多種窗口函數,包括:
- window:返回一個新的 DStream,它包含了原始 DStream 中指定窗口大小和滑動間隔的數據。
- countByWindow:返回一個新的單元素 DStream,它包含了原始 DStream 中指定窗口大小和滑動間隔的元素個數。
- reduceByWindow:返回一個新的 DStream,它包含了原始 DStream 中指定窗口大小和滑動間隔的元素經過 reduce 函數處理后的結果。
- reduceByKeyAndWindow:類似于 reduceByWindow,但是在進行 reduce 操作之前會先按照 key 進行分組。
下面是一個使用窗口函數的示例代碼:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("Window Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
在這個示例中,我們首先創建了一個 DStream,并對其進行了一系列轉換操作。然后,我們使用 reduceByKeyAndWindow 函數對 DStream 進行窗口化處理,指定了窗口大小為 30 秒,滑動間隔為 10 秒。最后,我們使用 print 方法打印出單詞計數的結果。
4.輸出操作
Spark Streaming允許DStream的數據輸出到外部系統,如數據庫或文件系統,輸出的數據可以被外部系統所使用,該操作類似于RDD的輸出操作。Spark Streaming支持以下輸出操作:
- **print() **: 打印DStream中每個RDD的前10個元素到控制臺。
- **saveAsTextFiles(prefix, [suffix] **: 將此DStream中每個RDD的所有元素以文本文件的形式保存。每個批次的數據都會保存在一個單獨的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
- **saveAsObjectFiles(prefix, [suffix])**: 將此DStream中每個RDD的所有元素以Java對象序列化的形式保存。每個批次的數據都會保存在一個單獨的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
- **saveAsHadoopFiles(prefix, [suffix])**:將此DStream中每個RDD的所有元素以Hadoop文件(SequenceFile等)的形式保存。每個批次的數據都會保存在一個單獨的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
- **foreachRDD(func)**:最通用的輸出操作,將函數func應用于DStream中生成的每個RDD。通過此函數,可以將數據寫入任何支持寫入操作的數據源。
十一、Structured Streaming
Structured Streaming 是 Spark 2.0 版本中引入的一種新的流處理引擎。它基于 Spark SQL 引擎,提供了一種聲明式的 API 來處理結構化數據流。
與 Spark Streaming 相比,Structured Streaming 具有以下優點:
- 易用性:Structured Streaming 提供了與 Spark SQL 相同的 API,可以讓開發人員快速構建流處理應用。
- 高性能:Structured Streaming 基于 Spark SQL 引擎,能夠快速處理大規模的數據流。
- 容錯性:Structured Streaming 具有良好的容錯性,能夠在節點故障時自動恢復。
- 端到端一致性:Structured Streaming 提供了端到端一致性保證,能夠確保數據不丟失、不重復。
下面是一個簡單的 Structured Streaming 示例代碼:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
在這個示例中,我們首先創建了一個 SparkSession 對象。然后,我們使用 readStream 方法從套接字源創建了一個 DataFrame。接下來,我們對 DataFrame 進行了一系列操作,包括 flatMap、groupBy 和 count。最后,我們使用 writeStream 方法將結果輸出到控制臺。
Structured Streaming 同樣支持 DSL 和 SQL 語法。
DSL 語法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
SQL 語法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
lines.createOrReplaceTempView("lines")
val wordCounts = spark.sql(
"""
|SELECT value, COUNT(*) as count
|FROM (
| SELECT explode(split(value, ' ')) as value
| FROM lines
|)
|GROUP BY value
""".stripMargin)
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
1.Source
Structured Streaming 支持多種輸入源,包括文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等。下面是一個使用 Scala 語言從 Kafka 中讀取數據的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 訂閱一個主題
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
2.Output
Structured Streaming 支持多種輸出方式,包括控制臺輸出、內存輸出、文件輸出、數據源輸出等。下面是將數據寫入到 Parquet 文件中的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 從 socket 中讀取數據
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 將數據寫入到 Parquet 文件中
lines.writeStream
.format("parquet")
.option("path", "path/to/output/dir")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
3.Output Mode
每當結果表更新時,我們都希望將更改后的結果行寫入外部接收器。
Output mode 指定了數據寫入輸出接收器的方式。Structured Streaming 支持以下三種 output mode:
Output Mode |
描述 |
Append |
只將流 DataFrame/Dataset 中的新行寫入接收器。 |
Complete |
每當有更新時,將流 DataFrame/Dataset 中的所有行寫入接收器。 |
Update |
每當有更新時,只將流 DataFrame/Dataset 中更新的行寫入接收器。 |
4.Output Sink
Output sink 指定了數據寫入的位置。Structured Streaming 支持多種輸出接收器,包括文件接收器、Kafka 接收器、Foreach 接收器、控制臺接收器和內存接收器等。下面是一些使用 Scala 語言將數據寫入到不同輸出接收器中的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 從 socket 中讀取數據
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 將數據寫入到 Parquet 文件中
lines.writeStream
.format("parquet")
.option("path", "path/to/output/dir")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
// 將數據寫入到 Kafka 中
//selectExpr 是一個 DataFrame 的轉換操作,它允許你使用 SQL 表達式來選擇 DataFrame 中的列。
//selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 表示選擇 key 和 value 列,并將它們的類型轉換為字符串類型。
//這是因為 Kafka 接收器要求數據必須是字符串類型或二進制類型。
lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 將數據寫入到控制臺中
lines.writeStream
.format("console")
.start()
// 將數據寫入到內存中
lines.writeStream
.format("memory")
.queryName("tableName")
.start()
5.PV,UV統計
下面是用Structured Streaming實現PV,UV統計的例子,我們來感受實戰下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object PVUVExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("PVUVExample").getOrCreate()
import spark.implicits._
// 假設我們有一個包含用戶ID和訪問的URL的輸入流
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val data = lines.as[String].map(line => {
val parts = line.split(",")
(parts(0), parts(1))
}).toDF("user", "url")
// 計算PV
val pv = data.groupBy("url").count().withColumnRenamed("count", "pv")
val pvQuery = pv.writeStream.outputMode("complete").format("console").start()
// 計算UV
val uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv")
val uvQuery = uv.writeStream.outputMode("complete").format("console").start()
pvQuery.awaitTermination()
uvQuery.awaitTermination()
}
}
這段代碼演示了如何使用Structured Streaming對數據進行PV和UV統計。它首先從一個socket源讀取數據,然后使用groupBy和count對數據進行PV統計,最后使用dropDuplicates、groupBy和count對數據進行UV統計。
假設我們在本地啟動了一個socket服務器,并向其發送以下數據:
user1,http://example.com/page1
user2,http://example.com/page1
user1,http://example.com/page2
user3,http://example.com/page1
user2,http://example.com/page2
user3,http://example.com/page2
那么程序將輸出以下結果:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
| url| pv|
+--------------------+---+
|http://example.co...| 3|
|http://example.co...| 3|
+--------------------+---+
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
| url| uv|
+--------------------+---+
|http://example.co...| 2|
|http://example.co...| 3|
+--------------------+---+
總結
在此,我們對Spark的基本概念、使用方式以及部分原理進行了簡單的介紹。Spark以其強大的處理能力和靈活性,已經成為大數據處理領域的一個重要工具。然而,這只是冰山一角。Spark的世界里還有許多深度和廣度等待著我們去探索。
作為初學者,你可能會覺得這個領域龐大且復雜。但請記住,每個都是從初學者開始的。不斷的學習和實踐,你將能夠更好的理解和掌握Spark,并將其應用于解決實際問題。這篇文章可能不能涵蓋所有的知識點,但我希望它能帶給你收獲和思考。