1. 程式人生 > >storm 消息確認機制及可靠性

storm 消息確認機制及可靠性

ride 字段名 [] div obj ets pan putc 一次

worker進程死掉

在一個節點 kill work進程 比方 kill 2509 對work沒有影響 由於會在其它節點又一次啟動進程運行topology任務

supervisor進程死掉

supervisor進程kill掉 對work進程沒有影響 由於他們是互相獨立的!

nimbus進程死掉(存在HA的問題)

nimbus假設死掉 整個任務掛掉 存在單點故障問題!(hadoop2有ha!。!!。!

storm沒有ha高可用)

節點宕機(和supervisor是一樣的)


ack/fail消息確認機制

spout發送過來的數據 blot要確認數據是否收到及反饋給spout 以下給一個樣例:



import java.util.Map;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;


public class ClusterStormTopologyAck {


public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;

/**
* 在本實例執行的時候被調用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
/**
* 死循環調用 心跳
*/
int i=0;
public void nextTuple() {
System.err.println("spout :"+i);
//values 就是value的list列表

//(new Values(i++),i-1);發送的值及key一一相應
this.collector.emit(new Values(i++),i-1);
Utils.sleep(1000);
}
/**
* 聲明字段名稱
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//fields就是field的列表
declarer.declare(new Fields("num"));
}
@Override
public void ack(Object msgId) {
System.out.println("運行ACK:"+msgId);
}
@Override
public void fail(Object msgId) {
System.out.println("運行FAIL:"+msgId);
//TODO--
//this.collector.emit(tuple);
}



}

public static class SumBolt extends BaseRichBolt{

private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
/**
* 僅僅會被調用一次
*/
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
/**
* 死循環,循環的獲取上一級發送過來的數據(spout/bolt)
*/
int sum = 0;
public void execute(Tuple input) {
//input.getInteger(0);
Integer count = input.getIntegerByField("num");

try{
//--------

this.collector.ack(input);
}catch(Exception e){
this.collector.fail(input);
}
/*if(count>10 && count<20){
this.collector.fail(input);
}{
this.collector.ack(input);
}*/
}


public void declareOutputFields(OutputFieldsDeclarer declarer) {

}
}


public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
String SPOUT_NAME = DataSourceSpout.class.getSimpleName();
String BOLT_NAME = SumBolt.class.getSimpleName();
builder.setSpout(SPOUT_NAME, new DataSourceSpout());
builder.setBolt(BOLT_NAME, new SumBolt()).shuffleGrouping(SPOUT_NAME);
Config config = new Config();
try {
StormSubmitter.submitTopology(ClusterStormTopologyAck.class.getSimpleName(), config, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}


}


}

storm 消息確認機制及可靠性