1. 程式人生 > >Spring Boot使用Redis進行訊息的釋出訂閱

Spring Boot使用Redis進行訊息的釋出訂閱

今天來學習如何利用Spring Data對Redis的支援來實現訊息的釋出訂閱機制。釋出訂閱是一種典型的非同步通訊模型,可以讓訊息的釋出者和訂閱者充分解耦。在我們的例子中,我們將使用StringRedisTemplate來發佈一個字串訊息,同時基於MessageListenerAdapter使用一個POJO來訂閱和響應該訊息。

提示

事實上,RedisRedis 不僅提供一個NoSQL資料庫,同時提供了一套訊息系統。

環境準備

開發環境:

  • IDE+Java環境(JDK 1.7或以上版本)
  • Maven 3.0+(Eclipse和Idea IntelliJ內建,如果使用IDE並且不使用命令列工具可以不安裝)

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.tianmaing</groupId>
  <artifactId>redis-message</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>redis-message</name>
  <description>Demo of message processing by redis</description>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.2.5.RELEASE</version>
    <relativePath/>
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>  
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-redis</artifactId>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

</project>

通過配置spring-boot-starter-redis依賴,把Spring Boot對Redis的相關支援引入進來。

建立Redis訊息的接收者

在任何一個基於訊息的應用中,都有訊息釋出者和訊息接收者(或者稱為訊息訂閱者)。建立訊息的接收者,我們只需一個普通POJO,在POJO中定義一個接收訊息的方法即可:

package com.tianmaying.springboot.redisdemo;

import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

    private CountDownLatch latch;

    @Autowired
    public Receiver(CountDownLatch latch) {
        this.latch = latch;
    }

    public void receiveMessage(String message) {
        LOGGER.info("Received <" + message + ">");
        latch.countDown();
    }
}

這個Receiver類將會被註冊為一個訊息監聽者時。處理訊息的方法我們可以任意命名,我們有相當大的靈活性。

我們給Receiver的建構函式通過@AutoWired標註注入了一個CountDownLatch例項,當接收到訊息時,呼叫cutDown()方法。

註冊監聽者和傳送訊息

Spring Data Redis提供基於Redis傳送和接收訊息的所有需要的元件,我們只需要配置好三個東西:

  • 一個連線工廠(connection factory)
  • 一個訊息監聽者容器(message listener container)
  • 一個Redis的模板(redis template)

我們將通過Redis模板來發送訊息,同時將Receiver註冊給訊息監聽者容器。連線工廠將兩者連線起來,使得它們可以通過Redis伺服器通訊。如何連線呢? 我們將連線工廠例項分別注入到監聽者容器和Redis模板中即可。

package com.tianmaying.springboot.redisdemo;

import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@SpringBootApplication
public class App {

    private static final Logger LOGGER = LoggerFactory.getLogger(App.class);

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("chat"));

        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    Receiver receiver(CountDownLatch latch) {
        return new Receiver(latch);
    }

    @Bean
    CountDownLatch latch() {
        return new CountDownLatch(1);
    }

    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(App.class, args);

        StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
        CountDownLatch latch = ctx.getBean(CountDownLatch.class);

        LOGGER.info("Sending message...");
        template.convertAndSend("chat", "Hello from Redis!");

        latch.await();

        System.exit(0);
    }
}

連線工程我們使用Spring Boot預設的RedisConnectionFactory,是Jedis Redis庫提供的JedisConnectionFactory實現。

我們將在listenerAdapter方法中定義的Bean註冊為一個訊息監聽者,它將監聽chat主題的訊息。

因為Receiver類是一個POJO,要將它包裝在一個訊息監聽者介面卡(實現了MessageListener介面),這樣才能被監聽者容器RedisMessageListenerContainer的addMessageListener方法新增到連線工廠中。有了這個介面卡,當一個訊息到達時,就會呼叫receiveMesage()`方法進行響應。

就這麼簡單,配置好連線工廠和訊息監聽者容器,你就可以監聽訊息啦!

傳送訊息就更簡單了,我們使用StringRedisTemplate來發送鍵和值均為字串的訊息。在main()方法中我們建立一個Spring應用的Context,初始化訊息監聽者容器,開始監聽訊息。然後獲取StringRedisTemplate的例項,往chat主題傳送一個訊息。我們看到,訊息可以被成功的接收到並打印出來,搞定!