- Apache Storm 教程
- Apache Storm - 首頁
- Apache Storm - 簡介
- Apache Storm - 核心概念
- Apache Storm - 叢集架構
- Apache Storm - 工作流程
- Storm - 分散式訊息系統
- Apache Storm - 安裝
- Apache Storm - 工作示例
- Apache Storm - Trident
- Twitter 中的 Apache Storm
- 雅虎財經中的 Apache Storm
- Apache Storm - 應用
- Apache Storm 有用資源
- Apache Storm - 快速指南
- Apache Storm - 有用資源
- Apache Storm - 討論
Apache Storm - Trident
Trident 是 Storm 的擴充套件。與 Storm 一樣,Trident 也是由 Twitter 開發的。開發 Trident 的主要原因是在 Storm 之上提供高階抽象,以及提供有狀態流處理和低延遲分散式查詢。
Trident 使用 spout 和 bolt,但這些低階元件在執行之前由 Trident 自動生成。Trident 具有函式、過濾器、聯接、分組和聚合功能。
Trident 將流處理為一系列批次,這些批次稱為事務。通常,這些小批次的大小將以數千或數百萬個元組為單位,具體取決於輸入流。這樣,Trident 就不同於 Storm,Storm 執行逐個元組的處理。
批處理的概念與資料庫事務非常相似。每個事務都分配一個事務 ID。一旦所有處理完成,則認為事務成功。但是,處理其中一個事務的元組失敗將導致整個事務重新傳輸。對於每個批次,Trident 將在事務開始時呼叫 beginCommit,並在事務結束時呼叫 commit。
Trident Topology
Trident API 提供了一個簡單的選項,可以使用“TridentTopology”類建立 Trident topology。基本上,Trident topology 從 spout 接收輸入流,並在流上執行有序的操作序列(過濾器、聚合、分組等)。Storm Tuple 被 Trident Tuple 替換,Bolt 被操作替換。可以如下建立一個簡單的 Trident topology:
TridentTopology topology = new TridentTopology();
Trident 元組
Trident 元組是命名值的列表。TridentTuple 介面是 Trident topology 的資料模型。TridentTuple 介面是 Trident topology 可以處理的基本資料單元。
Trident Spout
Trident spout 類似於 Storm spout,但增加了使用 Trident 功能的選項。實際上,我們仍然可以使用我們在 Storm topology 中使用的 IRichSpout,但它本質上是非事務性的,我們將無法使用 Trident 提供的優勢。
具有使用 Trident 功能的所有功能的基本 spout 是“ITridentSpout”。它支援事務性和不透明事務性語義。其他 spout 包括 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。
除了這些通用 spout 之外,Trident 還有許多 trident spout 的示例實現。其中之一是 FeederBatchSpout spout,我們可以使用它輕鬆地傳送命名列表的 trident 元組,而無需擔心批處理、並行性等。
FeederBatchSpout 的建立和資料饋送可以如下所示:
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
Trident 操作
Trident 依賴於“Trident 操作”來處理 trident 元組的輸入流。Trident API 有許多內建操作來處理簡單到複雜的流處理。這些操作範圍從簡單的驗證到 trident 元組的複雜分組和聚合。讓我們瞭解一下最重要和最常用的操作。
過濾器
過濾器是一個用於執行輸入驗證任務的物件。Trident 過濾器獲取 trident 元組欄位的子集作為輸入,並根據是否滿足某些條件返回 true 或 false。如果返回 true,則元組保留在輸出流中;否則,元組將從流中刪除。過濾器將基本上繼承自BaseFilter類並實現isKeep方法。以下是過濾器操作的示例實現:
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(1) % 2 == 0;
}
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2]
[1, 4]
過濾器函式可以使用“each”方法在 topology 中呼叫。“Fields”類可用於指定輸入(trident 元組的子集)。示例程式碼如下:
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())
函式
函式是一個用於對單個 trident 元組執行簡單操作的物件。它獲取 trident 元組欄位的子集併發出零個或多個新的 trident 元組欄位。
函式基本上繼承自BaseFunction類並實現execute方法。下面給出了一個示例實現:
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
int a = tuple.getInteger(0);
int b = tuple.getInteger(1);
collector.emit(new Values(a + b));
}
}
input
[1, 2]
[1, 3]
[1, 4]
output
[1, 2, 3]
[1, 3, 4]
[1, 4, 5]
就像過濾器操作一樣,函式操作可以使用each方法在 topology 中呼叫。示例程式碼如下:
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
聚合
聚合是一個用於對輸入批次、分割槽或流執行聚合操作的物件。Trident 有三種類型的聚合。它們如下:
aggregate - 獨立地聚合每個 trident 元組批次。在聚合過程中,元組最初使用全域性分組重新分割槽,以將同一批次的所有分割槽合併到單個分割槽中。
partitionAggregate - 聚合每個分割槽而不是整個 trident 元組批次。分割槽聚合的輸出完全替換輸入元組。分割槽聚合的輸出包含一個單欄位元組。
persistentaggregate - 聚合所有批次中的所有 trident 元組並將結果儲存在記憶體或資料庫中。
TridentTopology topology = new TridentTopology();
// aggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.aggregate(new Count(), new Fields(“count”))
// partitionAggregate operation
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.partitionAggregate(new Count(), new Fields(“count"))
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
聚合操作可以使用 CombinerAggregator、ReducerAggregator 或通用 Aggregator 介面建立。上面示例中使用的“count”聚合器是內建聚合器之一。它是使用“CombinerAggregator”實現的。實現如下:
public class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) {
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
@Override
public Long zero() {
return 0L;
}
}
分組
分組操作是內建操作,可以透過groupBy方法呼叫。groupBy 方法透過對指定欄位執行 partitionBy 來重新分割槽流,然後在每個分割槽內,它將組欄位相等的元組組合在一起。通常,我們將“groupBy”與“persistentAggregate”一起使用以獲取分組聚合。示例程式碼如下:
TridentTopology topology = new TridentTopology();
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
.each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
.groupBy(new Fields(“d”)
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
合併和聯接
合併和聯接可以透過分別使用“merge”和“join”方法來完成。合併組合一個或多個流。聯接類似於合併,除了聯接使用來自雙方的 trident 元組欄位來檢查和聯接兩個流。此外,聯接僅在批次級別工作。示例程式碼如下:
TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"),
new Fields("key", "a", "b", "c"));
狀態維護
Trident 提供了一種狀態維護機制。狀態資訊可以儲存在 topology 本身中,或者也可以將其儲存在單獨的資料庫中。原因是維護一個狀態,如果任何元組在處理過程中失敗,則重試失敗的元組。這在更新狀態時會產生問題,因為您不確定此元組的狀態是否已以前更新過。如果元組在更新狀態之前失敗,則重試元組將使狀態穩定。但是,如果元組在更新狀態後失敗,則重試相同的元組將再次增加資料庫中的計數並使狀態不穩定。需要執行以下步驟以確保訊息僅處理一次:
以小批次處理元組。
為每個批次分配一個唯一的 ID。如果批次被重試,則為其分配相同的唯一 ID。
狀態更新在批次之間是有序的。例如,第二個批次的狀態更新在第一個批次的狀態更新完成之前是不可能的。
分散式 RPC
分散式 RPC 用於查詢和檢索 Trident topology 的結果。Storm 有一個內建的分散式 RPC 伺服器。分散式 RPC 伺服器接收來自客戶端的 RPC 請求並將其傳遞給 topology。topology 處理請求並將結果傳送到分散式 RPC 伺服器,然後由分散式 RPC 伺服器重定向到客戶端。Trident 的分散式 RPC 查詢像普通的 RPC 查詢一樣執行,只是這些查詢是並行執行的。
何時使用 Trident?
在許多用例中,如果需求是僅處理一次查詢,我們可以透過在 Trident 中編寫 topology 來實現它。另一方面,在 Storm 的情況下,很難實現恰好一次處理。因此,Trident 將對那些需要恰好一次處理的用例有用。Trident 並非適用於所有用例,尤其是高效能用例,因為它增加了 Storm 的複雜性並管理狀態。
Trident 的工作示例
我們將把上一節中完成的呼叫日誌分析器應用程式轉換為 Trident 框架。由於其高階 API,Trident 應用程式將相對容易,與普通 storm 相比。Storm 將基本上需要在 Trident 中執行任何一個函式、過濾器、聚合、GroupBy、聯接和合並操作。最後,我們將使用LocalDRPC類啟動 DRPC 伺服器,並使用LocalDRPC類的execute方法搜尋某些關鍵字。
格式化呼叫資訊
FormatCall 類的目的是格式化包含“呼叫者號碼”和“接收者號碼”的呼叫資訊。完整的程式程式碼如下:
編碼:FormatCall.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class FormatCall extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String fromMobileNumber = tuple.getString(0);
String toMobileNumber = tuple.getString(1);
collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
}
}
CSVSplit
CSVSplit 類的目的是根據“逗號 (,)”分割輸入字串併發出字串中的每個單詞。此函式用於解析分散式查詢的輸入引數。完整程式碼如下:
編碼:CSVSplit.java
import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
public class CSVSplit extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
for(String word: tuple.getString(0).split(",")) {
if(word.length() > 0) {
collector.emit(new Values(word));
}
}
}
}
日誌分析器
這是主應用程式。最初,應用程式將初始化 TridentTopology 並使用FeederBatchSpout提供呼叫者資訊。可以使用 TridentTopology 類的newStream方法建立 Trident topology 流。類似地,可以使用 TridentTopology 類的newDRCPStream方法建立 Trident topology DRPC 流。可以使用 LocalDRPC 類建立一個簡單的 DRCP 伺服器。LocalDRPC具有 execute 方法來搜尋某些關鍵字。完整程式碼如下所示。
編碼:LogAnalyserTrident.java
import java.util.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;
import com.google.common.collect.ImmutableList;
public class LogAnalyserTrident {
public static void main(String[] args) throws Exception {
System.out.println("Log Analyser Trident");
TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
"toMobileNumber", "duration"));
TridentState callCounts = topology
.newStream("fixed-batch-spout", testSpout)
.each(new Fields("fromMobileNumber", "toMobileNumber"),
new FormatCall(), new Fields("call"))
.groupBy(new Fields("call"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(),
new Fields("count"));
LocalDRPC drpc = new LocalDRPC();
topology.newDRPCStream("call_count", drpc)
.stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));
topology.newDRPCStream("multiple_call_count", drpc)
.each(new Fields("args"), new CSVSplit(), new Fields("call"))
.groupBy(new Fields("call"))
.stateQuery(callCounts, new Fields("call"), new MapGet(),
new Fields("count"))
.each(new Fields("call", "count"), new Debug())
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident", conf, topology.build());
Random randomGenerator = new Random();
int idx = 0;
while(idx < 10) {
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123402", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123403", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123401",
"1234123404", randomGenerator.nextInt(60))));
testSpout.feed(ImmutableList.of(new Values("1234123402",
"1234123403", randomGenerator.nextInt(60))));
idx = idx + 1;
}
System.out.println("DRPC : Query starts");
System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
System.out.println(drpc.execute("multiple_call_count", "1234123401 -
1234123402,1234123401 - 1234123403"));
System.out.println("DRPC : Query ends");
cluster.shutdown();
drpc.shutdown();
// DRPCClient client = new DRPCClient("drpc.server.location", 3772);
}
}
構建和執行應用程式
完整的應用程式包含三個 Java 程式碼。它們如下:
- FormatCall.java
- CSVSplit.java
- LogAnalyerTrident.java
可以使用以下命令構建應用程式:
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令執行應用程式:
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
輸出
應用程式啟動後,應用程式將輸出有關叢集啟動過程、操作處理、DRPC 伺服器和客戶端資訊以及最終叢集關閉過程的完整詳細資訊。此輸出將顯示在控制檯上,如下所示。
DRPC : Query starts [["1234123401 - 1234123402",10]] DEBUG: [1234123401 - 1234123402, 10] DEBUG: [1234123401 - 1234123403, 10] [[20]] DRPC : Query ends