關于Hadoop平臺,網上有很多的資料,但是比較零碎,為了方便大家對這個平臺有著充分的了解,筆者在此系統的介紹一下這個平臺。
1、什么是Hadoop?
(1)Hadoop是一個開源的框架,可編寫和運行分布式應用處理大規模數據,是專為離線和大規模數據分析而設計的,并不適合那種對幾個記錄隨機讀寫的在線事務處理模式。Hadoop=HDFS(文件系統,數據存儲技術相關)+ Mapreduce(數據處理),Hadoop的數據來源可以是任何形式,在處理半結構化和非結構化數據上與關系型數據庫相比有更好的性能,具有更靈活的處理能力,不管任何數據形式最終會轉化為key/value,key/value是基本數據單元。用函數式變成Mapreduce代替SQL,SQL是查詢語句,而Mapreduce則是使用腳本和代碼,而對于適用于關系型數據庫,習慣SQL的Hadoop有開源工具hive代替。
(2)Hadoop就是一個分布式計算的解決方案.
hadoop能做什么?
hadoop擅長日志分析,facebook就用Hive來進行日志分析,2009年時facebook就有非編程人員的30%的人使用HiveQL進行數據分析;淘寶搜索中的自定義篩選也使用的Hive;利用Pig還可以做高級的數據處理,包括Twitter、LinkedIn上用于發現您可能認識的人,可以實現類似Amazon.com的協同過濾的推薦效果。淘寶的商品推薦也是!在Yahoo!的40%的Hadoop作業是用pig運行的,包括垃圾郵件的識別和過濾,還有用戶特征建模。(2012年8月25新更新,天貓的推薦系統是hive,少量嘗試mahout!)
下面舉例說明:
設想一下這樣的應用場景. 我有一個100M 的數據庫備份的sql 文件.我現在想在不導入到數據庫的情況下直接用grep操作通過正則過濾出我想要的內容。例如:某個表中 含有相同關鍵字的記錄那么有幾種方式,一種是直接用linux的命令 grep 還有一種就是通過編程來讀取文件,然后對每行數據進行正則匹配得到結果好了 現在是100M 的數據庫備份.上述兩種方法都可以輕松應對.
那么如果是1G , 1T 甚至 1PB 的數據呢 ,上面2種方法還能行得通嗎? 答案是不能.畢竟單臺服務器的性能總有其上限.那么對于這種 超大數據文件怎么得到我們想要的結果呢?
有種方法 就是分布式計算, 分布式計算的核心就在于 利用分布式算法把運行在單臺機器上的程序擴展到多臺機器上并行運行.從而使數據處理能力成倍增加.但是這種分布式計算一般對編程人員要求很高,而且對服務器也有要求.導致了成本變得非常高.
Haddop 就是為了解決這個問題誕生的.Haddop 可以很輕易的把 很多linux的廉價pc 組成 分布式結點,然后編程人員也不需要知道分布式算法之類,只需要根據mapreduce的規則定義好接口方法,剩下的就交給Haddop. 它會自動把相關的計算分布到各個結點上去,然后得出結果.
例如上述的例子 : Hadoop 要做的事 首先把 1PB的數據文件導入到 HDFS中, 然后編程人員定義好 map和reduce, 也就是把文件的行定義為key,每行的內容定義為value , 然后進行正則匹配,匹配成功則把結果 通過reduce聚合起來返回.Hadoop 就會把這個程序分布到N 個結點去并行的操作.
那么原本可能需要計算好幾天,在有了足夠多的結點之后就可以把時間縮小到幾小時之內.
這也就是所謂的 大數據云計算了.如果還是不懂的話再舉個簡單的例子
比如1億個1 相加 得出計算結果, 我們很輕易知道結果是 1億.但是計算機不知道.那么單臺計算機處理的方式做一個一億次的循環每次結果+1
那么分布式的處理方式則變成 我用 1萬臺 計算機,每個計算機只需要計算 1萬個 1 相加 然后再有一臺計算機把 1萬臺計算機得到的結果再相加
從而得到最后的結果.
理論上講, 計算速度就提高了 1萬倍. 當然上面可能是一個不恰當的例子.但所謂分布式,大數據,云計算 大抵也就是這么回事了。
2、基本工作原理
Hadoop核心
Hadoop的核心就是HDFS和MapReduce,而兩者只是理論基礎,不是具體可使用的高級應用,Hadoop旗下有很多經典子項目,比如HBase、Hive等,這些都是基于HDFS和MapReduce發展出來的。要想了解Hadoop,就必須知道HDFS和MapReduce是什么。
HDFS
HDFS(Hadoop Distributed File System,Hadoop分布式文件系統),它是一個高度容錯性的系統,適合部署在廉價的機器上。HDFS能提供高吞吐量的數據訪問,適合那些有著超大數據集(large data set)的應用程序。
HDFS的設計特點是:
1、大數據文件,非常適合上T級別的大文件或者一堆大數據文件的存儲,如果文件只有幾個G甚至更小就沒啥意思了。
2、文件分塊存儲,HDFS會將一個完整的大文件平均分塊存儲到不同計算器上,它的意義在于讀取文件時可以同時從多個主機取不同區塊的文件,多主機讀取比單主機讀取效率要高得多得都。
3、流式數據訪問,一次寫入多次讀寫,這種模式跟傳統文件不同,它不支持動態改變文件內容,而是要求讓文件一次寫入就不做變化,要變化也只能在文件末添加內容。
4、廉價硬件,HDFS可以應用在普通PC機上,這種機制能夠讓給一些公司用幾十臺廉價的計算機就可以撐起一個大數據集群。
5、硬件故障,HDFS認為所有計算機都可能會出問題,為了防止某個主機失效讀取不到該主機的塊文件,它將同一個文件塊副本分配到其它某幾個主機上,如果其中一臺主機失效,可以迅速找另一塊副本取文件。
HDFS的關鍵元素:
Block:將一個文件進行分塊,通常是64M。
NameNode:保存整個文件系統的目錄信息、文件信息及分塊信息,這是由唯一一臺主機專門保存,當然這臺主機如果出錯,NameNode就失效了。在Hadoop2.*開始支持activity-standy模式----如果主NameNode失效,啟動備用主機運行NameNode。
DataNode:分布在廉價的計算機上,用于存儲Block塊文件。
MapReduce
通俗說MapReduce是一套從海量·源數據提取分析元素最后返回結果集的編程模型,將文件分布式存儲到硬盤是第一步,而從海量數據中提取分析我們需要的內容就是MapReduce做的事了。
下面以一個計算海量數據最大值為例:一個銀行有上億儲戶,銀行希望找到存儲金額最高的金額是多少,按照傳統的計算方式,我們會這樣:
JAVA代碼
Long moneys[] ...
Long max = 0L;
for(int i=0;i<moneys.length;i++){
if(moneys[i]>max){
max = moneys[i];
}
}
如果計算的數組長度少的話,這樣實現是不會有問題的,還是面對海量數據的時候就會有問題。
MapReduce會這樣做:首先數字是分布存儲在不同塊中的,以某幾個塊為一個Map,計算出Map中最大的值,然后將每個Map中的最大值做Reduce操作,Reduce再取最大值給用戶。
MapReduce的基本原理就是:將大的數據分析分成小塊逐個分析,最后再將提取出來的數據匯總分析,最終獲得我們想要的內容。當然怎么分塊分析,怎么做Reduce操作非常復雜,Hadoop已經提供了數據分析的實現,我們只需要編寫簡單的需求命令即可達成我們想要的數據。
總結
總的來說Hadoop適合應用于大數據存儲和大數據分析的應用,適合于服務器幾千臺到幾萬臺的集群運行,支持PB級的存儲容量。
Hadoop典型應用有:搜索、日志處理、推薦系統、數據分析、視頻圖像分析、數據保存等。
但要知道,Hadoop的使用范圍遠小于SQL或Python之類的腳本語言,所以不要盲目使用Hadoop,看完這篇試讀文章,我知道Hadoop不適用于我們的項目。不過Hadoop作為大數據的熱門詞,我覺得一個狂熱的編程愛好者值得去學習了解,或許你下一個歸宿就需要Hadoop人才,不是嗎。
3、下載&安裝
Hadoop基于Java的,因此,必須先裝JDK,如何安裝,自行查找教程。
1)下載
http://hadoop.Apache.org/ hadoop官網
點擊download找到最新版軟件進行下載:https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.1.2/hadoop-3.1.2.tar.gz
上面的連接可能版本不全,最全的版本在這https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/,清華大學的下載源。
選擇鏡像站點進行下載,大概300多M這樣。
解壓到某一路徑,不含中文以及特殊符號:如 D:1_Program_File,解壓過程可能報某些錯:
無視之。
三、配置環境變量
添加HADOOP_HOME配置:自己安裝hadoop路徑,我的是D:hadoop-3.0.3
在Path中添加如下:自己安裝hadoop路徑/bin,如:D:/hadoop-3.0.3/bin
四、hadoop需要jdk支持,jdk路徑不能有空格,如有空格,可以這樣,如:”D:Program Files"Javajdk1.8.0_25
五、hadoop路徑下創建data用于數據存儲,再在data下創建datanode目錄和namenode目錄
六、hadoop配置
四個hadoop路徑/etc/hadoop/core-site.xml,etc/hadoop/mapred-site.xml,etc/hadoop/hdfs-site.xml,etc/hadoop/yarn-site.xml
1.core-site.xml
<configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration> myeclipse上配置hadoop時,localhost需寫成自己的IP
2.mapred-site.xml
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
3.hdfs-site.xml(先創建路徑data/snn、data/namenode、data/datanode)
<configuration> <!-- 這個參數設置為1,因為是單機版hadoop --> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>/D:/1_Program_File/hadoop-3.0.2/data/namenode</value> </property> <property> <name>fs.checkpoint.dir</name> <value>/D:/1_Program_File/hadoop-3.0.2/data/snn</value> </property> <property> <name>fs.checkpoint.edits.dir</name> <value>/D:/1_Program_File/hadoop-3.0.2/data/snn</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/D:/1_Program_File/hadoop-3.0.2/data/datanode</value> </property> </configuration>
4.yarn-site.xml
<configuration> <!-- Site specific YARN configuration properties --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> </configuration>
七、修改D:/hadoop-3.0.3/etc/hadoop/hadoop-env.cmd配置,找到"set JAVA_HOME=%JAVA_HOME%"替換為"set JAVA_HOME="D:Program Files"Javajdk1.8.0_25"
八、winutils中對應的hadoop版本中的bin替換自己hadoop安裝目錄下的bin
找到對應的版本下的bin替換hadoop中的bin,配置完成!
九、啟動服務
1.cmd中,D:hadoop-3.0.3bin> hdfs namenode -format
執行后,data下的namenode和datanode下會有current等文件,我當時安裝的是hadoop3.1.1,用的winutils中的hadoop3.0.0,datanode總是沒有啟動沒有數據,換成hadoop3.0.3,使用wintuils的hadoop3.0.0后,就可以了。
2.D:hadoop-3.0.3sbin啟動start-all.cmd服務,會看到
Hadoop Namenode
Hadoop datanode
YARN Resourc Manager
如果報錯:
2019-08-03 11:54:23,239 ERROR namenode.NameNode: Failed to start namenode. java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$windows.a ccess0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:6 06) at org.apache.hadoop.fs.FileUtil.canWrite(FileUtil.java:971) at org.apache.hadoop.hdfs.server.common.Storage$StorageDirectory.analyze Storage(Storage.java:614) at org.apache.hadoop.hdfs.server.common.Storage$StorageDirectory.analyze Storage(Storage.java:574)
1、系統環境變量配置HADOOP_HOME ,并且添加進path 變量里;
2、HADOOP_HOMEbin 里是否有hadoop.dll 和 winutils.exe 這兩個文件
3、C: windowsSystem32 里是否有hadoop.dll 文件 ,記得重啟電腦噢!!!
如果出現其他錯誤比如version %1等,請確認winutils的是32位還是64位版本,是否與你電腦匹配。
十、HDFS應用
1、通過http://127.0.0.1:8088/即可查看集群所有節點狀態:
2、訪問http://localhost:9870/即可查看文件管理頁面:
a.進入文件系統
b.創建目錄
c.上傳成功
注:在之前的版本中文件管理的端口是50070,在3.0.0中替換為了9870端口
d.使用hadoop命令進行文件操作
mkdir命令創建目錄:hadoop fs -mkdir hdfs://ip:9000/user
put命令上傳文件:hadoop fs -put D:/a.txt hdfs://ip:9000/user/
ls命令查看指定目錄文件列表:hadoop fs -ls hdfs://ip:9000/user/
4、第一個程序
值得注意的是,配置的時候,需要給Hadoop權限才能正確執行。最簡單的辦法就是講hadoop以及其目錄下所有文件都歸在一個組中。
chown -R hadoop:hadoop hadoop文件夾
就可以了。
配置完成之后,我們我們還需要什么?
1.需要在HDFS中保存有文件。
2.需要一個程序jar包,我們前面說過,JobTracker接收jar包就會分解job為mapTask和reduceTask。mapTask會讀取HDFS中的文件來執行。
我們來看目標。
我們輸入兩個文件,file1和file2。交給hadoop執行之后,會返回file1和file2文件中的單詞的計數。
我們說過,hadoop返回的是<key,value>的鍵值對的形式。
所以結果如下:也就是把單詞以及單詞的個數返回
school 1
hello 3
world 2
...
所以我們首先創建兩個文件:
file1和file2。
隨便填點東西在里面,文件中的內容是用來計數。單詞之間用空格分隔,當然這是不一定的,如何區分單詞是在后面jar包中的map程序中分辨的。
我們寫好了這兩個文件之后,要將文件提交到HDFS中。如何提交呢?
提交之前,首先要確保hadoop已經運行起來了,查看jps可以看到hadoop的進程。
首先我們在hadoop的HDFS中創建一個文件夾。打開cmd,輸入
hdfs dfs -mkdir /test
這樣就可以在HDFS根目錄下創建一個input_wordcount的文件夾。
其實Hadoop的HDFS命令行非常接近Shell,只需要使用hdfs dfs -后面寫上shell命令就可以對應執操作HDFS文件系統了。例如:
hdfs dfs -ls /
查看根目錄下的文件。
創建文件夾之后,我們就可以提交我們寫的兩個file文件。
hdfs dfs -put input/* /test
如果報錯:
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /input/file1.txt._COPYING_ could only be written to 0 of the 1 minReplication nodes. There a 0 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2099) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:287) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2658) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:866) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:550) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:523)
使用命令查看報告:hadoop dfsadmin -report
發現磁盤都是空的,這個問題一般是由于使用hadoop namenode -format 格式化多次,導致spaceID不一致造成的,這語句可不能隨便執行的,解決方法如下[1]:
stop-all.cmd hdfs namenode -format start-all.cmd
發現還是無法解決,然后看到日志窗口有如下異常:
java.io.IOException: Incompatible clusterIDs in D:tmphadoop-Administratordfs data: namenode clusterID = CID-636ec898-037d-4196-b096-3f53e7d172fb; datanode cl usterID = CID-9031b022-9e2e-4e46-9b46-855159d45f53 at org.apache.hadoop.hdfs.server.datanode.DataStorage.doTransition(DataS torage.java:719) at org.apache.hadoop.hdfs.server.datanode.DataStorage.loadStorageDirecto ry(DataStorage.java:284) at org.apache.hadoop.hdfs.server.datanode.DataStorage.loadDataStorage(Da taStorage.java:397) at org.apache.hadoop.hdfs.server.datanode.DataStorage.addStorageLocation
于是刪掉 D:tmphadoop-Administrator下面的文件,重新執行方法[1]。問題還是沒解決,查看data目錄下面,竟然沒有datanode文件夾!最后發現etc下面的文件沒有按照上述流程配置好,將沒配置好的xml文件按照上述流程配置好之后,清空data目錄以及D:tmphadoop-Administrator,然后重新執行方法[1],問題解決:
這里我兩個file文件都放在test目錄下,所以直接使用正則表達式都提交上去即可,提交到根目錄文件夾下。然后我們查看根目錄,查看是否提交完成。
D:1_Program_Filehadoop-3.0.2sbin>hdfs dfs -ls / The filename, directory name, or volume label syntax is incorrect. Found 3 items -rw-r--r-- 1 Administrator supergroup 23 2019-08-03 13:08 /file1.txt -rw-r--r-- 1 Administrator supergroup 28 2019-08-03 13:08 /file2.txt drwxr-xr-x - Administrator supergroup 0 2019-08-03 13:10 /test
提交成功了。第一個要求完成了,接下來我們就需要一個程序jar包。
打開IDE或者myeclipse工具。創建一個java程序,我在這里創建一個maven項目。
首先我們需要導入依賴包:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hadoop.demo</groupId> <artifactId>HadoopDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0</version> </dependency> </dependencies> <!-- fastjson --> </project>
然后我們創建一個WordCount類。
在這個類里,首先我們要創建一個Map方法,需要繼承MApper類:
public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one); } } }
Mapper<LongWritable, Text, Text, IntWritable>是什么意思呢?
前面兩個類參數是輸入,后面兩個是輸出。
也就是WordCOuntMap方法接收LongWritable,Text的參數,返回<Text, IntWriatable>鍵值對。
需要重寫map方法,可以看到Context對象即為返回結果,內部其實是<Text, IntWriatable>鍵值對。
這里需要注意的是,value的值,value默認是一行數據,你文件中有多少行,map函數就會被調用多少次。
這我們就看懂了吧,首先拿到一行的數據,使用StringTokenizer根據空格分割字符串,得到token。遍歷token并寫入context中返回即可。
然后我們需要編寫reduce方法:同樣的,reduce方法繼承reduce類。
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } }
wordCountReduce方法接收<Text, IntWritable>鍵值對,將鍵值對組合起來,結果寫入另外一個鍵值對中,返回即可。
其中最重要是重寫reduce方法,同樣的context也是返回的結果。
這里需要注意的是,reduce方法是什么時候調用的呢?是在所有mapTask都被執行完成之后,reduceTask啟動了才調用。
所有reduce方法中接收到的是所有map返回的參數。所以我們簡單的求和寫入context中就可以了。
最后我們編寫main方法作為入口,調用兩個函數。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
這里我們主要是告訴JobTracker,告訴他去調用什么就可以了。
類都編寫好了之后`,我們需要的是jar包,所以我們將程序打包為jar包。
拿到jar包之后,我們需要將jar包作為作業提交給Hadoop執行。怎么做呢?
hadoop jar WordCount.jar WordCount input_wordcount output_wordcount
hadoop jar WordCount.jar WordCount這里提交jar包,并且告訴主類在哪。
后面兩個都是我們自定義的參數了。會在main中獲取到,即輸入參數為input_wordcount。輸出參數為output_wordcount
執行完成之后可以看到。
hdfs dfs -ls
Found 2 items
drwxr-xr-x - haoye supergroup 0 2017-05-06 20:34 input_wordcount
drwxr-xr-x - haoye supergroup 0 2017-05-06 20:40 output_wordcount
hdfs dfs -ls output_wordcount
Found 2 items
-rw-r--r-- 3 haoye supergroup 0 2017-05-06 20:40 output_wordcount/_SUCCESS
-rw-r--r-- 3 haoye supergroup 83 2017-05-06 20:40 output_wordcount/part-r-00000
其中part-r-00000為結果文件。
我們可以查看它的內容
hdfs dfs -cat output_wordcount/part-r-00000
api 1
file 3
free 2
hadoop 7
hello 3
home 1
java 2
new 2
school 1
system 1
world 2
得到結果了吧。
對于hadoop來說,執行任務需要操作HDFS,需要job對應的jar包。而jar包中需要編寫mapTask和ReduceTask對應的方法。交給jobTracker執行就可以了。十分的方便。