1. 程式人生 > >JMS訊息中介軟體原理及ActiveMQ在企業中的應用(接上篇)

JMS訊息中介軟體原理及ActiveMQ在企業中的應用(接上篇)

程式碼實現:傳送訊息---》接受訊息---》伺服器配置
//1 傳送訊息(接受回覆訊息)
public class SenderMessageService {
//釋出指定訊息到指定地址(在釋出之前,建議將訊息儲存到資料庫)
 public void publish(String type, Object object) {
  try {
   InitialContext initCtx = new InitialContext();
   //1
   Context envContext = (Context) initCtx.lookup("java:comp/env");
   //2
   ConnectionFactory connectionFactory = (ConnectionFactory) envContext
     .lookup("jms/NormalConnectionFactory");
   //3
   Connection connection = connectionFactory.createConnection();
   //4
   Session jmsSession = connection.createSession(false,
     Session.AUTO_ACKNOWLEDGE);
   //5,6 Destination:需指定其對應的主題(subject)名稱
   MessageProducer producer = jmsSession
     .createProducer((Destination) envContext
       .lookup("jms/topic/MyTopic"));
   // 設定持久方式:根據Destination建立MessageProducer物件,同時設定其持久模式
   producer.setDeliveryMode(DeliveryMode.PERSISTENT);
   //7
   Message message = jmsSession.createMessage();
   message.setObjectProperty(type, object);
   Topic topic = jmsSession.createTopic("jms/topic/MyTopic");
   //8
   message.setJMSReplyTo(topic);
   //傳送訊息
   producer.send(message);
   
   //9 接受回覆的訊息
   MessageConsumer consumer = jmsSession.createConsumer(topic);
   consumer.setMessageListener(new MessageListener() {
    public void onMessage(Message message) {
     if (message != null && message instanceof TextMessage) {
      String messageReceived = null;
      try {
       messageReceived = ((TextMessage) message).getText();
      } catch (JMSException e) {
       e.printStackTrace();
      }
      System.out
        .println("reply message received from customer1:"
          + messageReceived);
     }
    }
   });
   connection.start();
   
   // 釋出重新整理帖子訊息
   // testMessage.clearProperties();
   // testMessage.setStringProperty("RefreshThreadId", "331");
   // producer.send(testMessage);
  } catch (NamingException e) {
   e.printStackTrace();
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
}
//2接受訊息(傳送回覆)
import javax.servlet.*;
import javax.servlet.http.*;
import javax.naming.*;
import javax.jms.*;
import com.brightmart.MessageAction;
import com.brightmart.SM;
import com.util.mail.TestSendMail;
// 初始化jms連線,建立topic監聽器;指定接收訊息時候,做的對應處理
public class JMSListener extends HttpServlet implements MessageListener {
 private static final long serialVersionUID = 3963233366687996777L;
 //初始化jms連線,建立topic監聽器
 public void init(ServletConfig config) throws ServletException {
  try {
   InitialContext initCtx = new InitialContext();
   Context envContext = (Context) initCtx.lookup("java:comp/env");// 1
   // 根據JNDI獲取
   ConnectionFactory connectionFactory = (ConnectionFactory) envContext
     .lookup("jms/FailoverConnectionFactory");// 2
   Connection connection = connectionFactory.createConnection();// 3
   // 給connection設定一個clientId
   connection.setClientID("MyClient");
   // 會話:兩個參,事務和應答模式
   Session jmsSession = connection.createSession(false,
     Session.CLIENT_ACKNOWLEDGE);// 4 AUTO_ACKNOWLEDGE
   // 普通訊息訂閱者,無法接收持久訊息// MessageConsumer consumer =
   // jmsSession.createConsumer((Destination)//envContext.lookup("jms/topic/MyTopic"));
   // 基於Topic建立持久的訊息訂閱者,前提:Connection必須指定一個唯一的clientId,當前為MyClient
   TopicSubscriber consumer = jmsSession.createDurableSubscriber(
     (Topic) envContext.lookup("jms/topic/MyTopic"), "MySub");// 5
   consumer.setMessageListener(this);
   connection.start();
  } catch (NamingException e) {
   e.printStackTrace();
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
 //接收訊息,做相應處理
 public void onMessage(Message message) {
  System.out.println("message in coustomer1.");
  if (message == null) {
   return;
  }
  try {
   if (message.getObjectProperty("email") != null) {
    String emailAddress = (String) message
      .getObjectProperty("email");
    TestSendMail sendMail = new TestSendMail();
    sendMail.sendMail(emailAddress);
    message.acknowledge();
    Destination d = message.getJMSReplyTo();
    Session sessionn = getConnection().createSession(false,
      Session.CLIENT_ACKNOWLEDGE);
    MessageProducer p = sessionn.createProducer(d);
    TextMessage tm = sessionn
      .createTextMessage("ustomer1 RECEIVED a email type message");
    System.out
      .println("customer1 RECEIVED a email type message");
    p.send(tm);
   } else if (message.getObjectProperty("message") != null) {
    MessageAction m = new MessageAction();
    SM sm = new SM();
    sm.setDestTermId((String) message.getObjectProperty("message"));
    sm.setMsgContent("分散式JMS-ActiveMQ系統測試");
    m.addSM(sm);
    message.acknowledge();
   } else {
    System.out.println("接收普通訊息,不做任何處理!");
   }
  } catch (JMSException e) {
   e.printStackTrace();
  }
 }
 public Connection getConnection() {
  InitialContext initCtx;
  try {
   initCtx = new InitialContext();
   Context envContext = (Context) initCtx.lookup("java:comp/env");
   // 根據JNDI、url、user、password獲取
   ConnectionFactory connectionFactory = (ConnectionFactory) envContext
     .lookup("jms/FailoverConnectionFactory");
   Connection connection = connectionFactory.createConnection();
   return connection;
  } catch (Exception e) {
   e.printStackTrace();
   return null;
  }
 }
}
 
//3 context.xml
Context中新增配置:
<Resource
name="jms/FailoverConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;maxReconnectAttempts=5"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource
name="jms/NormalConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="tcp://localhost:61616"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource name="jms/topic/MyTopic"
auth="Container"
type="org.apache.activemq.command.ActiveMQTopic"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO"/>
<Resource name="jms/queue/MyQueue"
auth="Container"
type="org.apache.activemq.command.ActiveMQQueue"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO.QUEUE"/>
 
4.傳送簡訊或郵件
請參考本部落格前兩篇文章
注:
1.系統下載ActiveMQ,並允許;
2.ActiveMQ需要融合web伺服器,如可以配置tomcat伺服器的context.xml;
3在專案中,需要引入ActiveMQ的jar.