ActiveMQ(22):Consumer高級特性之消息分組(Message Groups)
一、簡介
Message Groups就是對消息分組,它是Exclusive Consumer功能的增強。
邏輯上,Message Groups 可以看成是一種並發的Exclusive Consumer。跟所有的消息都由唯一的consumer處理不同,JMS 消息屬性JMSXGroupID 被用來區分message group。Message Groups特性保證所有具有相同JMSXGroupID的消息會被分發到相同的consumer(只要這個consumer保持active)。
另外一方面,Message Groups特性也是一種負載均衡的機制。在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那麽broker會檢查是否有某個consumer擁有這個message group。如果沒有,那麽broker會選擇一個consumer,並將它關聯到這個message group。此後,這個consumer會接收這個message group的所有消息,直到:
1:Consumer被關閉
2:Message group被關閉,通過發送一個消息,並設置這個消息的JMSXGroupSeq為-1
二、操作
2.1 創建一個Message Groups
創建一個Message Groups,只需要在message對象上設置屬性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
2.2 關閉一個Message Groups
關閉一個Message Groups,只需要在message對象上設置屬性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA"); message.setIntProperty("JMSXGroupSeq", -1);
發送:
public void test4() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("test-queue4"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("messageGroupA--" + i); message.setStringProperty("JMSXGroupID", "GroupA"); producer.send(message); TextMessage message2 = session.createTextMessage("GroupB--" + i); message.setStringProperty("JMSXGroupID", "GroupB"); producer.send(message2); } session.commit(); session.close(); connection.close(); }
接收:
public void test4() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)"); Connection connection = cf.createConnection(); connection.start(); final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("test-queue4"); for (int i = 0; i < 2; i++) { MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println(consumer + "收到消息:" + msg.getText()); session.commit(); } catch (Exception e) { e.printStackTrace(); } } }); } }
效果:
本文出自 “我愛大金子” 博客,請務必保留此出處http://1754966750.blog.51cto.com/7455444/1924848
ActiveMQ(22):Consumer高級特性之消息分組(Message Groups)