Apache Flume - 獲取Twitter資料



使用Flume,我們可以從各種服務獲取資料並將其傳輸到集中式儲存(HDFS和HBase)。本章解釋如何從Twitter服務獲取資料並使用Apache Flume將其儲存到HDFS中。

如Flume架構中所述,Web伺服器生成日誌資料,Flume中的代理會收集這些資料。通道將這些資料緩衝到接收器,接收器最終將其推送到集中式儲存。

在本節提供的示例中,我們將建立一個應用程式,並使用Apache Flume提供的實驗性Twitter源來獲取其中的推文。我們將使用記憶體通道來緩衝這些推文,並使用HDFS接收器將這些推文推送到HDFS中。

Fetch Data

要獲取Twitter資料,我們必須按照以下步驟操作:

  • 建立一個Twitter應用程式
  • 安裝/啟動HDFS
  • 配置Flume

建立Twitter應用程式

為了從Twitter獲取推文,需要建立一個Twitter應用程式。請按照以下步驟建立Twitter應用程式。

步驟1

要建立一個Twitter應用程式,請點選以下連結 https://apps.twitter.com/。登入您的Twitter帳戶。您將看到一個Twitter應用程式管理視窗,您可以在其中建立、刪除和管理Twitter應用程式。

Application Management window

步驟2

點選建立新的應用按鈕。您將被重定向到一個視窗,其中包含一個應用程式表單,您必須在其中填寫您的詳細資訊才能建立應用程式。填寫網站地址時,請提供完整的URL模式,例如 http://example.com。

Create an Application

步驟3

填寫詳細資訊,完成後接受開發者協議,點選頁面底部的建立您的Twitter應用程式按鈕。如果一切順利,將建立一個具有以下所示詳細資訊的應用程式。

Application created

步驟4

在頁面底部的金鑰和訪問令牌選項卡下,您可以看到一個名為建立我的訪問令牌的按鈕。點選它以生成訪問令牌。

Key Access Tokens

步驟5

最後,點選頁面右側頂部的測試OAuth按鈕。這將連結到一個頁面,該頁面顯示您的Consumer key、Consumer secret、Access tokenAccess token secret。複製這些詳細資訊。這些資訊對於配置Flume中的代理很有用。

OAuth Tool

啟動HDFS

由於我們將資料儲存在HDFS中,因此我們需要安裝/驗證Hadoop。啟動Hadoop並在其中建立一個資料夾以儲存Flume資料。在配置Flume之前,請按照以下步驟操作。

步驟1:安裝/驗證Hadoop

安裝 Hadoop。如果您的系統中已安裝Hadoop,請使用Hadoop版本命令驗證安裝,如下所示。

$ hadoop version 

如果您的系統包含Hadoop,並且您已設定路徑變數,則將獲得以下輸出:

Hadoop 2.6.0 
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 
Compiled by jenkins on 2014-11-13T21:10Z 
Compiled with protoc 2.5.0 
From source with checksum 18e43357c8f927c0695f1e9522859d6a 
This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar

步驟2:啟動Hadoop

瀏覽Hadoop的sbin目錄,並啟動yarn和Hadoop dfs(分散式檔案系統),如下所示。

cd /$Hadoop_Home/sbin/ 
$ start-dfs.sh 
localhost: starting namenode, logging to
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out 
localhost: starting datanode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out 
Starting secondary namenodes [0.0.0.0] 
starting secondarynamenode, logging to 
   /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out
  
$ start-yarn.sh 
starting yarn daemons 
starting resourcemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out 
localhost: starting nodemanager, logging to 
   /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out 

步驟3:在HDFS中建立目錄

在Hadoop DFS中,您可以使用命令mkdir建立目錄。瀏覽它,並在所需的路徑中建立一個名為twitter_data的目錄,如下所示。

$cd /$Hadoop_Home/bin/ 
$ hdfs dfs -mkdir hdfs://:9000/user/Hadoop/twitter_data 

配置Flume

我們必須使用conf資料夾中的配置檔案來配置源、通道和接收器。本章中提供的示例使用Apache Flume提供的名為Twitter 1% Firehose的實驗性源、記憶體通道和HDFS接收器。

Twitter 1% Firehose Source

此源高度實驗性。它使用流式API連線到1%的樣本Twitter Firehose,並持續下載推文,將其轉換為Avro格式,並將Avro事件傳送到下游Flume接收器。

安裝Flume後,我們將預設獲得此源。與此源對應的jar檔案位於lib資料夾中,如下所示。

Twitter Jar Files

設定classpath

classpath變數設定為Flume-env.sh檔案中Flume的lib資料夾,如下所示。

export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/* 

此源需要諸如Consumer key、Consumer secret、Access tokenAccess token secret之類的Twitter應用程式詳細資訊。配置此源時,您必須為以下屬性提供值:

  • 通道

  • 源型別:org.apache.flume.source.twitter.TwitterSource

  • consumerKey - OAuth消費者金鑰

  • consumerSecret - OAuth消費者金鑰

  • accessToken - OAuth訪問令牌

  • accessTokenSecret - OAuth令牌金鑰

  • maxBatchSize - Twitter批次中應包含的最大Twitter訊息數。預設值為1000(可選)。

  • maxBatchDurationMillis - 關閉批次之前要等待的最大毫秒數。預設值為1000(可選)。

通道

我們使用記憶體通道。要配置記憶體通道,您必須為通道型別提供值。

  • type - 它儲存通道的型別。在我們的示例中,型別為MemChannel

  • Capacity - 儲存在通道中的最大事件數。其預設值為100(可選)。

  • TransactionCapacity - 通道接受或傳送的最大事件數。其預設值為100(可選)。

HDFS接收器

此接收器將資料寫入HDFS。要配置此接收器,您必須提供以下詳細資訊。

  • 通道

  • type - hdfs

  • hdfs.path - 要儲存資料的HDFS中目錄的路徑。

我們可以根據場景提供一些可選值。以下是我們在應用程式中配置的HDFS接收器的可選屬性。

  • fileType - 這是我們的HDFS檔案的所需檔案格式。SequenceFile、DataStreamCompressedStream是此流提供的三種類型。在我們的示例中,我們使用DataStream

  • writeFormat - 可以是文字或可寫入的。

  • batchSize - 在將檔案重新整理到HDFS之前寫入檔案事件的數量。其預設值為100。

  • rollsize - 觸發滾動操作的檔案大小。其預設值為100。

  • rollCount - 在檔案滾動之前寫入檔案中的事件數。其預設值為10。

示例 - 配置檔案

以下是配置檔案示例。複製此內容並將其另存為Flume的conf資料夾中的twitter.conf

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS
  
# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret 
TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token 
TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret 
TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
  
# Describing/Configuring the sink 

TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
 
# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 100
  
# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel 

執行

瀏覽Flume主目錄並執行應用程式,如下所示。

$ cd $FLUME_HOME 
$ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf 
Dflume.root.logger=DEBUG,console -n TwitterAgent

如果一切順利,推文流式傳輸到HDFS將開始。以下是獲取推文時命令提示符視窗的快照。

Fetching Tweets

驗證HDFS

您可以使用以下URL訪問Hadoop管理Web UI。

https://:50070/ 

點選頁面右側的實用工具下拉選單。您可以看到以下快照中所示的兩個選項。

Verifying HDFS

點選瀏覽檔案系統,然後輸入已儲存推文的HDFS目錄的路徑。在我們的示例中,路徑將為/user/Hadoop/twitter_data/。然後,您可以看到儲存在HDFS中的Twitter日誌檔案列表,如下所示。

Browse the file system
廣告
© . All rights reserved.