MapReduce - Hadoop 實現



MapReduce是一個框架,用於編寫應用程式,以可靠的方式處理大型商品硬體叢集上的海量資料。本章將引導您使用 Java 在 Hadoop 框架中操作 MapReduce。

MapReduce 演算法

通常,MapReduce 範例基於將 map-reduce 程式傳送到實際資料所在的計算機。

  • 在 MapReduce 作業期間,Hadoop 將 Map 和 Reduce 任務傳送到叢集中的相應伺服器。

  • 該框架管理所有資料傳遞的細節,例如發出任務、驗證任務完成以及在節點之間複製叢集中的資料。

  • 大部分計算發生在具有本地磁碟資料的節點上,從而減少了網路流量。

  • 完成給定任務後,叢集會收集並減少資料以形成適當的結果,並將其傳送回 Hadoop 伺服器。

MapReduce Algorithm

輸入和輸出(Java 視角)

MapReduce 框架操作鍵值對,即該框架將作業的輸入視為一組鍵值對,併產生一組鍵值對作為作業的輸出,這些鍵值對可能屬於不同的型別。

鍵和值類必須由框架序列化,因此需要實現 Writable 介面。此外,鍵類必須實現 WritableComparable 介面,以方便框架進行排序。

MapReduce 作業的輸入和輸出格式均為鍵值對:

(輸入) <k1, v1> -> map -> <k2, v2>-> reduce -> <k3, v3> (輸出)。

輸入 輸出
Map <k1, v1> 列表 (<k2, v2>)
Reduce <k2, 列表(v2)> 列表 (<k3, v3>)

MapReduce 實現

下表顯示了有關組織用電量的資料。該表包括連續五年的月用電量和年平均用電量。

一月 二月 三月 四月 五月 六月 七月 八月 九月 十月 十一月 十二月 平均
1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

我們需要編寫應用程式來處理給定表中的輸入資料,以查詢最大用電量年份、最小用電量年份等等。對於具有有限記錄數量的程式設計師來說,這項任務很容易,因為他們只需編寫邏輯來生成所需的輸出,並將資料傳遞到編寫的應用程式即可。

現在讓我們提高輸入資料的規模。假設我們必須分析某個州所有大型工業的用電量。當我們編寫應用程式來處理此類海量資料時,

  • 它們將花費大量時間執行。

  • 當我們將資料從源移動到網路伺服器時,將產生大量的網路流量。

為了解決這些問題,我們有 MapReduce 框架。

輸入資料

以上資料儲存為 **sample.txt** 並作為輸入提供。輸入檔案如下所示。

1979 23 23 2 43 24 25 26 26 26 26 25 26 25
1980 26 27 28 28 28 30 31 31 31 30 30 30 29
1981 31 32 32 32 33 34 35 36 36 34 34 34 34
1984 39 38 39 39 39 41 42 43 40 39 38 38 40
1985 38 39 39 39 39 41 41 41 00 40 39 39 45

示例程式

以下示例資料程式使用 MapReduce 框架。

package hadoop;

import java.util.*;
import java.io.IOException;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class ProcessUnits
{
   //Mapper class
   public static class E_EMapper extends MapReduceBase implements
   Mapper<LongWritable,  /*Input key Type */
   Text,                   /*Input value Type*/
   Text,                   /*Output key Type*/
   IntWritable>            /*Output value Type*/
   {
      //Map function
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
      {
         String line = value.toString();
         String lasttoken = null;
         StringTokenizer s = new StringTokenizer(line,"\t");
         String year = s.nextToken();
         
         while(s.hasMoreTokens()){
            lasttoken=s.nextToken();
         }
         
         int avgprice = Integer.parseInt(lasttoken);
         output.collect(new Text(year), new IntWritable(avgprice));
      }
   }
   
   //Reducer class
	
   public static class E_EReduce extends MapReduceBase implements
   Reducer< Text, IntWritable, Text, IntWritable >
   {
      //Reduce function
      public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException
      {
         int maxavg=30;
         int val=Integer.MIN_VALUE;
         while (values.hasNext())
         {
            if((val=values.next().get())>maxavg)
            {
               output.collect(key, new IntWritable(val));
            }
         }
      }
   }
	
   //Main function
	
   public static void main(String args[])throws Exception
   {
      JobConf conf = new JobConf(Eleunits.class);
		
      conf.setJobName("max_eletricityunits");
		
      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);
		
      conf.setMapperClass(E_EMapper.class);
      conf.setCombinerClass(E_EReduce.class);
      conf.setReducerClass(E_EReduce.class);
		
      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);
		
      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
		
      JobClient.runJob(conf);
   }
}

將上述程式儲存到 **ProcessUnits.java**。程式的編譯和執行如下所示。

ProcessUnits 程式的編譯和執行

讓我們假設我們在 Hadoop 使用者的主目錄中(例如 /home/hadoop)。

請按照以下步驟編譯並執行上述程式。

**步驟 1** - 使用以下命令建立一個目錄來儲存已編譯的 Java 類。

$ mkdir units

**步驟 2** - 下載 Hadoop-core-1.2.1.jar,用於編譯和執行 MapReduce 程式。從 mvnrepository.com 下載 jar 包。讓我們假設下載資料夾是 /home/hadoop/。

**步驟 3** - 以下命令用於編譯 **ProcessUnits.java** 程式併為程式建立一個 jar 包。

$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java
$ jar -cvf units.jar -C units/ .

**步驟 4** - 以下命令用於在 HDFS 中建立一個輸入目錄。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

**步驟 5** - 以下命令用於將名為 **sample.txt** 的輸入檔案複製到 HDFS 的輸入目錄中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

**步驟 6** - 以下命令用於驗證輸入目錄中的檔案

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

**步驟 7** - 以下命令用於執行 Eleunit_max 應用程式,並從輸入目錄中獲取輸入檔案。

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

等待一段時間直到檔案執行完畢。執行後,輸出包含許多輸入拆分、Map 任務、Reducer 任務等。

INFO mapreduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO mapreduce.Job: Counters: 49

File System Counters
   
   FILE: Number of bytes read=61
   FILE: Number of bytes written=279400
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0

   HDFS: Number of bytes read=546
   HDFS: Number of bytes written=40
   HDFS: Number of read operations=9
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=2 Job Counters
   
   Launched map tasks=2
   Launched reduce tasks=1
   Data-local map tasks=2
	
   Total time spent by all maps in occupied slots (ms)=146137
   Total time spent by all reduces in occupied slots (ms)=441
   Total time spent by all map tasks (ms)=14613
   Total time spent by all reduce tasks (ms)=44120
	
   Total vcore-seconds taken by all map tasks=146137
   Total vcore-seconds taken by all reduce tasks=44120
	
   Total megabyte-seconds taken by all map tasks=149644288
   Total megabyte-seconds taken by all reduce tasks=45178880

Map-Reduce Framework
   
   Map input records=5
	
   Map output records=5
   Map output bytes=45
   Map output materialized bytes=67
	
   Input split bytes=208
   Combine input records=5
   Combine output records=5
	
   Reduce input groups=5
   Reduce shuffle bytes=6
   Reduce input records=5
   Reduce output records=5
	
   Spilled Records=10
   Shuffled Maps =2
   Failed Shuffles=0
   Merged Map outputs=2
	
   GC time elapsed (ms)=948
   CPU time spent (ms)=5160
	
   Physical memory (bytes) snapshot=47749120
   Virtual memory (bytes) snapshot=2899349504
	
   Total committed heap usage (bytes)=277684224

File Output Format Counters

   Bytes Written=40

**步驟 8** - 以下命令用於驗證輸出資料夾中的結果檔案。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

**步驟 9** - 以下命令用於檢視 **Part-00000** 檔案中的輸出。此檔案由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

以下是 MapReduce 程式生成的輸出:

1981 34
1984 40
1985 45

**步驟 10** - 以下命令用於將輸出資料夾從 HDFS 複製到本地檔案系統。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop
廣告