1. 程式人生 > >ActiveMQ(22):Consumer高級特性之消息分組(Message Groups)

ActiveMQ(22):Consumer高級特性之消息分組(Message Groups)

jms message groups activemq

一、簡介

Message Groups就是對消息分組,它是Exclusive Consumer功能的增強。


邏輯上,Message Groups 可以看成是一種並發的Exclusive Consumer。跟所有的消息都由唯一的consumer處理不同,JMS 消息屬性JMSXGroupID 被用來區分message group。Message Groups特性保證所有具有相同JMSXGroupID的消息會被分發到相同的consumer(只要這個consumer保持active)。


另外一方面,Message Groups特性也是一種負載均衡的機制。在一個消息被分發到consumer之前,broker首先檢查消息JMSXGroupID屬性。如果存在,那麽broker會檢查是否有某個consumer擁有這個message group。如果沒有,那麽broker會選擇一個consumer,並將它關聯到這個message group。此後,這個consumer會接收這個message group的所有消息,直到:

1:Consumer被關閉

2:Message group被關閉,通過發送一個消息,並設置這個消息的JMSXGroupSeq為-1

二、操作

2.1 創建一個Message Groups

創建一個Message Groups,只需要在message對象上設置屬性即可,如下:

message.setStringProperty("JMSXGroupID","GroupA");

2.2 關閉一個Message Groups

關閉一個Message Groups,只需要在message對象上設置屬性即可,如下:

message.setStringProperty("JMSXGroupID","GroupA");
message.setIntProperty("JMSXGroupSeq", -1);

發送:

public void test4() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
  Connection connection = connectionFactory.createConnection();
  connection.start();
  Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
  Destination destination = session.createQueue("test-queue4");
  MessageProducer producer = session.createProducer(destination);
  for (int i = 0; i < 3; i++) {
      TextMessage message = session.createTextMessage("messageGroupA--" + i);
	message.setStringProperty("JMSXGroupID", "GroupA");
	producer.send(message);
		
	TextMessage message2 = session.createTextMessage("GroupB--" + i);
	message.setStringProperty("JMSXGroupID", "GroupB");
	producer.send(message2);
  }
  session.commit();
  session.close();
    connection.close();
}

接收:

public void test4() throws Exception {
    ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("liuy","123456","failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)");
		
  Connection connection = cf.createConnection();
  connection.start();
		
  final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  Destination destination = session.createQueue("test-queue4");
  for (int i = 0; i < 2; i++) {
      MessageConsumer consumer = session.createConsumer(destination);
    consumer.setMessageListener(new MessageListener() {
        @Override
      public void onMessage(Message message) {
	      TextMessage msg = (TextMessage) message;
	    try {
	        System.out.println(consumer + "收到消息:" + msg.getText());
		  session.commit();
	    } catch (Exception e) {
	        e.printStackTrace();
	    }
	  }
    });
    }
}

效果:

技術分享


本文出自 “我愛大金子” 博客,請務必保留此出處http://1754966750.blog.51cto.com/7455444/1924848

ActiveMQ(22):Consumer高級特性之消息分組(Message Groups)