1. 程式人生 > >訊息中介軟體ActiveMQ(一)HelloWorld入門例項

訊息中介軟體ActiveMQ(一)HelloWorld入門例項

1、JMS訊息傳送模式

  • 在點對點或佇列模型

一個生產者向一個特定的佇列釋出訊息,一個消費者從該佇列中讀取訊息。這裡,生產者知道消費者的佇列,並直接將訊息傳送到消費者的佇列。這種模式被概括為:只有一個消費者將獲得訊息。生產者不需要在接收者消費該訊息期間處於執行狀態,接收者也同樣不需要在訊息傳送時處於執行狀態。每一個成功處理的訊息都由接收者簽收。

  • 釋出者/訂閱者模型:

支援向一個特定的訊息主題釋出訊息。0或多個訂閱者可能對接收來自特定訊息主題的訊息感興趣。在這種模型下,釋出者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得訊息.在釋出者和訂閱者之間存在時間依賴性。釋出者需要建立一個訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續的活動狀態以接收訊息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連線時釋出的訊息將在訂閱者重新連線時重新發布。

2、JMS應用程式介面

  • ConnectionFactory 介面(連線工廠)

使用者用來建立到JMS提供者的連線的被管物件。JMS客戶通過可移植的介面訪問連線,這樣當下層的實現改變時,程式碼不需要進行修改。 管理員在JNDI名字空間中配置連線工廠,這樣,JMS客戶才能夠查詢到它們。根據訊息型別的不同,使用者將使用佇列連線工廠,或者主題連線工廠。

  • Connection 介面(連線)

連線代表了應用程式和訊息伺服器之間的通訊鏈路。在獲得了連線工廠後,就可以建立一個與JMS提供者的連線。根據不同的連線型別,連線允許使用者建立會話,以傳送和接收佇列和主題到目標。

  • Destination 介面
    (目標)

目標是一個包裝了訊息目標識別符號的被管物件,訊息目標是指訊息釋出和接收的地點,或者是佇列,或者是主題。JMS管理員建立這些物件,然後使用者通過JNDI發現它們。和連線工廠一樣,管理員可以建立兩種型別的目標,點對點模型的佇列,以及釋出者/訂閱者模型的主題。

  • MessageConsumer 介面(訊息消費者)

由會話建立的物件,用於接收發送到目標的訊息。消費者可以同步地(阻塞模式),或非同步(非阻塞)接收佇列和主題型別的訊息。

  • MessageProducer 介面(訊息生產者)

由會話建立的物件,用於傳送訊息到目標。使用者可以建立某個目標的傳送者,也可以建立一個通用的傳送者,在傳送訊息時指定目標。

  • Message 介面(訊息)

是在消費者和生產者之間傳送的物件,也就是說從一個應用程式創送到另一個應用程式。一個訊息有三個主要部分:

訊息頭(必須):包含用於識別和為訊息尋找路由的操作設定。

一組訊息屬性(可選):包含額外的屬性,支援其他提供者和使用者的相容。可以建立定製的欄位和過濾器(訊息選擇器)。

一個訊息體(可選):允許使用者建立五種型別的訊息(文字訊息,對映訊息,位元組訊息,流訊息和物件訊息)。

訊息介面非常靈活,並提供了許多方式來定製訊息的內容。

  • Session 介面(會話)

表示一個單執行緒的上下文,用於傳送和接收訊息。由於會話是單執行緒的,所以訊息是連續的,就是說訊息是按照發送的順序一個一個接收的。會話的好處是它支援事務。如果使用者選擇了事務支援,會話上下文將儲存一組訊息,直到事務被提交才傳送這些訊息。在提交事務之前,使用者可以使用回滾操作取消這些訊息。一個會話允許使用者建立訊息生產者來發送訊息,建立訊息消費者來接收訊息。

3、訊息佇列

把ActiveMQ依賴的jar包新增到工程中:activemq-all-5.12.0.jar

使用maven工程,則新增jar包的依賴:

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.11.2</version>
</dependency>

3.1、Producer:

public class QueueSender {

	public static void main(String[] args) {
		//建立一個連線工廠
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//從工廠物件中獲得連線
			Connection connection = connectionFactory.createConnection();
			//開啟連線
			connection.start();
			/*
			connection.createSession(paramA, paramB)
			A)paramA設定為true時:
			paramB的值忽略, acknowledgment mode被jms伺服器設定 SESSION_TRANSACTED 。
			當一個事務被提交的時候,訊息確認就會自動發生。
			B) paramA設定為false時:
			Session.AUTO_ACKNOWLEDGE為自動確認,當客戶成功的從receive方法返回的時候,或者從
			MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的訊息。
			Session.CLIENT_ACKNOWLEDGE 為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的
			acknowledge方法。jms伺服器才會刪除訊息。(預設是批量確認)
			*/
			//開啟一個回話,第一個引數指定不使用事務,第二個引數指定客戶端接收訊息的確認方式
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//建立一目的地Queue或者是Topic
			Queue queue = session.createQueue("mytestqueue");
			//建立一個生產者
			MessageProducer producer = session.createProducer(queue);
			//建立message
			TextMessage message = new ActiveMQTextMessage();
			message.setText("hello");
			//傳送訊息
			producer.send(message);
			//關閉
			producer.close();
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}

3.2、Consumer:

消費者有兩種消費方法:

1、同步消費。通過呼叫消費者的receive方法從目的地中顯式提取訊息。receive方法可以一直阻塞到訊息到達。

2、非同步消費。客戶可以為消費者註冊一個訊息監聽器,以定義在訊息到達時所採取的動作。實現MessageListener介面,在MessageListener()方法中實現訊息的處理邏輯。

3.2.1、同步消費:

public class QueueConsumer {

	public static void main(String[] args) {
		//建立一連線工廠
		ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//建立一個連線
			Connection connection = connectionFactory.createConnection();
			//開啟連線
			connection.start();
			//建立一個回話
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//建立一個目的地Destination 
			Queue queue = session.createQueue("mytestqueue");
			//建立一個消費者
			MessageConsumer consumer = session.createConsumer(queue);
			while(true) {
				//設定接收者接收訊息的時間,為了便於測試,這裡定為100s
				Message message = consumer.receive(100000);
				if (message != null) {
					System.out.println(message);
				} else {
					//超時結束
					break;
				}
				
			}
			consumer.close();
			session.close();
			connection.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
}

3.2.2、非同步消費:

public class QueueConsumer {

	public static void main(String[] args) {
		//建立一連線工廠
		ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//建立一個連線
			Connection connection = connectionFactory.createConnection();
			//開啟連線
			connection.start();
			//建立一個回話
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//建立一個目的地Destination 
			Queue queue = session.createQueue("mytestqueue");
			//建立一個消費者
			MessageConsumer consumer = session.createConsumer(queue);
			consumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message message) {
					if (message instanceof TextMessage) {
						String text = "";
						try {
							text = ((TextMessage)message).getText();
						} catch (JMSException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
						System.out.println(text);
					}
					
				}
			});
			System.in.read();
			//關閉
			consumer.close();
			session.close();
			connection.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
}

4、釋出者/訂閱者

4.1、Producer:

public class TopicProducer {

	public static void main(String[] args) {
		//建立連線工廠
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//建立連線
			Connection connection = connectionFactory.createConnection();
			//開啟連線
			connection.start();
			//建立一個回話
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//建立一個Destination,queue或者Topic
			Topic topic = session.createTopic("mytopic");
			//建立一個生成者
			MessageProducer producer = session.createProducer(topic);
			//建立一個訊息
			TextMessage textMessage = new ActiveMQTextMessage();
			textMessage.setText("hello my topic");
			//傳送訊息
			producer.send(textMessage);
			//關閉
			producer.close();
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

4.2、Consumer:

public class TopicConsumer {

	public static void main(String[] args) {
		//建立連線工廠
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
		try {
			//建立連線
			Connection connection = connectionFactory.createConnection();
			connection.start();
			//建立一個會話
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//建立一個目標
			Destination destination = session.createTopic("mytopic");
			//建立一個消費者
			MessageConsumer consumer = session.createConsumer(destination);
			//接收訊息
			consumer.setMessageListener(new MessageListener() {
				
				@Override
				public void onMessage(Message message) {
					
					System.out.println(message);
					
				}
			});
			//暫停
			System.in.read();
			//關閉
			consumer.close();
			session.close();
			connection.close();
			
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}