
- Hadoop 教程
- Hadoop - 首頁
- Hadoop - 大資料概述
- Hadoop - 大資料解決方案
- Hadoop - 簡介
- Hadoop - 環境設定
- Hadoop - HDFS 概述
- Hadoop - HDFS 操作
- Hadoop - 命令參考
- Hadoop - MapReduce
- Hadoop - 流式處理
- Hadoop - 多節點叢集
- Hadoop 有用資源
- Hadoop - 問題與解答
- Hadoop - 快速指南
- Hadoop - 有用資源
Hadoop - 流式處理
Hadoop streaming 是 Hadoop 發行版附帶的一個實用程式。此實用程式允許您使用任何可執行檔案或指令碼作為 mapper 和/或 reducer 來建立和執行 Map/Reduce 作業。
使用 Python 的示例
對於 Hadoop streaming,我們正在考慮詞頻統計問題。Hadoop 中的任何作業都必須有兩個階段:mapper 和 reducer。我們已經用 python 指令碼編寫了 mapper 和 reducer 的程式碼,以便在 Hadoop 下執行它。也可以用 Perl 和 Ruby 編寫相同的程式碼。
Mapper 階段程式碼
!/usr/bin/python import sys # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Break the line into words words = myline.split() # Iterate the words list for myword in words: # Write the results to standard output print '%s\t%s' % (myword, 1)
確保此檔案具有執行許可權 (chmod +x /home/ expert/hadoop-1.2.1/mapper.py)。
Reducer 階段程式碼
#!/usr/bin/python from operator import itemgetter import sys current_word = "" current_count = 0 word = "" # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Split the input we got from mapper.py word, count = myline.split('\t', 1) # Convert count variable to integer try: count = int(count) except ValueError: # Count was not a number, so silently ignore this line continue if current_word == word: current_count += count else: if current_word: # Write result to standard output print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # Do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)
將 mapper 和 reducer 程式碼儲存在 Hadoop 主目錄中的 mapper.py 和 reducer.py 中。確保這些檔案具有執行許可權 (chmod +x mapper.py 和 chmod +x reducer.py)。由於 python 對縮排敏感,因此可以從以下連結下載相同的程式碼。
WordCount 程式的執行
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1. 2.1.jar \ -input input_dirs \ -output output_dir \ -mapper <path/mapper.py \ -reducer <path/reducer.py
其中“\”用於行延續,以提高可讀性。
例如,
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py
Streaming 如何工作
在上面的示例中,mapper 和 reducer 都是 python 指令碼,它們從標準輸入讀取輸入並將輸出傳送到標準輸出。該實用程式將建立一個 Map/Reduce 作業,將作業提交到合適的叢集,並監視作業的進度,直到它完成。
當為 mapper 指定指令碼時,每個 mapper 任務在 mapper 初始化時都會啟動該指令碼作為一個單獨的程序。當 mapper 任務執行時,它會將其輸入轉換為行並將這些行饋送到程序的標準輸入 (STDIN)。同時,mapper 從程序的標準輸出 (STDOUT) 收集面向行的輸出,並將每一行轉換為鍵/值對,這些鍵/值對作為 mapper 的輸出收集。預設情況下,一行中第一個製表符字元之前的部分是鍵,其餘部分(不包括製表符字元)是值。如果行中沒有製表符字元,則整行都被視為鍵,而值為 null。但是,可以根據需要自定義此設定。
當為 reducer 指定指令碼時,每個 reducer 任務都會啟動該指令碼作為一個單獨的程序,然後 reducer 初始化。當 reducer 任務執行時,它會將其輸入鍵/值對轉換為行並將這些行饋送到程序的標準輸入 (STDIN)。同時,reducer 從程序的標準輸出 (STDOUT) 收集面向行的輸出,並將每一行轉換為鍵/值對,這些鍵/值對作為 reducer 的輸出收集。預設情況下,一行中第一個製表符字元之前的部分是鍵,其餘部分(不包括製表符字元)是值。但是,可以根據具體需求自定義此設定。
重要命令
引數 | 選項 | 描述 |
---|---|---|
-input directory/file-name | 必需 | mapper 的輸入位置。 |
-output directory-name | 必需 | reducer 的輸出位置。 |
-mapper executable or script or JavaClassName | 必需 | Mapper 可執行檔案。 |
-reducer executable or script or JavaClassName | 必需 | Reducer 可執行檔案。 |
-file file-name | 可選 | 使 mapper、reducer 或 combiner 可執行檔案在計算節點上本地可用。 |
-inputformat JavaClassName | 可選 | 您提供的類應返回 Text 類的鍵/值對。如果未指定,則使用 TextInputFormat 作為預設值。 |
-outputformat JavaClassName | 可選 | 您提供的類應獲取 Text 類的鍵/值對。如果未指定,則使用 TextOutputformat 作為預設值。 |
-partitioner JavaClassName | 可選 | 確定將哪個鍵傳送到哪個 reducer 的類。 |
-combiner streamingCommand or JavaClassName | 可選 | map 輸出的 Combiner 可執行檔案。 |
-cmdenv name=value | 可選 | 將環境變數傳遞到 streaming 命令。 |
-inputreader | 可選 | 為了向後相容性:指定記錄讀取器類(而不是輸入格式類)。 |
-verbose | 可選 | 詳細輸出。 |
-lazyOutput | 可選 | 延遲建立輸出。例如,如果輸出格式基於 FileOutputFormat,則只有在第一次呼叫 output.collect(或 Context.write)時才會建立輸出檔案。 |
-numReduceTasks | 可選 | 指定 reducer 的數量。 |
-mapdebug | 可選 | map 任務失敗時要呼叫的指令碼。 |
-reducedebug | 可選 | reduce 任務失敗時要呼叫的指令碼。 |