Hadoop - MapReduce



MapReduce是一個框架,使用它可以編寫應用程式,以可靠的方式在大型商用硬體叢集上並行處理海量資料。

什麼是 MapReduce?

MapReduce 是一種基於 Java 的分散式計算處理技術和程式模型。MapReduce 演算法包含兩個重要的任務:Map 和 Reduce。Map 接收一組資料並將其轉換為另一組資料,其中單個元素被分解成元組(鍵/值對)。其次是 Reduce 任務,它接收 Map 的輸出作為輸入,並將這些資料元組組合成更小的一組元組。正如 MapReduce 名稱的順序所暗示的那樣,Reduce 任務總是在 Map 作業之後執行。

MapReduce 的主要優勢在於它易於將資料處理擴充套件到多個計算節點。在 MapReduce 模型下,資料處理原語稱為 Mapper 和 Reducer。將資料處理應用程式分解為MapperReducer有時並非易事。但是,一旦我們以 MapReduce 的形式編寫了一個應用程式,將應用程式擴充套件到在叢集中的數百、數千甚至數萬臺機器上執行僅僅是配置更改的問題。這種簡單的可擴充套件性吸引了許多程式設計師使用 MapReduce 模型。

演算法

  • 通常,MapReduce 正規化基於將計算傳送到資料所在的位置!

  • MapReduce 程式分三個階段執行:Map 階段、Shuffle 階段和 Reduce 階段。

    • Map 階段 - Map 或 Mapper 的工作是處理輸入資料。通常,輸入資料以檔案或目錄的形式存在,並存儲在 Hadoop 檔案系統 (HDFS) 中。輸入檔案逐行傳遞給 Mapper 函式。Mapper 處理資料並建立多個小的資料塊。

    • Reduce 階段 - 此階段是Shuffle階段和Reduce階段的組合。Reducer 的工作是處理來自 Mapper 的資料。處理後,它會生成一組新的輸出,並將儲存在 HDFS 中。

  • 在 MapReduce 作業期間,Hadoop 將 Map 和 Reduce 任務傳送到叢集中的相應伺服器。

  • 框架管理所有資料傳遞細節,例如發出任務、驗證任務完成以及在節點之間複製資料。

  • 大部分計算發生在具有本地磁碟資料的節點上,這減少了網路流量。

  • 給定任務完成後,叢集收集並減少資料以形成適當的結果,並將其傳送回 Hadoop 伺服器。

MapReduce Algorithm

輸入和輸出(Java 視角)

MapReduce 框架操作的是<鍵,值>對,也就是說,框架將作業的輸入視為一組<鍵,值>對,並生成一組<鍵,值>對作為作業的輸出,這些對可能是不同型別的。

鍵和值類應以序列化方式由框架處理,因此需要實現 Writable 介面。此外,鍵類必須實現 Writable-Comparable 介面,以方便框架進行排序。MapReduce 作業的輸入和輸出型別 - (輸入) <k1, v1> → map → <k2, v2> → reduce → <k3, v3>(輸出)。

輸入 輸出
Map <k1, v1> 列表 (<k2, v2>)
Reduce <k2, 列表(v2)> 列表 (<k3, v3>)

術語

  • 有效載荷 - 應用程式實現 Map 和 Reduce 函式,並構成作業的核心。

  • Mapper - Mapper 將輸入鍵/值對對映到一組中間鍵/值對。

  • NameNode - 管理 Hadoop 分散式檔案系統 (HDFS) 的節點。

  • DataNode - 在進行任何處理之前,資料預先存在於其中的節點。

  • MasterNode - JobTracker 執行的節點,接受來自客戶端的作業請求。

  • SlaveNode - Map 和 Reduce 程式執行的節點。

  • JobTracker - 排程作業並將分配的作業跟蹤到 Task Tracker。

  • Task Tracker - 跟蹤任務並將狀態報告給 JobTracker。

  • 作業 - 程式是在資料集上執行 Mapper 和 Reducer 的過程。

  • 任務 - 在資料片上執行 Mapper 或 Reducer 的過程。

  • 任務嘗試 - 在 SlaveNode 上執行任務的特定嘗試例項。

示例場景

以下是關於某個組織電力消耗的資料。它包含各個年份的月度電力消耗和年度平均值。

一月 二月 三月 四月 五月 六月 七月 八月 九月 十月 十一月 十二月 平均值
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

如果上述資料作為輸入給出,我們必須編寫應用程式來處理它並生成結果,例如查詢使用量最大的年份、使用量最小的年份等等。對於具有有限記錄數量的程式設計師來說,這是一個簡單的任務。他們只需編寫邏輯來生成所需的輸出,並將資料傳遞給編寫的應用程式。

但是,想想代表某個州自成立以來所有大型工業的電力消耗的資料。

當我們編寫應用程式來處理此類海量資料時,

  • 它們將花費大量時間來執行。

  • 當我們將資料從源移動到網路伺服器等等時,將會有大量的網路流量。

為了解決這些問題,我們有 MapReduce 框架。

輸入資料

上述資料儲存為sample.txt並作為輸入給出。輸入檔案如下所示。

1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
1985   38   39   39  39   39   41   41   41   00   40   39   39  45 

示例程式

以下是使用 MapReduce 框架對示例資料進行處理的程式。

package hadoop; 

import java.util.*; 

import java.io.IOException; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ProcessUnits {
   //Mapper class 
   public static class E_EMapper extends MapReduceBase implements 
   Mapper<LongWritable ,/*Input key Type */ 
   Text,                /*Input value Type*/ 
   Text,                /*Output key Type*/ 
   IntWritable>        /*Output value Type*/ 
   {
      //Map function 
      public void map(LongWritable key, Text value, 
      OutputCollector<Text, IntWritable> output,   
      
      Reporter reporter) throws IOException { 
         String line = value.toString(); 
         String lasttoken = null; 
         StringTokenizer s = new StringTokenizer(line,"\t"); 
         String year = s.nextToken(); 
         
         while(s.hasMoreTokens()) {
            lasttoken = s.nextToken();
         }
         int avgprice = Integer.parseInt(lasttoken); 
         output.collect(new Text(year), new IntWritable(avgprice)); 
      } 
   }
   
   //Reducer class 
   public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
   
      //Reduce function 
      public void reduce( Text key, Iterator <IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
         int maxavg = 30; 
         int val = Integer.MIN_VALUE; 
            
         while (values.hasNext()) { 
            if((val = values.next().get())>maxavg) { 
               output.collect(key, new IntWritable(val)); 
            } 
         }
      } 
   }

   //Main function 
   public static void main(String args[])throws Exception { 
      JobConf conf = new JobConf(ProcessUnits.class); 
      
      conf.setJobName("max_eletricityunits"); 
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class); 
      conf.setMapperClass(E_EMapper.class); 
      conf.setCombinerClass(E_EReduce.class); 
      conf.setReducerClass(E_EReduce.class); 
      conf.setInputFormat(TextInputFormat.class); 
      conf.setOutputFormat(TextOutputFormat.class); 
      
      FileInputFormat.setInputPaths(conf, new Path(args[0])); 
      FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
      
      JobClient.runJob(conf); 
   } 
} 

將上述程式儲存為ProcessUnits.java。程式的編譯和執行解釋如下。

Process Units 程式的編譯和執行

讓我們假設我們在 Hadoop 使用者的主目錄中(例如 /home/hadoop)。

按照以下步驟編譯和執行上述程式。

步驟 1

以下命令用於建立目錄以儲存編譯後的 Java 類。

$ mkdir units 

步驟 2

下載Hadoop-core-1.2.1.jar,它用於編譯和執行 MapReduce 程式。訪問以下連結mvnrepository.com下載 jar 包。讓我們假設下載的資料夾是/home/hadoop/

步驟 3

以下命令用於編譯ProcessUnits.java程式併為程式建立一個 jar 包。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
$ jar -cvf units.jar -C units/ . 

步驟 4

以下命令用於在 HDFS 中建立一個輸入目錄。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir 

步驟 5

以下命令用於將名為sample.txt的輸入檔案複製到 HDFS 的輸入目錄中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 

步驟 6

以下命令用於驗證輸入目錄中的檔案。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/ 

步驟 7

以下命令用於透過從輸入目錄獲取輸入檔案來執行 Eleunit_max 應用程式。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 

等待一段時間直到檔案執行完畢。執行後,如下所示,輸出將包含輸入拆分的數量、Map 任務的數量、Reducer 任務的數量等。

INFO mapreduce.Job: Job job_1414748220717_0002 
completed successfully 
14/10/31 06:02:52 
INFO mapreduce.Job: Counters: 49 
   File System Counters 
 
FILE: Number of bytes read = 61 
FILE: Number of bytes written = 279400 
FILE: Number of read operations = 0 
FILE: Number of large read operations = 0   
FILE: Number of write operations = 0 
HDFS: Number of bytes read = 546 
HDFS: Number of bytes written = 40 
HDFS: Number of read operations = 9 
HDFS: Number of large read operations = 0 
HDFS: Number of write operations = 2 Job Counters 


   Launched map tasks = 2  
   Launched reduce tasks = 1 
   Data-local map tasks = 2  
   Total time spent by all maps in occupied slots (ms) = 146137 
   Total time spent by all reduces in occupied slots (ms) = 441   
   Total time spent by all map tasks (ms) = 14613 
   Total time spent by all reduce tasks (ms) = 44120 
   Total vcore-seconds taken by all map tasks = 146137 
   Total vcore-seconds taken by all reduce tasks = 44120 
   Total megabyte-seconds taken by all map tasks = 149644288 
   Total megabyte-seconds taken by all reduce tasks = 45178880 
   
Map-Reduce Framework 
 
   Map input records = 5  
   Map output records = 5   
   Map output bytes = 45  
   Map output materialized bytes = 67  
   Input split bytes = 208 
   Combine input records = 5  
   Combine output records = 5 
   Reduce input groups = 5  
   Reduce shuffle bytes = 6  
   Reduce input records = 5  
   Reduce output records = 5  
   Spilled Records = 10  
   Shuffled Maps  = 2  
   Failed Shuffles = 0  
   Merged Map outputs = 2  
   GC time elapsed (ms) = 948  
   CPU time spent (ms) = 5160  
   Physical memory (bytes) snapshot = 47749120  
   Virtual memory (bytes) snapshot = 2899349504  
   Total committed heap usage (bytes) = 277684224
     
File Output Format Counters 
 
   Bytes Written = 40 

步驟 8

以下命令用於驗證輸出資料夾中的結果檔案。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/ 

步驟 9

以下命令用於檢視Part-00000檔案中的輸出。此檔案由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 

以下是 MapReduce 程式生成的輸出。

1981    34 
1984    40 
1985    45 

步驟 10

以下命令用於將輸出資料夾從 HDFS 複製到本地檔案系統進行分析。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 

重要命令

所有 Hadoop 命令都由$HADOOP_HOME/bin/hadoop命令呼叫。執行沒有任何引數的 Hadoop 指令碼將列印所有命令的描述。

用法 - hadoop [--config confdir] COMMAND

下表列出了可用的選項及其描述。

序號 選項和描述
1

namenode -format

格式化 DFS 檔案系統。

2

secondarynamenode

執行 DFS 次要 NameNode。

3

namenode

執行 DFS NameNode。

4

datanode

執行 DFS DataNode。

5

dfsadmin

執行 DFS 管理員客戶端。

6

mradmin

執行 Map-Reduce 管理員客戶端。

7

fsck

執行 DFS 檔案系統檢查實用程式。

8

fs

執行通用檔案系統使用者客戶端。

9

balancer

執行叢集平衡實用程式。

10

oiv

將離線 fsimage 檢視器應用於 fsimage。

11

fetchdt

從 NameNode 獲取委託令牌。

12

jobtracker

執行 MapReduce 作業跟蹤器節點。

13

pipes

執行 Pipes 作業。

14

tasktracker

執行 MapReduce 任務跟蹤器節點。

15

historyserver

作為獨立守護程序執行作業歷史伺服器。

16

job

操作 MapReduce 作業。

17

queue

獲取有關 JobQueue 的資訊。

18

version

列印版本。

19

jar <jar>

執行 jar 檔案。

20

distcp <srcurl> <desturl>

遞迴複製檔案或目錄。

21

distcp2 <srcurl> <desturl>

DistCp 版本 2。

22

archive -archiveName NAME -p <父路徑> <源>* <目標>

建立 Hadoop 歸檔。

23

classpath

列印獲取 Hadoop jar 和所需庫所需的類路徑。

24

daemonlog

獲取/設定每個守護程式的日誌級別

如何與 MapReduce 作業互動

用法 - hadoop job [GENERIC_OPTIONS]

以下是 Hadoop 作業中可用的通用選項。

序號 GENERIC_OPTION 和描述
1

-submit <job-file>

提交作業。

2

-status <job-id>

列印 Map 和 Reduce 完成百分比以及所有作業計數器。

3

-counter <job-id> <group-name> <countername>

列印計數器值。

4

-kill <job-id>

殺死作業。

5

-events <job-id> <fromevent-#> <#-of-events>

列印作業跟蹤器接收的給定範圍內的事件詳細資訊。

6

-history [all] <jobOutputDir> - history < jobOutputDir>

列印作業詳細資訊、失敗和被殺死的提示詳細資訊。可以透過指定 [all] 選項來檢視有關作業的更多詳細資訊,例如為每個任務完成的成功任務和任務嘗試。

7

-list[all]

顯示所有作業。-list 僅顯示尚未完成的作業。

8

-kill-task <task-id>

殺死任務。被殺死的任務不會計入失敗的嘗試。

9

-fail-task <task-id>

使任務失敗。失敗的任務計入失敗的嘗試。

10

-set-priority <job-id> <priority>

更改作業的優先順序。允許的優先順序值為 VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW

檢視作業狀態

$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 

檢視作業輸出目錄的歷史記錄

$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 

殺死作業

$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
e.g. 
$ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004 
廣告