1. 程式人生 > >JMS訊息傳送和接收例項

JMS訊息傳送和接收例項

傳送訊息

不管是將訊息傳送到佇列還是釋出到主題,程式設計的步驟是相同的,差別在於使用不同的JMS物件。具體定義見表:


傳送訊息的過程大體分為以下幾步;

1、獲得一個Weblogic Server上下文的引用;

2、建立連線工廠;

3、使用連線工廠建立一個連線;

4、使用連線建立一個會話;

5、獲取一個目的;

6、使用會話和目的建立訊息的生產者;

7、建立訊息物件;

8、使用連線建立一個需要傳送的訊息型別的例項;

9、使用連線的一個佇列傳送器或主題公佈器,然後使用傳送器或公佈器傳送訊息。

在敲程式碼之前要先匯入需要的JAR包,並且配置JMS伺服器。


注意:

wlfullclient.jar生成方式是,進入weblogic的安裝目錄例如C:\Oracle\Middleware\wlserver_10.3\server\lib,執行 java -jar wljarbuilder.jar就能生成wlfullclient.jar檔案  傳送訊息程式碼:
package com.xu.testDemo;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class MsgQueueSender {

	/**
	 * @功能:JMS中實現點對點訊息服務--傳送訊息
	 * @作者:
	 * @日期:2012-10-17
	 */

	private QueueSender sender;
	private TextMessage msg;

	public MsgQueueSender(String[] argv) throws NamingException, JMSException {
		/* 初始化上下文物件 */
		String url = "t3://localhost:7001";
		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY,
				"weblogic.jndi.WLInitialContextFactory");
		p.put(Context.PROVIDER_URL, url);
		Context ctx = new InitialContext(p);

		/* 建立一個連線工廠 */
		QueueConnectionFactory qConFactory = (QueueConnectionFactory) ctx
				.lookup("weblogic.jms.ConnectionFactory");

		/* 建立一個佇列 */
		Queue messageQueue = (Queue) ctx.lookup("jms/MyMDB");

		/* 建立連線 */
		QueueConnection qCon = qConFactory.createQueueConnection();

		/* 建立一個會話 */
		QueueSession session = qCon.createQueueSession(false,
				Session.AUTO_ACKNOWLEDGE);

		/* 建立一個傳送者 */
		sender = session.createSender(messageQueue);

		/* 建立一個訊息 */
		msg = session.createTextMessage();

	}

	public void runClient(String str) throws JMSException {
		/* 設定訊息,併發送 */
		msg.setText("Hello");
		sender.send(msg);
		msg.setText("Welcome to JMS");
		sender.send(msg);
		msg.setText(str);
		sender.send(msg);
	}

	public static void main(String[] args) throws Exception {
		try {
			MsgQueueSender mqs = new MsgQueueSender(args);
			mqs.runClient("aaa");

		} catch (NamingException e) {
			System.err.println("");
			System.err.println("**請確保已經正確地設定JMS伺服器。在執行之前必須配置JMS伺服器和正確的JMS目的。");
			System.err.println("");
			throw e;

		}
	}

}


接收訊息分為同步與非同步接收。

同步接收程式碼:

package com.xu.testDemo;

import java.util.Properties;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class SyncMesConsumer {

	/**
	 * @功能:同步接收訊息例項
	 * @作者:
	 * @日期:2012-10-17
	 */

	private QueueReceiver receiver;
	private TextMessage msg;

	public SyncMesConsumer() throws NamingException, JMSException {
		/* 初始化上下文物件 */
		String url = "t3://localhost:7001";
		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY,
				"weblogic.jndi.WLInitialContextFactory");
		p.put(Context.PROVIDER_URL, url);
		Context ctx = new InitialContext(p);

		/* 建立一個連線工廠 */
		QueueConnectionFactory qConFactory = (QueueConnectionFactory) ctx
				.lookup("weblogic.jms.ConnectionFactory");
		/* 建立一個佇列 */
		Queue messageQueue = (Queue) ctx.lookup("jms/MyMDB");
		/* 建立一個連線 */
		QueueConnection qCon = qConFactory.createQueueConnection();
		/* 建立一個會話 */
		QueueSession session = qCon.createQueueSession(false,
				Session.AUTO_ACKNOWLEDGE);

		/* 建立訊息接收者 */
		receiver = session.createReceiver(messageQueue);
		/* 在呼叫此方法之前,訊息傳遞被禁止 */
		qCon.start();

	}

	public void runClient() throws JMSException {
		msg = (TextMessage) receiver.receive();
		System.err.println("Reciverd:" + msg.getText());
		msg = (TextMessage) receiver.receive();
		System.err.println("Reciverd:" + msg.getText());
		msg = (TextMessage) receiver.receive();
		System.err.println("Reciverd:" + msg.getText());
	}

	public static void main(String[] args) throws Exception {
		SyncMesConsumer consumer = new SyncMesConsumer();
		consumer.runClient();

	}

}

程式碼執行結果:

非同步接收程式碼:
package com.xu.testDemo;

import java.util.Properties;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class AsynMesConsumer implements MessageListener {

	/**
	 * @功能:非同步接收訊息
	 * @作者:
	 * @日期:2012-10-18
	 */
	
	private int EXPECTED_MESSAGE_COUNT = 2;
	private int messageCount = 0;
	private QueueReceiver receiver;
	private TextMessage msg;
	
	public AsynMesConsumer() throws NamingException, JMSException
	{
		/*初始化上下文物件*/
		String url = "t3://localhost:7001";
		Properties p = new Properties();
		p.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory");
		p.put(Context.PROVIDER_URL, url);
		Context ctx = new InitialContext(p);
		
		/*建立連線工廠*/
		QueueConnectionFactory qConFactory = (QueueConnectionFactory)
				ctx.lookup("weblogic.jms.ConnectionFactory");
		/*建立佇列*/
		Queue messageQueue = (Queue) ctx.lookup("jms/MyMDB");
		/*建立連線*/
		QueueConnection qCon = qConFactory.createQueueConnection();
		/*建立一個會話*/
		QueueSession session = qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
		/*建立一個接收者*/
		receiver = session.createReceiver(messageQueue);
		/*設定一個訊息監聽*/
		receiver.setMessageListener(this);
		qCon.start();
		
	}
	@Override
	public void onMessage(Message m) {
		
		try {
			msg = (TextMessage) m;
			System.out.println("Receiver:"+msg.getText());
			
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		messageCount ++;
	}
	public boolean expectMoreMessage()
	{
		return messageCount < EXPECTED_MESSAGE_COUNT;
	}
	public static void main(String[] args) throws Exception {
		int MAX_TRIES = 10;
		int tryCount = 0;
		AsynMesConsumer consumer = new AsynMesConsumer();
		while(consumer.expectMoreMessage() && (tryCount < MAX_TRIES))
		{
			try{
				Thread.sleep(1000);
			}catch(InterruptedException e)
			{
				e.printStackTrace();
			}	
			tryCount ++;
		}
		

	}
}

執行效果: