JMS訊息中介軟體原理及ActiveMQ在企業中的應用(接上篇)
阿新 • • 發佈:2019-01-28
程式碼實現:傳送訊息---》接受訊息---》伺服器配置
//1 傳送訊息(接受回覆訊息)
public class SenderMessageService {
//釋出指定訊息到指定地址(在釋出之前,建議將訊息儲存到資料庫)
public void publish(String type, Object object) {
try {
InitialContext initCtx = new InitialContext();
//1
Context envContext = (Context) initCtx.lookup("java:comp/env");
//2
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/NormalConnectionFactory");
//3
Connection connection = connectionFactory.createConnection();
//4
Session jmsSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5,6 Destination:需指定其對應的主題(subject)名稱
MessageProducer producer = jmsSession
.createProducer((Destination) envContext
.lookup("jms/topic/MyTopic"));
// 設定持久方式:根據Destination建立MessageProducer物件,同時設定其持久模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//7
Message message = jmsSession.createMessage();
message.setObjectProperty(type, object);
Topic topic = jmsSession.createTopic("jms/topic/MyTopic");
//8
message.setJMSReplyTo(topic);
//傳送訊息
producer.send(message);
//9 接受回覆的訊息
MessageConsumer consumer = jmsSession.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
String messageReceived = null;
try {
messageReceived = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out
.println("reply message received from customer1:"
+ messageReceived);
}
}
});
connection.start();
// 釋出重新整理帖子訊息
// testMessage.clearProperties();
// testMessage.setStringProperty("RefreshThreadId", "331");
// producer.send(testMessage);
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
//2接受訊息(傳送回覆)
import javax.servlet.*;
import javax.servlet.http.*;
import javax.naming.*;
import javax.jms.*;
import com.brightmart.MessageAction;
import com.brightmart.SM;
import com.util.mail.TestSendMail;
// 初始化jms連線,建立topic監聽器;指定接收訊息時候,做的對應處理
public class JMSListener extends HttpServlet implements MessageListener {
private static final long serialVersionUID = 3963233366687996777L;
//初始化jms連線,建立topic監聽器
public void init(ServletConfig config) throws ServletException {
try {
InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");// 1
// 根據JNDI獲取
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/FailoverConnectionFactory");// 2
Connection connection = connectionFactory.createConnection();// 3
// 給connection設定一個clientId
connection.setClientID("MyClient");
// 會話:兩個參,事務和應答模式
Session jmsSession = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);// 4 AUTO_ACKNOWLEDGE
// 普通訊息訂閱者,無法接收持久訊息// MessageConsumer consumer =
// jmsSession.createConsumer((Destination)//envContext.lookup("jms/topic/MyTopic"));
// 基於Topic建立持久的訊息訂閱者,前提:Connection必須指定一個唯一的clientId,當前為MyClient
TopicSubscriber consumer = jmsSession.createDurableSubscriber(
(Topic) envContext.lookup("jms/topic/MyTopic"), "MySub");// 5
consumer.setMessageListener(this);
connection.start();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
//接收訊息,做相應處理
public void onMessage(Message message) {
System.out.println("message in coustomer1.");
if (message == null) {
return;
}
try {
if (message.getObjectProperty("email") != null) {
String emailAddress = (String) message
.getObjectProperty("email");
TestSendMail sendMail = new TestSendMail();
sendMail.sendMail(emailAddress);
message.acknowledge();
Destination d = message.getJMSReplyTo();
Session sessionn = getConnection().createSession(false,
Session.CLIENT_ACKNOWLEDGE);
MessageProducer p = sessionn.createProducer(d);
TextMessage tm = sessionn
.createTextMessage("ustomer1 RECEIVED a email type message");
System.out
.println("customer1 RECEIVED a email type message");
p.send(tm);
} else if (message.getObjectProperty("message") != null) {
MessageAction m = new MessageAction();
SM sm = new SM();
sm.setDestTermId((String) message.getObjectProperty("message"));
sm.setMsgContent("分散式JMS-ActiveMQ系統測試");
m.addSM(sm);
message.acknowledge();
} else {
System.out.println("接收普通訊息,不做任何處理!");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public Connection getConnection() {
InitialContext initCtx;
try {
initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");
// 根據JNDI、url、user、password獲取
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/FailoverConnectionFactory");
Connection connection = connectionFactory.createConnection();
return connection;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
//3 context.xml
Context中新增配置:
<Resource
name="jms/FailoverConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource
name="jms/NormalConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="tcp://localhost:61616"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource name="jms/topic/MyTopic"
auth="Container"
type="org.apache.activemq.command.ActiveMQTopic"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO"/>
<Resource name="jms/queue/MyQueue"
auth="Container"
type="org.apache.activemq.command.ActiveMQQueue"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO.QUEUE"/>
4.傳送簡訊或郵件
請參考本部落格前兩篇文章
注:
1.系統下載ActiveMQ,並允許;
2.ActiveMQ需要融合web伺服器,如可以配置tomcat伺服器的context.xml;
3在專案中,需要引入ActiveMQ的jar.
//1 傳送訊息(接受回覆訊息)
public class SenderMessageService {
//釋出指定訊息到指定地址(在釋出之前,建議將訊息儲存到資料庫)
public void publish(String type, Object object) {
try {
InitialContext initCtx = new InitialContext();
//1
Context envContext = (Context) initCtx.lookup("java:comp/env");
//2
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/NormalConnectionFactory");
//3
Connection connection = connectionFactory.createConnection();
//4
Session jmsSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//5,6 Destination:需指定其對應的主題(subject)名稱
MessageProducer producer = jmsSession
.createProducer((Destination) envContext
.lookup("jms/topic/MyTopic"));
// 設定持久方式:根據Destination建立MessageProducer物件,同時設定其持久模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//7
Message message = jmsSession.createMessage();
message.setObjectProperty(type, object);
Topic topic = jmsSession.createTopic("jms/topic/MyTopic");
//8
message.setJMSReplyTo(topic);
//傳送訊息
producer.send(message);
//9 接受回覆的訊息
MessageConsumer consumer = jmsSession.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
String messageReceived = null;
try {
messageReceived = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out
.println("reply message received from customer1:"
+ messageReceived);
}
}
});
connection.start();
// 釋出重新整理帖子訊息
// testMessage.clearProperties();
// testMessage.setStringProperty("RefreshThreadId", "331");
// producer.send(testMessage);
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
//2接受訊息(傳送回覆)
import javax.servlet.*;
import javax.servlet.http.*;
import javax.naming.*;
import javax.jms.*;
import com.brightmart.MessageAction;
import com.brightmart.SM;
import com.util.mail.TestSendMail;
// 初始化jms連線,建立topic監聽器;指定接收訊息時候,做的對應處理
public class JMSListener extends HttpServlet implements MessageListener {
private static final long serialVersionUID = 3963233366687996777L;
//初始化jms連線,建立topic監聽器
public void init(ServletConfig config) throws ServletException {
try {
InitialContext initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");// 1
// 根據JNDI獲取
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/FailoverConnectionFactory");// 2
Connection connection = connectionFactory.createConnection();// 3
// 給connection設定一個clientId
connection.setClientID("MyClient");
// 會話:兩個參,事務和應答模式
Session jmsSession = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);// 4 AUTO_ACKNOWLEDGE
// 普通訊息訂閱者,無法接收持久訊息// MessageConsumer consumer =
// jmsSession.createConsumer((Destination)//envContext.lookup("jms/topic/MyTopic"));
// 基於Topic建立持久的訊息訂閱者,前提:Connection必須指定一個唯一的clientId,當前為MyClient
TopicSubscriber consumer = jmsSession.createDurableSubscriber(
(Topic) envContext.lookup("jms/topic/MyTopic"), "MySub");// 5
consumer.setMessageListener(this);
connection.start();
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
}
}
//接收訊息,做相應處理
public void onMessage(Message message) {
System.out.println("message in coustomer1.");
if (message == null) {
return;
}
try {
if (message.getObjectProperty("email") != null) {
String emailAddress = (String) message
.getObjectProperty("email");
TestSendMail sendMail = new TestSendMail();
sendMail.sendMail(emailAddress);
message.acknowledge();
Destination d = message.getJMSReplyTo();
Session sessionn = getConnection().createSession(false,
Session.CLIENT_ACKNOWLEDGE);
MessageProducer p = sessionn.createProducer(d);
TextMessage tm = sessionn
.createTextMessage("ustomer1 RECEIVED a email type message");
System.out
.println("customer1 RECEIVED a email type message");
p.send(tm);
} else if (message.getObjectProperty("message") != null) {
MessageAction m = new MessageAction();
SM sm = new SM();
sm.setDestTermId((String) message.getObjectProperty("message"));
sm.setMsgContent("分散式JMS-ActiveMQ系統測試");
m.addSM(sm);
message.acknowledge();
} else {
System.out.println("接收普通訊息,不做任何處理!");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public Connection getConnection() {
InitialContext initCtx;
try {
initCtx = new InitialContext();
Context envContext = (Context) initCtx.lookup("java:comp/env");
// 根據JNDI、url、user、password獲取
ConnectionFactory connectionFactory = (ConnectionFactory) envContext
.lookup("jms/FailoverConnectionFactory");
Connection connection = connectionFactory.createConnection();
return connection;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
//3 context.xml
Context中新增配置:
<Resource
name="jms/FailoverConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource
name="jms/NormalConnectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="tcp://localhost:61616"
brokerName="localhost"
useEmbeddedBroker="false"/>
<Resource name="jms/topic/MyTopic"
auth="Container"
type="org.apache.activemq.command.ActiveMQTopic"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO"/>
<Resource name="jms/queue/MyQueue"
auth="Container"
type="org.apache.activemq.command.ActiveMQQueue"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
physicalName="MY.TEST.FOO.QUEUE"/>
4.傳送簡訊或郵件
請參考本部落格前兩篇文章
注:
1.系統下載ActiveMQ,並允許;
2.ActiveMQ需要融合web伺服器,如可以配置tomcat伺服器的context.xml;
3在專案中,需要引入ActiveMQ的jar.