- MapReduce 教程
- MapReduce - 首頁
- MapReduce - 簡介
- MapReduce - 演算法
- MapReduce - 安裝
- MapReduce - API
- MapReduce - Hadoop 實現
- MapReduce - 分割槽器
- MapReduce - 合併器
- MapReduce - Hadoop 管理
- MapReduce 資源
- MapReduce - 快速指南
- MapReduce - 有用資源
- MapReduce - 討論
MapReduce - 分割槽器
分割槽器在處理輸入資料集時就像一個條件。分割槽階段發生在 Map 階段之後,Reduce 階段之前。
分割槽器的數量等於 Reducer 的數量。這意味著分割槽器將根據 Reducer 的數量劃分資料。因此,來自單個分割槽器的資料由單個 Reducer 處理。
分割槽器
分割槽器對中間 Map 輸出的鍵值對進行分割槽。它使用使用者定義的條件對資料進行分割槽,該條件就像一個雜湊函式。分割槽的總數與作業的 Reducer 任務數相同。讓我們舉一個例子來了解分割槽器是如何工作的。
MapReduce 分割槽器實現
為了方便起見,讓我們假設我們有一個名為 Employee 的小表,其中包含以下資料。我們將使用此示例資料作為我們的輸入資料集來演示分割槽器的工作原理。
| ID | 姓名 | 年齡 | 性別 | 薪資 |
|---|---|---|---|---|
| 1201 | gopal | 45 | 男 | 50,000 |
| 1202 | manisha | 40 | 女 | 50,000 |
| 1203 | khalil | 34 | 男 | 30,000 |
| 1204 | prasanth | 30 | 男 | 30,000 |
| 1205 | kiran | 20 | 男 | 40,000 |
| 1206 | laxmi | 25 | 女 | 35,000 |
| 1207 | bhavya | 20 | 女 | 15,000 |
| 1208 | reshma | 19 | 女 | 15,000 |
| 1209 | kranthi | 22 | 男 | 22,000 |
| 1210 | Satish | 24 | 男 | 25,000 |
| 1211 | Krishna | 25 | 男 | 25,000 |
| 1212 | Arshad | 28 | 男 | 20,000 |
| 1213 | lavanya | 18 | 女 | 8,000 |
我們必須編寫一個應用程式來處理輸入資料集,以查詢不同年齡組(例如,20歲以下,21到30歲之間,30歲以上)中按性別劃分的最高薪資員工。
輸入資料
以上資料儲存在“/home/hadoop/hadoopPartitioner”目錄下的input.txt檔案中,並作為輸入提供。
| 1201 | gopal | 45 | 男 | 50000 |
| 1202 | manisha | 40 | 女 | 51000 |
| 1203 | khaleel | 34 | 男 | 30000 |
| 1204 | prasanth | 30 | 男 | 31000 |
| 1205 | kiran | 20 | 男 | 40000 |
| 1206 | laxmi | 25 | 女 | 35000 |
| 1207 | bhavya | 20 | 女 | 15000 |
| 1208 | reshma | 19 | 女 | 14000 |
| 1209 | kranthi | 22 | 男 | 22000 |
| 1210 | Satish | 24 | 男 | 25000 |
| 1211 | Krishna | 25 | 男 | 26000 |
| 1212 | Arshad | 28 | 男 | 20000 |
| 1213 | lavanya | 18 | 女 | 8000 |
根據給定的輸入,以下是程式的演算法解釋。
Map 任務
Map 任務接受鍵值對作為輸入,而我們有文字檔案中的文字資料。此 Map 任務的輸入如下:
輸入 - 鍵將是諸如“任何特殊鍵 + 檔名 + 行號”之類的模式(例如:key = @input1),而值將是該行中的資料(例如:value = 1201 \t gopal \t 45 \t Male \t 50000)。
方法 - 此 Map 任務的操作如下:
讀取值(記錄資料),它作為輸入值從引數列表中的字串中獲取。
使用 split 函式,分離性別並存儲在字串變數中。
String[] str = value.toString().split("\t", -3);
String gender=str[3];
將性別資訊和記錄資料值作為輸出鍵值對從 Map 任務傳送到分割槽任務。
context.write(new Text(gender), new Text(value));
對文字檔案中的所有記錄重複以上所有步驟。
輸出 - 您將獲得性別數據和記錄資料值作為鍵值對。
分割槽器任務
分割槽器任務接受來自 Map 任務的鍵值對作為其輸入。分割槽意味著將資料劃分為段。根據給定的分割槽條件標準,輸入的鍵值對資料可以根據年齡標準分為三個部分。
輸入 - 鍵值對集合中的所有資料。
key = 記錄中性別的欄位值。
value = 該性別的整個記錄資料值。
方法 - 分割槽邏輯的過程如下。
- 從輸入鍵值對中讀取年齡欄位值。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
根據以下條件檢查年齡值。
- 年齡小於或等於 20
- 年齡大於 20 且小於或等於 30。
- 年齡大於 30。
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
輸出 - 鍵值對的所有資料被分割成三個鍵值對集合。Reducer 在每個集合上單獨工作。
Reduce 任務
分割槽器任務的數量等於 Reducer 任務的數量。這裡我們有三個分割槽器任務,因此我們有三個 Reducer 任務要執行。
輸入 - Reducer 將使用不同的鍵值對集合執行三次。
key = 記錄中性別的欄位值。
value = 該性別的整個記錄資料。
方法 - 以下邏輯將應用於每個集合。
- 讀取每個記錄的薪資欄位值。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
將薪資與 max 變數進行比較。如果 str[4] 是最大薪資,則將 str[4] 分配給 max,否則跳過此步驟。
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
對每個鍵集合重複步驟 1 和 2(Male & Female 是鍵集合)。執行完這三個步驟後,您將找到 Male 鍵集合中的一個最大薪資和 Female 鍵集合中的一個最大薪資。
context.write(new Text(key), new IntWritable(max));
輸出 - 最終,您將獲得一組鍵值對資料,它們包含在三個不同年齡組的集合中。它分別包含每個年齡組中來自 Male 集合的最大薪資和來自 Female 集合的最大薪資。
執行 Map、分割槽器和 Reduce 任務後,鍵值對資料的三個集合將儲存在三個不同的檔案中作為輸出。
所有這三個任務都被視為 MapReduce 作業。這些作業的以下要求和規範應在配置中指定:
- 作業名稱
- 鍵和值的輸入和輸出格式
- Map、Reduce 和分割槽器任務的各個類
Configuration conf = getConf(); //Create Job Job job = new Job(conf, "topsal"); job.setJarByClass(PartitionerExample.class); // File Input and Output paths FileInputFormat.setInputPaths(job, new Path(arg[0])); FileOutputFormat.setOutputPath(job,new Path(arg[1])); //Set Mapper class and Output format for key-value pair. job.setMapperClass(MapClass.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //set partitioner statement job.setPartitionerClass(CaderPartitioner.class); //Set Reducer class and Input/Output format for key-value pair. job.setReducerClass(ReduceClass.class); //Number of Reducer tasks. job.setNumReduceTasks(3); //Input and Output format for data job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
示例程式
以下程式展示瞭如何在 MapReduce 程式中為給定條件實現分割槽器。
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
將上述程式碼儲存為“/home/hadoop/hadoopPartitioner”中的PartitionerExample.java。下面給出程式的編譯和執行。
編譯和執行
讓我們假設我們在 Hadoop 使用者的主目錄中(例如,/home/hadoop)。
按照以下步驟編譯和執行上述程式。
步驟 1 - 下載 Hadoop-core-1.2.1.jar,用於編譯和執行 MapReduce 程式。您可以從mvnrepository.com下載該 jar 檔案。
讓我們假設下載的資料夾是“/home/hadoop/hadoopPartitioner”
步驟 2 - 以下命令用於編譯程式PartitionerExample.java併為程式建立 jar 檔案。
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java $ jar -cvf PartitionerExample.jar -C .
步驟 3 - 使用以下命令在 HDFS 中建立輸入目錄。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步驟 4 - 使用以下命令將名為input.txt的輸入檔案複製到 HDFS 的輸入目錄中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
步驟 5 - 使用以下命令驗證輸入目錄中的檔案。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步驟 6 - 使用以下命令執行 Top salary 應用程式,並從輸入目錄中獲取輸入檔案。
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
等待一段時間,直到檔案執行完畢。執行後,輸出包含許多輸入拆分、Map 任務和 Reducer 任務。
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully 15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=467 FILE: Number of bytes written=426777 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=480 HDFS: Number of bytes written=72 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Launched map tasks=1 Launched reduce tasks=3 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=8212 Total time spent by all reduces in occupied slots (ms)=59858 Total time spent by all map tasks (ms)=8212 Total time spent by all reduce tasks (ms)=59858 Total vcore-seconds taken by all map tasks=8212 Total vcore-seconds taken by all reduce tasks=59858 Total megabyte-seconds taken by all map tasks=8409088 Total megabyte-seconds taken by all reduce tasks=61294592 Map-Reduce Framework Map input records=13 Map output records=13 Map output bytes=423 Map output materialized bytes=467 Input split bytes=119 Combine input records=0 Combine output records=0 Reduce input groups=6 Reduce shuffle bytes=467 Reduce input records=13 Reduce output records=6 Spilled Records=26 Shuffled Maps =3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms)=224 CPU time spent (ms)=3690 Physical memory (bytes) snapshot=553816064 Virtual memory (bytes) snapshot=3441266688 Total committed heap usage (bytes)=334102528 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=361 File Output Format Counters Bytes Written=72
步驟 7 - 使用以下命令驗證輸出資料夾中的結果檔案。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
您將在三個檔案中找到輸出,因為您在程式中使用了三個分割槽器和三個 Reducer。
步驟 8 - 使用以下命令檢視Part-00000檔案中的輸出。此檔案由 HDFS 生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
Part-00000 中的輸出
Female 15000 Male 40000
使用以下命令檢視Part-00001檔案中的輸出。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
Part-00001 中的輸出
Female 35000 Male 31000
使用以下命令檢視Part-00002檔案中的輸出。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
Part-00002 中的輸出
Female 51000 Male 50000