- HCatalog CLI 命令
- HCatalog - 建立表
- HCatalog - 修改表
- HCatalog - 檢視
- HCatalog - 顯示錶
- HCatalog - 顯示分割槽
- HCatalog - 索引
- HCatalog API
- HCatalog - 讀寫器
- HCatalog - 輸入輸出格式
- HCatalog - 載入器和儲存器
- HCatalog 有用資源
- HCatalog - 快速指南
- HCatalog - 有用資源
- HCatalog - 討論
HCatalog - 輸入輸出格式
HCatInputFormat 和 HCatOutputFormat 介面用於從 HDFS 讀取資料,並在處理後使用 MapReduce 作業將結果資料寫入 HDFS。讓我們詳細說明輸入和輸出格式介面。
HCatInputFormat
HCatInputFormat 用於 MapReduce 作業,從 HCatalog 管理的表中讀取資料。HCatInputFormat 提供了一個 Hadoop 0.20 MapReduce API,用於讀取資料,就像資料已釋出到表中一樣。
| 序號 | 方法名稱和描述 |
|---|---|
| 1 | public static HCatInputFormat setInput(Job job, String dbName, String tableName) throws IOException 設定作業使用的輸入。它使用給定的輸入規範查詢元資料儲存,並將匹配的分割槽序列化到 MapReduce 任務的作業配置中。 |
| 2 | public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException 設定作業使用的輸入。它使用給定的輸入規範查詢元資料儲存,並將匹配的分割槽序列化到 MapReduce 任務的作業配置中。 |
| 3 | public HCatInputFormat setFilter(String filter) throws IOException 在輸入表上設定過濾器。 |
| 4 | public HCatInputFormat setProperties(Properties properties) throws IOException 設定輸入格式的屬性。 |
HCatInputFormat API 包含以下方法:
- setInput
- setOutputSchema
- getTableSchema
要使用 HCatInputFormat 讀取資料,首先使用被讀取表的必要資訊例項化一個 InputJobInfo,然後使用 InputJobInfo 呼叫 setInput。
您可以使用 setOutputSchema 方法包含一個 投影模式,以指定輸出欄位。如果沒有指定模式,則將返回表中的所有列。您可以使用 getTableSchema 方法確定指定輸入表的表模式。
HCatOutputFormat
HCatOutputFormat 用於 MapReduce 作業,將資料寫入 HCatalog 管理的表。HCatOutputFormat 提供了一個 Hadoop 0.20 MapReduce API,用於將資料寫入表。當 MapReduce 作業使用 HCatOutputFormat 寫入輸出時,將使用為表配置的預設 OutputFormat,並且作業完成後,新分割槽將釋出到表中。
| 序號 | 方法名稱和描述 |
|---|---|
| 1 | public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException 設定作業寫入輸出的資訊。它查詢元資料伺服器以查詢要用於表的 StorageHandler。如果分割槽已釋出,則會丟擲錯誤。 |
| 2 | public static void setSchema (Configuration conf, HCatSchema schema) throws IOException 設定寫入分割槽的資料模式。如果沒有呼叫此方法,則預設使用表模式。 |
| 3 | public RecordWriter 獲取作業的記錄寫入器。它使用 StorageHandler 的預設 OutputFormat 獲取記錄寫入器。 |
| 4 | public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException 獲取此輸出格式的輸出提交器。它確保正確提交輸出。 |
HCatOutputFormat API 包含以下方法:
- setOutput
- setSchema
- getTableSchema
對 HCatOutputFormat 的第一次呼叫必須是 setOutput;任何其他呼叫都會丟擲異常,指出輸出格式未初始化。
寫入資料的模式由 setSchema 方法指定。必須呼叫此方法,提供正在寫入的資料模式。如果您的資料與表模式具有相同的模式,則可以使用 HCatOutputFormat.getTableSchema() 獲取表模式,然後將其傳遞給 setSchema()。
示例
以下 MapReduce 程式從一個表讀取資料(假設該表在第二列“column 1”中有一個整數),並計算找到的每個不同值的例項數。也就是說,它執行等效於“select col1, count(*) from $table group by col1;”的操作。
例如,如果第二列中的值為 {1, 1, 1, 3, 3, 5},則程式將生成以下值和計數輸出:
1, 3 3, 2 5, 1
現在讓我們來看一下程式程式碼:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;
import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;
public class GroupByAge extends Configured implements Tool {
public static class Map extends Mapper<WritableComparable,
HCatRecord, IntWritable, IntWritable> {
int age;
@Override
protected void map(
WritableComparable key, HCatRecord value,
org.apache.hadoop.mapreduce.Mapper<WritableComparable,
HCatRecord, IntWritable, IntWritable>.Context context
)throws IOException, InterruptedException {
age = (Integer) value.get(1);
context.write(new IntWritable(age), new IntWritable(1));
}
}
public static class Reduce extends Reducer<IntWritable, IntWritable,
WritableComparable, HCatRecord> {
@Override
protected void reduce(
IntWritable key, java.lang.Iterable<IntWritable> values,
org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
WritableComparable, HCatRecord>.Context context
)throws IOException ,InterruptedException {
int sum = 0;
Iterator<IntWritable> iter = values.iterator();
while (iter.hasNext()) {
sum++;
iter.next();
}
HCatRecord record = new DefaultHCatRecord(2);
record.set(0, key.get());
record.set(1, sum);
context.write(null, record);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String serverUri = args[0];
String inputTableName = args[1];
String outputTableName = args[2];
String dbName = null;
String principalID = System
.getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
if (principalID != null)
conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
Job job = new Job(conf, "GroupByAge");
HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));
// initialize HCatOutputFormat
job.setInputFormatClass(HCatInputFormat.class);
job.setJarByClass(GroupByAge.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(WritableComparable.class);
job.setOutputValueClass(DefaultHCatRecord.class);
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
HCatSchema s = HCatOutputFormat.getTableSchema(job);
System.err.println("INFO: output schema explicitly set for writing:" + s);
HCatOutputFormat.setSchema(job, s);
job.setOutputFormatClass(HCatOutputFormat.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new GroupByAge(), args);
System.exit(exitCode);
}
}
在編譯上述程式之前,您必須下載一些jar包並將它們新增到此應用程式的類路徑中。您需要下載所有 Hive jar 包和 HCatalog jar 包(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。
使用以下命令將這些jar檔案從本地複製到HDFS,並將它們新增到類路徑。
bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar, hdfs:///tmp/hive-metastore-0.10.0.jar, hdfs:///tmp/libthrift-0.7.0.jar, hdfs:///tmp/hive-exec-0.10.0.jar, hdfs:///tmp/libfb303-0.7.0.jar, hdfs:///tmp/jdo2-api-2.3-ec.jar, hdfs:///tmp/slf4j-api-1.6.1.jar
使用以下命令編譯並執行給定的程式。
$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive
現在,檢查您的輸出目錄(hdfs: user/tmp/hive)以檢視輸出(part_0000,part_0001)。