1. 程式人生 > >Storm實時計算網站pv

Storm實時計算網站pv

PVBolt1進行多併發區域性彙總,PVSumbolt單執行緒進行全域性彙總

(1)建立資料輸入源PVSpout

package storm.test;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.
storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class PVSpout implements IRichSpout{ private static final long serialVersionUID = 1
L; private SpoutOutputCollector collector ; private BufferedReader reader; @SuppressWarnings("rawtypes") @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; try { reader = new BufferedReader
(new InputStreamReader(new FileInputStream("F:\\test\\websit.log"),"UTF-8")); } catch (Exception e) { e.printStackTrace(); } } @Override public void close() { try { if (reader != null) { reader.close(); } } catch (IOException e) { e.printStackTrace(); } } @Override public void activate() { } @Override public void deactivate() { } private String str; @Override public void nextTuple() { try { while((str = reader.readLine()) != null){ collector.emit(new Values(str)); Thread.sleep(500); } } catch (Exception e) { } } @Override public void ack(Object msgId) { } @Override public void fail(Object msgId) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("log")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }

(2)建立資料處理pvbolt1

package storm.test;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class PVBolt1 implements IRichBolt {

    private static final long serialVersionUID = 1L;
    private OutputCollector collector;
    private long pv = 0;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        // 獲取傳遞過來的資料
        String logline = input.getString(0);

        // 截取出sessionid
        String session_id = logline.split("\t")[1];

        // 根據會話id不同統計pv次數
        if (session_id != null) {
            pv++;
        }

        // 提交
        collector.emit(new Values(Thread.currentThread().getId(), pv));

        System.err.println("threadid:" + Thread.currentThread().getId() + "  pv:" + pv);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("thireadID", "pv"));

    }

    @Override
    public void cleanup() {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

(3)建立PVSumBolt

package storm.test;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class PVSumBolt implements IRichBolt {

    private static final long serialVersionUID = 1L;
    private Map<Long, Long> counts = new HashMap<>();

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void execute(Tuple input) {
        Long threadID = input.getLong(0);
        Long pv = input.getLong(1);

        counts.put(threadID, pv);

        long word_sum = 0;

        Iterator<Long> iterator = counts.values().iterator();

        while (iterator.hasNext()) {
            word_sum += iterator.next();
        }

        System.err.println("pv_all:" + word_sum);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

(4)驅動

package storm.test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;

public class PVMain {

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("PVSpout", new PVSpout(), 1);
        builder.setBolt("PVBolt1", new PVBolt1(), 4).shuffleGrouping("PVSpout");
        builder.setBolt("PVSumBolt", new PVSumBolt(), 1).shuffleGrouping("PVBolt1");

        Config conf = new Config();

        conf.setNumWorkers(2);

        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("pvtopology", conf, builder.createTopology());
        }
    }
}

結果:

134pv_num : 9
sumnum ==>33
128pv_num : 7
sumnum ==>34
134pv_num : 10
sumnum ==>35
128pv_num : 8
sumnum ==>36
128pv_num : 9
sumnum ==>37
128pv_num : 10