Storm框架:如何消費RabbitMq訊息(程式碼案例)
阿新 • • 發佈:2018-11-01
1、定義拓撲topology
public class MessageTopology { public static void main(String[] args) throws Exception { //組裝topology TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("RabbitmqSpout", new RabbitmqSpout()); topologyBuilder.setBolt("FilterBolt", new FilterBolt()).shuffleGrouping("RabbitmqSpout"); Config conf = new Config (); try { if (args.length > 0) { StormSubmitter.submitTopology(args[0], conf, topologyBuilder.createTopology()); } else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("messageTopology", conf, topologyBuilder.createTopology()); } } catch (AlreadyAliveException e) { e.printStackTrace(); } } }
2、定義資料來源RabbitmqSpout
RabbitmqSpout繼承自org.apache.storm.topology.IRichSpout介面,實現對應的方法:open(),close(),activate(),deactivate(),nextTuple(),ack(),fail()。
unconfirmedMap
物件儲存了MQ所有發射出去等待確認的訊息唯一標識deliveryTag,當storm系統回撥ack、fail方法後進行MQ訊息的成功確認或失敗重回佇列操作(Storm系統回撥方法會在bolt操作中主動呼叫ack、fail方法時觸發)。
public class RabbitmqSpout implements IRichSpout { private final Logger LOGGER = LoggerFactory.getLogger(RabbitmqSpout.class); private Map map; private TopologyContext topologyContext; private SpoutOutputCollector spoutOutputCollector; private Connection connection; private Channel channel; private static final String QUEUE_NAME = "message_queue"; private final Map<String, Long> unconfirmedMap = Collections.synchronizedMap(new HashMap<String, Long>()); //連線mq服務 private void connect() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); } @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.map = map; this.topologyContext = topologyContext; this.spoutOutputCollector = spoutOutputCollector; try { this.connect(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } @Override public void close() { try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } @Override public void nextTuple() { try { GetResponse response = channel.basicGet(QUEUE_NAME, false); if (response == null) { Utils.sleep(3000); } else { AMQP.BasicProperties props = response.getProps(); String messageId = UUID.randomUUID().toString(); Long deliveryTag = response.getEnvelope().getDeliveryTag(); String body = new String(response.getBody()); unconfirmedMap.put(messageId, deliveryTag); LOGGER.info("RabbitmqSpout: {}, {}, {}, {}", body, messageId, deliveryTag, props); this.spoutOutputCollector.emit(new Values(body), messageId); } } catch (IOException e) { e.printStackTrace(); } } @Override public void ack(Object o) { String messageId = o.toString(); Long deliveryTag = unconfirmedMap.get(messageId); LOGGER.info("ack: {}, {}, {}\n\n", messageId, deliveryTag, unconfirmedMap.size()); try { unconfirmedMap.remove(messageId); channel.basicAck(deliveryTag, false); } catch (IOException e) { e.printStackTrace(); } } @Override public void fail(Object o) { String messageId = o.toString(); Long deliveryTag = unconfirmedMap.get(messageId); LOGGER.info("fail: {}, {}, {}\n\n", messageId, deliveryTag, unconfirmedMap.size()); try { unconfirmedMap.remove(messageId); channel.basicNack(deliveryTag, false, true); } catch (IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("body")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void activate() { } @Override public void deactivate() { } }
3、定義資料流處理FilterBolt
public class FilterBolt implements IRichBolt { private final Logger LOGGER = LoggerFactory.getLogger(FilterBolt.class); private Map map; private TopologyContext topologyContext; private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.map = map; this.topologyContext = topologyContext; this.outputCollector = outputCollector; } @Override public void execute(Tuple tuple) { String value = tuple.getStringByField("body"); LOGGER.info("FilterBolt:{}", value); outputCollector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("body")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } @Override public void cleanup() { } }