1. 程式人生 > >JMS傳送和接收例項-釋出/訂閱模式

JMS傳送和接收例項-釋出/訂閱模式

傳送訊息

不管是將訊息傳送到佇列還是釋出到主題,程式設計的步驟是相同的,差別在於使用不同的JMS物件。具體定義見表:

傳送訊息的過程大體分為以下幾步;

1、獲得一個Weblogic Server上下文的引用;

2、建立連線工廠;

3、使用連線工廠建立一個連線;

4、使用連線建立一個會話;

5、獲取一個目的;

6、使用會話和目的建立訊息的生產者;

7、建立訊息物件;

8、使用連線建立一個需要傳送的訊息型別的例項;

9、使用連線的一個佇列傳送器或主題公佈器,然後使用傳送器或公佈器傳送訊息。

在敲程式碼之前要先匯入需要的JAR包,並且配置JMS伺服器。

注意:

wlfullclient.jar生成方式是,進入weblogic的安裝目錄例如C:\Oracle\Middleware\wlserver_10.3\server\lib,執行 java -jar wljarbuilder.jar就能生成wlfullclient.jar檔案

傳送訊息程式碼:

package com.xu.Pub2Sub;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MsgTopicProducer {

	/**
	 * @功能:主題傳送者
	 * @作者:
	 * @日期:2012-10-18
	 */
	
	private TopicPublisher publisher;
	private TextMessage msg;
	
	public MsgTopicProducer(String[] args) throws NamingException, JMSException
	{
		/*初始化上下文物件*/
		String url = "t3://localhost:7001";
		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
		p.put(Context.PROVIDER_URL, url);
		Context ctx = new InitialContext(p);
		
		/*建立一個連線工廠*/
		TopicConnectionFactory tConFactory = (TopicConnectionFactory)
				ctx.lookup("weblogic.jms.ConnectionFactory");
		/*建立一個主題*/
		Topic messageTopic = (Topic) ctx.lookup("jms/MyTopic");
		/*建立一個連線*/
		TopicConnection tCon = tConFactory.createTopicConnection();
		/*建立一個會話*/
		TopicSession session = tCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		
		/*建立一個主題釋出者,併發送訊息*/
		publisher = session.createPublisher(messageTopic);
		msg = session.createTextMessage();
		
	}
	public void runClient() throws JMSException
	{
		msg.setText("Hello");
		publisher.publish(msg);
		msg.setText("Welcome to JMS!");
		publisher.publish(msg);
		System.out.println("成功!");
	}
	
	public static void main(String[] args) throws NamingException, JMSException {
		
		MsgTopicProducer mp = new MsgTopicProducer(args);
		mp.runClient();
	}

}



同步接收者程式碼:

package com.xu.Pub2Sub;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;


public class SyncMessageTopicReceiver {

	/**
	 * @功能:釋出-訂閱訊息服務   同步接收者
	 * @作者:
	 * @日期:2012-10-18
	 */
	
	private TopicSubscriber subscriber ;
	private TextMessage msg;
	
	public SyncMessageTopicReceiver() throws NamingException, JMSException
	{
		/*初始化上下文物件*/
		String url = "t3://localhost:7001";
		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
		p.put(Context.PROVIDER_URL, url);
		Context ctx = new InitialContext(p);
		
		/*建立主題連線工廠*/
		TopicConnectionFactory tConFactory = (TopicConnectionFactory) 
				ctx.lookup("weblogic.jms.ConnectionFactory");
		/*建立一個主題*/
		Topic messageTopic = (Topic) ctx.lookup("jms/MyTopic");
		/*建立連線*/
		TopicConnection tCon = tConFactory.createTopicConnection();
		/*建立會話*/
		TopicSession session = tCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		/*建立接收者*/
		subscriber = session.createSubscriber(messageTopic);
		tCon.start();	
	}
	public void runClient() throws JMSException
	{
		msg = (TextMessage) subscriber.receive();
		System.out.println("Receiver:"+msg.getText());
		msg = (TextMessage) subscriber.receive();
		System.out.println("Receiver:"+msg.getText());
		
	}
	public static void main(String[] args) throws NamingException, JMSException {
		SyncMessageTopicReceiver receiver = new SyncMessageTopicReceiver();
		receiver.runClient();
	}

}

非同步接收者程式碼:

package com.xu.Pub2Sub;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class AsyncMessageTopicReceiver implements MessageListener {

	/**
	 * @功能:釋出-訂閱訊息服務 非同步接收訊息
	 * @作者:
	 * @日期:2012-10-18
	 */

	private int EXPECTED_MESSAGE_COUNT = 10;
	private int messageCount = 0;
	private TopicSubscriber subscriber;

	public AsyncMessageTopicReceiver() throws NamingException, JMSException {
		/* 初始化上下文物件 */
		String url = "t3://localhost:7001";
		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY,
				"weblogic.jndi.WLInitialContextFactory");
		p.put(Context.PROVIDER_URL, url);
		Context ctx = new InitialContext(p);
		/*建立主題連線工廠*/
		TopicConnectionFactory tConFactory = (TopicConnectionFactory) 
				ctx.lookup("weblogic.jms.ConnectionFactory");
		/*建立一個主題*/
		Topic messageTopic = (Topic) ctx.lookup("jms/MyTopic");
		/*建立連線*/
		TopicConnection tCon = tConFactory.createTopicConnection();
		/*建立會話*/
		TopicSession session = tCon.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		/*建立接收者*/
		subscriber = session.createSubscriber(messageTopic);
		/* 設定監聽 */
		subscriber.setMessageListener(this);
		tCon.start();

	}

	public boolean expectMoreMessage() {
		return messageCount < EXPECTED_MESSAGE_COUNT;
	}

	@Override
	public void onMessage(Message message) {
		System.out.println("onMessage");
		try {
			TextMessage msg = (TextMessage) message;
			System.out.println("Receiver:" + msg.getText());
		} catch (JMSException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}

		messageCount++;
	}

	public static void main(String[] args) throws NamingException, JMSException {
		int MAX_TRIES = 30;
		int tryCount = 0;
		AsyncMessageTopicReceiver receiver = new AsyncMessageTopicReceiver();
		while (receiver.expectMoreMessage() && (tryCount < MAX_TRIES)) {
			
			try {
				System.out.println(tryCount);
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
			tryCount++;
		}

	}
}