Hazelcast - Map、Reduce 和聚合



當您擁有大量資料並且需要多臺機器(即分散式環境)來計算資料時,MapReduce 是一種計算模型,對資料處理很有用。它涉及將資料“對映”到成對的關鍵-值然後“縮減”(即對這些鍵進行分組並對值執行操作)。

鑑於 Hazelcast 是在考慮分散式環境的情況下進行設計的,因此天然就包含實施 Map-Reduce 框架。

讓我們看一個示例來說明如何做到這一點。

例如,假設我們有汽車(品牌和車牌號)及其車主的資訊。

Honda-9235, John
Hyundai-235, Alice
Honda-935, Bob
Mercedes-235, Janice
Honda-925, Catnis
Hyundai-1925, Jane

現在,我們需要找出每種品牌(即現代、本田等)的汽車數量。

示例

讓我們嘗試使用 MapReduce 來找出其結果:

package com.example.demo;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.mapreduce.Context;
import com.hazelcast.mapreduce.Job;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.KeyValueSource;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.Reducer;
import com.hazelcast.mapreduce.ReducerFactory;

public class MapReduce {
   public static void main(String[] args) throws ExecutionException,
   InterruptedException {
      try {
         // create two Hazelcast instances
         HazelcastInstance hzMember = Hazelcast.newHazelcastInstance();
         Hazelcast.newHazelcastInstance();
         IMap<String, String> vehicleOwnerMap=hzMember.getMap("vehicleOwnerMap");
         vehicleOwnerMap.put("Honda-9235", "John");
         vehicleOwnerMap.putc"Hyundai-235", "Alice");
         vehicleOwnerMap.put("Honda-935", "Bob");
         vehicleOwnerMap.put("Mercedes-235", "Janice");
         vehicleOwnerMap.put("Honda-925", "Catnis");
         vehicleOwnerMap.put("Hyundai-1925", "Jane");
         KeyValueSource<String, String> kvs=KeyValueSource.fromMap(vehicleOwnerMap);
         JobTracker tracker = hzMember.getJobTracker("vehicleBrandJob");
         Job<String, String> job = tracker.newJob(kvs);
         ICompletableFuture<Map<String, Integer>> myMapReduceFuture =
            job.mapper(new BrandMapper())
            .reducer(new BrandReducerFactory()).submit();
         Map<String, Integer&g; result = myMapReduceFuture.get();
         System.out.println("Final output: " + result);
      } finally {
         Hazelcast.shutdownAll();
      }
   }
   private static class BrandMapper implements Mapper<String, String, String, Integer> {
      @Override
      public void map(String key, String value, Context<String, Integer>
      context) {
         context.emit(key.split("-", 0)[0], 1);
      }
   }
   private static class BrandReducerFactory implements ReducerFactory<String, Integer, Integer> {
      @Override
      public Reducer<Integer, Integer> newReducer(String key) {
         return new BrandReducer();
      }
   }
   private static class BrandReducer extends Reducer<Integer, Integer> {
      private AtomicInteger count = new AtomicInteger(0);
      @Override
      public void reduce(Integer value) {
         count.addAndGet(value);
      }
      @Override
      public Integer finalizeReduce() {
         return count.get();
      }
   }
}

我們來了解一下這段程式碼:

  • 我們建立了 Hazelcast 成員。在示例中,我們只有一個成員,但還可以有許多個成員。
  • 我們使用假資料建立了一個對映並從中建立了一個鍵-值儲存。

  • 我們建立了一個 Map-Reduce 任務,並要求其將鍵-值儲存用作資料。

  • 然後,我們向叢集提交任務並等待其完成。

  • 對映器建立一個鍵,即從原始鍵中提取品牌資訊並將值設定為 1,然後將其資訊作為 K-V 傳送到還原器。

  • 還原器根據鍵(即品牌名稱)對資料進行分組,然後對值進行簡單求和。

輸出

程式碼的輸出為:

Final output: {Mercedes=1, Hyundai=2, Honda=3}
廣告