1. 程式人生 > >cluster模式下storm kill topology時做cleanup的解決方法

cluster模式下storm kill topology時做cleanup的解決方法

public class MySpout extends BaseRichSpout {
    private static final Logger logger = LoggerFactory.getLogger(MySpout.class);
    private SpoutOutputCollector _collector;

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        logger.info("shutdown spout open function called");
        _collector = collector;
    }

    public void activate() {
        logger.info("shutdown spout activate function called");
    }

    public void deactivate() {
        logger.info("shutdown deaactivate to spout and bolt");
        try {
            String mes = "shutDown";
            long id = 11111111111111111L;
            _collector.emit("stop", new Values(mes), id);
            //Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void nextTuple() {
        try {
            Thread.sleep(10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer message) {
        message.declareStream("stop", new Fields("stop"));
    }

    public void ack(Object msgId) {
        logger.info("shutDown spout ack, msId " + msgId);
    }

    public void fail(Object msgId) {
        logger.error("shutDown spout fail, msId " + msgId);
    }
}