DynamoDB - 表格活動



DynamoDB 流使您可以跟蹤和響應表格專案更改。利用此功能建立一個應用程式,該應用程式透過更新跨源資訊來響應更改。同步大型多使用者系統中成千上萬使用者的 資料。用它來向用戶傳送更新通知。它的應用證明是多種多樣的和大量的。DynamoDB 流是實現此功能的主要工具。

流捕獲包含表格中專案修改的時間排序序列。它們最多儲存此資料 24 小時。應用程式使用它們幾乎即時地檢視原始專案和修改後的專案。

在表格上啟用的流捕獲所有修改。對於任何 CRUD 操作,DynamoDB 都會建立一個流記錄,其中包含已修改專案的 主鍵屬性。您可以為其他資訊配置流,例如之前和之後的影像。

流帶有兩項保證:

  • 每個記錄在流中只出現一次,並且

  • 每個專案修改都會產生與修改順序相同的流記錄。

所有流都即時處理,以便您可以將它們用於應用程式中的相關功能。

管理流

在建立表格時,您可以啟用流。現有表格允許停用流或更改設定。流提供非同步操作功能,這意味著不會影響表格效能。

利用 AWS 管理控制檯可以輕鬆管理流。首先,導航到控制檯,然後選擇**表格**。在“概述”選項卡中,選擇**管理流**。在視窗中,選擇在表格資料修改時新增到流中的資訊。輸入所有設定後,選擇**啟用**。

如果您想停用任何現有流,請選擇**管理流**,然後選擇**停用**。

您還可以利用 CreateTable 和 UpdateTable API 來啟用或更改流。使用引數 StreamSpecification 來配置流。StreamEnabled 指定狀態,啟用為 true,停用為 false。

StreamViewType 指定新增到流中的資訊:KEYS_ONLY、NEW_IMAGE、OLD_IMAGE 和 NEW_AND_OLD_IMAGES。

流讀取

透過連線到端點併發出 API 請求來讀取和處理流。每個流都由流記錄組成,每個記錄都作為擁有流的單個修改存在。流記錄包括一個序列號,顯示釋出順序。記錄屬於稱為分割槽的組。分割槽充當多個記錄的容器,並且還儲存訪問和遍歷記錄所需的資訊。24 小時後,記錄會自動刪除。

這些分割槽會根據需要生成和刪除,並且不會持續很長時間。它們還會自動分成多個新的分割槽,通常是為了響應寫入活動的激增。停用流後,開啟的分割槽將關閉。分割槽之間的層次關係意味著應用程式必須優先處理父分割槽才能確保正確的處理順序。您可以使用 Kinesis 介面卡自動執行此操作。

**注意** - 沒有導致更改的操作不會寫入流記錄。

訪問和處理記錄需要執行以下任務:

  • 確定目標流的 ARN。
  • 確定儲存目標記錄的流分割槽。
  • 訪問分割槽以檢索所需的記錄。

**注意** - 最多應有兩個程序同時讀取一個分割槽。如果超過 2 個程序,則可能會限制源。

可用的流 API 操作包括

  • ListStreams
  • DescribeStream
  • GetShardIterator
  • GetRecords

您可以檢視以下流讀取示例:

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =  
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());  
   private static AmazonDynamoDBStreamsClient streamsClient =  
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());  

   public static void main(String args[]) {  
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");   
      streamsClient.setEndpoint("InsertStreamEndpointHere");    
      
      // table creation 
      String tableName = "MyTestingTable";  
      ArrayList<AttributeDefinition> attributeDefinitions =  
         new ArrayList<AttributeDefinition>();  
      
      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID") 
         .withAttributeType("N"));
         
      ArrayList<KeySchemaElement> keySchema = new 
         ArrayList<KeySchemaElement>(); 
      
      keySchema.add(new KeySchemaElement() 
         .withAttributeName("ID") 
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification(); 
      streamSpecification.setStreamEnabled(true); 
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);  
      CreateTableRequest createTableRequest = new CreateTableRequest() 
         .withTableName(tableName) 
         .withKeySchema(keySchema) 
         .withAttributeDefinitions(attributeDefinitions) 
         .withProvisionedThroughput(new ProvisionedThroughput() 
         .withReadCapacityUnits(1L) 
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);  
      
      System.out.println("Executing CreateTable for " + tableName); 
      dynamoDBClient.createTable(createTableRequest);  
      System.out.println("Creating " + tableName); 
      
      try { 
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); 
      } catch (InterruptedException e) { 
         e.printStackTrace(); 
      } 
         
      // Get the table's stream settings 
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);  
      
      String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); 
      StreamSpecification myStreamSpec =  
         describeTableResult.getTable().getStreamSpecification();  
      
      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); 
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());  
      
      // Add an item 
      int numChanges = 0; 
      System.out.println("Making some changes to table data"); 
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); 
      item.put("ID", new AttributeValue().withN("222")); 
      item.put("Alert", new AttributeValue().withS("item!")); 
      dynamoDBClient.putItem(tableName, item); 
      numChanges++;  
      
      // Update the item         
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); 
      key.put("ID", new AttributeValue().withN("222")); 
      Map<String, AttributeValueUpdate> attributeUpdates =  
      new HashMap<String, AttributeValueUpdate>(); 
      
      attributeUpdates.put("Alert", new AttributeValueUpdate() 
         .withAction(AttributeAction.PUT) 
         .withValue(new AttributeValue().withS("modified item"))); 
      
      dynamoDBClient.updateItem(tableName, key, attributeUpdates); 
      numChanges++;   
      
      // Delete the item         
      dynamoDBClient.deleteItem(tableName, key);  
      numChanges++;
      
      // Get stream shards         
      DescribeStreamResult describeStreamResult =  
      streamsClient.describeStream(new DescribeStreamRequest() 
         .withStreamArn(myStreamArn)); 
      String streamArn =  
         describeStreamResult.getStreamDescription().getStreamArn(); 
      List<Shard> shards =  
         describeStreamResult.getStreamDescription().getShards();  
      
      // Process shards 
      for (Shard shard : shards) { 
         String shardId = shard.getShardId(); 
         System.out.println("Processing " + shardId + " in "+ streamArn);  
         
         // Get shard iterator 
         GetShardIteratorRequest getShardIteratorRequest = new 
            GetShardIteratorRequest() 
            .withStreamArn(myStreamArn) 
            .withShardId(shardId) 
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); 
         
         GetShardIteratorResult getShardIteratorResult =  
            streamsClient.getShardIterator(getShardIteratorRequest); 
         String nextItr = getShardIteratorResult.getShardIterator();  
         
         while (nextItr != null && numChanges > 0) { 
            // Read data records with iterator                 
            GetRecordsResult getRecordsResult =  
               streamsClient.getRecords(new GetRecordsRequest(). 
               withShardIterator(nextItr));
               
            List<Record> records = getRecordsResult.getRecords(); 
            System.out.println("Pulling records...");  
               
            for (Record record : records) { 
               System.out.println(record); 
               numChanges--;
            } 
            nextItr = getRecordsResult.getNextShardIterator(); 
         } 
      } 
   } 
}
廣告
© . All rights reserved.