1. 程式人生 > >weblogic中使用jms傳送和接受訊息

weblogic中使用jms傳送和接受訊息

一. 開篇語

今天很坑爹啊, 為了搞一下JMS這個玩意, 整了一天了, 光是weblogic的環境就換了兩, 最後總算是搞定了, 寫下這篇日誌, 記錄下學習的心得.

 

二. JMS的思想

所謂的JMS其實就是非同步通訊, 我們可以打個簡單的比方: 現在的手機基本上普及了, 手機它有兩個基本功能, 一個是打電話, 一個是發簡訊. 

打電話是同步的, 必須要保證雙方都開機才能進行通話; 而發簡訊則是非同步的, 接收方不需要保持開機狀態; 

 

SUN公司給我們提供了一組標準被Java API用於企業級的訊息處理, 通過JMS可以在Java程式之間傳送和接受訊息以達到交換資料的目的,

非同步通訊實現了程式之間的鬆耦合的關係.

 

三. 名詞解釋

1. 連線工廠(ConnectionFactory): 用來建立訊息伺服器的connection物件.
2. 連線(Connection): 代表一個與JMS提供者的活動連線.
3. 目的(Destination): 標識訊息的傳送和接收方式, 分為佇列(Queue)和主題(Topic)兩種.
4. 會話(Session): 接收和傳送訊息的會話執行緒.

5. 為了實現JMS獨立於不同供應商MS的專有技術, weblogic JMS採用了受管物件(administratored object)的機制. 受管物件就是由訊息伺服器通過管理介面建立, 程式通過JNDI介面取得這些物件.weblogic 中的兩種受管物件: connection factory, distination. 

 

 

四. 訊息型別

1. StreamMessage: 訊息由序列化的Java物件組成, 必須按照設定時的順序讀取物件.
2. MapMessage: 訊息由key/value對組成, 其中名稱為string型別, 值為Java資料型別. 可以使用列舉順序讀取該訊息的值, 也可以通過名稱無序地獲取值。

3. TextMessage: 訊息的主體為字串, 這是最常用的訊息型別.
4. ObjectMessage: 訊息的主體為序列化的Java物件, 可以是自己定義的序列化的Java物件.
5. BytesMessage: 訊息的主體是二進位制資料.

 

五. 環境準備

1. myeclipse環境

① jre1.4

② j2ee 1.4 libraries

③ 匯入weblogic.jar

 

2. weblogic環境

① 安裝weblogic8.1並整合到myeclipse

     http://blog.csdn.net/zdp072/article/details/26831739

② 配置weblogic伺服器

1)、新建jms連線工廠,工廠名稱為“myJMSConnectionFactory”, JNDI name為"myJMSConnectionFactoryJNDIName" .

2)、定義後備儲存, 並填寫儲存目錄.

3)、新建jms伺服器,伺服器名稱為:“myJMSServer”.

4)、在“myJMSServer”服務下新建目標為“myJMSQueue”佇列, JNDI name為"myJMSQueueJNDIName".

5)、在“myJMSServer”服務下新建目標為“myJMSTopic”主題, JNDI name為"myJMSTopicJNDIName".

 

 

六. 程式碼實現

1. PTP(point to point) - 點對點模式.

   生產者傳送一條訊息到訊息伺服器, 訊息伺服器傳送給一個消費者, 這條訊息不能再發送給其他消費者, 相當於佇列, 先到先得.

   (一個訊息生產者對應一個訊息消費者)

 

  

①. 實體類User

 

/**
 * 實體類
 * @author zhangjim
 */
public class User implements Serializable {
	private static final long serialVersionUID = 1L;
	private String name;
	private int age;

	public int getAge() {
		return age;
	}

	public void setAge(int age) {
		this.age = age;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}
}

 

②. 生產者QueueMsgSender

 

public class QueueMsgSender {

	// Defines the JNDI context factory.
	public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

	// Defines the JNDI provider url.
	public final static String PROVIDER_URL = "t3://localhost:7001/";

	// Defines the JMS connection factory for the queue.
	public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName";

	// Defines the queue, use the queue JNDI name 
	public final static String QUEUE_JNDI_NAME = "myJMSQueueJNDIName";

	private QueueConnectionFactory qconFactory;
	private QueueConnection queueConnection;
	private QueueSession queueSession;
	private QueueSender queueSender;
	private Queue queue;
	private TextMessage textMessage;
	private StreamMessage streamMessage;
	private BytesMessage bytesMessage;
	private MapMessage mapMessage;
	private ObjectMessage objectMessage;
	
	/**
	 * get the context object.
	 * 
	 * @return context object
	 * @throws NamingException if operation cannot be performed
	 */
	private static InitialContext getInitialContext() throws NamingException {
		Hashtable table = new Hashtable();
		table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); 
		table.put(Context.PROVIDER_URL, PROVIDER_URL);
		InitialContext context = new InitialContext(table);
		return context;
	}

	/**
	 * Creates all the necessary objects for sending messages to a JMS queue.
	 * 
	 * @param ctx JNDI initial context
	 * @param queueName name of queue
	 * @exception NamingException if operation cannot be performed
	 * @exception JMSException if JMS fails to initialize due to internal error
	 */
	public void init(Context ctx, String queueName) throws NamingException, JMSException {
		qconFactory = (QueueConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME);
		queueConnection = qconFactory.createQueueConnection();
		queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
		queue = (Queue) ctx.lookup(queueName);
		queueSender = queueSession.createSender(queue);

		textMessage = queueSession.createTextMessage();
		streamMessage = queueSession.createStreamMessage();
		bytesMessage = queueSession.createBytesMessage();
		mapMessage = queueSession.createMapMessage();
		objectMessage = queueSession.createObjectMessage();

		queueConnection.start();
	}

	/**
	 * Sends a message to a JMS queue.
	 * 
	 * @param message message to be sent
	 * @exception JMSException if JMS fails to send message due to internal error
	 */
	public void send(String message) throws JMSException {
		// type1: set TextMessage
		textMessage.setText(message);
		
		// type2: set StreamMessage
		streamMessage.writeString(message);
		streamMessage.writeInt(20);

		// type3: set BytesMessage
		byte[] block = message.getBytes();
		bytesMessage.writeBytes(block);

		// type4: set MapMessage
		mapMessage.setString("name", message);

		// type5: set ObjectMessage
		User user = new User();
		user.setName(message);
		user.setAge(30);
		objectMessage.setObject(user);
		
		queueSender.send(objectMessage);
	}

	/**
	 * read the msg from the console, then send it.
	 * 
	 * @param msgSender
	 * @throws IOException if IO fails to send message due to internal error
	 * @throws JMSException if JMS fails to send message due to internal error
	 */
	private static void readAndSend(QueueMsgSender msgSender) throws IOException, JMSException {
		BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));
		System.out.println("Enter message(input quit to quit):");  
		String line = null;
		boolean quit = false; 
		do {
			line = msgStream.readLine();
			if (line != null && line.trim().length() != 0) { 
				msgSender.send(line);
				System.out.println("JMS Message Sent: " + line + "\n");
				quit = line.equalsIgnoreCase("quit");
			}
		} while (!quit);

	}
	
	/**
	 * release resources.
	 * 
	 * @exception JMSException if JMS fails to close objects due to internal error
	 */
	public void close() throws JMSException {
		queueSender.close();
		queueSession.close();
		queueConnection.close();
	}
	
	/**
	 * test client.
	 * 
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		InitialContext ctx = getInitialContext(); 
		QueueMsgSender sender = new QueueMsgSender();  
		sender.init(ctx, QUEUE_JNDI_NAME);
		readAndSend(sender);
		sender.close();
	}
}

 

③. 消費者QueueMsgReceiver

 

public class QueueMsgReceiver implements MessageListener {
	// Defines the JNDI context factory.
	public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

	// Defines the JNDI provider url.
	public final static String PROVIDER_URL = "t3://localhost:7001";

	// Defines the JMS connection factory for the queue.
	public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName";

	// Defines the queue, use the queue JNDI name 
	public final static String QUEUE_JNDI_NAME = "myJMSQueueJNDIName";

	private QueueConnectionFactory qconFactory;
	private QueueConnection queueConnection;
	private QueueSession queueSession;
	private QueueReceiver queueReceiver;
	private Queue queue;
	private boolean quit = false;
	
	/**
	 * get the context object.
	 * 
	 * @return context object
	 * @throws NamingException if operation cannot be performed
	 */
	private static InitialContext getInitialContext() throws NamingException {
		Hashtable table = new Hashtable();
		table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
		table.put(Context.PROVIDER_URL, PROVIDER_URL);
		InitialContext context = new InitialContext(table);
		return context;
	}
	
	/**
	 * Creates all the necessary objects for receiving messages from a JMS queue.
	 * 
	 * @param ctx JNDI initial context
	 * @param queueName name of queue
	 * @exception NamingException if operation cannot be performed
	 * @exception JMSException if JMS fails to initialize due to internal error
	 */
	public void init(Context ctx, String queueName) throws NamingException, JMSException {
		qconFactory = (QueueConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME);
		queueConnection = qconFactory.createQueueConnection(); 
		queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
		queue = (Queue) ctx.lookup(queueName);
		queueReceiver = queueSession.createReceiver(queue); 
		queueReceiver.setMessageListener(this);
		
		// second thread: message reveive thread.
		queueConnection.start();  
	}

	/**
	 * implement from MessageListener.
	 * when a message arrived, it will be invoked.
	 * 
	 * @param message message
	 */
	public void onMessage(Message message) {
		try {
			String msgStr = "";  
			int age = 0; 

			if (message instanceof TextMessage) {
				msgStr = ((TextMessage) message).getText();
			} else if (message instanceof StreamMessage) {
				msgStr = ((StreamMessage) message).readString();
				age = ((StreamMessage) message).readInt();
			} else if (message instanceof BytesMessage) {
				byte[] block = new byte[1024];
				((BytesMessage) message).readBytes(block);
				msgStr = String.valueOf(block);
			} else if (message instanceof MapMessage) {
				msgStr = ((MapMessage) message).getString("name");
			} else if (message instanceof ObjectMessage) {
				User user = (User) ((ObjectMessage) message).getObject();
				msgStr = user.getName(); 
				age = user.getAge();
			}

			System.out.println("Message Received: " + msgStr + ", " + age);

			if (msgStr.equalsIgnoreCase("quit")) {
				synchronized (this) {
					quit = true;
					this.notifyAll(); // Notify main thread to quit
				}
			}
		} catch (JMSException e) {
			throw new RuntimeException("error happens", e);
		}
	}

	/**
	 * release resources.
	 * 
	 * @exception JMSException if JMS fails to close objects due to internal error
	 */
	public void close() throws JMSException {
		queueReceiver.close();
		queueSession.close();
		queueConnection.close();
	}

	/**
	 * test client.
	 * first thread(main thread)
	 * 
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		InitialContext ctx = getInitialContext();
		QueueMsgReceiver receiver = new QueueMsgReceiver(); 
		receiver.init(ctx, QUEUE_JNDI_NAME);

		// Wait until a "quit" message has been received.
		synchronized (receiver) {
			while (!receiver.quit) {
				try {
					receiver.wait();
				} catch (InterruptedException e) { 
					throw new RuntimeException("error happens", e);
				}
			}
		}
		receiver.close();
	}
}

 

 

2. publisher and subscriber - 釋出訂閱模式.

    生產者傳送一條訊息到訊息伺服器, 訊息伺服器傳送給正在監聽的所有消費者, 類似廣播.

    (一個訊息生產者對應多個訊息消費者)

 

 

①. 生產者TopicMsgPublisher

 

public class TopicMsgPublisher {

	// Defines the JNDI context factory.
	public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

	// Defines the JNDI provider url.
	public final static String PROVIDER_URL = "t3://localhost:7001/";

	// Defines the JMS connection factory for the topic.
	public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName";

	// Defines the topic, use the topic JNDI name 
	public final static String TOPIC_JNDI_NAME = "myJMSTopicJNDIName"; 

	private TopicConnectionFactory tconFactory;
	private TopicConnection topicConnection;
	private TopicSession topicSession;
	private TopicPublisher topicPublisher; 
	private Topic topic; 
	private TextMessage textMessage;
	private StreamMessage streamMessage;
	private BytesMessage bytesMessage;
	private MapMessage mapMessage;
	private ObjectMessage objectMessage;
	
	/**
	 * get the context object.
	 * 
	 * @return context object
	 * @throws NamingException if operation cannot be performed
	 */
	private static InitialContext getInitialContext() throws NamingException {
		Hashtable table = new Hashtable();
		table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY); 
		table.put(Context.PROVIDER_URL, PROVIDER_URL);
		InitialContext context = new InitialContext(table);
		return context;
	}

	/**
	 * Creates all the necessary objects for sending messages to a JMS topic.
	 * 
	 * @param ctx JNDI initial context
	 * @param topicName name of topic
	 * @exception NamingException if operation cannot be performed
	 * @exception JMSException if JMS fails to initialize due to internal error
	 */
	public void init(Context ctx, String topicName) throws NamingException, JMSException {
		tconFactory = (TopicConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME);
		topicConnection = tconFactory.createTopicConnection();
		topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		topic = (Topic) ctx.lookup(topicName);
		topicPublisher  = topicSession.createPublisher(topic);

		textMessage = topicSession.createTextMessage();
		streamMessage = topicSession.createStreamMessage();
		bytesMessage = topicSession.createBytesMessage();
		mapMessage = topicSession.createMapMessage();
		objectMessage = topicSession.createObjectMessage();

		topicConnection.start();
	}

	/**
	 * Sends a message to a JMS topic.
	 * 
	 * @param message message to be sent
	 * @exception JMSException if JMS fails to send message due to internal error
	 */
	public void send(String message) throws JMSException {
		// type1: set TextMessage
		textMessage.setText(message);
		
		// type2: set StreamMessage
		streamMessage.writeString(message);
		streamMessage.writeInt(20);

		// type3: set BytesMessage
		byte[] block = message.getBytes();
		bytesMessage.writeBytes(block);

		// type4: set MapMessage
		mapMessage.setString("name", message);

		// type5: set ObjectMessage
		User user = new User();
		user.setName(message);
		user.setAge(30);
		objectMessage.setObject(user);
		
		topicPublisher.publish(objectMessage);
	}

	/**
	 * read the msg from the console, then send it.
	 * 
	 * @param msgPublisher
	 * @throws IOException if IO fails to send message due to internal error
	 * @throws JMSException if JMS fails to send message due to internal error
	 */
	private static void readAndSend(TopicMsgPublisher msgPublisher) throws IOException, JMSException { 
		BufferedReader msgStream = new BufferedReader(new InputStreamReader(System.in));
		System.out.println("Enter message(input quit to quit):");  
		String line = null;
		boolean quit = false; 
		do {
			line = msgStream.readLine();
			if (line != null && line.trim().length() != 0) { 
				msgPublisher.send(line);
				System.out.println("JMS Message Sent: " + line + "\n");
				quit = line.equalsIgnoreCase("quit");
			}
		} while (!quit);

	}
	
	/**
	 * release resources.
	 * 
	 * @exception JMSException if JMS fails to close objects due to internal error
	 */
	public void close() throws JMSException {
		topicPublisher.close();
		topicSession.close();
		topicConnection.close();
	}
	
	/**
	 * test client.
	 * 
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		InitialContext ctx = getInitialContext(); 
		TopicMsgPublisher publisher = new TopicMsgPublisher();  
		publisher.init(ctx, TOPIC_JNDI_NAME); 
		readAndSend(publisher);
		publisher.close();
	}
}


②. 消費者TopicMsgSubscriber

 

 

public class TopicMsgSubscriber implements MessageListener {
	// Defines the JNDI context factory.
	public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

	// Defines the JNDI provider url.
	public final static String PROVIDER_URL = "t3://localhost:7001";

	// Defines the JMS connection factory for the topic.
	public final static String CONNECTION_FACTORY_JNDI_NAME = "myJMSConnectionFactoryJNDIName";

	// Defines the topic, use the topic JNDI name 
	public final static String TOPIC_JNDI_NAME = "myJMSTopicJNDIName";

	private TopicConnectionFactory tconFactory;
	private TopicConnection topicConnection;
	private TopicSession topicSession;
	private TopicSubscriber topicSubscriber;
	private Topic topic; 
	private boolean quit = false;
	
	/**
	 * get the context object.
	 * 
	 * @return context object
	 * @throws NamingException if operation cannot be performed
	 */
	private static InitialContext getInitialContext() throws NamingException {
		Hashtable table = new Hashtable();
		table.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
		table.put(Context.PROVIDER_URL, PROVIDER_URL);
		InitialContext context = new InitialContext(table);
		return context;
	}
	
	/**
	 * Creates all the necessary objects for receiving messages from a JMS topic.
	 * 
	 * @param ctx JNDI initial context
	 * @param topicName name of topic
	 * @exception NamingException if operation cannot be performed
	 * @exception JMSException if JMS fails to initialize due to internal error
	 */
	public void init(Context ctx, String topicName) throws NamingException, JMSException {
		tconFactory = (TopicConnectionFactory) ctx.lookup(CONNECTION_FACTORY_JNDI_NAME);
		topicConnection = tconFactory.createTopicConnection(); 
		topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
		topic = (Topic) ctx.lookup(topicName);
		topicSubscriber = topicSession.createSubscriber(topic); 
		topicSubscriber.setMessageListener(this);
		
		// second thread: message reveive thread.
		topicConnection.start();  
	}

	/**
	 * implement from MessageListener.
	 * when a message arrived, it will be invoked.
	 * 
	 * @param message message
	 */
	public void onMessage(Message message) {
		try {
			String msgStr = "";  
			int age = 0; 

			if (message instanceof TextMessage) {
				msgStr = ((TextMessage) message).getText();
			} else if (message instanceof StreamMessage) {
				msgStr = ((StreamMessage) message).readString();
				age = ((StreamMessage) message).readInt();
			} else if (message instanceof BytesMessage) {
				byte[] block = new byte[1024];
				((BytesMessage) message).readBytes(block);
				msgStr = String.valueOf(block);
			} else if (message instanceof MapMessage) {
				msgStr = ((MapMessage) message).getString("name");
			} else if (message instanceof ObjectMessage) {
				User user = (User) ((ObjectMessage) message).getObject();
				msgStr = user.getName(); 
				age = user.getAge();
			}

			System.out.println("Message subscribed: " + msgStr + ", " + age);

			if (msgStr.equalsIgnoreCase("quit")) {
				synchronized (this) {
					quit = true;
					this.notifyAll(); // Notify main thread to quit
				}
			}
		} catch (JMSException e) {
			throw new RuntimeException("error happens", e);
		}
	}

	/**
	 * release resources.
	 * 
	 * @exception JMSException if JMS fails to close objects due to internal error
	 */
	public void close() throws JMSException {
		topicSubscriber.close();
		topicSession.close();
		topicConnection.close();
	}

	/**
	 * test client.
	 * first thread(main thread)
	 * 
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		InitialContext ctx = getInitialContext();
		TopicMsgSubscriber subscriber = new TopicMsgSubscriber(); 
		subscriber.init(ctx, TOPIC_JNDI_NAME);

		// Wait until a "quit" message has been subscribed.
		synchronized (subscriber) {
			while (!subscriber.quit) {
				try {
					subscriber.wait();
				} catch (InterruptedException e) { 
					throw new RuntimeException("error happens", e);
				}
			}
		}
		subscriber.close();
	}
}

 

七. 總結

 

使用JMS可以把不影響使用者執行結果又比較耗時的任務非同步的扔給JMS伺服器端, 而儘快地把螢幕還給使用者, 且伺服器端能夠多執行緒排隊相應高併發的請求, 在Java世界裡達到最高境界的解耦. 客戶端和伺服器端無須直連, 甚至無需知道對方是誰、在哪裡、有多少人,  只要對流過的資訊作響應就行了, 在企業應用複雜時作用非常明顯. 

 

八. 原始碼下載

 

下載原始碼請點選此連結: http://download.csdn.net/detail/zdp072/7422431