
- MapReduce 教程
- MapReduce - 首頁
- MapReduce - 簡介
- MapReduce - 演算法
- MapReduce - 安裝
- MapReduce - API
- MapReduce - Hadoop 實現
- MapReduce - 分割槽器
- MapReduce - 組合器
- MapReduce - Hadoop 管理
- MapReduce 資源
- MapReduce - 快速指南
- MapReduce - 有用資源
- MapReduce - 討論
MapReduce - Hadoop 實現
MapReduce是一個框架,用於編寫應用程式,以可靠的方式處理大型商品硬體叢集上的海量資料。本章將引導您使用 Java 在 Hadoop 框架中操作 MapReduce。
MapReduce 演算法
通常,MapReduce 範例基於將 map-reduce 程式傳送到實際資料所在的計算機。
在 MapReduce 作業期間,Hadoop 將 Map 和 Reduce 任務傳送到叢集中的相應伺服器。
該框架管理所有資料傳遞的細節,例如發出任務、驗證任務完成以及在節點之間複製叢集中的資料。
大部分計算發生在具有本地磁碟資料的節點上,從而減少了網路流量。
完成給定任務後,叢集會收集並減少資料以形成適當的結果,並將其傳送回 Hadoop 伺服器。

輸入和輸出(Java 視角)
MapReduce 框架操作鍵值對,即該框架將作業的輸入視為一組鍵值對,併產生一組鍵值對作為作業的輸出,這些鍵值對可能屬於不同的型別。
鍵和值類必須由框架序列化,因此需要實現 Writable 介面。此外,鍵類必須實現 WritableComparable 介面,以方便框架進行排序。
MapReduce 作業的輸入和輸出格式均為鍵值對:
(輸入) <k1, v1> -> map -> <k2, v2>-> reduce -> <k3, v3> (輸出)。
輸入 | 輸出 | |
---|---|---|
Map | <k1, v1> | 列表 (<k2, v2>) |
Reduce | <k2, 列表(v2)> | 列表 (<k3, v3>) |
MapReduce 實現
下表顯示了有關組織用電量的資料。該表包括連續五年的月用電量和年平均用電量。
一月 | 二月 | 三月 | 四月 | 五月 | 六月 | 七月 | 八月 | 九月 | 十月 | 十一月 | 十二月 | 平均 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
我們需要編寫應用程式來處理給定表中的輸入資料,以查詢最大用電量年份、最小用電量年份等等。對於具有有限記錄數量的程式設計師來說,這項任務很容易,因為他們只需編寫邏輯來生成所需的輸出,並將資料傳遞到編寫的應用程式即可。
現在讓我們提高輸入資料的規模。假設我們必須分析某個州所有大型工業的用電量。當我們編寫應用程式來處理此類海量資料時,
它們將花費大量時間執行。
當我們將資料從源移動到網路伺服器時,將產生大量的網路流量。
為了解決這些問題,我們有 MapReduce 框架。
輸入資料
以上資料儲存為 **sample.txt** 並作為輸入提供。輸入檔案如下所示。
1979 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
示例程式
以下示例資料程式使用 MapReduce 框架。
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable, /*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line,"\t"); String year = s.nextToken(); while(s.hasMoreTokens()){ lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce(Text key, Iterator <IntWritable> values, OutputCollector>Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(Eleunits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
將上述程式儲存到 **ProcessUnits.java**。程式的編譯和執行如下所示。
ProcessUnits 程式的編譯和執行
讓我們假設我們在 Hadoop 使用者的主目錄中(例如 /home/hadoop)。
請按照以下步驟編譯並執行上述程式。
**步驟 1** - 使用以下命令建立一個目錄來儲存已編譯的 Java 類。
$ mkdir units
**步驟 2** - 下載 Hadoop-core-1.2.1.jar,用於編譯和執行 MapReduce 程式。從 mvnrepository.com 下載 jar 包。讓我們假設下載資料夾是 /home/hadoop/。
**步驟 3** - 以下命令用於編譯 **ProcessUnits.java** 程式併為程式建立一個 jar 包。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
**步驟 4** - 以下命令用於在 HDFS 中建立一個輸入目錄。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
**步驟 5** - 以下命令用於將名為 **sample.txt** 的輸入檔案複製到 HDFS 的輸入目錄中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
**步驟 6** - 以下命令用於驗證輸入目錄中的檔案
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
**步驟 7** - 以下命令用於執行 Eleunit_max 應用程式,並從輸入目錄中獲取輸入檔案。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段時間直到檔案執行完畢。執行後,輸出包含許多輸入拆分、Map 任務、Reducer 任務等。
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=61 FILE: Number of bytes written=279400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=546 HDFS: Number of bytes written=40 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=146137 Total time spent by all reduces in occupied slots (ms)=441 Total time spent by all map tasks (ms)=14613 Total time spent by all reduce tasks (ms)=44120 Total vcore-seconds taken by all map tasks=146137 Total vcore-seconds taken by all reduce tasks=44120 Total megabyte-seconds taken by all map tasks=149644288 Total megabyte-seconds taken by all reduce tasks=45178880 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=67 Input split bytes=208 Combine input records=5 Combine output records=5 Reduce input groups=5 Reduce shuffle bytes=6 Reduce input records=5 Reduce output records=5 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=948 CPU time spent (ms)=5160 Physical memory (bytes) snapshot=47749120 Virtual memory (bytes) snapshot=2899349504 Total committed heap usage (bytes)=277684224 File Output Format Counters Bytes Written=40
**步驟 8** - 以下命令用於驗證輸出資料夾中的結果檔案。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
**步驟 9** - 以下命令用於檢視 **Part-00000** 檔案中的輸出。此檔案由 HDFS 生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是 MapReduce 程式生成的輸出:
1981 | 34 |
1984 | 40 |
1985 | 45 |
**步驟 10** - 以下命令用於將輸出資料夾從 HDFS 複製到本地檔案系統。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs -get output_dir /home/hadoop