MapReduce - 分割槽器



分割槽器在處理輸入資料集時就像一個條件。分割槽階段發生在 Map 階段之後,Reduce 階段之前。

分割槽器的數量等於 Reducer 的數量。這意味著分割槽器將根據 Reducer 的數量劃分資料。因此,來自單個分割槽器的資料由單個 Reducer 處理。

分割槽器

分割槽器對中間 Map 輸出的鍵值對進行分割槽。它使用使用者定義的條件對資料進行分割槽,該條件就像一個雜湊函式。分割槽的總數與作業的 Reducer 任務數相同。讓我們舉一個例子來了解分割槽器是如何工作的。

MapReduce 分割槽器實現

為了方便起見,讓我們假設我們有一個名為 Employee 的小表,其中包含以下資料。我們將使用此示例資料作為我們的輸入資料集來演示分割槽器的工作原理。

ID 姓名 年齡 性別 薪資
1201 gopal 45 50,000
1202 manisha 40 50,000
1203 khalil 34 30,000
1204 prasanth 30 30,000
1205 kiran 20 40,000
1206 laxmi 25 35,000
1207 bhavya 20 15,000
1208 reshma 19 15,000
1209 kranthi 22 22,000
1210 Satish 24 25,000
1211 Krishna 25 25,000
1212 Arshad 28 20,000
1213 lavanya 18 8,000

我們必須編寫一個應用程式來處理輸入資料集,以查詢不同年齡組(例如,20歲以下,21到30歲之間,30歲以上)中按性別劃分的最高薪資員工。

輸入資料

以上資料儲存在“/home/hadoop/hadoopPartitioner”目錄下的input.txt檔案中,並作為輸入提供。

1201 gopal 45 50000
1202 manisha 40 51000
1203 khaleel 34 30000
1204 prasanth 30 31000
1205 kiran 20 40000
1206 laxmi 25 35000
1207 bhavya 20 15000
1208 reshma 19 14000
1209 kranthi 22 22000
1210 Satish 24 25000
1211 Krishna 25 26000
1212 Arshad 28 20000
1213 lavanya 18 8000

根據給定的輸入,以下是程式的演算法解釋。

Map 任務

Map 任務接受鍵值對作為輸入,而我們有文字檔案中的文字資料。此 Map 任務的輸入如下:

輸入 - 鍵將是諸如“任何特殊鍵 + 檔名 + 行號”之類的模式(例如:key = @input1),而值將是該行中的資料(例如:value = 1201 \t gopal \t 45 \t Male \t 50000)。

方法 - 此 Map 任務的操作如下:

  • 讀取(記錄資料),它作為輸入值從引數列表中的字串中獲取。

  • 使用 split 函式,分離性別並存儲在字串變數中。

String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • 將性別資訊和記錄資料作為輸出鍵值對從 Map 任務傳送到分割槽任務

context.write(new Text(gender), new Text(value));
  • 對文字檔案中的所有記錄重複以上所有步驟。

輸出 - 您將獲得性別數據和記錄資料值作為鍵值對。

分割槽器任務

分割槽器任務接受來自 Map 任務的鍵值對作為其輸入。分割槽意味著將資料劃分為段。根據給定的分割槽條件標準,輸入的鍵值對資料可以根據年齡標準分為三個部分。

輸入 - 鍵值對集合中的所有資料。

key = 記錄中性別的欄位值。

value = 該性別的整個記錄資料值。

方法 - 分割槽邏輯的過程如下。

  • 從輸入鍵值對中讀取年齡欄位值。
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • 根據以下條件檢查年齡值。

    • 年齡小於或等於 20
    • 年齡大於 20 且小於或等於 30。
    • 年齡大於 30。
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

輸出 - 鍵值對的所有資料被分割成三個鍵值對集合。Reducer 在每個集合上單獨工作。

Reduce 任務

分割槽器任務的數量等於 Reducer 任務的數量。這裡我們有三個分割槽器任務,因此我們有三個 Reducer 任務要執行。

輸入 - Reducer 將使用不同的鍵值對集合執行三次。

key = 記錄中性別的欄位值。

value = 該性別的整個記錄資料。

方法 - 以下邏輯將應用於每個集合。

  • 讀取每個記錄的薪資欄位值。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • 將薪資與 max 變數進行比較。如果 str[4] 是最大薪資,則將 str[4] 分配給 max,否則跳過此步驟。

if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • 對每個鍵集合重複步驟 1 和 2(Male & Female 是鍵集合)。執行完這三個步驟後,您將找到 Male 鍵集合中的一個最大薪資和 Female 鍵集合中的一個最大薪資。

context.write(new Text(key), new IntWritable(max));

輸出 - 最終,您將獲得一組鍵值對資料,它們包含在三個不同年齡組的集合中。它分別包含每個年齡組中來自 Male 集合的最大薪資和來自 Female 集合的最大薪資。

執行 Map、分割槽器和 Reduce 任務後,鍵值對資料的三個集合將儲存在三個不同的檔案中作為輸出。

所有這三個任務都被視為 MapReduce 作業。這些作業的以下要求和規範應在配置中指定:

  • 作業名稱
  • 鍵和值的輸入和輸出格式
  • Map、Reduce 和分割槽器任務的各個類
Configuration conf = getConf();

//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);

// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));

//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);

//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);

//Number of Reducer tasks.
job.setNumReduceTasks(3);

//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

示例程式

以下程式展示瞭如何在 MapReduce 程式中為給定條件實現分割槽器。

package partitionerexample;

import java.io.*;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;

import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.util.*;

public class PartitionerExample extends Configured implements Tool
{
   //Map class
	
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
	
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
			
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
			
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
	
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
		
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
		
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
		
      job.setMapperClass(MapClass.class);
		
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
		
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
		
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
		
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

將上述程式碼儲存為“/home/hadoop/hadoopPartitioner”中的PartitionerExample.java。下面給出程式的編譯和執行。

編譯和執行

讓我們假設我們在 Hadoop 使用者的主目錄中(例如,/home/hadoop)。

按照以下步驟編譯和執行上述程式。

步驟 1 - 下載 Hadoop-core-1.2.1.jar,用於編譯和執行 MapReduce 程式。您可以從mvnrepository.com下載該 jar 檔案。

讓我們假設下載的資料夾是“/home/hadoop/hadoopPartitioner”

步驟 2 - 以下命令用於編譯程式PartitionerExample.java併為程式建立 jar 檔案。

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

步驟 3 - 使用以下命令在 HDFS 中建立輸入目錄。

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

步驟 4 - 使用以下命令將名為input.txt的輸入檔案複製到 HDFS 的輸入目錄中。

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

步驟 5 - 使用以下命令驗證輸入目錄中的檔案。

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

步驟 6 - 使用以下命令執行 Top salary 應用程式,並從輸入目錄中獲取輸入檔案。

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

等待一段時間,直到檔案執行完畢。執行後,輸出包含許多輸入拆分、Map 任務和 Reducer 任務。

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49

File System Counters

   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
	
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
	
Job Counters

   Launched map tasks=1
   Launched reduce tasks=3
	
   Data-local map tasks=1
	
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
	
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
	
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
	
Map-Reduce Framework

   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
	
   Input split bytes=119
	
   Combine input records=0
   Combine output records=0
	
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
	
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
	
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
	
   Total committed heap usage (bytes)=334102528
	
Shuffle Errors

   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
	
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
	
File Input Format Counters

   Bytes Read=361
	
File Output Format Counters

   Bytes Written=72

步驟 7 - 使用以下命令驗證輸出資料夾中的結果檔案。

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

您將在三個檔案中找到輸出,因為您在程式中使用了三個分割槽器和三個 Reducer。

步驟 8 - 使用以下命令檢視Part-00000檔案中的輸出。此檔案由 HDFS 生成。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Part-00000 中的輸出

Female   15000
Male     40000

使用以下命令檢視Part-00001檔案中的輸出。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Part-00001 中的輸出

Female   35000
Male    31000

使用以下命令檢視Part-00002檔案中的輸出。

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Part-00002 中的輸出

Female  51000
Male   50000
廣告

© . All rights reserved.