課程介紹與筆記
1.HDFS shell
1.0查看幫助
hadoop fs -help
1.1上傳
hadoop fs -put <linux上文件><hdfs上的路徑>
1.2查看文件內容
hadoop fs -cat <hdfs上的路徑>
1.3查看文件列表
hadoop fs -ls /
1.4下載文件
hadoop fs -get <hdfs上的路徑><linux上文件>
2.使用java接口操作HDFS
見eclipse工程下的demo
3.hadoop通信機制
不同進程之間的方法進行調用
4.HDFS源碼分析
FileSystem.get —> 通過反射實例化了一個DistributedFileSystem —> new DFSCilent()把他作為自己的成員變量
在DFSClient構造方法里面,調用了createNamenode,使用了RPC機制,得到了一個NameNode的代理對象,就可以和NameNode進行通信了
FileSystem —> DistributedFileSystem —> DFSClient —> NameNode的代理
1.執(zhí)行MR的命令:
hadoop jar <jar在linux的路徑><main方法所在的類的全類名><參數>
例子:
hadoop jar /root/wc1.jar cn.itcast.d3.hadoop.mr.WordCount hdfs://itcast:9000/words /out2
2.MR執(zhí)行流程
(1).客戶端提交一個mr的jar包給JobClient(提交方式:hadoop jar …)
(2).JobClient通過RPC和JobTracker進行通信,返回一個存放jar包的地址(HDFS)和jobId
(3).client將jar包寫入到HDFS當中(path = hdfs上的地址 + jobId)
(4).開始提交任務(任務的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
(5).JobTracker進行初始化任務
(6).讀取HDFS上的要處理的文件,開始計算輸入分片,每一個分片對應一個MapperTask
(7).TaskTracker通過心跳機制領取任務(任務的描述信息)
(8).下載所需的jar,配置文件等
(9).TaskTracker啟動一個java child子進程,用來執(zhí)行具體的任務(MapperTask或ReducerTask)
(10).將結果寫入到HDFS當中
1.實現分區(qū)的步驟:
1.1先分析一下具體的業(yè)務邏輯,確定大概有多少個分區(qū)
1.2首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類
1.3重寫public int getPartition這個方法,根據具體邏輯,讀數據庫或者配置返回相同的數字
1.4在main方法中設置Partioner的類,job.setPartitionerClass(DataPartitioner.class);
1.5設置Reducer的數量,job.setNumReduceTasks(6);
2.排序MR默認是按key2進行排序的,如果想自定義排序規(guī)則,被排序的對象要實現WritableComparable接口,在compareTo方法中實現排序規(guī)則,然后將這個對象當做k2,即可完成排序
3.combiner的作用就是在map端對輸出先做一次合并,以減少傳輸到reducer的數據量。
4.MR啟動流程
start-mapred.sh —> hadoop-daemon.sh —> hadoop —> org.apache.hadoop.mapred.JobTracker
Jobtracker調用順序:main —> startTracker —> new JobTracker 在其構造方法中首先創(chuàng)建一個調度器,接著創(chuàng)建一個RPC的server(interTrackerServer)tasktracker會通過PRC機制與其通信
然后調用offerService方法對外提供服務,在offerService方法中啟動RPC server,初始化jobtracker,調用taskScheduler的start方法 —> eagerTaskInitializationListener調用start方法,
—> 調用jobInitManagerThread的start方法,因為其是一個線程,會調用JobInitManager的run方法 —> jobInitQueue任務隊列去取第一個任務,然后把它丟入線程池中,然后調用—>InitJob的run方法
—> jobTracker的initJob方法 —> JobInProgress的initTasks —> maps = new TaskInProgress[numMapTasks]和reduces = new TaskInProgress[numReduceTasks];
TaskTracker調用順序:main —> new TaskTracker在其構造方法中調用了initialize方法,在initialize方法中調用RPC.waitForProxy得到一個jobtracker的代理對象
接著TaskTracker調用了本身的run方法,—> offerService方法 —> transmitHeartBeat返回值是(HeartbeatResponse)是jobTracker的指令,在transmitHeartBeat方法中InterTrackerProtocol調用了heartbeat將tasktracker的狀態(tài)通過RPC機制發(fā)送給jobTracker,返回值就是JobTracker的指令
heartbeatResponse.getActions()得到具體的指令,然后判斷指令的具體類型,開始執(zhí)行任務
addToTaskQueue啟動類型的指令加入到隊列當中,TaskLauncher又把任務加入到任務隊列當中,—> TaskLauncher的run方法 —> startNewTask方法 —> localizeJob下載資源 —> launchTaskForJob開始加載任務 —> launchTask —> runner.start()啟動線程; —> TaskRunner調用run方法 —> launchJvmAndWait啟動java child進程
1.上傳zk安裝包
2.解壓
3.配置(先在一臺節(jié)點上配置)
3.1添加一個zoo.cfg配置文件
$ZOOKEEPER/conf
mv zoo_sample.cfg zoo.cfg
3.2修改配置文件(zoo.cfg)
dataDir=/itcast/zookeeper-3.4.5/data
server.1=itcast05:2888:3888
server.2=itcast06:2888:3888
server.3=itcast07:2888:3888
3.3在(dataDir=/itcast/zookeeper-3.4.5/data)創(chuàng)建一個myid文件,里面內容是server.N中的N(server.2里面內容為2)
echo “1” > myid
3.4將配置好的zk拷貝到其他節(jié)點
scp -r /itcast/zookeeper-3.4.5/ itcast06:/itcast/
scp -r /itcast/zookeeper-3.4.5/ itcast07:/itcast/
3.5注意:在其他節(jié)點上一定要修改myid的內容
在itcast06應該講myid的內容改為2 (echo “6” > myid)
在itcast07應該講myid的內容改為3 (echo “7” > myid)
4.啟動集群
分別啟動zk
./zkServer.sh start