作者:高聰 王忠民 陳彥萍
來(lái)源:華章科技
大數(shù)據(jù)的生命周期分為數(shù)據(jù)獲取(data acquisition)、數(shù)據(jù)存儲(chǔ)(data storage)、數(shù)據(jù)分析(data analysis)以及結(jié)果(result),并且將前述大數(shù)據(jù)處理的三代技術(shù)中相關(guān)的工具映射至數(shù)據(jù)獲取、數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)分析三個(gè)環(huán)節(jié)來(lái)進(jìn)行分類討論,詳情如表1-2所示。
▲表1-2 大數(shù)據(jù)處理的典型工具
- 在數(shù)據(jù)獲取階段,通常涉及從多源異構(gòu)的數(shù)據(jù)源獲取數(shù)據(jù),這些數(shù)據(jù)源可能是批處理數(shù)據(jù)源,也有可能是實(shí)時(shí)流數(shù)據(jù)源;
- 在數(shù)據(jù)存儲(chǔ)階段,需要對(duì)前一階段已經(jīng)獲取到的數(shù)據(jù)進(jìn)行存儲(chǔ),以便進(jìn)行后續(xù)的分析與處理,常見的存儲(chǔ)方式有磁盤(disk)形式和無(wú)盤(diskless)形式。
- 在數(shù)據(jù)分析階段,針對(duì)不同的應(yīng)用需求,會(huì)運(yùn)用各類模型和算法來(lái)對(duì)數(shù)據(jù)進(jìn)行分析與處理。
在表1-2中,三代技術(shù)中不同的處理階段所涉及的工具存在重疊。此外,對(duì)于混合計(jì)算技術(shù),其本身同時(shí)涉及批處理技術(shù)和實(shí)時(shí)處理技術(shù),實(shí)現(xiàn)混合計(jì)算模型的技術(shù)也要比單純的批處理技術(shù)和實(shí)時(shí)處理技術(shù)更加復(fù)雜;鑒于混合計(jì)算技術(shù)的上述特點(diǎn),這里不對(duì)在數(shù)據(jù)的獲取、存儲(chǔ)與分析方面所涉及的具體工具做特別的劃分。
01 HDFS
Hadoop分布式文件系統(tǒng)(Hadoop Distributed File System,HDFS)目前是Apache Hadoop項(xiàng)目的一個(gè)子項(xiàng)目,與已有的分布式文件系統(tǒng)有很多相似之處。
此外,作為專門針對(duì)商業(yè)化硬件(commodity hardware)設(shè)計(jì)的文件系統(tǒng),HDFS的獨(dú)特之處也很明顯:首先其具有很高的容錯(cuò)性,其次可以部署在較為廉價(jià)的硬件上,最后能夠提供高吞吐量的應(yīng)用數(shù)據(jù)訪問(wèn)能力。
對(duì)于終端用戶而言,HDFS就是一個(gè)傳統(tǒng)的文件系統(tǒng),具有文件和目錄的創(chuàng)建、修改、刪除等常規(guī)操作。
HDFS采用主/從(Master/Slave)體系結(jié)構(gòu)。單個(gè)HDFS集群僅包含一個(gè)名稱節(jié)點(diǎn)(NameNode),其提供元數(shù)據(jù)服務(wù),管理文件系統(tǒng)的命名空間(namespace),并引導(dǎo)用戶對(duì)文件的訪問(wèn)。此外,單個(gè)HDFS集群可以包含多個(gè)數(shù)據(jù)節(jié)點(diǎn)(DataNode),數(shù)據(jù)節(jié)點(diǎn)負(fù)責(zé)管理與自身相關(guān)聯(lián)的存儲(chǔ)空間。
HDFS對(duì)外給出文件系統(tǒng)的命名空間作為用戶對(duì)數(shù)據(jù)進(jìn)行訪存的接口。
在HDFS內(nèi)部,單個(gè)文件通常被分割成多個(gè)塊(block),這些塊存儲(chǔ)在一系列數(shù)據(jù)節(jié)點(diǎn)上。由名稱節(jié)點(diǎn)在整個(gè)HDFS集群的命名空間上執(zhí)行文件和目錄的打開、讀取和關(guān)閉等操作。文件的塊與數(shù)據(jù)節(jié)點(diǎn)之間的映射也是由名稱節(jié)點(diǎn)管理的。數(shù)據(jù)節(jié)點(diǎn)基于名稱節(jié)點(diǎn)的指令來(lái)實(shí)施塊的創(chuàng)建、復(fù)制和刪除等。
02 Sqoop
Sqoop是一個(gè)在Hadoop和關(guān)系數(shù)據(jù)庫(kù)服務(wù)器之間傳送數(shù)據(jù)的工具,方便大量數(shù)據(jù)的導(dǎo)入導(dǎo)出工作,其支持多種類型的數(shù)據(jù)存儲(chǔ)軟件。
Sqoop的核心功能為數(shù)據(jù)的導(dǎo)入和導(dǎo)出。
- 導(dǎo)入數(shù)據(jù):從諸如MySQL、SQL Server和Oracle等關(guān)系數(shù)據(jù)庫(kù)將數(shù)據(jù)導(dǎo)入到Hadoop下的HDFS、Hive和HBase等數(shù)據(jù)存儲(chǔ)系統(tǒng)。
- 導(dǎo)出數(shù)據(jù):從Hadoop的文件系統(tǒng)中將數(shù)據(jù)導(dǎo)出至關(guān)系數(shù)據(jù)庫(kù)。
Sqoop的一個(gè)顯著特點(diǎn)是可以使用MapReduce將數(shù)據(jù)從傳統(tǒng)的關(guān)系數(shù)據(jù)庫(kù)導(dǎo)入到HDFS中。Sqoop作為一個(gè)通用性的工具,只需要在一個(gè)節(jié)點(diǎn)上安裝,因此安裝和使用十分便捷。
03 Flume
Flume是由Hadoop生態(tài)系統(tǒng)中著名的軟件公司Cloudera于2011年發(fā)布,該軟件能夠支持分布式海量日志的采集、集成與傳輸,以實(shí)時(shí)的方式從數(shù)據(jù)發(fā)送方獲取數(shù)據(jù),并傳輸給數(shù)據(jù)接收方。
Flume具有兩個(gè)顯著的特點(diǎn):可靠性和可擴(kuò)展性。
- 針對(duì)可靠性,其提供了從強(qiáng)到弱的三級(jí)保障,即End-to-end、Store on failure和Best effort。
- 針對(duì)可擴(kuò)展性,其采用三層的體系結(jié)構(gòu),即Agent、Collector和Storage,每層都可以在水平方向上進(jìn)行擴(kuò)展。
Flume以Agent的方式運(yùn)行,單個(gè)Agent包含Source、Channel和Sink三個(gè)組件,由Agent對(duì)數(shù)據(jù)進(jìn)行收集,然后交付給存儲(chǔ)機(jī)制。從多個(gè)數(shù)據(jù)源收集到的日志信息依次經(jīng)過(guò)上述三個(gè)組件,然后存入HDFS或HBase中。因此,通過(guò)Flume可以將數(shù)據(jù)便捷地轉(zhuǎn)交給Hadoop體系結(jié)構(gòu)。
04 Scribe
Scribe是由Facebook開發(fā)的分布式日志系統(tǒng),在Facebook內(nèi)部已經(jīng)得到了廣泛的應(yīng)用。Scribe能夠針對(duì)位于不同數(shù)據(jù)源的日志信息進(jìn)行收集,然后存儲(chǔ)至某個(gè)統(tǒng)一的存儲(chǔ)系統(tǒng),這個(gè)存儲(chǔ)系統(tǒng)可以是網(wǎng)絡(luò)文件系統(tǒng)(Network File System,NFS),也可以是分布式文件系統(tǒng)。
Scribe的體系結(jié)構(gòu)由三部分組成:Scribe Agent、Scribe和Storage。
- 第一部分Scribe Agent為用戶提供接口,用戶使用該接口來(lái)發(fā)送數(shù)據(jù)。
- 第二部分Scribe接收由Scribe Agent發(fā)送來(lái)的數(shù)據(jù),根據(jù)各類數(shù)據(jù)所具有的不同topic再次分發(fā)給不同的實(shí)體。
- 第三部分Storage包含多種存儲(chǔ)系統(tǒng)和介質(zhì)。
Scribe的日志收集行為只包括主動(dòng)寫入的日志,Scribe自身沒有主動(dòng)抓取日志的功能。因此,用戶需要主動(dòng)向Scribe Agent發(fā)送相關(guān)的日志信息。
05 HBase
HBase的全稱為Hadoop Database,是基于谷歌BigTable的開源實(shí)現(xiàn),其使用Hadoop體系結(jié)構(gòu)中的HDFS作為基本的文件系統(tǒng)。谷歌根據(jù)BigTable的理念設(shè)計(jì)實(shí)現(xiàn)了谷歌文件系統(tǒng)GFS,但是該方案未開源。HBase可以稱為BigTable的山寨版,是開源的。
HBase在Hadoop體系結(jié)構(gòu)中的位置介于HDFS和MapReduce之間,其架構(gòu)為主/從形式,內(nèi)部的兩個(gè)核心構(gòu)件為Master和RegionServer。
HBase是建立在HDFS之上的分布式面向列的數(shù)據(jù)庫(kù),能夠針對(duì)海量結(jié)構(gòu)化數(shù)據(jù)實(shí)現(xiàn)隨機(jī)的實(shí)時(shí)訪問(wèn),其設(shè)計(jì)理念和運(yùn)行模式都充分利用了HDFS的高容錯(cuò)性。
由于HBase是面向列的,因此它在數(shù)據(jù)庫(kù)的表中是按照行進(jìn)行排序的。在HBase中,所有的存儲(chǔ)內(nèi)容都是字節(jié),任何要存儲(chǔ)的內(nèi)容都需要先轉(zhuǎn)換成字節(jié)流的形式,此外數(shù)據(jù)庫(kù)的行鍵值按照字節(jié)進(jìn)行排序,同時(shí)形成了索引。
06 MapReduce
MapReduce是Hadoop體系結(jié)構(gòu)中極為重要的核心構(gòu)件之一。作為一個(gè)分布式的并行計(jì)算模型,MapReduce包含的兩個(gè)單詞分別具有特定的含義:“Map”表示“映射”;“Reduce”表示“歸約”。上述兩個(gè)概念的基本理念源于函數(shù)式編程語(yǔ)言(functional programming language)。
與傳統(tǒng)的編程語(yǔ)言不同,函數(shù)式編程語(yǔ)言是一類非馮諾依曼式的程序設(shè)計(jì)語(yǔ)言,其編程范式的抽象程度很高,主要由原始函數(shù)、定義函數(shù)和函數(shù)型構(gòu)成。
MapReduce的這種設(shè)計(jì)思想使分布式并行程序設(shè)計(jì)的難度得以簡(jiǎn)化,用戶將已有的代碼稍加修改就能夠運(yùn)行在分布式環(huán)境下。在實(shí)際應(yīng)用場(chǎng)景中,大多數(shù)情況下收集到的大量多源異構(gòu)數(shù)據(jù)都不具有特定的規(guī)律和特征。
MapReduce的工作過(guò)程能夠在一定程度上將上述數(shù)據(jù)按照某種規(guī)律進(jìn)行歸納和總結(jié)。在“Map”階段,通過(guò)指定的映射函數(shù)提取數(shù)據(jù)的特征,得到的結(jié)果的形式為鍵值對(duì) 。在“Reduce”階段,通過(guò)指定的歸約函數(shù)對(duì)“Map”階段得到的結(jié)果進(jìn)行統(tǒng)計(jì)。對(duì)于不同的具體問(wèn)題,所需要的歸約函數(shù)的個(gè)數(shù)可能千差萬(wàn)別。
總體來(lái)說(shuō),MapReduce具有開發(fā)難度低、擴(kuò)展性強(qiáng)和容錯(cuò)性高三個(gè)顯著特點(diǎn)。盡管其分布式并行計(jì)算模型能大幅度提高海量數(shù)據(jù)的處理速度,但受限于大數(shù)據(jù)的規(guī)模,通常MapReduce的作業(yè)例程的執(zhí)行時(shí)間為分鐘級(jí),隨著數(shù)據(jù)量的增加,耗時(shí)若干天也很普遍。
07 Hive
Hive針對(duì)數(shù)據(jù)倉(cāng)庫(kù)來(lái)提供類似SQL語(yǔ)句的查詢功能,其能夠?qū)⒁越Y(jié)構(gòu)化形式存儲(chǔ)的數(shù)據(jù)映射成數(shù)據(jù)庫(kù)表,主要應(yīng)用場(chǎng)景為多維度數(shù)據(jù)分析和海量結(jié)構(gòu)化數(shù)據(jù)離線分析。Hive的體系結(jié)構(gòu)主要包含用戶接口、元數(shù)據(jù)存儲(chǔ)、解釋器、編譯器、優(yōu)化器和執(zhí)行器。
雖然使用MapReduce也能夠?qū)崿F(xiàn)查詢,但是對(duì)于邏輯復(fù)雜度高的查詢,用戶在實(shí)現(xiàn)時(shí)難度較大。Hive提供類似于SQL的語(yǔ)法接口,降低了學(xué)習(xí)成本,提高了開發(fā)效率。
Hive基于SQL的語(yǔ)法來(lái)定義名為HiveQL或HQL的查詢語(yǔ)言,其支持常規(guī)的索引化和基本的數(shù)據(jù)查詢,更重要的是能夠?qū)⒒赟QL的查詢需求轉(zhuǎn)化為MapReduce的作業(yè)例程。
除了自身具有的功能之外,用戶可以在Hive中編寫自定義函數(shù),具體來(lái)說(shuō)分為三種:
- 用戶自定義函數(shù)(User Defined Function,UDF)
- 用戶自定義聚合函數(shù)(User Defined Aggregation Function,UDAF)
- 用戶自定義表生成函數(shù)(User Defined Table-generating Function,UDTF)
08 Pig
Pig是一個(gè)面向過(guò)程的高級(jí)程序設(shè)計(jì)語(yǔ)言,能夠分析大型數(shù)據(jù)集,并將結(jié)果表示為數(shù)據(jù)流,其內(nèi)置了多種數(shù)據(jù)類型,并且支持元組(tuple)、映射(map)和包(package)等范式。
Pig有兩種工作模式:Local模式和MapReduce模式。
- 在Local模式下,Pig的運(yùn)行獨(dú)立于Hadoop體系結(jié)構(gòu),全部操作均在本地進(jìn)行。
- 在MapReduce模式下,Pig使用了Hadoop集群中的分布式文件系統(tǒng)HDFS。
作為一種程序設(shè)計(jì)語(yǔ)言,Pig能夠?qū)?shù)據(jù)進(jìn)行加載、處理,并且存儲(chǔ)獲得的結(jié)果。Pig和Hive均能夠簡(jiǎn)化Hadoop的常見工作任務(wù)。Hive通常應(yīng)用在靜態(tài)數(shù)據(jù)上,處理例行性的分析任務(wù)。
Pig比Hive在規(guī)模上更加輕量,其與SQL的結(jié)合使得用戶能夠使用比Hive更加簡(jiǎn)潔的代碼來(lái)給出解決方案。與MapReduce相比,Pig在接口方面提供了更高層次的抽象,具有更多的數(shù)據(jù)結(jié)構(gòu)類型。此外,Pig還提供了大量的數(shù)據(jù)變換操作,MapReduce在這方面比較薄弱。
09 Cascading
Cascading是用JAVA語(yǔ)言編寫成的開源庫(kù),能夠脫離MapReduce來(lái)完成對(duì)復(fù)雜數(shù)據(jù)工作流的處理。該開源庫(kù)提供的應(yīng)用程序編程接口定義了復(fù)雜的數(shù)據(jù)流以及將這些數(shù)據(jù)流與后端系統(tǒng)集成的規(guī)則。此外,其還定義了將邏輯數(shù)據(jù)流映射至計(jì)算平臺(tái)并進(jìn)行執(zhí)行的規(guī)則。
針對(duì)數(shù)據(jù)的提取、轉(zhuǎn)換和加載(Extract Transform Load,ETL),Cascading提供了6個(gè)基本操作:
- 復(fù)制(copy)
- 過(guò)濾(filter)
- 合并(merge)
- 計(jì)數(shù)(count)
- 平均(average)
- 結(jié)合(join)
初級(jí)的ETL應(yīng)用程序通常涉及數(shù)據(jù)和文件的復(fù)制,以及不良數(shù)據(jù)的過(guò)濾。針對(duì)多種不同數(shù)據(jù)源的輸入文件,需要對(duì)它們進(jìn)行合并。計(jì)數(shù)和平均是對(duì)數(shù)據(jù)和記錄進(jìn)行處理的常用操作。結(jié)合指的是將不同處理分支中的處理結(jié)果按照給定的規(guī)則進(jìn)行結(jié)合。
10 Spark
與Hadoop類似,Spark也是一個(gè)針對(duì)大數(shù)據(jù)的分布式計(jì)算框架。Spark可以用來(lái)構(gòu)建大規(guī)模、低延遲的數(shù)據(jù)處理應(yīng)用程序。
相對(duì)于Hadoop,Spark的顯著特點(diǎn)是能夠在內(nèi)存中進(jìn)行計(jì)算,因此又稱為通用內(nèi)存并行計(jì)算框架,與MapReduce兼容,其主要構(gòu)件包括SparkCore、SparkSQL、SparkStreaming、MLlib、GraphX、BlinkDB和Tachyon。
Hadoop存在磁盤I/O和序列化等性能瓶頸,在Spark的設(shè)計(jì)理念中,選用內(nèi)存來(lái)存儲(chǔ)Hadoop中存儲(chǔ)在HDFS的中間結(jié)果。Spark兼容HDFS,能夠很好地融入Hadoop體系結(jié)構(gòu),被認(rèn)為是MapReduce的替代品。
根據(jù)Spark官方網(wǎng)站的數(shù)據(jù),Spark的批處理速度比MapReduce提升了近10倍,內(nèi)存中的數(shù)據(jù)分析速度則提升了近100倍。
Spark模型所特有的彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset,RDD)使得針對(duì)數(shù)據(jù)的災(zāi)難恢復(fù)在內(nèi)存和磁盤上都可以實(shí)現(xiàn)。
總體來(lái)說(shuō),Spark的編程模型具有以下四個(gè)特點(diǎn):速度(speed)、簡(jiǎn)易(ease of use)、通用(generality)和兼容(runs everywhere)。
- 在速度方面,Spark使用基于有向無(wú)環(huán)圖(Directed Acyclic Graph,DAG)的作業(yè)調(diào)度算法,采用先進(jìn)的查詢優(yōu)化器和物理執(zhí)行器提高了數(shù)據(jù)的批處理和流式處理的性能。
- 在簡(jiǎn)易方面,Spark支持多種高級(jí)算法,用戶可以使用Java、Scala、Python、R和SQL等語(yǔ)言編寫交互式應(yīng)用程序。
- 在通用方面,Spark提供了大量的通用庫(kù),使用這些庫(kù)可以方便地開發(fā)出針對(duì)不同應(yīng)用場(chǎng)景的統(tǒng)一解決方案,極大地降低了研發(fā)與運(yùn)營(yíng)的成本。
- 在兼容方面,Spark本身能夠方便地與現(xiàn)有的各類開源系統(tǒng)無(wú)縫銜接,例如已有的Hadoop體系結(jié)構(gòu)中的HDFS和Hbase。
11 Shark
作為一個(gè)面向大規(guī)模數(shù)據(jù)的數(shù)據(jù)倉(cāng)庫(kù)工具,Shark最初是基于Hive的代碼進(jìn)行開發(fā)的。Hive在執(zhí)行交互查詢時(shí)需要在私有數(shù)據(jù)倉(cāng)庫(kù)上執(zhí)行非常耗時(shí)的ETL操作,為了彌補(bǔ)這個(gè)性能問(wèn)題,Shark成了Hadoop體系結(jié)構(gòu)中的首個(gè)交互式SQL軟件。
Shark支持Hive包含的查詢語(yǔ)言、元存儲(chǔ)、序列化格式以及自定義函數(shù)。后來(lái),Hadoop體系結(jié)構(gòu)中MapReduce本身的結(jié)構(gòu)限制了Shark的發(fā)展,研究者們中止了Shark的研發(fā),啟動(dòng)了Shark SQL這個(gè)新項(xiàng)目。Shark SQL是基于Spark的一個(gè)組件,提供了針對(duì)結(jié)構(gòu)化數(shù)據(jù)的便捷操作,統(tǒng)一了結(jié)構(gòu)化查詢語(yǔ)言與命令式語(yǔ)言。
Shark在Spark的體系結(jié)構(gòu)中提供了和Hive相同的HiveQL編程接口,因此與Hive兼容。通過(guò)Hive的HQL解析,將HQL轉(zhuǎn)換成Spark上的RDD操作。
12 Kafka
Kafka是一個(gè)分布式流處理平臺(tái)(distributed streaming platform),最初由領(lǐng)英公司開發(fā),使用的編程語(yǔ)言是Java和Scala。
Kafka支持分區(qū)(partition)和副本(replica),針對(duì)消息隊(duì)列進(jìn)行處理。消息傳送功能包含連接服務(wù)(connection service)、消息的路由(routing)、傳送(delivery)、持久性(durability)、安全性(security)和日志記錄(log)。
Kafka的主要應(yīng)用程序接口有如下四類:生產(chǎn)者(producer API)、消費(fèi)者(consumer API)、流(stream API)和連接器(connector API)。
Kafka對(duì)外的接口設(shè)計(jì)理念是基于話題(topic)的,消息生成后被寫入話題中,用戶從話題中讀取消息。單個(gè)的話題由多個(gè)分區(qū)構(gòu)成,當(dāng)系統(tǒng)性能下降時(shí),通常的操作是增加分區(qū)的個(gè)數(shù)。
分區(qū)之間的消息互相獨(dú)立,每個(gè)分區(qū)內(nèi)的消息是有序的。新消息的寫入操作在具體實(shí)現(xiàn)中為相應(yīng)文件內(nèi)容的追加操作,該方式具有較強(qiáng)的性能。由于一個(gè)話題可以包含多個(gè)分區(qū),因此Kafka具有高吞吐量、低延遲的特性。
消息隊(duì)列包含兩個(gè)模型:點(diǎn)對(duì)點(diǎn)(point-to-point)和發(fā)布/訂閱(publish/subscribe)。
- 對(duì)于點(diǎn)對(duì)點(diǎn)模型,消息生成后進(jìn)入隊(duì)列,由用戶從隊(duì)列中取出消息并使用。當(dāng)消息被使用后,其生命周期已經(jīng)結(jié)束,即該消息無(wú)法再次被使用。雖然消息隊(duì)列支持多個(gè)用戶,但一個(gè)消息僅能夠被一個(gè)用戶所使用。
- 對(duì)于發(fā)布/訂閱模型,消息生成后其相關(guān)信息會(huì)被發(fā)布到多個(gè)話題中,只要訂閱了相關(guān)話題的用戶就都可以使用該消息。與點(diǎn)對(duì)點(diǎn)模型不同,在發(fā)布/訂閱模型中一個(gè)消息可以被多個(gè)用戶使用。
13 Kestrel
Kestrel是由推特(Twitter)開發(fā)的開源中間件(middleware),使用的編程語(yǔ)言為Scala,其前身是名為Starling的輕量級(jí)分布式隊(duì)列服務(wù)器,同樣Kestrel也具有輕量化的特點(diǎn)。
Starling支持MemCache協(xié)議,其能夠方便地構(gòu)建網(wǎng)絡(luò)訪問(wèn)隊(duì)列。推特早期使用Starling來(lái)處理大量的隊(duì)列消息,后來(lái)推特將基于Ruby語(yǔ)言的Starling項(xiàng)目進(jìn)行重構(gòu),使用Scala語(yǔ)言將其重新實(shí)現(xiàn),得到Kestrel。
在協(xié)議支持性方面,Kestrel支持三類協(xié)議:MemCache、Text和Thrift,其中MemCache協(xié)議沒有完整地實(shí)現(xiàn),僅支持部分操作。Kestrel本身運(yùn)行在Java虛擬機(jī)(Java Virtual machine,JVM)上,針對(duì)Java的各類優(yōu)化措施均可以使用。
為了改善性能,Kestrel中的隊(duì)列存儲(chǔ)在內(nèi)存中,針對(duì)隊(duì)列的操作日志保存在硬盤中。雖然Kestrel本身是輕量化的,但其具有豐富的配置選項(xiàng),能夠很方便地組成集群,集群中的節(jié)點(diǎn)互相之間是透明的,針對(duì)隊(duì)列中消息獲取的GET協(xié)議支持阻塞獲取和可靠獲取。
阻塞獲取是指用戶可以設(shè)置超時(shí)時(shí)間,在時(shí)間內(nèi)有消息的話即刻返回,如果超時(shí)后還沒有消息就結(jié)束等待。可靠獲取是指隊(duì)列服務(wù)器只有在收到用戶明確的確認(rèn)反饋后,才將相關(guān)的消息從隊(duì)列中永久刪除。
如果用戶使用GET操作從隊(duì)列獲取消息后隊(duì)列服務(wù)器馬上將該消息從隊(duì)列中刪除,那么此后需要用戶來(lái)確保該消息不會(huì)異常丟失,這對(duì)網(wǎng)絡(luò)狀態(tài)和系統(tǒng)運(yùn)行的特定環(huán)境要求較為苛刻。因此,用戶可以采用可靠獲取的方式來(lái)消除上述疑慮。
14 Storm
Storm是使用Java和Clojure編寫而成的分布式實(shí)時(shí)處理系統(tǒng),其雛形是由Nathan Marz和BackType構(gòu)建的,BackType是一家社交數(shù)據(jù)分析公司。2011年,推特收購(gòu)BackType,并將Storm開源。
Storm的主要功能是針對(duì)持續(xù)產(chǎn)生的數(shù)據(jù)流進(jìn)行計(jì)算,進(jìn)而彌補(bǔ)了Hadoop體系結(jié)構(gòu)對(duì)實(shí)時(shí)性支持的缺失。Storm的處理速度快,具有良好的可擴(kuò)展性和容錯(cuò)性,其所處理的數(shù)據(jù)位于內(nèi)存中。
用戶在Storm中設(shè)計(jì)的計(jì)算圖稱為拓?fù)?/strong>(topology),拓?fù)渲邪鞴?jié)點(diǎn)和從節(jié)點(diǎn),且以集群的形式呈現(xiàn)。Storm的主/從體系結(jié)構(gòu)是由兩類節(jié)點(diǎn)實(shí)現(xiàn)的:控制節(jié)點(diǎn)(master node)和工作節(jié)點(diǎn)(worker node),調(diào)度相關(guān)的信息以及主從節(jié)點(diǎn)的重要工作數(shù)據(jù)都是由ZooKeeper集群來(lái)負(fù)責(zé)處理的。
- 控制節(jié)點(diǎn)為主節(jié)點(diǎn),其上運(yùn)行的Nimbus進(jìn)程主要負(fù)責(zé)狀態(tài)監(jiān)測(cè)與資源管理,該進(jìn)程維護(hù)和分析Storm的拓?fù)洌瑫r(shí)收集需要執(zhí)行的任務(wù),然后將收集到的任務(wù)指派給可用的工作節(jié)點(diǎn)。
- 工作節(jié)點(diǎn)為從節(jié)點(diǎn),其上運(yùn)行的Supervisor進(jìn)程包含一個(gè)或多個(gè)工作進(jìn)程(worker),工作進(jìn)程根據(jù)所要處理的任務(wù)量來(lái)配置合理數(shù)量的執(zhí)行器(executor)以便執(zhí)行任務(wù)。Supervisor進(jìn)程監(jiān)聽本地節(jié)點(diǎn)的狀態(tài),根據(jù)實(shí)際情況啟動(dòng)或者結(jié)束工作進(jìn)程。
拓?fù)渲械臄?shù)據(jù)在噴嘴(spout)之間傳遞,噴嘴把從外部數(shù)據(jù)源獲取到的數(shù)據(jù)提供給拓?fù)洌虼耸荢torm中流的來(lái)源。數(shù)據(jù)流中數(shù)據(jù)的格式稱為元組(tuple),具體來(lái)說(shuō)為鍵值對(duì)(key-value pair),元組用來(lái)封裝需要處理的實(shí)際數(shù)據(jù)。
針對(duì)數(shù)據(jù)流的計(jì)算邏輯都是在螺栓(bolt)中執(zhí)行的,具體的處理過(guò)程中除了需要指定消息的生成、分發(fā)和連接,其余的都與傳統(tǒng)應(yīng)用程序類似。
15 Trident
Trident是位于Storm已有的實(shí)時(shí)處理環(huán)境之上更高層的抽象構(gòu)件,提供了狀態(tài)流處理和低延遲的分布式查詢功能,其屏蔽了計(jì)算事務(wù)處理和運(yùn)行狀態(tài)管理的細(xì)節(jié)。此外,還針對(duì)數(shù)據(jù)庫(kù)增加了更新操作的原語(yǔ)。
在Trident中,數(shù)據(jù)流的處理按照批次進(jìn)行,即所謂的事務(wù)。一般來(lái)說(shuō),對(duì)于不同的數(shù)據(jù)源,每個(gè)批次的數(shù)據(jù)量的規(guī)模可達(dá)數(shù)百萬(wàn)個(gè)元組。一個(gè)處理批次稱為一個(gè)事務(wù),當(dāng)所有處理完成之后,認(rèn)為該事務(wù)成功結(jié)束;當(dāng)事務(wù)中的一個(gè)或者多個(gè)元組處理失敗時(shí),整個(gè)事務(wù)需要回滾(rollback),然后重新提交。
Trident的事務(wù)控制包含三個(gè)層次:非事務(wù)控制(non-transactional)、嚴(yán)格的事務(wù)控制(transactional)和不透明的事務(wù)控制(opaque-transactional)。
- 對(duì)于非事務(wù)控制,單個(gè)批次內(nèi)的元組處理可以出現(xiàn)部分處理成功的情況,處理失敗的元組可以在其他批次進(jìn)行重試。
- 對(duì)于嚴(yán)格的事務(wù)控制,單個(gè)批次內(nèi)處理失敗的元組只能在該批次內(nèi)進(jìn)行重試,如果失敗的元組一直無(wú)法成功處理,那么進(jìn)程掛起,即不包含容錯(cuò)機(jī)制。
- 對(duì)于不透明的事務(wù)控制,單個(gè)批次內(nèi)處理失敗的元組可以在其他批次內(nèi)重試一次,其容錯(cuò)機(jī)制規(guī)定重試操作有且僅有一次。
上述針對(duì)消息的可靠性保障機(jī)制使得數(shù)據(jù)的處理有且僅有一次,保證了事務(wù)數(shù)據(jù)的持久性。容錯(cuò)機(jī)制使得失敗的元組在重試環(huán)節(jié)的狀態(tài)更新是冪等的,冪等性是統(tǒng)計(jì)學(xué)中的一個(gè)重要性能指標(biāo),其保證了即使數(shù)據(jù)被多次處理,從處理結(jié)果的角度來(lái)看和處理一次是相同的。
Trident的出現(xiàn)顯著減少了編寫基于Storm的應(yīng)用程序的代碼量,其本身具有函數(shù)、過(guò)濾器、連接、分組和聚合功能。在組件方面,它保留了Spout,將Bolt組件中實(shí)現(xiàn)的處理邏輯映射為一些新的具體操作,例如過(guò)濾、函數(shù)和分組統(tǒng)計(jì)等。
數(shù)據(jù)的狀態(tài)可以保存在拓?fù)鋬?nèi)部存儲(chǔ)當(dāng)中(例如內(nèi)存),也可以保存在外部存儲(chǔ)當(dāng)中(例如磁盤),Trident的應(yīng)用程序接口支持這兩種機(jī)制。
16 S4
S4項(xiàng)目是由雅虎(Yahoo)提出的,作為一個(gè)分布式流處理計(jì)算引擎,其設(shè)計(jì)的初衷是與按點(diǎn)擊數(shù)付費(fèi)的廣告結(jié)合,基于實(shí)時(shí)的計(jì)算來(lái)評(píng)估潛在用戶是否可能對(duì)廣告進(jìn)行點(diǎn)擊。
這里S4是指簡(jiǎn)單的(Simple)、可擴(kuò)展的(Scalable)、流(Streaming)以及系統(tǒng)(System)。在S4項(xiàng)目提出之前,雅虎已經(jīng)擁有了Hadoop,但Hadoop的基本理念是批處理,即利用MapReduce對(duì)已經(jīng)過(guò)存儲(chǔ)的靜態(tài)數(shù)據(jù)進(jìn)行處理。盡管MapReduce的處理速度非常快,但是從本質(zhì)上說(shuō),其無(wú)法處理流數(shù)據(jù)。
S4項(xiàng)目將流數(shù)據(jù)看作事件,其具體的實(shí)現(xiàn)中包含五個(gè)重要構(gòu)件:處理節(jié)點(diǎn)(processing element)、事件(event)、處理節(jié)點(diǎn)容器(Processing Element Container,PEC)、機(jī)器節(jié)點(diǎn)(node)和機(jī)器節(jié)點(diǎn)集群(cluster)。
一個(gè)集群中包含多個(gè)機(jī)器節(jié)點(diǎn),一個(gè)機(jī)器節(jié)點(diǎn)中包含一個(gè)處理節(jié)點(diǎn)容器,一個(gè)處理節(jié)點(diǎn)容器中包含多個(gè)處理節(jié)點(diǎn)。處理節(jié)點(diǎn)對(duì)事件進(jìn)行處理,處理結(jié)果作為新的事件,其能夠被其他處理節(jié)點(diǎn)處理。上述的點(diǎn)擊付費(fèi)廣告的應(yīng)用場(chǎng)景具有很高的實(shí)時(shí)性要求,而Hadoop無(wú)法很好地應(yīng)對(duì)這樣的要求。
具體來(lái)說(shuō),MapReduce所處理的數(shù)據(jù)是保存在分布式文件系統(tǒng)上的,在執(zhí)行數(shù)據(jù)處理任務(wù)之前,MapReduce有一個(gè)數(shù)據(jù)準(zhǔn)備的過(guò)程,需要處理的數(shù)據(jù)會(huì)按照分塊依次進(jìn)行運(yùn)算,不同的數(shù)據(jù)分塊大小可以對(duì)所謂的實(shí)時(shí)性進(jìn)行調(diào)節(jié)。
當(dāng)數(shù)據(jù)塊較小時(shí),可以獲得一定的低延遲性,但是數(shù)據(jù)準(zhǔn)備的過(guò)程就會(huì)變得很長(zhǎng);當(dāng)數(shù)據(jù)塊較大時(shí),數(shù)據(jù)處理的過(guò)程無(wú)法實(shí)現(xiàn)較低的延遲性。諸如S4的流計(jì)算系統(tǒng)所處理的數(shù)據(jù)是實(shí)時(shí)的流數(shù)據(jù),即數(shù)據(jù)源源不斷地從外部數(shù)據(jù)源到達(dá)處理系統(tǒng)。
流計(jì)算處理系統(tǒng)的主要目標(biāo)是在保證給定的準(zhǔn)確度和精確性的前提下以最快的速度完成數(shù)據(jù)的處理。如果流數(shù)據(jù)不能夠被及時(shí)處理,那么其潛在的價(jià)值就會(huì)大打折扣,隨著處理時(shí)間的增長(zhǎng),流數(shù)據(jù)的潛在價(jià)值保持遞減。軟件開發(fā)者能夠根據(jù)不同的場(chǎng)景和需求在S4的上層開發(fā)處理流數(shù)據(jù)的應(yīng)用程序。
17 Spark Streaming
作為Spark的組成部分,Spark Streaming主要針對(duì)流計(jì)算任務(wù),其能夠與Spark的其他構(gòu)件很好地進(jìn)行協(xié)作。
一般來(lái)說(shuō),大數(shù)據(jù)的處理有兩類方式:批處理和流計(jì)算。
- 對(duì)于批處理,任務(wù)執(zhí)行的對(duì)象是預(yù)先保存好的數(shù)據(jù),其任務(wù)頻率可以是每小時(shí)一次,每十小時(shí)一次,也可以是每二十四小時(shí)一次。批處理的典型工具有Spark和MapReduce。
- 對(duì)于流處理,任務(wù)執(zhí)行的對(duì)象是實(shí)時(shí)到達(dá)的、源源不斷的數(shù)據(jù)流。換言之,只要有數(shù)據(jù)到達(dá),那么就一直保持處理。流處理的典型工具有Kafka和Storm。
作為Spark基礎(chǔ)應(yīng)用程序接口的擴(kuò)展,Spark Streaming能夠從眾多第三方應(yīng)用程序獲得數(shù)據(jù),例如Kafka、Flume和Kinesis等。在Spark Streaming中,數(shù)據(jù)的抽象表示是以離散化的形式組織的,即DStreams。DStreams可以用來(lái)表示連續(xù)的數(shù)據(jù)流。
在Spark Streaming的內(nèi)部,DStreams是由若干連續(xù)的彈性數(shù)據(jù)集(Resilient Distributed Dataset,RDD)構(gòu)成的,每個(gè)彈性數(shù)據(jù)集中包含的數(shù)據(jù)都是來(lái)源于確定時(shí)間間隔。Spark Streaming的數(shù)據(jù)處理模式是對(duì)確定時(shí)間間隔內(nèi)的數(shù)據(jù)進(jìn)行批處理。
由于部分中間結(jié)果需要在外存中進(jìn)行存儲(chǔ),因此傳統(tǒng)的批處理系統(tǒng)一般運(yùn)行起來(lái)較為緩慢,但是這樣的處理模式可以具有很高的容錯(cuò)性。
Spark Streaming的數(shù)據(jù)處理模式是基于彈性數(shù)據(jù)集進(jìn)行的,通常將絕大部分中間結(jié)果保存在內(nèi)存中,可以根據(jù)彈性數(shù)據(jù)集之間的互相依賴關(guān)系進(jìn)行高速運(yùn)算。這樣的處理模式也被稱為微批次處理架構(gòu),具體的特點(diǎn)是數(shù)據(jù)處理的粒度較為粗糙,針對(duì)每個(gè)選定的彈性數(shù)據(jù)集進(jìn)行處理,對(duì)于批次內(nèi)包含的數(shù)據(jù)無(wú)法實(shí)現(xiàn)進(jìn)一步的細(xì)分。
18 Lambdoop
2013年,項(xiàng)目負(fù)責(zé)人Rubén Casado在巴塞羅那的NoSQL Matters大會(huì)上發(fā)布了Lambdoop框架。Lambdoop是一個(gè)結(jié)合了實(shí)時(shí)處理和批處理的大數(shù)據(jù)應(yīng)用程序開發(fā)框架,其基于Java語(yǔ)言。
Lambdoop中可供選擇的處理范式(processing paradigm)有三種:非實(shí)時(shí)批處理、實(shí)時(shí)流處理和混合計(jì)算模型。
Lambdoop實(shí)現(xiàn)了一個(gè)基于Lambda的體系結(jié)構(gòu),該結(jié)構(gòu)為軟件開發(fā)者提供了一個(gè)抽象層(abstraction layer),使用與Lambda架構(gòu)類似的方式來(lái)開發(fā)大數(shù)據(jù)相關(guān)的應(yīng)用程序。
對(duì)于使用Lambdoop應(yīng)用程序開發(fā)框架的用戶,軟件開發(fā)者在應(yīng)用程序的開發(fā)過(guò)程中不需要處理不同技術(shù)、參數(shù)配置和數(shù)據(jù)格式等煩瑣的細(xì)節(jié)問(wèn)題,只需要使用必需的應(yīng)用程序接口。
此外,Lambdoop還提供了輔助的軟件工具,例如輸入/輸出驅(qū)動(dòng)、數(shù)據(jù)可視化接口、聚類管理工具以及大量人工智能算法的具體實(shí)現(xiàn)。大多數(shù)已有的大數(shù)據(jù)處理技術(shù)關(guān)注于海量靜態(tài)數(shù)據(jù)的管理,例如前述的Hadoop、Hive和Pig等。此外,學(xué)界和業(yè)界也對(duì)動(dòng)態(tài)數(shù)據(jù)的實(shí)時(shí)處理較為關(guān)注,典型的應(yīng)用軟件有前述的Storm和S4。
由于針對(duì)海量靜態(tài)數(shù)據(jù)的批處理能夠考慮到更多相關(guān)信息,因此相應(yīng)的處理結(jié)果具有更高的可靠性和健壯性,例如訓(xùn)練出更加精確的預(yù)測(cè)模型。遺憾的是,絕大多數(shù)批處理過(guò)程耗時(shí)較長(zhǎng),在對(duì)響應(yīng)時(shí)間要求較高的應(yīng)用領(lǐng)域,批處理是不可行的。
從理論上來(lái)說(shuō),實(shí)時(shí)處理能夠解決上述問(wèn)題,但實(shí)時(shí)處理有一個(gè)重大的缺陷:由于需要保證較小的延遲,實(shí)時(shí)處理所分析的數(shù)據(jù)量是十分有限的。在實(shí)際的生產(chǎn)環(huán)境中,通常需要實(shí)時(shí)處理和批處理兩種方式各自具有的優(yōu)點(diǎn),這對(duì)軟件開發(fā)者來(lái)說(shuō)是一個(gè)挑戰(zhàn)性的難題,同時(shí)這也是Lambdoop的設(shè)計(jì)初衷。
19 SummingBird
SummingBird是由推特于2013年開源的數(shù)據(jù)分析工具,大數(shù)據(jù)時(shí)代的數(shù)據(jù)處理分為批處理和實(shí)時(shí)處理兩大領(lǐng)域,這兩種方式各有利弊,僅采用一種處理方式無(wú)法滿足各類應(yīng)用日益多樣化的需求。
作為能夠處理大規(guī)模數(shù)據(jù)的應(yīng)用軟件,SummingBird的設(shè)計(jì)初衷是將上述兩種處理方式結(jié)合起來(lái),最大限度地獲得批處理技術(shù)提供的容錯(cuò)性和實(shí)時(shí)處理技術(shù)提供的實(shí)時(shí)性,其支持批處理模式(基于Hadoop/MapReduce)、流處理模式(基于Storm)以及混合模式。SummingBird最大的特點(diǎn)是無(wú)縫融合了批處理和流處理。
推特通過(guò)SummingBird整合批處理和流處理來(lái)降低在處理模式之間轉(zhuǎn)換帶來(lái)的開銷,提供近乎原生Scala和Java的方式來(lái)執(zhí)行MapReduce任務(wù)。
SummingBird作業(yè)流程包含兩種形式的數(shù)據(jù):流(stream)和快照(snapshot),前者記錄了數(shù)據(jù)處理的全部歷史,后者為作業(yè)系統(tǒng)在單個(gè)時(shí)間戳上的快照。
簡(jiǎn)單地說(shuō),SummingBird可以認(rèn)為是Hadoop和Storm的結(jié)合,具體包含以下構(gòu)件:
- Producer,即數(shù)據(jù)的抽象,傳遞給指定的平臺(tái)做MapReduce流編譯;
- Platform,即平臺(tái)的實(shí)例,由MapReduce庫(kù)實(shí)現(xiàn),SummingBird提供了平臺(tái)對(duì)Storm和相關(guān)內(nèi)存處理的支持;
- Source,即數(shù)據(jù)源;
- Store,即包含所有鍵值對(duì)的快照;
- Sink,即能夠生成包含Producer具體數(shù)值的非聚合流,Sink是流,不是快照;
- Service,即供用戶在Producer流中的當(dāng)前數(shù)值上執(zhí)行查找合并(lookup join)和左端合并(left join)的操作,合并的連接值可以為其他Store的快照、其他Sink的流和其他異步功能提供的快照或者流;
- Plan,由Platform生成,是MapReduce流的最終實(shí)現(xiàn)。對(duì)于Storm來(lái)說(shuō)Plan是StormTopology的實(shí)例,對(duì)于Memory來(lái)說(shuō)Plan是內(nèi)存中的stream。
關(guān)于作者:高聰,男,1985年11月生,西安電子科技大學(xué)計(jì)算機(jī)科學(xué)與技術(shù)專業(yè)學(xué)士,計(jì)算機(jī)系統(tǒng)結(jié)構(gòu)專業(yè)碩士、博士。自2015年12月至今,在西安郵電大學(xué)計(jì)算機(jī)學(xué)院任教,主要研究方向:數(shù)據(jù)感知與融合、邊緣計(jì)算和無(wú)線傳感器網(wǎng)絡(luò)。
本文摘編自《工業(yè)大數(shù)據(jù)融合體系結(jié)構(gòu)與關(guān)鍵技術(shù)》,經(jīng)出版方授權(quán)發(fā)布。