Hadoop - 流式處理



Hadoop streaming 是 Hadoop 發行版附帶的一個實用程式。此實用程式允許您使用任何可執行檔案或指令碼作為 mapper 和/或 reducer 來建立和執行 Map/Reduce 作業。

使用 Python 的示例

對於 Hadoop streaming,我們正在考慮詞頻統計問題。Hadoop 中的任何作業都必須有兩個階段:mapper 和 reducer。我們已經用 python 指令碼編寫了 mapper 和 reducer 的程式碼,以便在 Hadoop 下執行它。也可以用 Perl 和 Ruby 編寫相同的程式碼。

Mapper 階段程式碼

!/usr/bin/python

import sys

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Break the line into words 
   words = myline.split() 

   # Iterate the words list
   for myword in words:
      # Write the results to standard output 
      print '%s\t%s' % (myword, 1)

確保此檔案具有執行許可權 (chmod +x /home/ expert/hadoop-1.2.1/mapper.py)。

Reducer 階段程式碼

#!/usr/bin/python

from operator import itemgetter 
import sys 

current_word = ""
current_count = 0 
word = "" 

# Input takes from standard input for myline in sys.stdin: 
   # Remove whitespace either side 
   myline = myline.strip() 

   # Split the input we got from mapper.py word, 
   count = myline.split('\t', 1) 

   # Convert count variable to integer 
   try: 
      count = int(count) 

   except ValueError: 
      # Count was not a number, so silently ignore this line continue

   if current_word == word: 
   current_count += count 
   else: 
      if current_word: 
         # Write result to standard output print '%s\t%s' % (current_word, current_count) 
   
      current_count = count
      current_word = word

# Do not forget to output the last word if needed! 
if current_word == word: 
   print '%s\t%s' % (current_word, current_count)

將 mapper 和 reducer 程式碼儲存在 Hadoop 主目錄中的 mapper.py 和 reducer.py 中。確保這些檔案具有執行許可權 (chmod +x mapper.py 和 chmod +x reducer.py)。由於 python 對縮排敏感,因此可以從以下連結下載相同的程式碼。

WordCount 程式的執行

$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1.
2.1.jar \
   -input input_dirs \ 
   -output output_dir \ 
   -mapper <path/mapper.py \ 
   -reducer <path/reducer.py

其中“\”用於行延續,以提高可讀性。

例如,

./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py

Streaming 如何工作

在上面的示例中,mapper 和 reducer 都是 python 指令碼,它們從標準輸入讀取輸入並將輸出傳送到標準輸出。該實用程式將建立一個 Map/Reduce 作業,將作業提交到合適的叢集,並監視作業的進度,直到它完成。

當為 mapper 指定指令碼時,每個 mapper 任務在 mapper 初始化時都會啟動該指令碼作為一個單獨的程序。當 mapper 任務執行時,它會將其輸入轉換為行並將這些行饋送到程序的標準輸入 (STDIN)。同時,mapper 從程序的標準輸出 (STDOUT) 收集面向行的輸出,並將每一行轉換為鍵/值對,這些鍵/值對作為 mapper 的輸出收集。預設情況下,一行中第一個製表符字元之前的部分是鍵,其餘部分(不包括製表符字元)是值。如果行中沒有製表符字元,則整行都被視為鍵,而值為 null。但是,可以根據需要自定義此設定。

當為 reducer 指定指令碼時,每個 reducer 任務都會啟動該指令碼作為一個單獨的程序,然後 reducer 初始化。當 reducer 任務執行時,它會將其輸入鍵/值對轉換為行並將這些行饋送到程序的標準輸入 (STDIN)。同時,reducer 從程序的標準輸出 (STDOUT) 收集面向行的輸出,並將每一行轉換為鍵/值對,這些鍵/值對作為 reducer 的輸出收集。預設情況下,一行中第一個製表符字元之前的部分是鍵,其餘部分(不包括製表符字元)是值。但是,可以根據具體需求自定義此設定。

重要命令

引數 選項 描述
-input directory/file-name 必需 mapper 的輸入位置。
-output directory-name 必需 reducer 的輸出位置。
-mapper executable or script or JavaClassName 必需 Mapper 可執行檔案。
-reducer executable or script or JavaClassName 必需 Reducer 可執行檔案。
-file file-name 可選 使 mapper、reducer 或 combiner 可執行檔案在計算節點上本地可用。
-inputformat JavaClassName 可選 您提供的類應返回 Text 類的鍵/值對。如果未指定,則使用 TextInputFormat 作為預設值。
-outputformat JavaClassName 可選 您提供的類應獲取 Text 類的鍵/值對。如果未指定,則使用 TextOutputformat 作為預設值。
-partitioner JavaClassName 可選 確定將哪個鍵傳送到哪個 reducer 的類。
-combiner streamingCommand or JavaClassName 可選 map 輸出的 Combiner 可執行檔案。
-cmdenv name=value 可選 將環境變數傳遞到 streaming 命令。
-inputreader 可選 為了向後相容性:指定記錄讀取器類(而不是輸入格式類)。
-verbose 可選 詳細輸出。
-lazyOutput 可選 延遲建立輸出。例如,如果輸出格式基於 FileOutputFormat,則只有在第一次呼叫 output.collect(或 Context.write)時才會建立輸出檔案。
-numReduceTasks 可選 指定 reducer 的數量。
-mapdebug 可選 map 任務失敗時要呼叫的指令碼。
-reducedebug 可選 reduce 任務失敗時要呼叫的指令碼。
廣告