1. 程式人生 > >大資料系列之實時處理Storm(二)Storm的本地模式,叢集模式

大資料系列之實時處理Storm(二)Storm的本地模式,叢集模式

我們通過編寫一個呼叫日誌的程式來熟悉一下Storm的本地模式和叢集模式,以及熟悉一下Storm的工作流程,主要實現的功能是統計一個電話給另外一個電話撥打的次數。

1.pom.xml

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>1.1.0</version>
</dependency>

2.建立Spout

package com.storm;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

/**
 * @author 鄒培賢
 * @Title: ${file_name}
 * @Package ${package_name}
 * @Description: Spout類,負責產生資料流
 * @date 2018/7/2620:22
 */
public class CallLogSpout implements IRichSpout{
    //Spout輸出收集器
    private SpoutOutputCollector collector;
    //是否完成
    private boolean completed = false;
    //上下文
    private TopologyContext context;
    //隨機生成器
    private Random randomGenerator=new Random();
    private Integer idx=0;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.context=topologyContext;
        this.collector=spoutOutputCollector;
    }

    @Override
    public void close() {

    }

    @Override
    public void activate() {

    }

    @Override
    public void deactivate() {

    }
/**
  * @Description: 下一個元祖,用來產生資料
  * @param ${tags}
  * @return ${return_type}
  * @throws
  * @author 鄒培賢
  * @date 2018/7/26 20:27
  */
    @Override
    public void nextTuple() {
          if(this.idx <= 1000){
              List<String> mobileNumbers=new ArrayList<String>();
              mobileNumbers.add("1234123401");
              mobileNumbers.add("1234123402");
              mobileNumbers.add("1234123403");
              mobileNumbers.add("1234123404");
              Integer localIdx = 0;
              while(localIdx++ < 100 && this.idx++ <1000){
                  //主叫
                  String caller =mobileNumbers.get(randomGenerator.nextInt(4));
                  //被叫
                  String callee =mobileNumbers.get(randomGenerator.nextInt(4));
                  while(caller == callee){
                      //主叫被叫是同一個
                      //被叫
                       callee =mobileNumbers.get(randomGenerator.nextInt(4));
                  }
                  //模擬通話時長
                  Integer duration = randomGenerator.nextInt(60);
                  this.collector.emit(new Values(caller,callee,duration));

              }
          }
    }

    @Override
    public void ack(Object o) {

    }

    @Override
    public void fail(Object o) {

    }
/**
  * @Description: 定義輸出欄位的描述資訊
  * @param ${tags}
  * @return ${return_type}
  * @throws
  * @author 鄒培賢
  * @date 2018/7/26 20:34
  */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("from","to","duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

3.建立CreatorBolt

package com.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
 * @author 鄒培賢
 * @Title: ${file_name}
 * @Package ${package_name}
 * @Description: 建立CallLog日誌的Bolt,用於處理Spout產生的資料
 * @date 2018/7/2620:35
 */
public class CallLogCreatorBolt implements IRichBolt{
    private OutputCollector collector;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector=outputCollector;
    }
/**
  * @Description:處理Spout輸出的元組資料
  * @param ${tags}
  * @return ${return_type}
  * @throws
  * @author 鄒培賢
  * @date 2018/7/26 20:37
  */
    @Override
    public void execute(Tuple tuple) {
        System.out.println("CallLogCreatorBolt.execute");
        //處理通話記錄
        String from=tuple.getString(0);
        String to=tuple.getString(1);
        Integer duration=tuple.getInteger(2);
        //產生新的tuple
        collector.emit(new Values(from+"-"+to,duration));
    }

    @Override
    public void cleanup() {

    }
/**
  * @Description: 設定輸出欄位的描述
  * @param ${tags}
  * @return ${return_type}
  * @throws
  * @author 鄒培賢
  * @date 2018/7/26 20:40
  */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("call","duration"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

4.建立CounterBolt

package com.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 鄒培賢
 * @Title: ${file_name}
 * @Package ${package_name}
 * @Description: 通話記錄計數器Bolt,處理上一個Blot輸出的資料
 * @date 2018/7/2620:42
 */
public class CallLogCounterBolt implements IRichBolt{
    private OutputCollector collector;
    private Map<String,Integer> counterMap;
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.counterMap = new HashMap<String, Integer>();
        this.collector=outputCollector;
    }
/**
  * @Description:處理資料
  * @param ${tags}
  * @return ${return_type}
  * @throws
  * @author 鄒培賢
  * @date 2018/7/26 20:46
  */
    @Override
    public void execute(Tuple tuple) {
        String call =tuple.getString(0);
        Integer duration = tuple.getInteger(1);
        if(!counterMap.containsKey(call)){
            counterMap.put(call,1);
        }else {
            Integer count = counterMap.get(call)+1;
            counterMap.put(call,count);
        }
        collector.ack(tuple);
    }
/**
  * @Description:Storm停掉之後執行
  * @param ${tags}
  * @return ${return_type}
  * @throws
  * @author 鄒培賢
  * @date 2018/7/26 20:49
  */
    @Override
    public void cleanup() {
        for(Map.Entry<String, Integer> entry:counterMap.entrySet()){
            System.out.println("============>"+entry.getKey()+":"+entry.getValue());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
      outputFieldsDeclarer.declare(new Fields("call"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

5.APP

  5.1本地模式:

package com.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @author 鄒培賢
 * @Title: ${file_name}
 * @Package ${package_name}
 * @Description: Stormd的Topology(由Spout + bolt連線在一起形成一個拓撲,
 * 形成有向圖,定點就是計算,邊是資料流。)
 * @date 2018/7/2620:52
 */
public class APP {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder=new TopologyBuilder();
        //設定Spout
        builder.setSpout("spout",new CallLogSpout());
        //設定creator-Bolt
        builder.setBolt("creator-bolt",new CallLogCreatorBolt()).shuffleGrouping("spout");
        //設定counter-Bolt
        builder.setBolt("counter-bolt",new CallLogCounterBolt()).fieldsGrouping("creator-bolt",new Fields("call"));
        Config conf=new Config();
        conf.setDebug(true);
        //本地模式Storm
         LocalCluster cluster=new LocalCluster();
         cluster.submitTopology("LogAnalyserStorm",conf,builder.createTopology());
         Thread.sleep(10000);
         //停止叢集檢視結果
         cluster.shutdown();
     
    }
}

注意:在本地模式除錯的時候移動要將log4j調成info,不然看不出效果

log4j如下:

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

在本地模式下檢視結果的時候會出現一些錯誤提示,我們先不用管,我們先看一下結果,熟悉一下Storm的流程。

                     

 5.2叢集模式:

5.2.1程式碼:

package com.storm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

/**
 * @author 鄒培賢
 * @Title: ${file_name}
 * @Package ${package_name}
 * @Description: Stormd的Topology(由Spout + bolt連線在一起形成一個拓撲,
 * 形成有向圖,定點就是計算,邊是資料流。)
 * @date 2018/7/2620:52
 */
public class APP {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder=new TopologyBuilder();
        //設定Spout
        builder.setSpout("spout",new CallLogSpout());
        //設定creator-Bolt
        builder.setBolt("creator-bolt",new CallLogCreatorBolt()).shuffleGrouping("spout");
        //設定counter-Bolt
        builder.setBolt("counter-bolt",new CallLogCounterBolt()).fieldsGrouping("creator-bolt",new Fields("call"));
        Config conf=new Config();
        conf.setDebug(true);
        //叢集形式
        StormSubmitter.submitTopology("mytop", conf, builder.createTopology());
    }
}

5.2.2打成jar包

5.2.3叢集執行

storm jar /home/zpx/jar/hadoop.jar  com.storm.APP

                                  

5.2.4 webui檢視

5.2.5手動殺死程式

注意:我們之前說過,Storm一旦開啟Topology,就不會停止,一直在實時的執行,我們需要手動殺死,點選上圖的名叫mytop的Topology,進入檢視詳情

5.2.6檢視輸出的資料

cd /home/zpx/soft/storm/logs

看到有個如下所示的資料夾

點進進去

看到有個/mytop-xx/xx/worker.log

然後使用cat檢視一下資料

我們通過上面編寫程式,大體的瞭解了Storm工作的基本過程,類似於MapReduce,只不過我們在這裡編寫的是Spout和Bolt,然後在APP中將其組裝成一個Topology,放在叢集上執行。

相關推薦

資料系列實時計算Spark十三機器學習

1.機器學習簡介 機器學習可能是當下最火的話題了。之前我們所做的一些工作,比如說java開發,安卓等等,其實無非就是在來回的寫方法,呼叫方法,而機器學習說的通俗一點可能就是找函式。要知道,我們現在面對的是巨大的資料量,對於這麼多的資料量,我們不太可能找到一個描述資料的方法或

資料系列實時處理StormStorm本地模式叢集模式

我們通過編寫一個呼叫日誌的程式來熟悉一下Storm的本地模式和叢集模式,以及熟悉一下Storm的工作流程,主要實現的功能是統計一個電話給另外一個電話撥打的次數。 1.pom.xml <dependency> <groupId>org.apach

資料系列實時計算Spark十八Python生成圖表

1.啟動pyspark pyspark --master local[2] 2.

資料系列實時計算Spark十七Python與Hbase整合

1.準備工作(所用到的工具庫會放在最後供下載使用) 1.1.安裝thrift   cmd>pip install thrift   我使用的是Anaconda3,下載下來的包會存放到  /Lib/site-packages/目錄下,如果沒有使用Anaconda3,

資料系列Hadoop知識整理MapReduce的核心Shuffle詳解

1.MapReduce的核心之shuffle詳解 上一篇中我們介紹了MapReduce是什麼,以及MapReduce的執行過程,其中在執行過程中主要分為Map端與Reducer端,MapReduce計算模型主要完成了對映與化簡,在這其中,有一個最重要的過程那就是其核心——s

資料儲存分散式檔案系統

1.Google檔案系統(GFS) 使用一堆廉價的商用計算機支撐大規模資料處理。 GFSClient: 應用程式的訪問介面 Master(主控伺服器):管理節點,在邏輯上只有一個(還有一臺“影子伺服器“,在主控伺服器失效時提供元資料,但並不是完整的熱備伺服器),儲

linux 命令系列目錄處理命令2

mkdir :make directories 建立資料夾        -p 建立遞迴資料夾 eg: mkdir -p  /tmp/test/linux/aaa        建立多個資料夾 eg: mkdir /tmp/test1 /tmp/test2 /tmp/

Appium python自動化測試系列appium環境搭建

ftp 自動化 手動 文件 搭建環境 做到 安裝python reg 成員 ?2.1 基礎環境搭建 當我們學習新的一項技術開始基本都是從環境搭建開始,本書除了第一章節也是的,如果你連最基礎的環境都沒有那麽我們也沒必要去說太多,大概介紹一下: 1、因為appium是支持and

談談-Android-PickerView系列源碼解析

需求 動態 () comm tag 多個 來源 ntc 寬高 前言   WheelView想必大家或多或少都有一定了解, 它是一款3D滾輪控件,效果類似IOS 上面的UIpickerview 。按照國際慣例,先放一張效果圖:   以上是Android-PickerView

【Android 進階】仿抖音系列列表播放視訊

上一篇中,我們實現了仿抖音上下翻頁切換視訊的效果,詳見【Android 進階】仿抖音系列之翻頁上下滑切換視訊(一),這一篇,我們來實現抖音列表播放視訊。 【Android 進階】仿抖音系列之翻頁上下滑切換視訊(一) 【Android 進階】仿抖音系列之列表播放視訊(二)

Android訊息處理機制Handler的本質-Message和Looper到底是什麼?

目錄 Android之訊息處理機制(二) 以下皆為乾貨,比較幹,需要讀者細細理解。  前面(一)已經解釋了Handler的基本機制了,下面來概括一下本質。 一、MessageQueue        MessageQueue其實就

資料分析中的挖掘技術

我們在上一篇文章中給大家介紹了大資料分析技術、資料探勘的意義、資料探勘的技術以及方法還有機器學習的內容。一般來說,大資料分析中的挖掘技術都是比較重要的,在這篇文章中我們給大家介紹一下資料探勘的主要過程以及資料探勘的重點內容。 我們不只在一篇文章中提到過,資料探勘的內容是非常重要的,

資料專案經理成長攻略

在上一次《大資料專案經理成長攻略》文章實踐一段時間來,與很多圈內朋友進行了交流,對前面思考的體系進行了完善,有了新的感悟。   01 更新了知識技能體系 作為資料探勘的專案經理,主要需要具備以下的技能。 在這裡我還是要推薦下我自己建的大資料學習交流qq裙:&nb

給想進入資料行業的朋友的建議

我們在上一篇文章中給大家介紹了大資料中的各個環節的普及內容,大資料的環節有資料的收集、資料的傳輸、資料的轉換、資料的清洗、資料的儲存、資料的二次加工、資料的挖掘、資料的統計、資料的上層應用輸出。今天我們在這篇文章中給大家介紹一下大資料的第一個環節,那就是資料的收集。 在資料的收集階段,資料主要有4種存在

Laravel系列新建laravel專案

新建專案就比較簡單了 開啟git for windows工具; laravel new test(專案名) 專案安裝成功; 進入新建的專案資料夾中; 需要將laravel的依賴包下載下來 依賴包下載完成之後; php artisan key:generate 執行下此

J2EE系列Struts2學習筆記---使用get/set方法自動獲取/設定引數值

上一篇部落格中講了struts2的配置以及實現了一個簡單的helloWorld程式。這一片部落格講一下如何使用get/set函式來自動獲取/設定引數值。 1.開啟eclipse,新建一個名為SecondLearnStruts2Chap02的Dynamic Web Proje

資料結構紅黑樹——插入操作

插入或刪除操作,都有可能改變紅黑樹的平衡性,利用顏色變化與旋轉這兩大法寶就可應對所有情況,將不平衡的紅黑樹變為平衡的紅黑樹。 在進行顏色變化或旋轉的時候,往往要涉及祖孫三代節點:X表示操作的基準節點,P代表X的父節點,G代表X的父節點的父節點。 我們先來大體預覽一下插入的

J2EE系列Hibernate4學習筆記--Hibernate4增刪改查體驗

上一篇部落格中使用hibernate4成功建立了表t_student,並向這個表中添加了一個數據。現在來體驗hibernate4實現資料庫表的增刪改查操作。 一、HibernateUtil封裝 使用hibernate對資料庫操作的時候都要首先獲得一個SessionFacto

死磕 java同步系列ReentrantLock原始碼解析——條件鎖

問題 (1)條件鎖是什麼? (2)條件鎖適用於什麼場景? (3)條件鎖的await()是在其它執行緒signal()的時候喚醒的嗎? 簡介 條件鎖,是指在獲取鎖之後發現當前業務場景自己無法處理,而需要等待某個條件的出現才可以繼續處理時使用的一種鎖。 比如,在阻塞佇列中,當佇列中沒有元素的時候是無法彈出一個元素

Docker系列烹飪披薩

前言 上一篇我們講解了虛擬機器和容器的區別,本節我們來講講Docker中關於Dockerfile、映象、容器等基本概念。Docker是一個在容器內開發、部署、執行應用程式的平臺,Docker本質上是容器化的代名詞,容器對於提高軟體開發和資料科學的安全性,可重複性和可擴充套件性起到了重要作用,擁抱Docker