springboot整合rabbitMQ之物件傳輸
rabbitMQ的安裝方法網上有很多教程,這裡就不重複了。
在springboot上使用rabbitMQ傳輸字串和物件,本文所給出的例子是在兩個不同的專案之間進行物件和和字串的傳輸。
rabbitMQ的依賴(在兩個專案中一樣的配置):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency >
pom配置檔案(在兩個專案中一樣的配置):
spring.application.name: demo1 //專案名
spring.rabbitmq.host: 192.168.1.111 //寫自己的ip
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
spring.rabbitmq.virtual-host: /
spring.rabbitmq.publisher-confirms: true
spring.rabbitmq.publisher-returns : true
spring.rabbitmq.template.mandatory: true
字元轉的相互傳輸(本例使用的topic型別)
1>. 首先,在生產者(專案A)中寫配置檔案,其中生成佇列queue,交換機exchange並且進行繫結binding
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author:fdh
* @Description:
* @Date: Create in 16:13 2017/12/22
*/
@Configuration
public class senderConfigration {
/**
*@Description: 新建佇列 topic.messages
*@Data:16:14 2017/12/22
*/
@Bean(name = "messages")
public Queue queueMessages(){
return new Queue("topic.messages");
}
/**
*@Description: 定義交換器
*@Data:16:15 2017/12/22
*/
@Bean
public TopicExchange exchange(){
return new TopicExchange("exchange");
}
/**
*@Description: 交換機與訊息佇列進行繫結 佇列messages繫結交換機with topic.messages
*@Data:16:18 2017/12/22
*/
@Bean
Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages,TopicExchange exchange){
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.messages");
}
}
2>. 第二步(專案A),生產者把訊息傳送到訊息佇列,
/**
* @Author:fdh
* @Description:
* @Date: Create in 14:15 2017/12/22
*/
@Controller
public class RabbitController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/sendss")
public void send1(){
amqpTemplate.convertAndSend("exchange","topic.messages","hello topic.messages RabbitMQ");
}
}
3>. 接下來,在消費者(專案B)端寫一個監聽器,交換器會根據繫結的routing key(topic.messages)把生產者生產的訊息放到匹配的訊息佇列中,監聽器會監聽相應的訊息佇列來獲取路由到該訊息佇列上的訊息。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
/**
* @ Author:fdh
* @ Description: 訊息佇列監聽器
* @ Date: Create in 14:19 2017/12/22
*/
@Component
public class Receiver {
@RabbitListener(queues = "topic.messages")
public void process2(String str1) throws ClassNotFoundException{
System.out.println("messages :"+str1);
System.out.println(Thread.currentThread().getName()+"接收到來自topic.message佇列的訊息: "+str1);
}
這樣,一個簡單的字串的傳輸便寫好了,下面開啟剛才定義的mapping: 192.168.1.111:8080/sendss
在消費者端的console視窗便會看到列印的訊息
以上就是一個簡單的傳輸字串的例子了。
2. 下面重點介紹一下消費者和生產者之間物件的傳輸。
物件的傳輸,要現在生產者(A)中進行序列化,即把物件轉化為位元組陣列進行傳輸,在消費者中,再把轉化的位元組陣列反序列化為物件。序列化和反序列化的方法很多,這裡採用的是java的Serializable 介面
1>. 在生產者(專案A)和消費者(專案B)的專案中建立實體類。
!注意!:*新建實體類Boy.java 該實體類在專案A、B中的位置,必須一致,即包名必須一致*,在本專案中,Boy.java 在專案A、B中都是: import com.fengdonghao.shiro.bean.Boy;
實體類也要一致。
package com.fengdonghao.shiro.bean;
import javax.persistence.*;
import java.io.Serializable;
/**
* @Author:fdh
* @Description:
* @Date:Create in11:14 2017/12/16
*/
@Entity
public class Boy implements Serializable{
private static final long serialVersionUID=1L;
@Id
@GeneratedValue
private int id;
private String name;
private int age;
@Override
public String toString() {
return "Boy{" +
"age=" + age +
", id=" + id +
", name='" + name + '\'' +
'}';
}
//此處省略getter 和setter 方法
}
2>. 在生產者(A)中配置 訊息佇列,交換器,並進行繫結binding,和在 例子1中的第一步是一樣的
3>. 在生產者(A)中的RabbitController.java 中另寫一個mapping,如下
@Controller
public class RabbitController {
@Autowired
private AmqpTemplate amqpTemplate;
@ResponseBody
@RequestMapping("/send")
public void send1() throws Exception{
Boy boy=new Boy(15,"tom");
//物件轉化為位元組碼 把物件轉化為位元組碼後,把位元組碼傳輸過去再轉化為物件
byte[] bytes=getBytesFromObject(boy);
System.out.println(bytes);
amqpTemplate.convertAndSend("exchange","topic.messages",bytes);
}
//物件轉化為位元組碼
public byte[] getBytesFromObject(Serializable obj) throws Exception {
if (obj == null) {
return null;
}
ByteArrayOutputStream bo = new ByteArrayOutputStream();
ObjectOutputStream oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
return bo.toByteArray();
}
}
4>. 在消費者(B)中對位元組陣列進行反序列化。
在Receiver中,重新編寫例1重點的監聽器
@Component
public class Receiver {
@RabbitListener(queues = "topic.messages")
public void process2(byte[] bytes) throws Exception{
System.out.println(bytes);
//位元組碼轉化為物件
Boy boy1=(Boy) getObjectFromBytes(bytes);
System.out.println(boy1);
System.out.println("messages :"+boy1.toString());
System.out.println(Thread.currentThread().getName()+"接收到來自topic.message佇列的訊息: "+boy1);
}
//位元組碼轉化為物件
public Object getObjectFromBytes(byte[] objBytes) throws Exception {
if (objBytes == null || objBytes.length == 0) {
return null;
}
ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);
ObjectInputStream oi = new ObjectInputStream(bi);
return oi.readObject();
}
}
驗證mapping: ip:8080/send
結果如下:
有什麼疑問或者問題,在下面評論,一起進步!