1. 程式人生 > >Storm入門之第6章一個實際的例子

Storm入門之第6章一個實際的例子

本章要闡述一個典型的網路分析解決方案,而這類問題通常利用Hadoop批處理作為解決方案。與Hadoop不同的是,基於Storm的方案會實時輸出結果。

我們的這個例子有三個主要元件(見圖6-1)

  • 一個基於Node.js的web應用,用於測試系統
  • 一個Redis伺服器,用於持久化資料
  • 一個Storm拓撲,用於分散式實時處理資料

架構概覽

圖6-1  架構概覽

NOTE:你如果想先把這個例子執行起來,請首先閱讀附錄C

我們已經偽造了簡單的電子商務網站。這個網站只有三個頁面:一個主頁、一個產品頁和一個產品統計頁面。這個應用基於ExpressSocket.io兩個框架實現了向瀏覽器推送內容更新。製作這個應用的目的是為了讓你體驗Storm叢集功能並看到處理結果,但它不是本書的重點,所以我們不會對它的頁面做更詳細描述。

主頁

這個頁面提供了全部有效產品的連結。它從Redis伺服器獲取資料並在頁面上把它們顯示出來。這個頁面的URL是http://localhost:3000/。(見圖6-2,譯者注,圖6-2翻譯如下,全是文字就不製圖了)

有效產品:

DVD播放器(帶環繞立體聲系統)

全高清藍光dvd播放器

媒體播放器(帶USB 2.0介面)

全高清攝像機

防水高清攝像機

防震防水高清攝像機

反射式攝像機

雙核安卓智慧手機(帶64GB SD卡)

普通行動電話

衛星電話

64GB SD卡

32GB SD卡

16GB SD卡

粉紅色智慧手機殼

黑色智慧手機殼

小山羊皮智慧手機殼

圖6-2 首頁

產品頁

產品頁用來顯示指定產品的相關資訊,例如,價格、名稱、分類。這個頁面的URL是:http://localhost:3000/product/:id。(見圖6-3,譯者注:全是文字不再製圖,翻譯如下)

產品頁:32英寸液晶電視

分類:電視機

價格:400

相關分類

圖6-3,產品頁

產品統計頁

這個頁面顯示通過收集使用者瀏覽站點,用Storm叢集計算的統計資訊。可以顯示為如下概要:瀏覽這個產品的使用者,在那些分類下面瀏覽了n次產品。該頁的URL是:http://localhost:3000/product/:id/stats。(見圖6-4,譯者注:全是文字,不再製圖,翻譯如下)

瀏覽了該產品的使用者也瀏覽了以下分類的產品:

1.攝像機

2.播放器

3.手機殼

4.儲存卡

圖6-4. 產品統計檢視

啟動這個Node.js web應用

首先啟動Redis伺服器,然後執行如下命令啟動web應用:

    node webapp/app.js

為了向你演示,這個應用會自動向Redis填充一些產品資料作為樣本。

Storm拓撲

為這個系統搭建Storm拓撲的目標是改進產品統計的實時性。產品統計頁顯示了一個分類計數器列表,用來顯示訪問了其它同類產品的使用者數。這樣可以幫助賣家瞭解他們的使用者需求。拓撲接收瀏覽日誌,並更新產品統計結果(見圖6-5)。

圖6-5  Storm拓撲的輸入與輸出

我們的Storm拓撲有五個元件:一個spout向拓撲提供資料,四個bolt完成統計任務。

UsersNavigationSpout

從使用者瀏覽資料佇列讀取資料傳送給拓撲

GetCategoryBolt

從Redis伺服器讀取產品資訊,向資料流新增產品分類

UserHistoryBolt

讀取使用者以前的產品瀏覽記錄,向下一步分發Product:Category鍵值對,在下一步更新計數器

ProductCategoriesCounterBolt

追蹤使用者瀏覽特定分類下的商品次數

NewsNotifierBolt

通知web應用立即更新使用者介面

下圖展示了拓撲的工作方式(見圖6-6)

package storm.analytics;
...
public class TopologyStarter {
    public static void main(String[] args) {
        Logger.getRootLogger().removeAllAppenders();
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("read-feed", new UsersNavigationSpout(),3);
        builder.setBolt("get-categ", new GetCategoryBolt(),3)
               .shuffleGrouping("read-feed");
        builder.setBolt("user-history", new UserHistoryBolt(),5)
               .fieldsGrouping("get-categ", new Fields("user"));
        builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(),5)
               .fieldsGrouping("user-history", new Fields("product"));
        builder.setBolt("news-notifier", new NewsNotifierBolt(),5)
               .shuffleGrouping("product-categ-counter");

        Config conf = new Config();
        conf.setDebug(true);
        conf.put("redis-host",REDIS_HOST);
        conf.put("redis-port",REDIS_PORT);
        conf.put("webserver", WEBSERVER);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("analytics", conf, builder.createTopology());
    }
}

Storm topology
Figure 6-6 Storm拓撲

UsersNavigationSpout

UsersNavigationSpout負責向拓撲提供瀏覽資料。每條瀏覽資料都是一個使用者瀏覽過的產品頁的引用。它們都被web應用儲存在Redis伺服器。我們一會兒就要看到更多資訊。

NOTE:下面的程式碼塊就是相關程式碼。

package storm.analytics;
public class UsersNavigationSpout extends BaseRichSpout {
    Jedis jedis;

    ...

    @Override
    public void nextTuple() {
        String content = jedis.rpop("navigation");
        if(content==null || "nil".equals(content)){
            try { Thread.sleep(300); } catch (InterruptedException e) {}
        } else {
            JSONObject obj=(JSONObject)JSONValue.parse(content);
            String user = obj.get("user").toString();
            String product = obj.get("product").toString();
            String type = obj.get("type").toString();
            HashMap<String, String> map = new HashMap<String, String>();
            map.put("product", product);
            NavigationEntry entry = new NavigationEntry(user, type, map);
            collector.emit(new Values(user, entry));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("user", "otherdata"));
    }
}

spout首先呼叫jedis.rpop(“navigation”)從Redis刪除並返回”navigation”列表最右邊的元素。如果列表已經是空的,就休眠0.3秒,以免使用忙等待迴圈阻塞伺服器。如果得到一條資料(資料是JSON格式),就解析它,並建立一個包含該資料的NavigationEntry POJO:

  • 瀏覽頁面的使用者
  • 使用者瀏覽的頁面型別
  • 由頁面型別決定的額外頁面資訊。“產品”頁的額外資訊就是使用者瀏覽的產品ID。

spout呼叫collector.emit(new Values(user, entry))分發包含這些資訊的元組。這個元組的內容是拓撲裡下一個bolt的輸入。

GetCategoryBolt

這個bolt非常簡單。它只負責反序列化前面的spout分發的元組內容。如果這是產品頁的資料,就通過ProductsReader類從Redis讀取產品資訊,然後基於輸入的元組再分發一個新的包含具體產品資訊的元組:

  • 使用者
  • 產品
  • 產品類別
package storm.analytics;

public class GetCategoryBolt extends BaseBasicBolt {
    private ProductReader reader;

    ...
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        NavigationEntry entry = (NavigationEntry)input.getValue(1);
        if("PRODUCT".equals(entry.getPageType())){
            try {
                String product = (String)entry.getOtherData().get("product");

                //呼叫產品條目API,得到產品資訊
                Product itm = reader.readItem(product);
                if(itm == null) {
                    return;
                }
                String categ = itm.getCategory();
                collector.emit(new Values(entry.getUserId(), product, categ));
            } catch (Exception ex) {
                System.err.println("Error processing PRODUCT tuple"+ ex);
                ex.printStackTrace();
            }
        }
    }
    ...
}

正如前面所提到的, 使用ProductsReader類讀取產品具體資訊。

package storm.analytics.utilities;
...
public class ProductReader {
    ...
    public Product readItem(String id) throws Exception{
        String content = jedis.get(id);
        if(content == null || ("nil".equals(content))){
            return null;
        }
        Object obj = JSONValue.parse(content);
        JSONObjectproduct = (JSONObject)obj;
        Product i = new Product((Long)product.get("id"),
                                (String)product.get("title"),
                                (Long)product.get("price"),
                                (String)product.get("category"));
        return i;
    }
    ...
}

UserHistoryBolt

UserHistoryBolt是整個應用的核心。它負責持續追蹤每個使用者瀏覽過的產品,並決定應當增加計數的鍵值對。

我們使用Redis儲存使用者的產品瀏覽歷史,同時基於效能方面的考慮,還應該保留一份本地副本。我們把資料訪問細節隱藏在方法getUserNavigationHistory(user)addProductToHistory(user,prodKey)裡,分別用來讀/寫訪問。它們的實現如下

package storm.analytics;
...
public class UserHistoryBolt extends BaseRichBolt{
    @Override
    public void execute(Tuple input) {
        String user = input.getString(0);
        String prod1 = input.getString(1);
        String cat1 = input.getString(2);

        //產品鍵嵌入了產品類別資訊
        String prodKey = prod1+":"+cat1;

        Set productsNavigated = getUserNavigationHistory(user);

        //如果使用者以前瀏覽過->忽略它
        if(!productsNavigated.contains(prodKey)) {
            //否則更新相關條目
            for (String other : productsNavigated) {
                String[] ot = other.split(":");
                String prod2 = ot[0];
                String cat2 = ot[1];
                collector.emit(new Values(prod1, cat2));
                collector.emit(new Values(prod2, cat1));
            }
            addProductToHistory(user, prodKey);
        }
    }
}

需要注意的是,這個bolt的輸出是那些類別計數應當獲得增長的產品。

看一看程式碼。這個bolt維護著一組被每個使用者瀏覽過的產品。值得注意的是,這個集合包含產品:類別鍵值對,而不是隻有產品。這是因為你會在接下來的呼叫中用到類別資訊,而且這樣也比每次從資料庫獲取更高效。這樣做的原因是基於以下考慮,產品可能只有一個類別,而且它在整個產品的生命週期當中不會改變。

讀取了使用者以前瀏覽過的產品集合之後(以及它們的類別),檢查當前產品以前有沒有被瀏覽過。如果瀏覽過,這條瀏覽資料就被忽略了。如果這是首次瀏覽,遍歷使用者瀏覽歷史,並執行collector.emit(new Values(prod1,cat2))分發一個元組,這個元組包含當前產品和所有瀏覽歷史類別。第二個元組包含所有瀏覽歷史產品和當前產品類別,由collectior.emit(new Values(prod2,cat1))。最後,將當前產品和它的類別新增到集合。

比如,假設使用者John有以下瀏覽歷史:

下面是將要處理的瀏覽資料

該使用者沒有瀏覽過產品8,因此你需要處理它。

因此要分發以下元組:

注意,左邊的產品和右邊的類別之間的關係應當作為一個整體遞增。

現在,讓我們看看這個Bolt用到的持久化實現。

public class UserHistoryBolt extends BaseRichBolt{
    ...
    private Set getUserNavigationHistory(String user) {
        Set userHistory = usersNavigatedItems.get(user);
        if(userHistory == null) {
            userHistory = jedis.smembers(buildKey(user));
            if(userHistory == null)
                userHistory = new HashSet();
            usersNavigatedItems.put(user, userHistory);
        }
        return userHistory;
    }
    private void addProductToHistory(String user, String product) {
        Set userHistory = getUserNavigationHistory(user);
        userHistory.add(product);
        jedis.sadd(buildKey(user), product);
    }
    ...
}

getUserNavigationHistory方法返回使用者瀏覽過的產品集。首先,通過usersNavigatedItems.get(user)方法試圖從本地記憶體得到使用者瀏覽歷史,否則,使用jedis.smembers(buildKey(user))從Redis伺服器獲取,並把資料新增到本地資料結構usersNavigatedItems

當用戶瀏覽一個新產品時,呼叫addProductToHistory,通過userHistory.add(product)jedis.sadd(buildKey(user),product)同時更新記憶體資料結構和Redis伺服器。

需要注意的是,當你需要做並行化處理時,只要bolt在記憶體中維護著使用者資料,你就得首先通過使用者做域資料流分組(譯者注:原文是fieldsGrouping,詳細情況請見第三章的域資料流組),這是一件很重要的事情,否則叢集內將會有使用者瀏覽歷史的多個不同步的副本。

ProductCategoriesCounterBolt

該類持續追蹤所有的產品-類別關係。它通過由UsersHistoryBolt分發的產品-類別資料對更新計數。

每個資料對的出現次數儲存在Redis伺服器。基於效能方面的考慮,要使用一個本地讀寫快取,通過一個後臺執行緒向Redis傳送資料。

Bolt會向拓撲的下一個Bolt——NewsNotifierBolt——傳送包含最新記數的元組,這也是最後一個Bolt,它會向終端使用者廣播實時更新的資料。

public class ProductCategoriesCounterBolt extends BaseRichBolt {
    ...
    @Override
    public void execute(){
        String product = input.getString(0);
        String categ = input.getString(1);
        int total = count(product, categ);
        collector.emit(new Values(product, categ, total));
    }
    ...
    private int count(String product, String categ) {
        int count = getProductCategoryCount(categ, product);
        count++;
        storeProductCategoryCount(categ, product, count);
        return count;
    }
    ...
}

這個bolt的持久化工作隱藏在getProductCategoryCountstoreProductCategoryCount兩個方法中。它們的具體實現如下:

package storm.analytics;
...
public class ProductCategoriesCounterBolt extends BaseRichBolt {
    // 條目:分類 -> 計數
    HashMap<String,Integer> counter = new HashMap<String, Integer>();

    //條目:分類 -> 計數
    HashMap<String,Integer> pendingToSave = new HashMap<String,Integer>();

    ...
    public int getProductCategoryCount(String categ, String product) {
        Integer count = counter.get(buildLocalKey(categ, product));
        if(count == null) {
            String sCount = jedis.hget(buildRedisKey(product), categ);
            if(sCount == null || "nil".equals(sCount)) {
                count = 0;
            } else {
                count = Integer.valueOf(sCount);
            }
        }
        return count;
    }
    ...
    private void storeProductCategoryCount(String categ, String product, int count) {
        String key = buildLocalKey(categ, product);
        counter.put(key, count);
        synchronized (pendingToSave) {
            pendingToSave.put(key, count);
        }
    }
    ...
}

方法getProductCategoryCount首先檢查記憶體快取計數器。如果沒有有效令牌,就從Redis伺服器取得資料。

方法storeProductCategoryCount更新計數器快取和pendingToSae緩衝。緩衝資料由下述後臺執行緒持久化。

package storm.analytics;

public class ProductCategoriesCounterBolt extends BaseRichBolt {
...
    private void startDownloaderThread() {
        TimerTask t = startDownloaderThread() {
            @Override
            public void run () {
                HashMap<String, Integer> pendings;
                synchronized (pendingToSave) {
                    pendings = pendingToSave;
                    pendingToSave = new HashMap<String,Integer>();
                }

                for (String key : pendings.keySet) {
                    String[] keys = key.split(":");
                    String product = keys[0];
                    String categ = keys[1];
                    Integer count = pendings.get(key);
                    jedis.hset(buildRedisKey(product), categ, count.toString());
                }
            }
        };
        timer = new Timer("Item categories downloader");
        timer.scheduleAtFixedRate(t, downloadTime, downloadTime);
    }
    ...
}

下載執行緒鎖定pendingToSave, 向Redis傳送資料時會為其它執行緒建立一個新的緩衝。這段程式碼每隔downloadTime毫秒執行一次,這個值可由拓撲配置引數download-time配置。download-time值越大,寫入Redis的次數就越少,因為一對資料的連續計數只會向Redis寫一次。

NewsNotifierBolt

為了讓使用者能夠實時檢視統計結果,由NewsNotifierBolt負責向web應用通知統計結果的變化。通知機制由Apache HttpClient通過HTTP POST訪問由拓撲配置引數指定的URL。POST訊息體是JSON格式。

測試時把這個bolt從拜年中刪除。

package storm.analytics;
...
public class NewsNotifierBolt extends BaseRichBolt {
...
@Override
public void execute(Tuple input) {
String product = input.getString(0);
String categ = input.getString(1);
int visits = input.getInteger(2);</code>

String content = "{\"product\":\"+product+"\",\"categ\":\""+categ+"\",\"visits\":"+visits+"}";
HttpPost post = new HttpPost(webserver);
try {
post.setEntity(new StringEntity(content));
HttpResponse response = client.execute(post);
org.apache.http.util.EntityUtils.consume(response.getEntity());
} catch (Exception e) {
e.printStackTrace();
reconnect();
}
}
...
}

Redis伺服器

Redis是一種先進的、基於記憶體的、支援持久化的鍵值儲存(見http://redis.io)。本例使用它儲存以下資訊:

  • 產品資訊,用來為web站點服務
  • 使用者瀏覽佇列,用來為Storm拓撲提供資料
  • Storm拓撲的中間資料,用於拓撲發生故障時恢復資料
  • Storm拓撲的處理結果,也就是我們期望得到的結果。

產品資訊

Redis伺服器以產品ID作為鍵,以JSON字串作為值儲存著產品資訊。

redis-cli
redis 127.0.0.1:6379&gt; get 15
"{\"title\":\"Kids smartphone cover\",\"category\":\"Covers\",\"price\":30,\"id\":
15}"

使用者瀏覽佇列

使用者瀏覽佇列儲存在Redis中一個鍵為navigation的先進先出佇列中。使用者瀏覽一個產品頁時,伺服器從佇列左側新增使用者瀏覽資料。Storm叢集不斷的從佇列右側獲取並移除資料。

redis 127.0.0.1:6379&gt; llen navigation
(integer) 5
redis 127.0.0.1:6379&gt; lrange navigation 0 4
1) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"type\":
\"PRODUCT\"}"
2) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"type\":
\"PRODUCT\"}"
3) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"2\",\"type\":
\"PRODUCT\"}"
4) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"3\",\"type\":
\"PRODUCT\"}"
5) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"5\",\"type\":
\"PRODUCT\"}"

中間資料

叢集需要分開儲存每個使用者的歷史資料。為了實現這一點,它在Redis伺服器上儲存著一個包含所有使用者瀏覽過的產品和它們的分類的集合。

redis 127.0.0.1:6379&gt; smembers history:59c34159-0ecb-4ef3-a56b-99150346f8d5
1) "1:Players"
2) "5:Cameras"
3) "2:Players"
4) "3:Cameras"

結果

Storm叢集生成關於使用者瀏覽的有用資料,並把它們的產品ID儲存在一個名為“prodcnt”的Redis hash中。

redis 127.0.0.1:6379&gt; hgetall prodcnt:2
1) "Players"
2) "1"
3) "Cameras"
4) "2"

測試拓撲

使用LocalCluster和一個本地Redis伺服器執行測試(見圖6-7)。向Redis填充產品資料,偽造訪問日誌。我們的斷言會在讀取拓撲向Redis輸出的資料時執行。測試使用者用java和groovy完成。
測試架構
圖6-7. 測試架構

初始化測試

初始化由以下三步組成:
啟動LocalCluster並提交拓撲。初始化在AbstractAnalyticsTest實現,所有測試用例都繼承該類。當初始化多個AbstractAnalyticsTest子類物件時,由一個名為topologyStarted的靜態標誌屬性確定初始化工作只會進行一次。

需要注意的是,sleep語句是為了確保在試圖獲取結果之前LocalCluster已經正確啟動了。

public abstract class AbstractAnalyticsTest extends Assert {
 def jedis
 static topologyStarted = false
 static sync= new Object()
 private void reconnect() {
 jedis = new Jedis(TopologyStarter.REDIS_HOST, TopologyStarter.REDIS_PORT)
 }
 @Before
 public void startTopology(){
 synchronized(sync){
 reconnect()
 if(!topologyStarted){
 jedis.flushAll()
 populateProducts()
 TopologyStarter.testing = true
 TopologyStarter.main(null)
 topologyStarted = true
 sleep 1000
 }
 }
 }
 ...
 public void populateProducts() {
 def testProducts = [
 [id: 0, title:"Dvd player with surround sound system",
 category:"Players", price: 100],
 [id: 1, title:"Full HD Bluray and DVD player",
 category:"Players", price:130],
 [id: 2, title:"Media player with USB 2.0 input",
 category:"Players", price:70],
 ...
 [id: 21, title:"TV Wall mount bracket 50-55 Inches",
 category:"Mounts", price:80]
 ]
 testProducts.each() { product -&gt;
 def val =
 "{ \"title\": \"${product.title}\" , \"category\": \"${product.category}\"," +
 " \"price\": ${product.price}, \"id\": ${product.id} }"
 println val
 jedis.set(product.id.toString(), val.toString())
 }
 }
 ...
}

在AbstractAnalyticsTest中實現一個名為navigate的方法。為了測試不同的場景,我們要模擬使用者瀏覽站點的行為,這一步向Redis的瀏覽佇列(譯者注:就是前文提到的鍵是navigation的佇列)插入瀏覽資料。

public abstract class AbstractAnalyticsTest extends Assert {
 ...
public void navigate(user, product) {
 String nav =
 "{\"user\": \"${user}\", \"product\": \"${product}\", \"type\": \"PRODUCT\"}".toString()
 println "Pushing navigation: ${nav}"
 jedis.lpush('navigation', nav)
 }
 ...
}

實現一個名為getProductCategoryStats的方法,用來讀取指定產品與分類的資料。不同的測試同樣需要斷言統計結果,以便檢查拓撲是否按照期望的那樣執行了。

public abstract class AbstractAnalyticsTest extends Assert {
 ...
 public int getProductCategoryStats(String product, String categ) {
 String count = jedis.hget("prodcnt:${product}", categ)
 if(count == null || "nil".equals(count))
 return 0
 return Integer.valueOf(count)
 }
 ...
}

一個測試用例

下一步,為使用者“1”模擬一些瀏覽記錄,並檢查結果。注意執行斷言之前要給系統留出兩秒鐘處理資料。(記住ProductCategoryCounterBolt維護著一份計數的本地副本,它是在後臺非同步儲存到Redis的。)

package functional
class StatsTest extends AbstractAnalyticsTest {
 @Test
 public void testNoDuplication(){
     navigate("1", "0") // Players
     navigate("1", "1") // Players
     navigate("1", "2") // Players
     navigate("1", "3") // Cameras
     Thread.sleep(2000) // Give two seconds for the system to process the data.
     assertEquals 1, getProductCategoryStats("0", "Cameras")
     assertEquals 1, getProductCategoryStats("1", "Cameras")
     assertEquals 1, getProductCategoryStats("2", "Cameras")
     assertEquals 2, getProductCategoryStats("0", "Players")
     assertEquals 3, getProductCategoryStats("3", "Players")
 }
}

對可擴充套件性和可用性的提示

為了能在一章的篇幅中講明白整個方案,它已經被簡化了。正因如此,一些與可擴充套件性和可用性有關的必要複雜性也被去掉了。這方面主要有兩個問題。
Redis伺服器不只是一個故障的節點,還是效能瓶頸。你能接收的資料最多就是Redis能處理的那些。Redis可以通過分片增強擴充套件性,它的可用性可以通過主從配置得到改進。這都需要修改拓撲和web應用的程式碼實現。
另一個缺點就是web應用不能通過增加伺服器成比例的擴充套件。這是因為當產品統計資料發生變化時,需要通知所有關注它的瀏覽器。這一“通知瀏覽器”的機制通過Socket.io實現,但是它要求監聽器和通知器在同一主機上。這一點只有當GET /product/:id/statsPOST /news滿足以下條件時才能實現,那就是這二者擁有相同的分片標準,確保引用相同產品的請求由相同的伺服器處理。