
- Hadoop 教程
- Hadoop - 首頁
- Hadoop - 大資料概覽
- Hadoop - 大資料解決方案
- Hadoop - 簡介
- Hadoop - 環境搭建
- Hadoop - HDFS 概覽
- Hadoop - HDFS 操作
- Hadoop - 命令參考
- Hadoop - MapReduce
- Hadoop - 流式處理
- Hadoop - 多節點叢集
- Hadoop 有用資源
- Hadoop - 問答
- Hadoop - 快速指南
- Hadoop - 有用資源
Hadoop - MapReduce
MapReduce是一個框架,使用它可以編寫應用程式,以可靠的方式在大型商用硬體叢集上並行處理海量資料。
什麼是 MapReduce?
MapReduce 是一種基於 Java 的分散式計算處理技術和程式模型。MapReduce 演算法包含兩個重要的任務:Map 和 Reduce。Map 接收一組資料並將其轉換為另一組資料,其中單個元素被分解成元組(鍵/值對)。其次是 Reduce 任務,它接收 Map 的輸出作為輸入,並將這些資料元組組合成更小的一組元組。正如 MapReduce 名稱的順序所暗示的那樣,Reduce 任務總是在 Map 作業之後執行。
MapReduce 的主要優勢在於它易於將資料處理擴充套件到多個計算節點。在 MapReduce 模型下,資料處理原語稱為 Mapper 和 Reducer。將資料處理應用程式分解為Mapper和Reducer有時並非易事。但是,一旦我們以 MapReduce 的形式編寫了一個應用程式,將應用程式擴充套件到在叢集中的數百、數千甚至數萬臺機器上執行僅僅是配置更改的問題。這種簡單的可擴充套件性吸引了許多程式設計師使用 MapReduce 模型。
演算法
通常,MapReduce 正規化基於將計算傳送到資料所在的位置!
MapReduce 程式分三個階段執行:Map 階段、Shuffle 階段和 Reduce 階段。
Map 階段 - Map 或 Mapper 的工作是處理輸入資料。通常,輸入資料以檔案或目錄的形式存在,並存儲在 Hadoop 檔案系統 (HDFS) 中。輸入檔案逐行傳遞給 Mapper 函式。Mapper 處理資料並建立多個小的資料塊。
Reduce 階段 - 此階段是Shuffle階段和Reduce階段的組合。Reducer 的工作是處理來自 Mapper 的資料。處理後,它會生成一組新的輸出,並將儲存在 HDFS 中。
在 MapReduce 作業期間,Hadoop 將 Map 和 Reduce 任務傳送到叢集中的相應伺服器。
框架管理所有資料傳遞細節,例如發出任務、驗證任務完成以及在節點之間複製資料。
大部分計算發生在具有本地磁碟資料的節點上,這減少了網路流量。
給定任務完成後,叢集收集並減少資料以形成適當的結果,並將其傳送回 Hadoop 伺服器。

輸入和輸出(Java 視角)
MapReduce 框架操作的是<鍵,值>對,也就是說,框架將作業的輸入視為一組<鍵,值>對,並生成一組<鍵,值>對作為作業的輸出,這些對可能是不同型別的。
鍵和值類應以序列化方式由框架處理,因此需要實現 Writable 介面。此外,鍵類必須實現 Writable-Comparable 介面,以方便框架進行排序。MapReduce 作業的輸入和輸出型別 - (輸入) <k1, v1> → map → <k2, v2> → reduce → <k3, v3>(輸出)。
輸入 | 輸出 | |
---|---|---|
Map | <k1, v1> | 列表 (<k2, v2>) |
Reduce | <k2, 列表(v2)> | 列表 (<k3, v3>) |
術語
有效載荷 - 應用程式實現 Map 和 Reduce 函式,並構成作業的核心。
Mapper - Mapper 將輸入鍵/值對對映到一組中間鍵/值對。
NameNode - 管理 Hadoop 分散式檔案系統 (HDFS) 的節點。
DataNode - 在進行任何處理之前,資料預先存在於其中的節點。
MasterNode - JobTracker 執行的節點,接受來自客戶端的作業請求。
SlaveNode - Map 和 Reduce 程式執行的節點。
JobTracker - 排程作業並將分配的作業跟蹤到 Task Tracker。
Task Tracker - 跟蹤任務並將狀態報告給 JobTracker。
作業 - 程式是在資料集上執行 Mapper 和 Reducer 的過程。
任務 - 在資料片上執行 Mapper 或 Reducer 的過程。
任務嘗試 - 在 SlaveNode 上執行任務的特定嘗試例項。
示例場景
以下是關於某個組織電力消耗的資料。它包含各個年份的月度電力消耗和年度平均值。
一月 | 二月 | 三月 | 四月 | 五月 | 六月 | 七月 | 八月 | 九月 | 十月 | 十一月 | 十二月 | 平均值 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
如果上述資料作為輸入給出,我們必須編寫應用程式來處理它並生成結果,例如查詢使用量最大的年份、使用量最小的年份等等。對於具有有限記錄數量的程式設計師來說,這是一個簡單的任務。他們只需編寫邏輯來生成所需的輸出,並將資料傳遞給編寫的應用程式。
但是,想想代表某個州自成立以來所有大型工業的電力消耗的資料。
當我們編寫應用程式來處理此類海量資料時,
它們將花費大量時間來執行。
當我們將資料從源移動到網路伺服器等等時,將會有大量的網路流量。
為了解決這些問題,我們有 MapReduce 框架。
輸入資料
上述資料儲存為sample.txt並作為輸入給出。輸入檔案如下所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25 1980 26 27 28 28 28 30 31 31 31 30 30 30 29 1981 31 32 32 32 33 34 35 36 36 34 34 34 34 1984 39 38 39 39 39 41 42 43 40 39 38 38 40 1985 38 39 39 39 39 41 41 41 00 40 39 39 45
示例程式
以下是使用 MapReduce 框架對示例資料進行處理的程式。
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"\t"); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken = s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg = 30; int val = Integer.MIN_VALUE; while (values.hasNext()) { if((val = values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(ProcessUnits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
將上述程式儲存為ProcessUnits.java。程式的編譯和執行解釋如下。
Process Units 程式的編譯和執行
讓我們假設我們在 Hadoop 使用者的主目錄中(例如 /home/hadoop)。
按照以下步驟編譯和執行上述程式。
步驟 1
以下命令用於建立目錄以儲存編譯後的 Java 類。
$ mkdir units
步驟 2
下載Hadoop-core-1.2.1.jar,它用於編譯和執行 MapReduce 程式。訪問以下連結mvnrepository.com下載 jar 包。讓我們假設下載的資料夾是/home/hadoop/。
步驟 3
以下命令用於編譯ProcessUnits.java程式併為程式建立一個 jar 包。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
步驟 4
以下命令用於在 HDFS 中建立一個輸入目錄。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步驟 5
以下命令用於將名為sample.txt的輸入檔案複製到 HDFS 的輸入目錄中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
步驟 6
以下命令用於驗證輸入目錄中的檔案。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步驟 7
以下命令用於透過從輸入目錄獲取輸入檔案來執行 Eleunit_max 應用程式。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段時間直到檔案執行完畢。執行後,如下所示,輸出將包含輸入拆分的數量、Map 任務的數量、Reducer 任務的數量等。
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read = 61 FILE: Number of bytes written = 279400 FILE: Number of read operations = 0 FILE: Number of large read operations = 0 FILE: Number of write operations = 0 HDFS: Number of bytes read = 546 HDFS: Number of bytes written = 40 HDFS: Number of read operations = 9 HDFS: Number of large read operations = 0 HDFS: Number of write operations = 2 Job Counters Launched map tasks = 2 Launched reduce tasks = 1 Data-local map tasks = 2 Total time spent by all maps in occupied slots (ms) = 146137 Total time spent by all reduces in occupied slots (ms) = 441 Total time spent by all map tasks (ms) = 14613 Total time spent by all reduce tasks (ms) = 44120 Total vcore-seconds taken by all map tasks = 146137 Total vcore-seconds taken by all reduce tasks = 44120 Total megabyte-seconds taken by all map tasks = 149644288 Total megabyte-seconds taken by all reduce tasks = 45178880 Map-Reduce Framework Map input records = 5 Map output records = 5 Map output bytes = 45 Map output materialized bytes = 67 Input split bytes = 208 Combine input records = 5 Combine output records = 5 Reduce input groups = 5 Reduce shuffle bytes = 6 Reduce input records = 5 Reduce output records = 5 Spilled Records = 10 Shuffled Maps = 2 Failed Shuffles = 0 Merged Map outputs = 2 GC time elapsed (ms) = 948 CPU time spent (ms) = 5160 Physical memory (bytes) snapshot = 47749120 Virtual memory (bytes) snapshot = 2899349504 Total committed heap usage (bytes) = 277684224 File Output Format Counters Bytes Written = 40
步驟 8
以下命令用於驗證輸出資料夾中的結果檔案。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
步驟 9
以下命令用於檢視Part-00000檔案中的輸出。此檔案由 HDFS 生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是 MapReduce 程式生成的輸出。
1981 34 1984 40 1985 45
步驟 10
以下命令用於將輸出資料夾從 HDFS 複製到本地檔案系統進行分析。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
重要命令
所有 Hadoop 命令都由$HADOOP_HOME/bin/hadoop命令呼叫。執行沒有任何引數的 Hadoop 指令碼將列印所有命令的描述。
用法 - hadoop [--config confdir] COMMAND
下表列出了可用的選項及其描述。
序號 | 選項和描述 |
---|---|
1 | namenode -format 格式化 DFS 檔案系統。 |
2 | secondarynamenode 執行 DFS 次要 NameNode。 |
3 | namenode 執行 DFS NameNode。 |
4 | datanode 執行 DFS DataNode。 |
5 | dfsadmin 執行 DFS 管理員客戶端。 |
6 | mradmin 執行 Map-Reduce 管理員客戶端。 |
7 | fsck 執行 DFS 檔案系統檢查實用程式。 |
8 | fs 執行通用檔案系統使用者客戶端。 |
9 | balancer 執行叢集平衡實用程式。 |
10 | oiv 將離線 fsimage 檢視器應用於 fsimage。 |
11 | fetchdt 從 NameNode 獲取委託令牌。 |
12 | jobtracker 執行 MapReduce 作業跟蹤器節點。 |
13 | pipes 執行 Pipes 作業。 |
14 | tasktracker 執行 MapReduce 任務跟蹤器節點。 |
15 | historyserver 作為獨立守護程序執行作業歷史伺服器。 |
16 | job 操作 MapReduce 作業。 |
17 | queue 獲取有關 JobQueue 的資訊。 |
18 | version 列印版本。 |
19 | jar <jar> 執行 jar 檔案。 |
20 | distcp <srcurl> <desturl> 遞迴複製檔案或目錄。 |
21 | distcp2 <srcurl> <desturl> DistCp 版本 2。 |
22 | archive -archiveName NAME -p <父路徑> <源>* <目標> 建立 Hadoop 歸檔。 |
23 | classpath 列印獲取 Hadoop jar 和所需庫所需的類路徑。 |
24 | daemonlog 獲取/設定每個守護程式的日誌級別 |
如何與 MapReduce 作業互動
用法 - hadoop job [GENERIC_OPTIONS]
以下是 Hadoop 作業中可用的通用選項。
序號 | GENERIC_OPTION 和描述 |
---|---|
1 | -submit <job-file> 提交作業。 |
2 | -status <job-id> 列印 Map 和 Reduce 完成百分比以及所有作業計數器。 |
3 | -counter <job-id> <group-name> <countername> 列印計數器值。 |
4 | -kill <job-id> 殺死作業。 |
5 | -events <job-id> <fromevent-#> <#-of-events> 列印作業跟蹤器接收的給定範圍內的事件詳細資訊。 |
6 | -history [all] <jobOutputDir> - history < jobOutputDir> 列印作業詳細資訊、失敗和被殺死的提示詳細資訊。可以透過指定 [all] 選項來檢視有關作業的更多詳細資訊,例如為每個任務完成的成功任務和任務嘗試。 |
7 | -list[all] 顯示所有作業。-list 僅顯示尚未完成的作業。 |
8 | -kill-task <task-id> 殺死任務。被殺死的任務不會計入失敗的嘗試。 |
9 | -fail-task <task-id> 使任務失敗。失敗的任務計入失敗的嘗試。 |
10 | -set-priority <job-id> <priority> 更改作業的優先順序。允許的優先順序值為 VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW |
檢視作業狀態
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
檢視作業輸出目錄的歷史記錄
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g. $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
殺死作業
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004