HCatalog 快速指南



HCatalog - 簡介

什麼是 HCatalog?

HCatalog 是 Hadoop 的表儲存管理工具。它將 Hive 元儲存的表格資料暴露給其他 Hadoop 應用程式。它使使用者能夠使用不同的資料處理工具(Pig、MapReduce)輕鬆地將資料寫入網格。它確保使用者不必擔心資料儲存的位置或格式。

HCatalog 作為 Hive 的關鍵元件,使使用者能夠以任何格式和任何結構儲存資料。

為什麼選擇 HCatalog?

為合適的任務啟用合適的工具

Hadoop 生態系統包含不同的資料處理工具,例如 Hive、Pig 和 MapReduce。儘管這些工具不需要元資料,但當元資料存在時,它們仍然可以從中受益。共享元資料儲存還使使用者能夠更輕鬆地在工具之間共享資料。使用 MapReduce 或 Pig 載入和規範化資料,然後透過 Hive 進行分析的工作流程非常常見。如果所有這些工具都共享一個元儲存,那麼每個工具的使用者都可以立即訪問使用另一個工具建立的資料。不需要載入或傳輸步驟。

捕獲處理狀態以啟用共享

HCatalog 可以釋出您的分析結果。因此,其他程式設計師可以透過“REST”訪問您的分析平臺。您釋出的模式對其他資料科學家也很有用。其他資料科學家可以使用您的發現作為後續發現的輸入。

將 Hadoop 與所有內容整合

Hadoop 作為處理和儲存環境為企業帶來了許多機會;但是,為了促進採用,它必須與現有工具協同工作並增強現有工具。Hadoop 應作為您分析平臺的輸入,或與您的運營資料儲存和 Web 應用程式整合。組織應該享受 Hadoop 的價值,而不必學習一套全新的工具。REST 服務透過熟悉的 API 和類似 SQL 的語言向企業開放平臺。企業資料管理系統使用 HCatalog 與 Hadoop 平臺更深入地整合。

HCatalog 架構

下圖顯示了 HCatalog 的整體架構。

Architecture

HCatalog 支援讀取和寫入任何格式的檔案,只要可以編寫 **SerDe**(序列化器-反序列化器)即可。預設情況下,HCatalog 支援 RCFile、CSV、JSON、SequenceFile 和 ORC 檔案格式。要使用自定義格式,必須提供 InputFormat、OutputFormat 和 SerDe。

HCatalog 建立在 Hive 元儲存之上,幷包含 Hive 的 DDL。HCatalog 為 Pig 和 MapReduce 提供讀寫介面,並使用 Hive 的命令列介面來發出資料定義和元資料探索命令。

HCatalog - 安裝

所有 Hadoop 子專案(如 Hive、Pig 和 HBase)都支援 Linux 作業系統。因此,您需要在系統上安裝 Linux 版本。HCatalog 於 2013 年 3 月 26 日與 Hive 安裝合併。從 Hive-0.11.0 版本開始,HCatalog 與 Hive 安裝一起提供。因此,請按照以下步驟安裝 Hive,這將自動在您的系統上安裝 HCatalog。

步驟 1:驗證 JAVA 安裝

安裝 Hive 之前,必須在系統上安裝 Java。可以使用以下命令檢查系統上是否已安裝 Java:

$ java –version

如果系統上已安裝 Java,則會看到以下響應:

java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

如果系統上未安裝 Java,則需要按照以下步驟操作。

步驟 2:安裝 Java

訪問以下連結下載 Java(JDK <最新版本> - X64.tar.gz):http://www.oracle.com/

然後將下載 **jdk-7u71-linux-x64.tar.gz** 到您的系統。

通常,您會在“下載”資料夾中找到下載的 Java 檔案。驗證並使用以下命令解壓縮 **jdk-7u71-linux-x64.gz** 檔案。

$ cd Downloads/
$ ls
jdk-7u71-linux-x64.gz

$ tar zxf jdk-7u71-linux-x64.gz
$ ls
jdk1.7.0_71 jdk-7u71-linux-x64.gz

要使所有使用者都能使用 Java,必須將其移動到“/usr/local/”位置。開啟 root 許可權,然後鍵入以下命令。

$ su
password:
# mv jdk1.7.0_71 /usr/local/
# exit

要設定 **PATH** 和 **JAVA_HOME** 變數,請將以下命令新增到 **~/.bashrc** 檔案中。

export JAVA_HOME=/usr/local/jdk1.7.0_71
export PATH=PATH:$JAVA_HOME/bin

現在,使用終端中如上所述的命令 **java -version** 驗證安裝。

步驟 3:驗證 Hadoop 安裝

安裝 Hive 之前,必須在系統上安裝 Hadoop。讓我們使用以下命令驗證 Hadoop 安裝:

$ hadoop version

如果系統上已安裝 Hadoop,則會看到以下響應:

Hadoop 2.4.1
Subversion https://svn.apache.org/repos/asf/hadoop/common -r 1529768
Compiled by hortonmu on 2013-10-07T06:28Z
Compiled with protoc 2.5.0
From source with checksum 79e53ce7994d1628b240f09af91e1af4

如果系統上未安裝 Hadoop,則繼續執行以下步驟:

步驟 4:下載 Hadoop

使用以下命令從 Apache 軟體基金會下載並解壓縮 Hadoop 2.4.1。

$ su
password:
# cd /usr/local
# wget http://apache.claz.org/hadoop/common/hadoop-2.4.1/
hadoop-2.4.1.tar.gz
# tar xzf hadoop-2.4.1.tar.gz
# mv hadoop-2.4.1/* to hadoop/
# exit

步驟 5:以偽分散式模式安裝 Hadoop

以下步驟用於以偽分散式模式安裝 **Hadoop 2.4.1**。

設定 Hadoop

您可以透過將以下命令新增到 **~/.bashrc** 檔案中來設定 Hadoop 環境變數。

export HADOOP_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME 
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin

現在將所有更改應用到當前正在執行的系統。

$ source ~/.bashrc

Hadoop 配置

您可以在“$HADOOP_HOME/etc/hadoop”位置找到所有 Hadoop 配置檔案。您需要根據您的 Hadoop 基礎架構對這些配置檔案進行適當的更改。

$ cd $HADOOP_HOME/etc/hadoop

為了使用 Java 開發 Hadoop 程式,您必須透過將 **JAVA_HOME** 值替換為系統中 Java 的位置來重置 **hadoop-env.sh** 檔案中的 Java 環境變數。

export JAVA_HOME=/usr/local/jdk1.7.0_71

以下是您必須編輯以配置 Hadoop 的檔案列表。

core-site.xml

**core-site.xml** 檔案包含諸如 Hadoop 例項使用的埠號、為檔案系統分配的記憶體、儲存資料的記憶體限制以及讀/寫緩衝區的大小等資訊。

開啟 core-site.xml 並將以下屬性新增到 <configuration> 和 </configuration> 標記之間。

<configuration>
   <property>
      <name>fs.default.name</name>
      <value>hdfs://:9000</value>
   </property>
</configuration>

hdfs-site.xml

**hdfs-site.xml** 檔案包含諸如複製資料的值、namenode 路徑和本地檔案系統的 datanode 路徑等資訊。這意味著您要儲存 Hadoop 基礎架構的位置。

讓我們假設以下資料。

dfs.replication (data replication value) = 1

(In the following path /hadoop/ is the user name.
hadoopinfra/hdfs/namenode is the directory created by hdfs file system.)

namenode path = //home/hadoop/hadoopinfra/hdfs/namenode

(hadoopinfra/hdfs/datanode is the directory created by hdfs file system.)
datanode path = //home/hadoop/hadoopinfra/hdfs/datanode

開啟此檔案,並將以下屬性新增到此檔案中的 <configuration>、</configuration> 標記之間。

<configuration>
   <property>
      <name>dfs.replication</name>
      <value>1</value>
   </property> 
   
   <property>
      <name>dfs.name.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value> 
   </property> 

   <property>
      <name>dfs.data.dir</name>
      <value>file:///home/hadoop/hadoopinfra/hdfs/datanode</value> 
   </property>
</configuration>

**注意** - 在上面的檔案中,所有屬性值都是使用者定義的,您可以根據您的 Hadoop 基礎架構進行更改。

yarn-site.xml

此檔案用於將 yarn 配置到 Hadoop 中。開啟 yarn-site.xml 檔案,並將以下屬性新增到此檔案中的 <configuration>、</configuration> 標記之間。

<configuration>
   <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle</value>
   </property>
</configuration>

mapred-site.xml

此檔案用於指定我們正在使用哪個 MapReduce 框架。預設情況下,Hadoop 包含 yarn-site.xml 的模板。首先,您需要使用以下命令將檔案從 **mapred-site,xml.template** 複製到 **mapred-site.xml** 檔案:

$ cp mapred-site.xml.template mapred-site.xml

開啟 mapred-site.xml 檔案,並將以下屬性新增到此檔案中的 <configuration>、</configuration> 標記之間。

<configuration>
   <property>
      <name>mapreduce.framework.name</name>
      <value>yarn</value>
   </property>
</configuration>

步驟 6:驗證 Hadoop 安裝

以下步驟用於驗證 Hadoop 安裝。

Namenode 設定

使用命令“hdfs namenode -format”設定 namenode,如下所示:

$ cd ~
$ hdfs namenode -format

預期結果如下:

10/24/14 21:30:55 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = localhost/192.168.1.11
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 2.4.1
...
...
10/24/14 21:30:56 INFO common.Storage: Storage directory
/home/hadoop/hadoopinfra/hdfs/namenode has been successfully formatted.
10/24/14 21:30:56 INFO namenode.NNStorageRetentionManager: Going to retain 1
images with txid >= 0 10/24/14 21:30:56 INFO util.ExitUtil: Exiting with status 0
10/24/14 21:30:56 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at localhost/192.168.1.11
************************************************************/

驗證 Hadoop DFS

以下命令用於啟動 DFS。執行此命令將啟動您的 Hadoop 檔案系統。

$ start-dfs.sh

預期輸出如下:

10/24/14 21:37:56 Starting namenodes on [localhost]
localhost: starting namenode, logging to
/home/hadoop/hadoop-2.4.1/logs/hadoop-hadoop-namenode-localhost.out localhost:
starting datanode, logging to
   /home/hadoop/hadoop-2.4.1/logs/hadoop-hadoop-datanode-localhost.out
Starting secondary namenodes [0.0.0.0]

驗證 Yarn 指令碼

以下命令用於啟動 Yarn 指令碼。執行此命令將啟動您的 Yarn 守護程式。

$ start-yarn.sh

預期輸出如下:

starting yarn daemons
starting resourcemanager, logging to /home/hadoop/hadoop-2.4.1/logs/
yarn-hadoop-resourcemanager-localhost.out
localhost: starting nodemanager, logging to
   /home/hadoop/hadoop-2.4.1/logs/yarn-hadoop-nodemanager-localhost.out

在瀏覽器上訪問 Hadoop

訪問 Hadoop 的預設埠號為 50070。使用以下 URL 在瀏覽器上獲取 Hadoop 服務。

https://:50070/

Accessing HADOOP

驗證叢集的所有應用程式

訪問叢集所有應用程式的預設埠號為 8088。使用以下網址訪問此服務。

https://:8088/

Cluster

完成 Hadoop 的安裝後,繼續執行下一步,在您的系統上安裝 Hive。

步驟 7:下載 Hive

本教程中我們使用 hive-0.14.0。您可以訪問以下連結下載它:**http://apache.petsads.us/hive/hive-0.14.0/**。讓我們假設它下載到 **/Downloads** 目錄。在這裡,我們為本教程下載名為“**apache-hive-0.14.0-bin.tar.gz**”的 Hive 存檔。以下命令用於驗證下載:

$ cd Downloads
$ ls

下載成功後,您將看到以下響應:

apache-hive-0.14.0-bin.tar.gz

步驟 8:安裝 Hive

以下步驟是安裝 Hive 到您的系統中所需的步驟。讓我們假設 Hive 存檔下載到 **/Downloads** 目錄。

解壓縮和驗證 Hive 存檔

以下命令用於驗證下載並解壓縮 Hive 存檔:

$ tar zxvf apache-hive-0.14.0-bin.tar.gz
$ ls

下載成功後,您將看到以下響應:

apache-hive-0.14.0-bin apache-hive-0.14.0-bin.tar.gz

將檔案複製到 /usr/local/hive 目錄

我們需要從超級使用者“su -”複製檔案。以下命令用於將檔案從解壓縮的目錄複製到“/usr/local/hive”目錄。

$ su -
passwd:
# cd /home/user/Download
# mv apache-hive-0.14.0-bin /usr/local/hive
# exit

設定 Hive 的環境

您可以透過將以下幾行新增到 **~/.bashrc** 檔案中來設定 Hive 環境:

export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASSPATH:/usr/local/Hadoop/lib/*:.
export CLASSPATH=$CLASSPATH:/usr/local/hive/lib/*:.

以下命令用於執行 ~/.bashrc 檔案。

$ source ~/.bashrc

步驟 9:配置 Hive

要將 Hive 與 Hadoop 配置,您需要編輯 **hive-env.sh** 檔案,該檔案位於 **$HIVE_HOME/conf** 目錄中。以下命令重定向到 Hive **config** 資料夾並複製模板檔案:

$ cd $HIVE_HOME/conf
$ cp hive-env.sh.template hive-env.sh

透過新增以下行來編輯 **hive-env.sh** 檔案:

export HADOOP_HOME=/usr/local/hadoop

至此,Hive 安裝完成。現在您需要一個外部資料庫伺服器來配置元儲存。我們使用 Apache Derby 資料庫。

步驟 10:下載和安裝 Apache Derby

按照以下步驟下載並安裝 Apache Derby:

下載 Apache Derby

使用以下命令下載 Apache Derby。下載需要一些時間。

$ cd ~
$ wget http://archive.apache.org/dist/db/derby/db-derby-10.4.2.0/db-derby-10.4.2.0-bin.tar.gz

使用以下命令驗證下載:

$ ls

下載成功後,您將看到以下響應:

db-derby-10.4.2.0-bin.tar.gz

解壓並驗證 Derby 歸檔檔案

使用以下命令解壓並驗證 Derby 歸檔檔案:

$ tar zxvf db-derby-10.4.2.0-bin.tar.gz
$ ls

下載成功後,您將看到以下響應:

db-derby-10.4.2.0-bin db-derby-10.4.2.0-bin.tar.gz

將檔案複製到 /usr/local/derby 目錄

我們需要以超級使用者身份“su -”進行復制。使用以下命令將檔案從解壓後的目錄複製到 **`/usr/local/derby`** 目錄:

$ su -
passwd:
# cd /home/user
# mv db-derby-10.4.2.0-bin /usr/local/derby
# exit

設定 Derby 環境

您可以透過將以下幾行新增到 **`~/.bashrc`** 檔案中來設定 Derby 環境:

export DERBY_HOME=/usr/local/derby
export PATH=$PATH:$DERBY_HOME/bin
export CLASSPATH=$CLASSPATH:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbytools.jar

使用以下命令執行 **`~/.bashrc`** 檔案:

$ source ~/.bashrc

建立元資料儲存目錄

在 $DERBY_HOME 目錄中建立一個名為 **`data`** 的目錄來儲存元資料儲存資料。

$ mkdir $DERBY_HOME/data

Derby 安裝和環境設定已完成。

步驟 11:配置 Hive 元資料儲存

配置元資料儲存意味著指定 Hive 儲存資料庫的位置。您可以透過編輯 **`hive-site.xml`** 檔案來實現,該檔案位於 **`$HIVE_HOME/conf`** 目錄中。首先,使用以下命令複製模板檔案:

$ cd $HIVE_HOME/conf
$ cp hive-default.xml.template hive-site.xml

編輯 **`hive-site.xml`** 檔案,並在 `` 和 `` 標籤之間新增以下幾行:

<property>
   <name>javax.jdo.option.ConnectionURL</name>
   <value>jdbc:derby://:1527/metastore_db;create = true</value>
   <description>JDBC connect string for a JDBC metastore</description>
</property>

建立一個名為 **`jpox.properties`** 的檔案,並在其中新增以下幾行:

javax.jdo.PersistenceManagerFactoryClass = org.jpox.PersistenceManagerFactoryImpl

org.jpox.autoCreateSchema = false
org.jpox.validateTables = false
org.jpox.validateColumns = false
org.jpox.validateConstraints = false

org.jpox.storeManagerType = rdbms
org.jpox.autoCreateSchema = true
org.jpox.autoStartMechanismMode = checked
org.jpox.transactionIsolation = read_committed

javax.jdo.option.DetachAllOnCommit = true
javax.jdo.option.NontransactionalRead = true
javax.jdo.option.ConnectionDriverName = org.apache.derby.jdbc.ClientDriver
javax.jdo.option.ConnectionURL = jdbc:derby://hadoop1:1527/metastore_db;create = true
javax.jdo.option.ConnectionUserName = APP
javax.jdo.option.ConnectionPassword = mine

步驟 12:驗證 Hive 安裝

在執行 Hive 之前,您需要在 HDFS 中建立 **`/tmp`** 資料夾和一個單獨的 Hive 資料夾。這裡我們使用 **`/user/hive/warehouse`** 資料夾。您需要為這些新建立的資料夾設定寫入許可權,如下所示:

chmod g+w

現在在驗證 Hive 之前在 HDFS 中設定它們。使用以下命令:

$ $HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$ $HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

使用以下命令驗證 Hive 安裝:

$ cd $HIVE_HOME
$ bin/hive

如果 Hive 安裝成功,您將看到以下響應:

Logging initialized using configuration in 
   jar:file:/home/hadoop/hive-0.9.0/lib/hive-common-0.9.0.jar!/
hive-log4j.properties Hive history
   =/tmp/hadoop/hive_job_log_hadoop_201312121621_1494929084.txt
………………….
hive>

您可以執行以下示例命令來顯示所有表:

hive> show tables;
OK Time taken: 2.798 seconds
hive>

步驟 13:驗證 HCatalog 安裝

使用以下命令設定 HCatalog 主目錄的系統變數 **`HCAT_HOME`**:

export HCAT_HOME = $HiVE_HOME/HCatalog

使用以下命令驗證 HCatalog 安裝。

cd $HCAT_HOME/bin
./hcat

如果安裝成功,您將看到以下輸出:

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
usage: hcat { -e "<query>" | -f "<filepath>" } 
   [ -g "<group>" ] [ -p "<perms>" ] 
   [ -D"<name> = <value>" ]
	
-D <property = value>    use hadoop value for given property
-e <exec>                hcat command given from command line
-f <file>                hcat commands in file
-g <group>               group for the db/table specified in CREATE statement
-h,--help                Print help information
-p <perms>               permissions for the db/table specified in CREATE statement

HCatalog - 命令列介面 (CLI)

HCatalog 命令列介面 (CLI) 可以從命令 **`$HIVE_HOME/HCatalog/bin/hcat`** 呼叫,其中 $HIVE_HOME 是 Hive 的主目錄。**`hcat`** 是用於初始化 HCatalog 伺服器的命令。

使用以下命令初始化 HCatalog 命令列。

cd $HCAT_HOME/bin
./hcat

如果安裝正確,您將獲得以下輸出:

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
usage: hcat { -e "<query>" | -f "<filepath>" } 
   [ -g "<group>" ] [ -p "<perms>" ] 
   [ -D"<name> = <value>" ]
	
-D <property = value>    use hadoop value for given property
-e <exec>                hcat command given from command line
-f <file>                hcat commands in file
-g <group>               group for the db/table specified in CREATE statement
-h,--help                Print help information
-p <perms>               permissions for the db/table specified in CREATE statement

HCatalog CLI 支援以下命令列選項:

序號 選項 示例及說明
1 -g

hcat -g mygroup ...

要建立的表必須具有組“mygroup”。

2 -p

hcat -p rwxr-xr-x ...

要建立的表必須具有讀、寫和執行許可權。

3 -f

hcat -f myscript.HCatalog ...

myscript.HCatalog 是包含要執行的 DDL 命令的指令碼檔案。

4 -e

hcat -e 'create table mytable(a int);' ...

將以下字串視為 DDL 命令並執行。

5 -D

hcat -Dkey = value ...

將鍵值對作為 Java 系統屬性傳遞給 HCatalog。

6 -

hcat

列印用法資訊。

注意:

  • **`-g`** 和 **`-p`** 選項不是必需的。

  • 一次只能提供 **`-e`** 或 **`-f`** 選項,不能同時提供。

  • 選項的順序無關緊要;您可以按任何順序指定選項。

序號 DDL 命令及說明
1

CREATE TABLE

使用 HCatalog 建立表。如果使用 CLUSTERED BY 子句建立表,則無法使用 Pig 或 MapReduce 向其寫入資料。

2

ALTER TABLE

支援,但 REBUILD 和 CONCATENATE 選項除外。其行為與 Hive 中相同。

3

DROP TABLE

支援。行為與 Hive 相同(刪除完整的表和結構)。

4

CREATE/ALTER/DROP VIEW

支援。行為與 Hive 相同。

**注意**:Pig 和 MapReduce 無法讀取或寫入檢視。

5

SHOW TABLES

顯示錶列表。

6

SHOW PARTITIONS

顯示分割槽列表。

7

建立/刪除索引

支援 CREATE 和 DROP FUNCTION 操作,但建立的函式仍必須在 Pig 中註冊並放置在 MapReduce 的 CLASSPATH 中。

8

DESCRIBE

支援。行為與 Hive 相同。描述結構。

後續章節將解釋上表中的一些命令。

HCatalog - 建立表

本章解釋如何建立表以及如何向表中插入資料。在 HCatalog 中建立表的約定與使用 Hive 建立表非常相似。

建立表語句

Create Table 是一個用於使用 HCatalog 在 Hive 元資料儲存中建立表的語句。其語法和示例如下:

語法

CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[ROW FORMAT row_format]
[STORED AS file_format]

示例

假設您需要使用 **`CREATE TABLE`** 語句建立一個名為 **`employee`** 的表。下表列出了 **`employee`** 表中的欄位及其資料型別:

序號 欄位名稱 資料型別
1 Eid int
2 Name String
3 Salary Float
4 Designation string

以下資料定義了受支援的欄位,例如 **`Comment`**,行格式化欄位,例如 **`Field terminator`**、**`Lines terminator`** 和 **`Stored File type`**。

COMMENT ‘Employee details’
FIELDS TERMINATED BY ‘\t’
LINES TERMINATED BY ‘\n’
STORED IN TEXT FILE

以下查詢使用上述資料建立一個名為 **`employee`** 的表。

./hcat –e "CREATE TABLE IF NOT EXISTS employee ( eid int, name String, 
   salary String, destination String) \
COMMENT 'Employee details' \
ROW FORMAT DELIMITED \
FIELDS TERMINATED BY ‘\t’ \
LINES TERMINATED BY ‘\n’ \
STORED AS TEXTFILE;"

如果新增 **`IF NOT EXISTS`** 選項,則如果表已存在,HCatalog 將忽略該語句。

成功建立表後,您將看到以下響應:

OK
Time taken: 5.905 seconds

載入資料語句

通常,在 SQL 中建立表後,我們可以使用 Insert 語句插入資料。但在 HCatalog 中,我們使用 LOAD DATA 語句插入資料。

向 HCatalog 中插入資料時,最好使用 LOAD DATA 來儲存批次記錄。載入資料有兩種方式:一種是從 **本地檔案系統**,另一種是從 **Hadoop 檔案系統**。

語法

LOAD DATA 的語法如下:

LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
[PARTITION (partcol1=val1, partcol2=val2 ...)]
  • LOCAL 是指定本地路徑的識別符號。它是可選的。
  • OVERWRITE 是可選的,用於覆蓋表中的資料。
  • PARTITION 是可選的。

示例

我們將以下資料插入表中。它是一個名為 **`sample.txt`** 的文字檔案,位於 **`/home/user`** 目錄中。

1201  Gopal        45000    Technical manager
1202  Manisha      45000    Proof reader
1203  Masthanvali  40000    Technical writer
1204  Kiran        40000    Hr Admin
1205  Kranthi      30000    Op Admin

以下查詢將給定的文字載入到表中。

./hcat –e "LOAD DATA LOCAL INPATH '/home/user/sample.txt'
OVERWRITE INTO TABLE employee;"

下載成功後,您將看到以下響應:

OK
Time taken: 15.905 seconds

HCatalog - 修改表

本章解釋如何更改表的屬性,例如更改表名、更改列名、新增列以及刪除或替換列。

更改表語句

您可以使用 ALTER TABLE 語句來更改 Hive 中的表。

語法

根據我們希望修改表中的哪些屬性,該語句採用以下任何語法。

ALTER TABLE name RENAME TO new_name
ALTER TABLE name ADD COLUMNS (col_spec[, col_spec ...])
ALTER TABLE name DROP [COLUMN] column_name
ALTER TABLE name CHANGE column_name new_name new_type
ALTER TABLE name REPLACE COLUMNS (col_spec[, col_spec ...])

下面解釋了一些場景。

重新命名為…語句

以下查詢將表名從 **`employee`** 重新命名為 **`emp`**。

./hcat –e "ALTER TABLE employee RENAME TO emp;"

更改語句

下表包含 **`employee`** 表的欄位,並顯示要更改的欄位(以粗體顯示)。

欄位名稱 從資料型別轉換 更改欄位名稱 轉換為資料型別
eid int eid int
name String ename String
salary Float salary Double
designation String designation String

以下查詢使用上述資料重新命名列名和列資料型別:

./hcat –e "ALTER TABLE employee CHANGE name ename String;"
./hcat –e "ALTER TABLE employee CHANGE salary salary Double;"

新增列語句

以下查詢將名為 **`dept`** 的列新增到 **`employee`** 表。

./hcat –e "ALTER TABLE employee ADD COLUMNS (dept STRING COMMENT 'Department name');"

替換語句

以下查詢刪除 **`employee`** 表中的所有列,並將其替換為 **`emp`** 和 **`name`** 列:

./hcat – e "ALTER TABLE employee REPLACE COLUMNS ( eid INT empid Int, ename STRING name String);"

刪除表語句

本章介紹如何在 HCatalog 中刪除表。當您從元資料儲存中刪除表時,它會刪除表/列資料及其元資料。它可以是普通表(儲存在元資料儲存中)或外部表(儲存在本地檔案系統中);HCatalog 對兩者都以相同的方式處理,無論其型別如何。

語法如下:

DROP TABLE [IF EXISTS] table_name;

以下查詢刪除名為 **`employee`** 的表:

./hcat –e "DROP TABLE IF EXISTS employee;"

成功執行查詢後,您將看到以下響應:

OK
Time taken: 5.3 seconds

HCatalog - 檢視

本章介紹如何在 HCatalog 中建立和管理 **檢視**。資料庫檢視使用 **`CREATE VIEW`** 語句建立。檢視可以從單個表、多個表或另一個檢視建立。

要建立檢視,使用者必須根據具體的實現擁有適當的系統許可權。

建立檢視語句

**`CREATE VIEW`** 建立具有給定名稱的檢視。如果已存在具有相同名稱的表或檢視,則會引發錯誤。您可以使用 **`IF NOT EXISTS`** 來跳過錯誤。

如果沒有提供列名,則檢視列的名稱將自動從 **定義的 SELECT 表示式** 中派生。

**注意**:如果 SELECT 包含未指定別名的標量表達式(例如 x+y),則生成的檢視列名將以 _C0、_C1 等形式生成。

重新命名列時,還可以提供列註釋。註釋不會自動從底層列繼承。

如果檢視的 **定義的 SELECT 表示式** 無效,則 CREATE VIEW 語句將失敗。

語法

CREATE VIEW [IF NOT EXISTS] [db_name.]view_name [(column_name [COMMENT column_comment], ...) ]
[COMMENT view_comment]
[TBLPROPERTIES (property_name = property_value, ...)]
AS SELECT ...;

示例

以下是 employee 表資料。現在讓我們看看如何建立一個名為 **`Emp_Deg_View`** 的檢視,其中包含薪水超過 35,000 的員工的 id、name、Designation 和 salary 欄位。

+------+-------------+--------+-------------------+-------+
|  ID  |    Name     | Salary |    Designation    | Dept  |
+------+-------------+--------+-------------------+-------+
| 1201 |    Gopal    | 45000  | Technical manager |  TP   |
| 1202 |   Manisha   | 45000  | Proofreader       |  PR   |
| 1203 | Masthanvali | 30000  | Technical writer  |  TP   |
| 1204 |    Kiran    | 40000  | Hr Admin          |  HR   |
| 1205 |   Kranthi   | 30000  | Op Admin          | Admin |
+------+-------------+--------+-------------------+-------+

以下是如何根據上述資料建立檢視的命令。

./hcat –e "CREATE VIEW Emp_Deg_View (salary COMMENT ' salary more than 35,000')
   AS SELECT id, name, salary, designation FROM employee WHERE salary ≥ 35000;"

輸出

OK
Time taken: 5.3 seconds

刪除檢視語句

DROP VIEW 刪除指定檢視的元資料。刪除其他檢視引用的檢視時,不會給出警告(依賴檢視將保留為無效,必須由使用者刪除或重新建立)。

語法

DROP VIEW [IF EXISTS] view_name;

示例

以下命令用於刪除名為 **`Emp_Deg_View`** 的檢視。

DROP VIEW Emp_Deg_View;

HCatalog - 顯示錶

您通常希望列出資料庫中的所有表或列出表中的所有列。顯然,每個資料庫都有自己的語法來列出表和列。

**`Show Tables`** 語句顯示所有表的名稱。預設情況下,它列出當前資料庫中的表,或者使用 **`IN`** 子句,在指定的資料庫中。

本章介紹如何在 HCatalog 中列出當前資料庫中的所有表。

顯示錶語句

SHOW TABLES 的語法如下:

SHOW TABLES [IN database_name] ['identifier_with_wildcards'];

以下查詢顯示錶列表:

./hcat –e "Show tables;"

成功執行查詢後,您將看到以下響應:

OK
emp
employee
Time taken: 5.3 seconds

HCatalog - 顯示分割槽

分割槽是表格資料的條件,用於建立單獨的表或檢視。SHOW PARTITIONS 列出給定基表的所有現有分割槽。分割槽按字母順序列出。在 Hive 0.6 之後,也可以指定分割槽規範的部分內容來過濾結果列表。

您可以使用 SHOW PARTITIONS 命令檢視特定表中存在的分割槽。本章介紹如何在 HCatalog 中列出特定表的分割槽。

顯示分割槽語句

語法如下:

SHOW PARTITIONS table_name;

以下查詢刪除名為 **`employee`** 的表:

./hcat –e "Show partitions employee;"

成功執行查詢後,您將看到以下響應:

OK
Designation = IT
Time taken: 5.3 seconds

動態分割槽

HCatalog 將表組織成分割槽。這是一種根據分割槽列的值(例如日期、城市和部門)將表劃分為相關部分的方法。使用分割槽,可以輕鬆查詢資料的一部分。

例如,名為 **`Tab1`** 的表包含員工資料,例如 id、name、dept 和 yoj(即加入年份)。假設您需要檢索 2012 年加入的所有員工的詳細資訊。查詢會搜尋整個表以查詢所需資訊。但是,如果按年份對員工資料進行分割槽並將其儲存在單獨的檔案中,則會縮短查詢處理時間。以下示例顯示如何分割槽檔案及其資料:

以下檔案包含 **`employeedata`** 表。

/tab1/employeedata/file1

id, name,   dept, yoj
1,  gopal,   TP, 2012
2,  kiran,   HR, 2012
3,  kaleel,  SC, 2013
4, Prasanth, SC, 2013

上述資料使用年份劃分為兩個檔案。

/tab1/employeedata/2012/file2

1, gopal, TP, 2012
2, kiran, HR, 2012

/tab1/employeedata/2013/file3

3, kaleel,   SC, 2013
4, Prasanth, SC, 2013

新增分割槽

我們可以透過修改表來向表中新增分割槽。假設我們有一個名為employee的表,其欄位包括Id、Name、Salary、Designation、Dept和yoj。

語法

ALTER TABLE table_name ADD [IF NOT EXISTS] PARTITION partition_spec
[LOCATION 'location1'] partition_spec [LOCATION 'location2'] ...;
partition_spec:
: (p_column = p_col_value, p_column = p_col_value, ...)

以下查詢用於向employee表新增分割槽。

./hcat –e "ALTER TABLE employee ADD PARTITION (year = '2013') location '/2012/part2012';"

重新命名分割槽

您可以使用RENAME-TO命令重新命名分割槽。其語法如下:

./hact –e "ALTER TABLE table_name PARTITION partition_spec RENAME TO PARTITION partition_spec;"

以下查詢用於重新命名分割槽:

./hcat –e "ALTER TABLE employee PARTITION (year=’1203’) RENAME TO PARTITION (Yoj='1203');"

刪除分割槽

用於刪除分割槽的命令語法如下:

./hcat –e "ALTER TABLE table_name DROP [IF EXISTS] PARTITION partition_spec,.
   PARTITION partition_spec,...;"

以下查詢用於刪除分割槽:

./hcat –e "ALTER TABLE employee DROP [IF EXISTS] PARTITION (year=’1203’);"

HCatalog - 索引

建立索引

索引只不過是指向表中特定列的指標。建立索引意味著在表的特定列上建立一個指標。其語法如下:

CREATE INDEX index_name
ON TABLE base_table_name (col_name, ...)
AS 'index.handler.class.name'
[WITH DEFERRED REBUILD]
[IDXPROPERTIES (property_name = property_value, ...)]
[IN TABLE index_table_name]
[PARTITIONED BY (col_name, ...)][
   [ ROW FORMAT ...] STORED AS ...
   | STORED BY ...
]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]

示例

讓我們來看一個例子來理解索引的概念。使用我們之前用過的同一個employee表,欄位包括Id、Name、Salary、Designation和Dept。在employee表的salary列上建立一個名為index_salary的索引。

以下查詢建立索引:

./hcat –e "CREATE INDEX inedx_salary ON TABLE employee(salary)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler';"

它指向salary列。如果修改了該列,則使用索引值儲存更改。

刪除索引

以下語法用於刪除索引:

DROP INDEX <index_name> ON <table_name>

以下查詢刪除索引index_salary:

./hcat –e "DROP INDEX index_salary ON employee;"

HCatalog - 讀寫器

HCatalog包含一個數據傳輸API,用於在不使用MapReduce的情況下進行並行輸入和輸出。此API使用表和行的基本儲存抽象來讀取Hadoop叢集中的資料並向其中寫入資料。

資料傳輸API主要包含三個類,它們是:

  • HCatReader - 從Hadoop叢集讀取資料。

  • HCatWriter - 將資料寫入Hadoop叢集。

  • DataTransferFactory - 生成reader和writer例項。

此API適用於主從節點設定。讓我們更詳細地討論HCatReaderHCatWriter

HCatReader

HCatReader是HCatalog內部的一個抽象類,它將底層系統(從中檢索記錄)的複雜性抽象出來。

序號 方法名稱及描述
1

Public abstract ReaderContext prepareRead() throws HCatException

這應該在主節點上呼叫以獲取ReaderContext,然後將其序列化併發送到從節點。

2

Public abstract Iterator <HCatRecorder> read() throws HCaException

這應該在從節點上呼叫以讀取HCatRecords。

3

Public Configuration getConf()

它將返回配置類物件。

HCatReader類用於從HDFS讀取資料。讀取是一個兩步過程,第一步發生在外部系統的master節點上。第二步在多個slave節點上並行執行。

讀取在ReadEntity上進行。在開始讀取之前,需要定義一個要從中讀取的ReadEntity。這可以透過ReadEntity.Builder完成。您可以指定資料庫名稱、表名、分割槽和過濾器字串。例如:

ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("mydb").withTable("mytbl").build(); 10. 

上面的程式碼片段定義了一個ReadEntity物件(“entity”),包含名為mydb的資料庫中名為mytbl的表,可用於讀取該表的所有行。請注意,此表必須在該操作開始之前存在於HCatalog中。

定義ReadEntity後,使用ReadEntity和叢集配置獲取HCatReader例項:

HCatReader reader = DataTransferFactory.getHCatReader(entity, config);

下一步是從reader獲取ReaderContext,如下所示:

ReaderContext cntxt = reader.prepareRead();

HCatWriter

此抽象類位於HCatalog內部。這是為了便於從外部系統寫入HCatalog。不要嘗試直接例項化它。而是使用DataTransferFactory。

序號 方法名稱及描述
1

Public abstract WriterContext prepareRead() throws HCatException

外部系統應該從主節點精確呼叫一次此方法。它返回一個WriterContext。這應該被序列化併發送到從節點以在那裡構造HCatWriter

2

Public abstract void write(Iterator<HCatRecord> recordItr) throws HCaException

此方法應在從節點上使用以執行寫入操作。recordItr是一個迭代器物件,其中包含要寫入HCatalog的記錄集合。

3

Public abstract void abort(WriterContext cntxt) throws HCatException

此方法應該在主節點上呼叫。此方法的主要目的是在發生故障時進行清理。

4

public abstract void commit(WriterContext cntxt) throws HCatException

此方法應該在主節點上呼叫。此方法的目的是執行元資料提交。

與讀取類似,寫入也是一個兩步過程,第一步發生在主節點上。隨後,第二步在從節點上並行發生。

寫入在WriteEntity上進行,其構造方式類似於讀取:

WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("mydb").withTable("mytbl").build();

上面的程式碼建立一個WriteEntity物件entity,可用於寫入名為mydb的資料庫中名為mytbl的表。

建立WriteEntity後,下一步是獲取WriterContext:

HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();

所有上述步驟都在主節點上發生。主節點然後序列化WriterContext物件,並使其可用於所有從節點。

在從節點上,需要使用WriterContext獲取HCatWriter,如下所示:

HCatWriter writer = DataTransferFactory.getHCatWriter(context);

然後,writer將迭代器作為write方法的引數。

writer.write(hCatRecordItr);

然後,writer迴圈呼叫此迭代器的getNext()並寫出附加到迭代器的所有記錄。

TestReaderWriter.java檔案用於測試HCatreader和HCatWriter類。以下程式演示瞭如何使用HCatReader和HCatWriter API從原始檔讀取資料,然後將其寫入目標檔案。

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.mapreduce.InputSplit;

import org.apache.hive.HCatalog.common.HCatException;
import org.apache.hive.HCatalog.data.transfer.DataTransferFactory;
import org.apache.hive.HCatalog.data.transfer.HCatReader;
import org.apache.hive.HCatalog.data.transfer.HCatWriter;
import org.apache.hive.HCatalog.data.transfer.ReadEntity;
import org.apache.hive.HCatalog.data.transfer.ReaderContext;
import org.apache.hive.HCatalog.data.transfer.WriteEntity;
import org.apache.hive.HCatalog.data.transfer.WriterContext;
import org.apache.hive.HCatalog.mapreduce.HCatBaseTest;

import org.junit.Assert;
import org.junit.Test;

public class TestReaderWriter extends HCatBaseTest {
   @Test
   public void test() throws MetaException, CommandNeedRetryException,
      IOException, ClassNotFoundException {
		
      driver.run("drop table mytbl");
      driver.run("create table mytbl (a string, b int)");
		
      Iterator<Entry<String, String>> itr = hiveConf.iterator();
      Map<String, String> map = new HashMap<String, String>();
		
      while (itr.hasNext()) {
         Entry<String, String> kv = itr.next();
         map.put(kv.getKey(), kv.getValue());
      }
		
      WriterContext cntxt = runsInMaster(map);
      File writeCntxtFile = File.createTempFile("hcat-write", "temp");
      writeCntxtFile.deleteOnExit();
		
      // Serialize context.
      ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(writeCntxtFile));
      oos.writeObject(cntxt);
      oos.flush();
      oos.close();
		
      // Now, deserialize it.
      ObjectInputStream ois = new ObjectInputStream(new FileInputStream(writeCntxtFile));
      cntxt = (WriterContext) ois.readObject();
      ois.close();
      runsInSlave(cntxt);
      commit(map, true, cntxt);
		
      ReaderContext readCntxt = runsInMaster(map, false);
      File readCntxtFile = File.createTempFile("hcat-read", "temp");
      readCntxtFile.deleteOnExit();
      oos = new ObjectOutputStream(new FileOutputStream(readCntxtFile));
      oos.writeObject(readCntxt);
      oos.flush();
      oos.close();
		
      ois = new ObjectInputStream(new FileInputStream(readCntxtFile));
      readCntxt = (ReaderContext) ois.readObject();
      ois.close();
		
      for (int i = 0; i < readCntxt.numSplits(); i++) {
         runsInSlave(readCntxt, i);
      }
   }
	
   private WriterContext runsInMaster(Map<String, String> config) throws HCatException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
		
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
      WriterContext info = writer.prepareWrite();
      return info;
   }
	
   private ReaderContext runsInMaster(Map<String, String> config, 
      boolean bogus) throws HCatException {
      ReadEntity entity = new ReadEntity.Builder().withTable("mytbl").build();
      HCatReader reader = DataTransferFactory.getHCatReader(entity, config);
      ReaderContext cntxt = reader.prepareRead();
      return cntxt;
   }
	
   private void runsInSlave(ReaderContext cntxt, int slaveNum) throws HCatException {
      HCatReader reader = DataTransferFactory.getHCatReader(cntxt, slaveNum);
      Iterator<HCatRecord> itr = reader.read();
      int i = 1;
		
      while (itr.hasNext()) {
         HCatRecord read = itr.next();
         HCatRecord written = getRecord(i++);
			
         // Argh, HCatRecord doesnt implement equals()
         Assert.assertTrue("Read: " + read.get(0) + "Written: " + written.get(0),
         written.get(0).equals(read.get(0)));
			
         Assert.assertTrue("Read: " + read.get(1) + "Written: " + written.get(1),
         written.get(1).equals(read.get(1)));
			
         Assert.assertEquals(2, read.size());
      }
		
      //Assert.assertFalse(itr.hasNext());
   }
	
   private void runsInSlave(WriterContext context) throws HCatException {
      HCatWriter writer = DataTransferFactory.getHCatWriter(context);
      writer.write(new HCatRecordItr());
   }
	
   private void commit(Map<String, String> config, boolean status,
      WriterContext context) throws IOException {
      WriteEntity.Builder builder = new WriteEntity.Builder();
      WriteEntity entity = builder.withTable("mytbl").build();
      HCatWriter writer = DataTransferFactory.getHCatWriter(entity, config);
		
      if (status) {
         writer.commit(context);
      } else {
         writer.abort(context);
      }
   }
	
   private static HCatRecord getRecord(int i) {
      List<Object> list = new ArrayList<Object>(2);
      list.add("Row #: " + i);
      list.add(i);
      return new DefaultHCatRecord(list);
   }
	
   private static class HCatRecordItr implements Iterator<HCatRecord> {
      int i = 0;
		
      @Override
      public boolean hasNext() {
         return i++ < 100 ? true : false;
      }
		
      @Override
      public HCatRecord next() {
         return getRecord(i);
      }
		
      @Override
      public void remove() {
         throw new RuntimeException();
      }
   }
}

上述程式以記錄的形式從HDFS讀取資料,並將記錄資料寫入mytable

HCatalog - 輸入輸出格式

HCatInputFormatHCatOutputFormat介面用於從HDFS讀取資料,並在處理後使用MapReduce作業將結果資料寫入HDFS。讓我們詳細說明輸入和輸出格式介面。

HCatInputFormat

HCatInputFormat用於MapReduce作業從HCatalog管理的表讀取資料。HCatInputFormat公開了一個Hadoop 0.20 MapReduce API,用於讀取資料,就好像它已釋出到表中一樣。

序號 方法名稱及描述
1

public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOException

設定作業要使用的輸入。它使用給定的輸入規範查詢元儲存,並將匹配的分割槽序列化到MapReduce任務的作業配置中。

2

public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOException

設定作業要使用的輸入。它使用給定的輸入規範查詢元儲存,並將匹配的分割槽序列化到MapReduce任務的作業配置中。

3

public HCatInputFormat setFilter(String filter)throws IOException

在輸入表上設定過濾器。

4

public HCatInputFormat setProperties(Properties properties) throws IOException

設定輸入格式的屬性。

HCatInputFormat API包含以下方法:

  • setInput
  • setOutputSchema
  • getTableSchema

要使用HCatInputFormat讀取資料,首先使用要讀取的表中的必要資訊例項化一個InputJobInfo,然後使用InputJobInfo呼叫setInput

您可以使用setOutputSchema方法包含一個投影模式,以指定輸出欄位。如果沒有指定模式,則將返回表中的所有列。您可以使用getTableSchema方法確定指定輸入表的表模式。

HCatOutputFormat

HCatOutputFormat用於MapReduce作業將資料寫入HCatalog管理的表。HCatOutputFormat公開了一個Hadoop 0.20 MapReduce API,用於將資料寫入表。當MapReduce作業使用HCatOutputFormat寫入輸出時,將使用為表配置的預設OutputFormat,並且作業完成後將新的分割槽釋出到表中。

序號 方法名稱及描述
1

public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOException

設定有關作業要寫入的輸出的資訊。它查詢元資料伺服器以查詢要用於表的StorageHandler。如果分割槽已釋出,則會引發錯誤。

2

public static void setSchema (Configuration conf, HCatSchema schema) throws IOException

設定寫入分割槽的資料模式。如果未呼叫此方法,則預設使用表模式。

3

public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedException

獲取作業的記錄寫入器。它使用StorageHandler的預設OutputFormat來獲取記錄寫入器。

4

public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedException

獲取此輸出格式的輸出提交器。它確保正確提交輸出。

HCatOutputFormat API包含以下方法:

  • setOutput
  • setSchema
  • getTableSchema

對HCatOutputFormat的第一次呼叫必須是setOutput;任何其他呼叫都會引發異常,提示輸出格式未初始化。

要寫入的資料模式由setSchema方法指定。您必須呼叫此方法,提供您正在寫入的資料模式。如果您的資料與表模式具有相同的模式,則可以使用HCatOutputFormat.getTableSchema()獲取表模式,然後將其傳遞給setSchema()

示例

以下MapReduce程式從一個表讀取資料(假設該表在第二列(“column 1”)中有一個整數),並計算它找到的每個不同值的例項數。也就是說,它執行等效於“select col1, count(*) from $table group by col1;”的操作。

例如,如果第二列中的值為{1, 1, 1, 3, 3, 5},則程式將生成以下值和計數輸出:

1, 3
3, 2
5, 1

現在讓我們看一下程式程式碼:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

在編譯上述程式之前,您必須下載一些jar檔案並將其新增到此應用程式的classpath中。您需要下載所有Hive jar檔案和HCatalog jar檔案(HCatalog-core-0.5.0.jar、hive-metastore-0.10.0.jar、libthrift-0.7.0.jar、hive-exec-0.10.0.jar、libfb303-0.7.0.jar、jdo2-api-2.3-ec.jar、slf4j-api-1.6.1.jar)。

使用以下命令將這些jar檔案從本地複製到HDFS,並將其新增到classpath中。

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

使用以下命令編譯並執行給定程式。

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

現在,檢查您的輸出目錄(hdfs: user/tmp/hive)以檢視輸出(part_0000、part_0001)。

HCatalog - Loader & Storer

HCatLoaderHCatStorer API與Pig指令碼一起使用,用於讀取和寫入HCatalog管理的表中的資料。這些介面不需要任何HCatalog特定的設定。

最好了解一些Apache Pig指令碼才能更好地理解本章內容。有關更多參考,請閱讀我們的Apache Pig教程。

HCatloader

HCatLoader用於Pig指令碼從HCatalog管理的表讀取資料。使用以下語法使用HCatloader將資料載入到HDFS中。

A = LOAD 'tablename' USING org.apache.HCatalog.pig.HCatLoader();

您必須用單引號指定表名:LOAD 'tablename'。如果您使用的是非預設資料庫,則必須將輸入指定為'dbname.tablename'

Hive元儲存允許您在不指定資料庫的情況下建立表。如果您以這種方式建立表,則資料庫名稱為'default',在為HCatLoader指定表時不需要。

下表包含HCatloader類的重要方法和描述。

序號 方法名稱及描述
1

public InputFormat<?,?> getInputFormat()throws IOException

使用HCatloader類讀取載入資料的輸入格式。

2

public String relativeToAbsolutePath(String location, Path curDir) throws IOException

它返回絕對路徑的字串格式。

3

public void setLocation(String location, Job job) throws IOException

它設定作業可以執行的位置。

4

public Tuple getNext() throws IOException

返回迴圈中的當前元組()。

HCatStorer

HCatStorer用於Pig指令碼將資料寫入HCatalog管理的表。使用以下語法進行儲存操作。

A = LOAD ...
B = FOREACH A ...
...
...
my_processed_data = ...

STORE my_processed_data INTO 'tablename' USING org.apache.HCatalog.pig.HCatStorer();

必須使用單引號指定表名:LOAD 'tablename'。資料庫和表都必須在執行 Pig 指令碼之前建立。如果使用非預設資料庫,則必須將輸入指定為'dbname.tablename'

Hive 元儲存允許您在不指定資料庫的情況下建立表。如果以這種方式建立表,則資料庫名為'default',您不需要在store語句中指定資料庫名。

對於USING子句,您可以使用一個字串引數來表示鍵值對分割槽。當寫入分割槽表且分割槽列不在輸出列中時,這是一個必需的引數。分割槽鍵的值**不應**加引號。

下表包含 HCatStorer 類的重要方法和描述。

序號 方法名稱及描述
1

public OutputFormat getOutputFormat() throws IOException

使用 HCatStorer 類讀取儲存資料的輸出格式。

2

public void setStoreLocation (String location, Job job) throws IOException

設定執行此store應用程式的位置。

3

public void storeSchema (ResourceSchema schema, String arg1, Job job) throws IOException

儲存模式。

4

public void prepareToWrite (RecordWriter writer) throws IOException

它有助於使用 RecordWriter 將資料寫入特定檔案。

5

public void putNext (Tuple tuple) throws IOException

將元組資料寫入檔案。

使用 HCatalog 執行 Pig

Pig 不會自動載入 HCatalog jar 包。要引入必要的 jar 包,您可以使用 Pig 命令中的標誌,或者設定環境變數PIG_CLASSPATHPIG_OPTS,如下所述。

要引入用於與 HCatalog 一起工作的適當 jar 包,只需包含以下標誌:

pig –useHCatalog <Sample pig scripts file>

設定執行的 CLASSPATH

使用以下 CLASSPATH 設定將 HCatalog 與 Apache Pig 同步。

export HADOOP_HOME = <path_to_hadoop_install>
export HIVE_HOME = <path_to_hive_install>
export HCAT_HOME = <path_to_hcat_install>

export PIG_CLASSPATH = $HCAT_HOME/share/HCatalog/HCatalog-core*.jar:\
$HCAT_HOME/share/HCatalog/HCatalog-pig-adapter*.jar:\
$HIVE_HOME/lib/hive-metastore-*.jar:$HIVE_HOME/lib/libthrift-*.jar:\
$HIVE_HOME/lib/hive-exec-*.jar:$HIVE_HOME/lib/libfb303-*.jar:\
$HIVE_HOME/lib/jdo2-api-*-ec.jar:$HIVE_HOME/conf:$HADOOP_HOME/conf:\
$HIVE_HOME/lib/slf4j-api-*.jar

示例

假設我們在 HDFS 中有一個名為student_details.txt的檔案,內容如下所示。

student_details.txt

001, Rajiv,    Reddy,       21, 9848022337, Hyderabad
002, siddarth, Battacharya, 22, 9848022338, Kolkata
003, Rajesh,   Khanna,      22, 9848022339, Delhi
004, Preethi,  Agarwal,     21, 9848022330, Pune
005, Trupthi,  Mohanthy,    23, 9848022336, Bhuwaneshwar
006, Archana,  Mishra,      23, 9848022335, Chennai
007, Komal,    Nayak,       24, 9848022334, trivendram
008, Bharathi, Nambiayar,   24, 9848022333, Chennai

我們還在同一個 HDFS 目錄中有一個名為sample_script.pig的示例指令碼。此檔案包含對student關係執行操作和轉換的語句,如下所示。

student = LOAD 'hdfs://:9000/pig_data/student_details.txt' USING 
   PigStorage(',') as (id:int, firstname:chararray, lastname:chararray,
   phone:chararray, city:chararray);
	
student_order = ORDER student BY age DESC;
STORE student_order INTO 'student_order_table' USING org.apache.HCatalog.pig.HCatStorer();
student_limit = LIMIT student_order 4;
Dump student_limit;
  • 指令碼的第一條語句將名為student_details.txt的檔案中的資料載入為名為student的關係。

  • 指令碼的第二條語句將根據年齡以降序排列關係的元組,並將其儲存為student_order

  • 第三條語句將處理後的資料student_order結果儲存在名為student_order_table的單獨表中。

  • 指令碼的第四條語句將student_order的前四個元組儲存為student_limit

  • 最後,第五條語句將轉儲關係student_limit的內容。

現在讓我們執行sample_script.pig,如下所示。

$./pig -useHCatalog hdfs://:9000/pig_data/sample_script.pig

現在,檢查您的輸出目錄(hdfs: user/tmp/hive)以檢視輸出(part_0000、part_0001)。

廣告
© . All rights reserved.