1. 程式人生 > >(三) RabbitMQ實戰教程(面向Java開發人員)之Spring整合RabbitMQ

(三) RabbitMQ實戰教程(面向Java開發人員)之Spring整合RabbitMQ

Spring整合RabbitMQ

Spring AMQP是對AMQP協議的抽象和封裝,從官方網站上得知它是由兩個專案組成的(spring-amqp和spring-rabbit)。在使用Spring整合RabbitMQ時我們主要關注三個核心介面(MessageListenerContainer、RabbitAdmin以及RabbitTemplate)

RabbitAdmin: 用於宣告交換機 佇列 繫結等
RabbitTemplate: 用於RabbitMQ訊息的傳送和接收
MessageListenerContainer: 監聽容器 為訊息入隊提供非同步處理

使用Spring整合RabbitMQ需要匯入如下依賴

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.0.2.RELEASE</version>
</dependency>

Spring AMQP手動宣告

1.建立生產者配置類 將RabbitAdmin、RabbitTemplate納入Spring管理

@Configuration
public class
SpringAMQPProducerConfig {
@Bean public ConnectionFactory connectionFactory() { com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory(); // 配置連線資訊 connectionFactory.setHost("192.168.56.128"); connectionFactory.setPort(5672
); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("roberto"); connectionFactory.setPassword("roberto"); // 網路異常自動連線恢復 connectionFactory.setAutomaticRecoveryEnabled(true); // 每10秒嘗試重試連線一次 connectionFactory.setNetworkRecoveryInterval(10000); // 設定ConnectionFactory屬性資訊 Map<String, Object> connectionFactoryPropertiesMap = new HashMap(); connectionFactoryPropertiesMap.put("principal", "RobertoHuang"); connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0"); connectionFactoryPropertiesMap.put("emailAddress", "[email protected]"); connectionFactory.setClientProperties(connectionFactoryPropertiesMap); CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory); return cachingConnectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }

2.建立生產者啟動類

@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.manual.declare.producer")
public class ProducerApplication {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProducerApplication.class);

        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        // 宣告交換機
        rabbitAdmin.declareExchange(new DirectExchange("roberto.order", true, false, new HashMap<>()));

        // 宣告訊息 (訊息體, 訊息屬性)
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("UTF-8");
        Message message = new Message("訂單資訊".getBytes(), messageProperties);

        // 釋出訊息 (交換機名, Routing key, 訊息);
        // 釋出訊息還可以使用rabbitTemplate.convertAndSend(); 其支援訊息後置處理
        rabbitTemplate.send("roberto.order", "add", message);
    }
}

3.建立消費者配置類 將RabbitAdmin納入Spring管理,並在MessageListenerContainer類中定義了訊息消費的邏輯

@Configuration
public class SpringAMQPConsumerConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();

        // 配置連線資訊
        connectionFactory.setHost("192.168.56.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("roberto");
        connectionFactory.setPassword("roberto");

        // 網路異常自動連線恢復
        connectionFactory.setAutomaticRecoveryEnabled(true);
        // 每10秒嘗試重試連線一次
        connectionFactory.setNetworkRecoveryInterval(10000);

        // 設定ConnectionFactory屬性資訊
        Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
        connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
        connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
        connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
        connectionFactory.setClientProperties(connectionFactoryPropertiesMap);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer();
        messageListenerContainer.setConnectionFactory(connectionFactory);
        messageListenerContainer.setQueueNames("roberto.order.add");

        // 設定消費者執行緒數
        messageListenerContainer.setConcurrentConsumers(5);
        // 設定最大消費者執行緒數
        messageListenerContainer.setMaxConcurrentConsumers(10);

        // 設定消費者屬性資訊
        Map<String, Object> argumentMap = new HashMap();
        messageListenerContainer.setConsumerArguments(argumentMap);

        // 設定消費者標籤
        messageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String s) {
                return "RGP訂單系統ADD處理邏輯消費者";
            }
        });

        // 使用setAutoStartup方法可以手動設定訊息消費時機
        messageListenerContainer.setAutoStartup(false);

        // 使用setAfterReceivePostProcessors方法可以增加訊息後置處理器
        // messageListenerContainer.setAfterReceivePostProcessors();

        messageListenerContainer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println(new String(message.getBody(), "UTF-8"));
                    System.out.println(message.getMessageProperties());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        return messageListenerContainer;
    }
}

4.建立消費者啟動類

@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.manual.declare.consumer")
public class ConsumerApplication {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerApplication.class);

        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        MessageListenerContainer messageListenerContainer = context.getBean(MessageListenerContainer.class);

        // 宣告佇列 (佇列名", 是否持久化, 是否排他, 是否自動刪除, 佇列屬性);
        rabbitAdmin.declareQueue(new Queue("roberto.order.add", true, false, false, new HashMap<>()));

        // 宣告Direct Exchange (交換機名, 是否持久化, 是否自動刪除, 交換機屬性);
        rabbitAdmin.declareExchange(new DirectExchange("roberto.order", true, false, new HashMap<>()));

        // 將佇列Binding到交換機上 Routing key為add
        rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("roberto.order.add")).to(new DirectExchange("roberto.order")).with("add"));

        // 開始監聽佇列
        messageListenerContainer.start();
    }
}

5.依次啟動訊息消費者和生產者 控制檯輸出如下

訂單資訊
MessageProperties [headers={}, contentType=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=roberto.order, receivedRoutingKey=add, deliveryTag=1, consumerTag=RGP訂單系統ADD處理邏輯消費者, consumerQueue=roberto.order.add]

上訴程式碼使用Spring整合RabbitMQ,最終實現效果和上一篇部落格介紹RabbitMQ Java Client一致

Spring AMQP自動宣告

在上訴的Demo中我們是手動使用RabbitAdmin對交換機、佇列和繫結進行宣告的,Spring AMQP還提供了自動宣告方式交換機、佇列和繫結。我們可以直接把要自動宣告的元件納入Spring容器中管理即可,自動聲明發生在RabbitMQ第一次連線建立的時候,自動宣告支援單個和批量自動宣告。使用自動宣告需要符合如下條件:

1.需要有連線產生
2.RabbitAdmin必須交由Spring管理,且autoStartup必須為true(預設)
3.如果ConnectionFactory使用的是CachingConnectionFactory,則cacheMode必須要為CacheMode.CHANNEL
4.所有要宣告的元件的shouldDeclare必須為true
5.要宣告的Queue名稱不能以amq.開頭

上訴規則定義在RabbitAdmin的afterPropertiesSet方法中,有興趣的同學可以自行閱讀RabbitAdmin原始碼

1.建立生產者配置類 將RabbitAdmin、RabbitTemplate納入Spring管理

@Configuration
public class SpringAMQPProducerConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();

        // 配置連線資訊
        connectionFactory.setHost("192.168.56.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("roberto");
        connectionFactory.setPassword("roberto");

        // 網路異常自動連線恢復
        connectionFactory.setAutomaticRecoveryEnabled(true);
        // 每10秒嘗試重試連線一次
        connectionFactory.setNetworkRecoveryInterval(10000);

        // 設定ConnectionFactory屬性資訊
        Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
        connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
        connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
        connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
        connectionFactory.setClientProperties(connectionFactoryPropertiesMap);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

2.建立生產者啟動類

@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.auto.declare.producer")
public class ProducerApplication {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(roberto.growth.process.rabbitmq.spring.amqp.manual.declare.producer.ProducerApplication.class);

        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        // 宣告交換機
        rabbitAdmin.declareExchange(new DirectExchange("roberto.order", true, false, new HashMap<>()));

        // 宣告訊息 (訊息體, 訊息屬性)
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("UTF-8");
        Message message = new Message("訂單資訊".getBytes(), messageProperties);

        // 釋出訊息 (交換機名, Routing key, 訊息);
        // 釋出訊息還可以使用rabbitTemplate.convertAndSend(); 其支援訊息後置處理
        rabbitTemplate.send("roberto.order", "add", message);
    }
}

3.建立消費者配置類 將RabbitAdmin納入Spring管理,並在MessageListenerContainer類中定義了訊息消費的邏輯,並且在該配置類中宣告交換機,佇列,繫結

@Configuration
public class SpringAMQPConsumerConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();

        // 配置連線資訊
        connectionFactory.setHost("192.168.56.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("roberto");
        connectionFactory.setPassword("roberto");

        // 網路異常自動連線恢復
        connectionFactory.setAutomaticRecoveryEnabled(true);
        // 每10秒嘗試重試連線一次
        connectionFactory.setNetworkRecoveryInterval(10000);

        // 設定ConnectionFactory屬性資訊
        Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
        connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
        connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
        connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
        connectionFactory.setClientProperties(connectionFactoryPropertiesMap);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    // 自動宣告交換機
    // 如果要一次性宣告多個 使用public List<Exchange> listExchange()即可
    public Exchange exchange() {
        return new DirectExchange("roberto.order", true, false, new HashMap<>());
    }

    @Bean
    // 自動宣告佇列
    // 如果要一次性宣告多個 使用public List<Queue> listQueue()即可
    public Queue queue() {
        return new Queue("roberto.order.add", true, false, false, new HashMap<>());
    }

    @Bean
    // 自動宣告繫結
    // 如果要一次性宣告多個 使用public List<Binding> listBinding()即可
    public Binding binding() {
        return new Binding("roberto.order.add", Binding.DestinationType.QUEUE, "roberto.order", "add", new HashMap<>());
    }

    @Bean
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer();
        messageListenerContainer.setConnectionFactory(connectionFactory);
        messageListenerContainer.setQueueNames("roberto.order.add");

        // 設定消費者執行緒數
        messageListenerContainer.setConcurrentConsumers(5);
        // 設定最大消費者執行緒數
        messageListenerContainer.setMaxConcurrentConsumers(10);

        // 設定消費者屬性資訊
        Map<String, Object> argumentMap = new HashMap();
        messageListenerContainer.setConsumerArguments(argumentMap);

        // 設定消費者標籤
        messageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String s) {
                return "RGP訂單系統ADD處理邏輯消費者";
            }
        });

        // 使用setAutoStartup方法可以手動設定訊息消費時機
        messageListenerContainer.setAutoStartup(true);

        // 使用setAfterReceivePostProcessors方法可以增加訊息後置處理器
        // messageListenerContainer.setAfterReceivePostProcessors();

        messageListenerContainer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println(new String(message.getBody(), "UTF-8"));
                    System.out.println(message.getMessageProperties());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        return messageListenerContainer;
    }
}

4.建立消費者啟動類

@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.auto.declare.consumer")
public class ConsumerApplication {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(ConsumerApplication.class);
    }
}

5.依次啟動訊息消費者和生產者 控制檯輸出如下

訂單資訊
MessageProperties [headers={}, contentType=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=roberto.order, receivedRoutingKey=add, deliveryTag=1, consumerTag=RGP訂單系統ADD處理邏輯消費者, consumerQueue=roberto.order.add]

該程式碼與上訴程式碼實現的效果一致,只是將交換機,佇列,繫結進行了自動宣告

MessageListenerAdapte

以上兩個Demo在消費訊息處理邏輯時往MessageListenerContainer中傳遞了MessageListener,但是我們有時候已經寫好了消費邏輯對應的類,我們不希望它去擴充套件MessageListener/ChannelAwareMessageListener,因為這麼做的話意味著我們需要改變現有程式碼。Spring AMQP提供了訊息處理器介面卡的功能,它可以把一個純POJO類適配成一個可以處理訊息的處理器,預設處理訊息的方法為handleMessage,可以通過setDefaultListenerMethod方法進行修改

1.建立生產者配置類

@Configuration
public class SpringAMQPProducerConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();

        connectionFactory.setHost("192.168.56.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("roberto");
        connectionFactory.setPassword("roberto");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(10000);

        Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
        connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
        connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
        connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
        connectionFactory.setClientProperties(connectionFactoryPropertiesMap);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

2.建立生產者啟動類

@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.message.listener.adapte.producer")
public class ProducerApplication {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProducerApplication.class);

        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        rabbitAdmin.declareExchange(new DirectExchange("roberto.order", true, false, new HashMap<>()));

        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("UTF-8");
        Message message = new Message("訂單資訊".getBytes(), messageProperties);

        rabbitTemplate.send("roberto.order", "add", message);
    }
}

3.建立消費者訊息處理器類,它可是是純POJO類

public class MessageHandle {
    public void add(byte[] message){
        try {
            System.out.println(new String(message,"UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

4.建立消費者配置類 配置自定義訊息處理器(將roberto.order.add佇列使用自定義訊息處理類的add方法進行處理)

@Configuration
public class SpringAMQPConsumerConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();

        connectionFactory.setHost("192.168.56.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("roberto");
        connectionFactory.setPassword("roberto");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(10000);

        Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
        connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
        connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
        connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
        connectionFactory.setClientProperties(connectionFactoryPropertiesMap);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public Exchange exchange() {
        return new DirectExchange("roberto.order", true, false, new HashMap<>());
    }

    @Bean
    public Queue queue() {
        return new Queue("roberto.order.add", true, false, false, new HashMap<>());
    }

    @Bean
    public Binding binding() {
        return new Binding("roberto.order.add", Binding.DestinationType.QUEUE, "roberto.order", "add", new HashMap<>());
    }

    @Bean
    public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer();
        messageListenerContainer.setConnectionFactory(connectionFactory);
        messageListenerContainer.setQueueNames("roberto.order.add");

        messageListenerContainer.setConcurrentConsumers(5);
        messageListenerContainer.setMaxConcurrentConsumers(10);

        Map<String, Object> argumentMap = new HashMap();
        messageListenerContainer.setConsumerArguments(argumentMap);

        messageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String s) {
                return "RGP訂單系統ADD處理邏輯消費者";
            }
        });

        messageListenerContainer.setAutoStartup(true);

        // 新建訊息處理器介面卡
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageHandle());
        // 設定預設處理訊息方法
        messageListenerAdapter.setDefaultListenerMethod("handleMessage");
        Map<String, String> queueOrTagToMethodName = new HashMap<>();
        // 將roberto.order.add佇列的訊息 使用add方法進行處理
        queueOrTagToMethodName.put("roberto.order.add","add");
        messageListenerAdapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
        messageListenerContainer.setMessageListener(messageListenerAdapter);

        return messageListenerContainer;
    }
}

5.建立消費者啟動類

@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.message.listener.adapte.consumer")
public class ConsumerApplication {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(ConsumerApplication.class);
    }
}

6.依次啟動訊息消費者和生產者 控制檯輸出如下

訂單資訊

如上Demo說明我們可以將一個純POJO類定義為訊息處理器,並且不用去擴充套件MessageListener/ChannelAwareMessageListener介面,關於自定義處理器方法的引數預設情況下為byte[]型別,這是由Spring AMQP預設訊息轉換器(SimpleMessageConverter)決定的,接下來我們將介紹Spring AMQP的訊息轉換器功能

Spring AMQP訊息轉換器

在上訴例子中我們定義的add(byte[] message)方法的引數是一個位元組陣列,但是有時候我們往RabbitMQ中傳送的是一個JSON物件,我們希望在處理訊息的時候它已經自動幫我們轉為JAVA物件;又或者我們往RabbitMQ中傳送的是一張圖片或其他格式的檔案,我們希望在處理訊息的時候它已經自動幫我們轉成檔案格式,我們可以手動設定MessageConverter來實現如上需求,如果未設定MessageConverter則使用Spring AMQP預設提供的SimpleMessageConverter

以下例子使用MessageConverter實現了當生產者往RabbitMQ傳送不同型別的資料的時候,使用MessageHandle不同的方法進行處理,需要注意的是當生產者在傳送JSON資料的時候,需要制定這個JSON是哪個物件,用於Spring AMQP轉換,規則如下

當傳送普通物件的JSON資料時,需要在訊息的header中增加一個__TypeId__的屬性告知消費者是哪個物件

當傳送List集合物件的JSON資料時,需要在訊息的header中將__TypeId__指定為java.util.List,並且需要額外指定屬性__ContentTypeId__使用者告知消費者List集合中的物件型別

當傳送Map集合物件的JSON資料時,需要在訊息的header中將__TypeId__指定為java.util.Map,並且需要額外指定屬性__KeyTypeId__用於告知客戶端Map中key的型別,__ContentTypeId__用於告知客戶端Map中Value的型別

1.建立訂單實體類

public class Order {
    /**
     * 訂單編號
     **/
    private String orderId;

    /**
     * 訂單金額
     **/
    private BigDecimal orderAmount;

    public Order() {

    }

    public Order(String orderId, BigDecimal orderAmount) {
        this.orderId = orderId;
        this.orderAmount = orderAmount;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public BigDecimal getOrderAmount() {
        return orderAmount;
    }

    public void setOrderAmount(BigDecimal orderAmount) {
        this.orderAmount = orderAmount;
    }
}

2.自定義檔案訊息轉換器

public class FileMessageConverter implements MessageConverter {
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String extName = (String) message.getMessageProperties().getHeaders().get("_extName");
        byte[] bytes = message.getBody();
        String fileName = UUID.randomUUID().toString();
        String filePath = System.getProperty("java.io.tmpdir") + fileName + "." + extName;
        File tempFile = new File(filePath);
        try {
            FileCopyUtils.copy(bytes, tempFile);
        } catch (IOException e) {
            throw new MessageConversionException("FileMessageConverter訊息轉換失敗", e);
        }

        return tempFile;
    }
}

3.自定義字串訊息轉換器

public class StringMessageConverter implements MessageConverter{
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        try {
            return new String(message.getBody(), "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException("StringMessageConverter轉換失敗", e);
        }
    }
}

4.自定義訊息處理器類

public class MessageHandle {
    public void add(byte[] body) {
        System.out.println("----------byte[]方法進行處理----------");
        System.out.println("body");
    }

    public void add(String message) {
        System.out.println("----------String方法進行處理----------");
        System.out.println(message);
    }

    public void add(File file) {
        System.out.println("----------File方法進行處理----------");
        System.out.println(file.length());
        System.out.println(file.getName());
        System.out.println(file.getAbsolutePath());
    }

    public void add(Order order) {
        System.out.println("----------Order方法進行處理----------");
        System.out.println(order.getOrderId() + "---" + order.getOrderAmount());
    }

    public void add(List<Order> orderList) {
        System.out.println("----------List<Order>方法進行處理----------");
        System.out.println(orderList.size());
        for (Order order : orderList) {
            System.out.println(order.getOrderId() + "---" + order.getOrderAmount());
        }
    }

    public void add(Map<String, Order> orderMap) {
        System.out.println("----------Map<String, Order>方法進行處理----------");
        for (Map.Entry<String, Order> entry : orderMap.entrySet()) {
            System.out.println(entry.getKey());
            System.out.println(entry.getValue().getOrderId() + "---" + entry.getValue().getOrderAmount());
        }
    }
}

5.建立生產者配置類

@Configuration
public class SpringAMQPProducerConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();

        connectionFactory.setHost("192.168.56.128");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("roberto");
        connectionFactory.setPassword("roberto");

        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setNetworkRecoveryInterval(10000);

        Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
        connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
        connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
        connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
        connectionFactory.setClientProperties(connectionFactoryPropertiesMap);

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

6.建立生產者啟動類 傳送多條不同訊息型別訊息

@ComponentScan(basePackages = "roberto.growth.process.rabbitmq.spring.amqp.message.converter.producer")
public class ProducerApplication {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProducerApplication.class);

        RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        rabbitAdmin.declareExchange(new DirectExchange("roberto.order", true, false, new HashMap<>()));

        // 傳送字串
        sendString(rabbitTemplate);
        // 傳送當個物件JSON
        sendSingle(rabbitTemplate);
        // 傳送List集合JSON
        sendList(rabbitTemplate);
        // 傳送Map集合JSON
        sendMap(rabbitTemplate);
        // 傳送圖片
        sendImage(rabbitTemplate);
    }

    public static void sendString(RabbitTemplate rabbitTemplate) {
        // 宣告訊息 (訊息體, 訊息屬性)
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("text/plain");
        Message message = new Message("訂單訊息".getBytes(), messageProperties);
        rabbitTemplate.send("roberto.order", "add", message);
    }

    public static void sendSingle(RabbitTemplate rabbitTemplate) throws Exception {
        Order order = new Order("OD0000001", new BigDecimal(888888.888888));
        ObjectMapper objectMapper = new ObjectMapper();

        // 宣告訊息 (訊息體, 訊息屬性)
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("__TypeId__", "order");
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("application/json");
        Message message = new Message(objectMapper.writeValueAsString(order).getBytes(), messageProperties);

        rabbitTemplate.send("roberto.order", "add", message);
    }

    public static void sendList(RabbitTemplate rabbitTemplate) throws Exception {
        Order order = new Order("OD0000001", new BigDecimal(888888.888888));
        Order order2 = new Order("OD0000002", new BigDecimal(888888.888888));
        List<Order> orderList = Arrays.asList(order, order2);

        ObjectMapper objectMapper = new ObjectMapper();

        // 宣告訊息 (訊息體, 訊息屬性)
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("__TypeId__", "java.util.List");
        messageProperties.getHeaders().put("__ContentTypeId__", "order");
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        messageProperties.setContentType("application/json"