1. 程式人生 > >《商城專案06》--用ActiveMQ實現訊息的傳送和接收

《商城專案06》--用ActiveMQ實現訊息的傳送和接收

一, 使用場景

對商品資訊進行操作的同時, 將資料同步到solr庫, 實現該需求有以下幾種方式:

方式1: 在e3-manager-service新增商品資訊的實現類中直接寫將資料新增到solr庫; <弊端: 負責商品資訊操作的開發人員不一定對solr熟悉, 所以得分離出來寫, 這裡可以直接呼叫寫好的solr介面>

方式2: 單獨寫一solr服務 e3-xxx-service, 實現將商品資訊資料同步到solr庫; <弊端: 服務之間的呼叫耦合性太強, 如果兩服務存在互相呼叫關係, 則應用啟動時, 必定因找不到呼叫的服務而報錯! > 

方式3: 用ActiveMQ訊息佇列中介軟體, 解除服務之間呼叫的耦合性;

 

二,ActiveMQ的下載安裝(windows環境下)

1, 資源下載

連結:https://pan.baidu.com/s/1ZtHxbtfNOnngpYd4Dhpgww 
提取碼:z0ta 
 

2, 解壓使用

2.1 解壓, 自定義 存放路徑

2.2  啟動MQ

直接雙擊啟動會有點問題, 不清楚什麼原因出現閃退

所以這裡採用cmd命令方式啟動:

cmd --> cd xxx\apache-activemq-5.12.0\bin -->activemq.bat start  -->回車

2.3 測試是否啟動成功

(URL:  http://localhost:8161/admin/)

登入: admin/admin

能正確顯示出以下頁面即為ok

 

三, ActiveMQ的測試使用

1, 導包 (activemq-all-5.11.2.jar)

2, 具體測試方法

2.1  Queue模式(一對一)

    //新建服務提供者Queue, 進行訊息傳送	
    @Test
	public void testQueueProducer() throws Exception {
		// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
		//brokerURL伺服器的ip及埠號
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		// 第二步:使用ConnectionFactory物件建立一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線,呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		//第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
		//第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答 2、手動應答。一般是自動應答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
		//引數:佇列的名稱。
		//Queue queue = session.createQueue("test-queue");
		Queue queue = session.createQueue("spring-queue");
		// 第六步:使用Session物件建立一個Producer物件。
		MessageProducer producer = session.createProducer(queue);
		// 第七步:建立一個Message物件,建立一個TextMessage物件。
		/*TextMessage message = new ActiveMQTextMessage();
		message.setText("hello activeMq,this is my first test.");*/
		TextMessage textMessage = session.createTextMessage("hello activeMq, this is my first test.");
		// 第八步:使用Producer物件傳送訊息。
		producer.send(textMessage);
		// 第九步:關閉資源。
		producer.close();
		session.close();
		connection.close();
	}

 

    //新建服務消費者Consunmer, 對訊息進行接收
    @Test
	public void testQueueConsumer() throws Exception {
		// 第一步:建立一個ConnectionFactory物件。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線。呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
		//Queue queue = session.createQueue("test-queue");
		Queue queue = session.createQueue("spring-queue");
		// 第六步:使用Session物件建立一個Consumer物件。
		MessageConsumer consumer = session.createConsumer(queue);
		// 第七步:接收訊息。
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				try {
					TextMessage textMessage = (TextMessage) message;
					String text = null;
					//取訊息的內容
					text = textMessage.getText();
					// 第八步:列印訊息。
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		//等待鍵盤輸入
		System.in.read();
		// 第九步:關閉資源
		consumer.close();
		session.close();
		connection.close();
	}

 

2.2 Topic廣播模式(一對多 )

    //新建服務提供者TopicProducer, 進行訊息廣播式傳送
	@Test
	public void testTopicProducer() throws Exception {
		// 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
		// brokerURL伺服器的ip及埠號
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		// 第二步:使用ConnectionFactory物件建立一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線,呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		// 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
		// 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。
		// 引數:話題的名稱。
		Topic topic = session.createTopic("test-topic");
		// 第六步:使用Session物件建立一個Producer物件。
		MessageProducer producer = session.createProducer(topic);
		// 第七步:建立一個Message物件,建立一個TextMessage物件。
		/*
		 * TextMessage message = new ActiveMQTextMessage(); message.setText(
		 * "hello activeMq,this is my first test.");
		 */
		TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
		// 第八步:使用Producer物件傳送訊息。
		producer.send(textMessage);
		// 第九步:關閉資源。
		producer.close();
		session.close();
		connection.close();
	}
    //新建服務消費者Consunmer,對訊息進行接收,該方法可以並行啟動多次,相當於同時有多個訊息接收者
    @Test
	public void testTopicConsumer() throws Exception {
		// 第一步:建立一個ConnectionFactory物件。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		// 第二步:從ConnectionFactory物件中獲得一個Connection物件。
		Connection connection = connectionFactory.createConnection();
		// 第三步:開啟連線。呼叫Connection物件的start方法。
		connection.start();
		// 第四步:使用Connection物件建立一個Session物件。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。
		Topic topic = session.createTopic("test-topic");
		// 第六步:使用Session物件建立一個Consumer物件。
		MessageConsumer consumer = session.createConsumer(topic);
		// 第七步:接收訊息。
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				try {
					TextMessage textMessage = (TextMessage) message;
					String text = null;
					// 取訊息的內容
					text = textMessage.getText();
					// 第八步:列印訊息。
					System.out.println(text);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		System.out.println("topic的消費端01。。。。。");
//		System.out.println("topic的消費端02。。。。。");
		// 等待鍵盤輸入
		System.in.read();
		// 第九步:關閉資源
		consumer.close();
		session.close();
		connection.close();
	}

備註: 

Topic廣播模式需要在訊息接收方法啟動的狀態下, 再進行訊息傳送才能被接收到; 如果沒有訊息接收者的情況下, 傳送的訊息會隨即被銷燬;