1. 程式人生 > >storm單詞計數 本地運行

storm單詞計數 本地運行

cep cal txt wordcount ioe ktr ren pos 分割




import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;


import org.apache.commons.io.FileUtils;


import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import cn.crxy.storm.LocalStormTopology.SumBolt;


public class WordcountStormTopology {

public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;

/**
* 在本實例執行的時候被調用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
/**
* 死循環調用 心跳
*/

public void nextTuple() {
//獲取指定目錄以下全部的文件
Collection<File> files = FileUtils.listFiles(new File("D:\\test"), new String[]{"txt"}, true);
for (File file : files) {
try {
//解析每個文件的每一行
List<String> readLines = FileUtils.readLines(file);

for (String line : readLines) {
//把每一行數據發送出去
this.collector.emit(new Values(line));
}

//重命名 防止多次讀
FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
} catch (IOException e) {

e.printStackTrace();
}
}
}
/**
* 聲明字段名稱
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//fields就是field的列表
declarer.declare(new Fields("line"));
}
}

public static class SpiltBolt extends BaseRichBolt{

private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
/**
* 僅僅會被調用一次
*/
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
/**
* 死循環,循環的獲取上一級發送過來的數據(spout/bolt)
*/
public void execute(Tuple input) {
//獲取tuple發來數據
String line = input.getStringByField("line");
//對每一行數據進行分割
String[] words = line.split("\t");
for (String word : words) {
//把分割的單詞發送到下一個bolt
this.collector.emit(new Values(word));
}
}


storm單詞計數 本地運行