HCatalog - 輸入輸出格式



HCatInputFormatHCatOutputFormat 介面用於從 HDFS 讀取資料,並在處理後使用 MapReduce 作業將結果資料寫入 HDFS。讓我們詳細說明輸入和輸出格式介面。

HCatInputFormat

HCatInputFormat 用於 MapReduce 作業,從 HCatalog 管理的表中讀取資料。HCatInputFormat 提供了一個 Hadoop 0.20 MapReduce API,用於讀取資料,就像資料已釋出到表中一樣。

序號 方法名稱和描述
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName) throws IOException

設定作業使用的輸入。它使用給定的輸入規範查詢元資料儲存,並將匹配的分割槽序列化到 MapReduce 任務的作業配置中。

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

設定作業使用的輸入。它使用給定的輸入規範查詢元資料儲存,並將匹配的分割槽序列化到 MapReduce 任務的作業配置中。

3

public HCatInputFormat setFilter(String filter) throws IOException

在輸入表上設定過濾器。

4

public HCatInputFormat setProperties(Properties properties) throws IOException

設定輸入格式的屬性。

HCatInputFormat API 包含以下方法:

  • setInput
  • setOutputSchema
  • getTableSchema

要使用 HCatInputFormat 讀取資料,首先使用被讀取表的必要資訊例項化一個 InputJobInfo,然後使用 InputJobInfo 呼叫 setInput

您可以使用 setOutputSchema 方法包含一個 投影模式,以指定輸出欄位。如果沒有指定模式,則將返回表中的所有列。您可以使用 getTableSchema 方法確定指定輸入表的表模式。

HCatOutputFormat

HCatOutputFormat 用於 MapReduce 作業,將資料寫入 HCatalog 管理的表。HCatOutputFormat 提供了一個 Hadoop 0.20 MapReduce API,用於將資料寫入表。當 MapReduce 作業使用 HCatOutputFormat 寫入輸出時,將使用為表配置的預設 OutputFormat,並且作業完成後,新分割槽將釋出到表中。

序號 方法名稱和描述
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

設定作業寫入輸出的資訊。它查詢元資料伺服器以查詢要用於表的 StorageHandler。如果分割槽已釋出,則會丟擲錯誤。

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

設定寫入分割槽的資料模式。如果沒有呼叫此方法,則預設使用表模式。

3

public RecordWriter , HCatRecord > getRecordWriter (TaskAttemptContext context) throws IOException, InterruptedException

獲取作業的記錄寫入器。它使用 StorageHandler 的預設 OutputFormat 獲取記錄寫入器。

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

獲取此輸出格式的輸出提交器。它確保正確提交輸出。

HCatOutputFormat API 包含以下方法:

  • setOutput
  • setSchema
  • getTableSchema

對 HCatOutputFormat 的第一次呼叫必須是 setOutput;任何其他呼叫都會丟擲異常,指出輸出格式未初始化。

寫入資料的模式由 setSchema 方法指定。必須呼叫此方法,提供正在寫入的資料模式。如果您的資料與表模式具有相同的模式,則可以使用 HCatOutputFormat.getTableSchema() 獲取表模式,然後將其傳遞給 setSchema()

示例

以下 MapReduce 程式從一個表讀取資料(假設該表在第二列“column 1”中有一個整數),並計算找到的每個不同值的例項數。也就是說,它執行等效於“select col1, count(*) from $table group by col1;”的操作。

例如,如果第二列中的值為 {1, 1, 1, 3, 3, 5},則程式將生成以下值和計數輸出:

1, 3
3, 2
5, 1

現在讓我們來看一下程式程式碼:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

在編譯上述程式之前,您必須下載一些jar包並將它們新增到此應用程式的類路徑中。您需要下載所有 Hive jar 包和 HCatalog jar 包(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。

使用以下命令將這些jar檔案從本地複製到HDFS,並將它們新增到類路徑

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

使用以下命令編譯並執行給定的程式。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

現在,檢查您的輸出目錄(hdfs: user/tmp/hive)以檢視輸出(part_0000,part_0001)。

廣告
© . All rights reserved.