1. 程式人生 > >Storm的StreamID使用樣例(版本1.0.2)

Storm的StreamID使用樣例(版本1.0.2)

alt constant rate fields olt topology next blog for

隨手嘗試了一下StreamID的的用法。留個筆記。

==數據樣例==

{
    "Address": "小橋鎮小橋中學對面",
    "CityCode": "511300",
    "CountyCode": "511322",
    "EnterpriseCode": "YUNDA",
    "MailNo": "667748320345",
    "Mobile": "183****5451",
    "Name": "王***",
    "ProvCode": "510000",
    "Weight": "39"
}

==拓撲結構==

技術分享圖片

==程序源碼==

<Spout1>

package test;

import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import common.simulate.DataRandom;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; public class Spout1 extends BaseRichSpout { private SpoutOutputCollector _collector = null; private DataRandom _dataRandom = null; private int _timeInterval = 1000; @Override public void
declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("Stream1", new Fields("json")); declarer.declareStream("Stream2", new Fields("json")); } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _dataRandom = DataRandom.getInstance(); if (conf.containsKey(Constants.SpoutInterval)) { _timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval)); } } @Override public void nextTuple() { try { Thread.sleep(_timeInterval); } catch (InterruptedException e) { e.printStackTrace(); } JSONObject jsonObject = _dataRandom.getRandomExpressData(); System.out.print("[---Spout1---]jsonObject=" + jsonObject + "\n"); _collector.emit("Stream1", new Values(jsonObject.toJSONString())); _collector.emit("Stream2", new Values(jsonObject.toJSONString())); } }

<CountBolt1>

package test;

import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class CountBolt1 extends BaseRichBolt {
    private OutputCollector _collector = null;
    private int taskId = 0;
    private Map<String, Integer> _map = new HashMap<>();

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("Stream3", new Fields("company", "count"));
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
        taskId = context.getThisTaskId();
    }

    @Override
    public void execute(Tuple input) {
        String str = input.getStringByField("json");
        JSONObject jsonObject = JSONObject.parseObject(str);
        String company = jsonObject.getString(Constants.EnterpriseCode);

        int count = 0;
        if (_map.containsKey(company)) {
            count = _map.get(company);
        }
        count++;
        _map.put(company, count);

        _collector.emit("Stream3", new Values(company, count));
        System.out.print("[---CountBolt1---]" +
                "taskId=" + taskId + ", company=" + company + ", count=" + count + "\n");
    }
}

<CountBolt2>

package test;

import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class CountBolt2 extends BaseRichBolt {
    private OutputCollector _collector = null;
    private int _taskId = 0;
    private Map<String, Integer> _map = new HashMap<>();

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        _collector = outputCollector;
        _taskId = topologyContext.getThisTaskId();
    }

    @Override
    public void execute(Tuple tuple) {
        String str = tuple.getStringByField("json");
        JSONObject jsonObject = JSONObject.parseObject(str);
        String prov = jsonObject.getString(Constants.ProvCode);

        int count = 0;
        if (_map.containsKey(prov)) {
            count = _map.get(prov);
        }
        count++;
        _map.put(prov, count);

        _collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));
        System.out.print("[---CountBolt2---]" +
                "taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "\n");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));
    }
}

<CountBolt3>

package test;

import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class CountBolt3 extends BaseRichBolt {
    private OutputCollector _collector = null;
    private int _taskId = 0;
    private Map<String, Integer> _map = new HashMap<>();

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        _collector = outputCollector;
        _taskId = topologyContext.getThisTaskId();
    }

    @Override
    public void execute(Tuple tuple) {
        String str = tuple.getStringByField("json");

        JSONObject jsonObject = JSONObject.parseObject(str);
        String city = jsonObject.getString(Constants.CityCode);

        int count = 0;
        if (_map.containsKey(city)) {
            count = _map.get(city);
        }
        count++;
        _map.put(city, count);

        _collector.emit("Stream4", new Values(city, count));
        System.out.print("[---CountBolt3---]" +
                "taskId=" + _taskId + ", city=" + city + ", count=" + count + "\n");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));
    }
}

<TopBolt>

package test;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.List;
import java.util.Map;

public class TopBolt extends BaseRichBolt {

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    }

    @Override
    public void execute(Tuple tuple) {
        System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "\n");
        List<Object> values = tuple.getValues();
        for(Object value : values) {
            System.out.print("[---TopBolt---]value=" + value + "\n");
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}

<TestTopology>

package test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class TestTopology {
    public static void main(String[] args)
            throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("Spout1", new Spout1());
        builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");
        builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");
        builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");
        builder.setBolt("Top", new TopBolt())
                .fieldsGrouping("Count1", "Stream3", new Fields("company"))
                .fieldsGrouping("Count2", "Stream4", new Fields("prov"))
                .fieldsGrouping("Count3", "Stream4", new Fields("city"));

        Config config = new Config();
        config.setNumWorkers(1);
        config.put(common.constants.Constants.SpoutInterval, args[1]);

        if (Boolean.valueOf(args[0])) {
            StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("TestTopology1", config, builder.createTopology());
        }
    }
}

==結果日誌==

[---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小橋鎮小橋中學對面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"}
[---CountBolt1---]taskId=1, company=YUNDA, count=1
[---CountBolt3---]taskId=3, city=511300, count=1
[---CountBolt2---]taskId=2, prov=510000, count=1
[---TopBolt---]StreamID=Stream4
[---TopBolt---]value=510000
[---TopBolt---]value=1
[---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616
[---TopBolt---]StreamID=Stream4
[---TopBolt---]value=511300
[---TopBolt---]value=1
[---TopBolt---]StreamID=Stream3
[---TopBolt---]value=YUNDA
[---TopBolt---]value=1

Storm的StreamID使用樣例(版本1.0.2)