1. 程式人生 > >使用Storm進行詞頻統計

使用Storm進行詞頻統計

date 業務 out eric args private shuf err base

詞頻統計

1.需求:讀取指定目錄的數據,並且實現單詞計數功能
2.實現方案:
Spout用於讀取指定文件夾(目錄),讀取文件,將文件的每一行發射到Bolt
SplitBolt用於接收Spout發射過來的數據,並拆分,發射到CountBolt
CountBolt接收SplitBolt發送的每一個單詞,進行單詞計數操作
3.拓撲設計:
DataSourceSpout + SplitBolt + CountBolt

代碼如下:

package com.csylh;

import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.File;
import java.io.IOException;
import java.util.*;

/**
 * Description:使用Storm完成詞頻統計功能
 *
 * @author: 留歌36
 * Date:2018/9/4 9:28
 */
public class LocalWordCountStormTopology {
    /**
     * 讀取數據並發送到Bolt上去
     */
    public static class DataSourceSpout extends BaseRichSpout{
        //定義一個發射器
        private SpoutOutputCollector collector;

        /**
         * 初始化方法 只是會被調用一次
         * @param conf  配置參數
         * @param context  上下文
         * @param collector  數據發射器
         */
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            //對上面定義的的發射器進行賦初值
            this.collector = collector;
        }

        /**
         * 用於數據的產生
         * 業務:
         * 1.讀取指定目錄的文件夾下的數據
         * 2.把每一行數據發射出去
         */
        @Override
        public void nextTuple() {
//            獲取所有文件,這裏指定文件的後綴
            Collection<File> files = FileUtils.listFiles(new File("E:\\StormText"),new String[]{"txt"},true);
//            循環遍歷每一個文件 ==>  由於這裏指定的是文件夾下面的目錄 所以就是需要進行循環遍歷
            for( File file : files){
                try {
//                    獲取每一個文件的每一行
                    List<String> lines =  FileUtils.readLines(file);
                    for(String line : lines){
//                        把每一行數據發射出去
                        this.collector.emit(new Values(line));
                    }
                    //TODO 數據處理完畢之後 改名  否則的話 會一直執行的
                    FileUtils.moveFile(file,new File(file.getAbsolutePath()+System.currentTimeMillis()));

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }

        /**
         * 聲明輸出字段名稱
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
    }
    /**
     * 對Spout發送過來的數據進行分割
     */
    public static class SplitBolt extends BaseRichBolt{
        private OutputCollector collector;
        /**
         * 初始化方法  只是會被執行一次
         * @param stormConf
         * @param context
         * @param collector Bolt的發射器,指定下一個Bolt的地址
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                this.collector = collector;
        }

        /**
         * 用於獲取Spout發送過來的數據
         * 業務邏輯
         *  spout發送過來的數據是一行一行的line
         *  這裏是需要line進行分割
         *
         * @param input
         */
        @Override
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split(",");

            for(String word : words){
//                這裏把每一個單詞發射出去
                this.collector.emit(new Values(word));
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
        }
    }
    /**
     * 詞頻匯總的Bolt
     */
    public static class CountBolt extends BaseRichBolt{
        /**
         * 由於這裏是不需要向外部發射  所以就不需要定義Collector
         * @param stormConf
         * @param context
         * @param collector
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        }
        Map<String,Integer> map = new HashMap<String, Integer>();
        /**
         * 業務邏輯
         * 1.獲取每一個單詞
         * 2.對每一個單詞進行匯總
         * 3.輸出結果
         * @param input
         */
        @Override
        public void execute(Tuple input) {
//            獲取每一個單詞
           String word = input.getStringByField("word");
           Integer count =  map.get(word);
           if (count == null){
               count = 0;
           }
            count++;
//           對單詞進行匯總
            map.put(word,count);
//           輸出
            System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");
            Set<Map.Entry<String,Integer>> entrySet = map.entrySet();
            for(Map.Entry<String,Integer> entry :entrySet){
                System.out.println(entry);
            }
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }
    /**
     * 主函數
     * @param args
     */
    public static void main(String[] args) {
//            使用TopologyBuilder根據Spout和Bolt構建Topology
        TopologyBuilder builder = new TopologyBuilder();
//            設置Bolt和Spout  設置Spout和Bolt的關聯關系
        builder.setSpout("DataSourceSpout",new DataSourceSpout());
        builder.setBolt("SplitBolt",new SplitBolt()).shuffleGrouping("DataSourceSpout");
        builder.setBolt("CountBolt",new CountBolt()).shuffleGrouping("SplitBolt");
//            創建一個本地的集群
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalWordCountStormTopology",new Config(),builder.createTopology());
    }
}

小結:開發Storm程序的步驟就是:
根據需求 設計實現方案 規劃拓撲

一般是先寫Spout數據產生器 發射數據到Bolt
接著,就是Bolt進行數據處理,如果有多個Bolt,非最後一個Bolt也要寫發射器Collector
最後一個Bolt直接輸出結果或者 輸出到HDFS或者關系型數據庫中
最終需要將Spout和Bolt進行組裝起來(借助TopologyBuilder)

使用Storm進行詞頻統計