Apache Storm - 例項演示



我們已經學習了 Apache Storm 的核心技術細節,現在是時候編寫一些簡單的場景了。

場景 – 移動通話記錄分析器

移動通話及其時長將作為輸入提供給 Apache Storm,Storm 將處理並分組相同主叫和被叫之間的通話,以及他們的總通話次數。

Spout 建立

Spout 是用於資料生成的元件。基本上,Spout 將實現 IRichSpout 介面。“IRichSpout”介面具有以下重要方法:

  • open − 為 spout 提供執行環境。執行器將執行此方法來初始化 spout。

  • nextTuple − 透過 collector 發出生成的資料。

  • close − 當 spout 將要關閉時呼叫此方法。

  • declareOutputFields − 宣告元組的輸出模式。

  • ack − 確認已處理特定元組。

  • fail − 指定未處理特定元組,並且不應重新處理。

Open

open 方法的簽名如下:

open(Map conf, TopologyContext context, SpoutOutputCollector collector)
  • conf − 為此 spout 提供 Storm 配置。

  • context − 提供關於 spout 在拓撲中的位置、其任務 ID、輸入和輸出資訊的完整資訊。

  • collector − 使我們能夠發出將由 bolt 處理的元組。

nextTuple

nextTuple 方法的簽名如下:

nextTuple()

nextTuple() 從與 ack() 和 fail() 方法相同的迴圈中定期呼叫。如果沒有工作要做,它必須釋放執行緒的控制權,以便其他方法有機會被呼叫。因此,nextTuple 的第一行檢查處理是否已完成。如果是,則應至少睡眠一毫秒以減少處理器上的負載,然後再返回。

close

close 方法的簽名如下:

close()

declareOutputFields

declareOutputFields 方法的簽名如下:

declareOutputFields(OutputFieldsDeclarer declarer)

declarer − 用於宣告輸出流 ID、輸出欄位等。

此方法用於指定元組的輸出模式。

ack

ack 方法的簽名如下:

ack(Object msgId)

此方法確認已處理特定元組。

fail

nextTuple 方法的簽名如下:

ack(Object msgId)

此方法通知未完全處理特定元組。Storm 將重新處理特定元組。

FakeCallLogReaderSpout

在我們的場景中,我們需要收集通話記錄詳細資訊。通話記錄資訊包含:

  • 主叫號碼
  • 被叫號碼
  • 時長

由於我們沒有通話記錄的即時資訊,我們將生成偽造的通話記錄。偽造資訊將使用 Random 類建立。完整的程式程式碼如下所示。

程式碼 − FakeCallLogReaderSpout.java

import java.util.*;
//import storm tuple packages
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import Spout interface packages
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;

//Create a class FakeLogReaderSpout which implement IRichSpout interface 
   to access functionalities
	
public class FakeCallLogReaderSpout implements IRichSpout {
   //Create instance for SpoutOutputCollector which passes tuples to bolt.
   private SpoutOutputCollector collector;
   private boolean completed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random class.
   private Random randomGenerator = new Random();
   private Integer idx = 0;

   @Override
   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.context = context;
      this.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this.idx <= 1000) {
         List<String> mobileNumbers = new ArrayList<String>();
         mobileNumbers.add("1234123401");
         mobileNumbers.add("1234123402");
         mobileNumbers.add("1234123403");
         mobileNumbers.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this.idx++ < 1000) {
            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumber == toMobileNumber) {
               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));
         }
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void close() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void activate() {}

   @Override 
   public void deactivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt 建立

Bolt 是一個元件,它將元組作為輸入,處理元組,併產生新的元組作為輸出。Bolt 將實現IRichBolt 介面。在這個程式中,使用了兩個 bolt 類CallLogCreatorBoltCallLogCounterBolt 來執行操作。

IRichBolt 介面具有以下方法:

  • prepare − 為 bolt 提供執行環境。執行器將執行此方法來初始化 spout。

  • execute − 處理單個輸入元組。

  • cleanup − 當 bolt 將要關閉時呼叫。

  • declareOutputFields − 宣告元組的輸出模式。

Prepare

prepare 方法的簽名如下:

prepare(Map conf, TopologyContext context, OutputCollector collector)
  • conf − 為此 bolt 提供 Storm 配置。

  • context − 提供關於 bolt 在拓撲中的位置、其任務 ID、輸入和輸出資訊的完整資訊等。

  • collector − 使我們能夠發出已處理的元組。

execute

execute 方法的簽名如下:

execute(Tuple tuple)

這裡tuple 是要處理的輸入元組。

execute 方法一次處理一個元組。可以透過 Tuple 類的 getValue 方法訪問元組資料。不必立即處理輸入元組。可以處理多個元組並作為單個輸出元組輸出。可以使用 OutputCollector 類發出已處理的元組。

cleanup

cleanup 方法的簽名如下:

cleanup()

declareOutputFields

declareOutputFields 方法的簽名如下:

declareOutputFields(OutputFieldsDeclarer declarer)

這裡的引數declarer 用於宣告輸出流 ID、輸出欄位等。

此方法用於指定元組的輸出模式。

通話記錄建立 Bolt

通話記錄建立 bolt 接收通話記錄元組。通話記錄元組包含主叫號碼、被叫號碼和通話時長。此 bolt 透過組合主叫號碼和被叫號碼來簡單地建立一個新值。新值的格式為“主叫號碼 – 被叫號碼”,並將其命名為新欄位“call”。完整的程式碼如下所示。

程式碼 − CallLogCreatorBolt.java

//import util packages
import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;

//import Storm IRichBolt package
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutputCollector which collects and emits tuples to produce output
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(new Values(from + " - " + to, duration));
   }

   @Override
   public void cleanup() {}

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

通話記錄計數 Bolt

通話記錄計數 bolt 接收通話及其時長作為元組。此 bolt 在 prepare 方法中初始化一個字典 (Map) 物件。在execute 方法中,它檢查元組併為元組中的每個新的“call”值在字典物件中建立一個新條目,並在字典物件中設定值為 1。對於字典中已存在的條目,它只遞增其值。簡單來說,此 bolt 將通話及其計數儲存到字典物件中。除了將通話及其計數儲存到字典中,我們還可以將其儲存到資料來源中。完整的程式程式碼如下:

程式碼 − CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutputCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
      this.counterMap = new HashMap<String, Integer>();
      this.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.put(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.put(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void cleanup() {
      for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
         System.out.println(entry.getKey()+" : " + entry.getValue());
      }
   }

   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

建立拓撲

Storm 拓撲基本上是一個 Thrift 結構。TopologyBuilder 類提供簡單易用的方法來建立複雜的拓撲。TopologyBuilder 類具有設定 spout(setSpout) 和設定 bolt(setBolt) 的方法。最後,TopologyBuilder 具有 createTopology 來建立拓撲。使用以下程式碼片段來建立拓撲:

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spout");

builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGroupingfieldsGrouping 方法有助於設定 spout 和 bolt 的流分組。

本地叢集

出於開發目的,我們可以使用“LocalCluster”物件建立一個本地叢集,然後使用“LocalCluster”類的“submitTopology”方法提交拓撲。“submitTopology”的引數之一是“Config”類的例項。“Config”類用於在提交拓撲之前設定配置選項。此配置選項將在執行時與叢集配置合併,並透過 prepare 方法傳送到所有任務(spout 和 bolt)。一旦拓撲提交到叢集,我們將等待 10 秒鐘讓叢集計算提交的拓撲,然後使用“LocalCluster”的“shutdown”方法關閉叢集。完整的程式程式碼如下:

程式碼 − LogAnalyserStorm.java

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

//import storm configuration packages
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
   public static void main(String[] args) throws Exception{
      //Create Config instance for cluster configuration
      Config config = new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());

      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spout");

      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
			
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

構建和執行應用程式

完整的應用程式有四個 Java 程式碼。它們是:

  • FakeCallLogReaderSpout.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

可以使用以下命令構建應用程式:

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令執行應用程式:

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

輸出

應用程式啟動後,它將輸出有關叢集啟動過程、spout 和 bolt 處理以及最終叢集關閉過程的完整詳細資訊。在“CallLogCounterBolt”中,我們列印了通話及其計數詳細資訊。此資訊將以如下方式顯示在控制檯上:

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

非 JVM 語言

Storm 拓撲由 Thrift 介面實現,這使得在任何語言中提交拓撲都變得容易。Storm 支援 Ruby、Python 和許多其他語言。讓我們來看看 Python 繫結。

Python 繫結

Python 是一種通用的解釋型、互動式、面向物件和高階程式語言。Storm 支援 Python 來實現其拓撲。Python 支援發出、錨定、確認和記錄操作。

如您所知,bolt 可以用任何語言定義。用另一種語言編寫的 bolt 作為子程序執行,Storm 透過 stdin/stdout 上的 JSON 訊息與這些子程序通訊。首先,讓我們來看一個支援 Python 繫結的示例 bolt WordCount。

public static class WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("word"));
   }
}

這裡類WordCount實現了IRichBolt介面,並使用指定的python實現的super方法引數“splitword.py”執行。現在建立一個名為“splitword.py”的python實現。

import storm
   class WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

這是 Python 的示例實現,它計算給定句子中的單詞數。類似地,您也可以與其他支援的語言繫結。

廣告
© . All rights reserved.