1. 程式人生 > >spring boot redis做為訊息佇列

spring boot redis做為訊息佇列

SpringBoot基於Redis快速實現訊息佇列

    1.常用訊息佇列工具

           目前常用的訊息佇列大概有三種類型,RabbitMQ等AMQP系列,kafka,Redis等key value系列,他們的使用場景分別是:

           1,rabbitMq:相對重量級的併發的情況,比如資料的非同步處理 任務的序列執行等。

           2,kafka:基於Pull的模式來處理,集體很高的吞吐量,一般用來進行 日誌的儲存和收集。

           3,redis:輕量級高併發,實時性要求高的情況,比如快取,秒殺,及時的資料分析(ELK日誌分析框架,使用的就是redis)

    2.springBoot基於redis整合訊息佇列

          實現:

          1,maven依賴:

<dependency>  
    <groupId>org.springframework.boot</groupId>  
    <artifactId>spring-boot-starter-data-redis</artifactId>  
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency> 

          2,redis的配置

spring:
  redis:
    database: 1 #索引(預設為0)
    host: localhost #地址
    port: 6379 #埠號
    #password:  #連線密碼(預設空)
    pool:
      max-idle: 8 #連線池中的最大空閒連線
      min-idle: 0 #連線池中的最小空閒連線
      max-active: 8 #連線池最大連線數(使用負值表示沒有限制)
      max-wait: -1 #連線池最大阻塞等待時間(使用負值表示沒有限制)
   #sentinel:
      #master: mymaster # 哨兵監聽的Redis server的名稱
      #nodes:
      127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579 #哨兵的配置列表
    timeout: 0 #連線超時時間(毫秒)

            3,註冊監聽者和傳送訊息

                1,一個連線工廠(connection factory)

                2,一個訊息監聽者容器(message listener container)

                3,一個redis的模版(redis template)

          我們通過redis模版來發送訊息,同時將receiver註冊給訊息監聽者容器。連線工廠將兩者連線起來,使得他們可以通過redis伺服器通訊,如何連線呢?我們將連線工廠例項分別注入監聽者容器和redis模版中即可。

        首先是連線工廠,使用springBoot預設的RedisConnectionFactory,是jedisRedis庫提供的JedisConnectionFactory實現。

@Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listenerAdapter) {
 
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
 
        return container;

        也可以自己建立連線工廠:

         首先建立一個jedis連線池:         

@Bean
    JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig JedisPoolConfig = new JedisPoolConfig();
        JedisPoolConfig.setMaxTotal(maxTotal);
        JedisPoolConfig.setMinIdle(minIdel);
        JedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
        JedisPoolConfig.setTestOnBorrow(testOnBorrow);
        JedisPoolConfig.setTestOnReturn(testOnReturn);
        JedisPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
        JedisPoolConfig.setTestWhileIdle(testWhileIdle);
        JedisPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
        JedisPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
        return JedisPoolConfig;
    }

          然後使用連線池來建立一個連線工廠:          

 @Bean
    JedisConnectionFactory jedisConnectionFactory() {
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
        jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
        jedisConnectionFactory.setHostName(hostName);
        jedisConnectionFactory.setPort(port);
        jedisConnectionFactory.setPassword(password);
        jedisConnectionFactory.setUsePool(true);
        jedisConnectionFactory.setDatabase(dbIndex);
        return jedisConnectionFactory;
    }

          《對應的屬性值都是從property配置檔案中讀取出來的》

          然後是redisTemplate

    @Bean
    @Scope("prototype")
    RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(jedisConnectionFactory());
        redisTemplate.setDefaultSerializer(new StringRedisSerializer());
        return redisTemplate;
    }

        最後是redis的監聽者

          採用的方式為實現 MessageListener

public class RedisMessageListener implements MessageListener {


    private GeneratorProcessor generatorProcessor;


    public RedisMessageListener(GeneratorProcessor generatorProcessor) {
        super();
        this.generatorProcessor = generatorProcessor;
    }


    @Override
    public void onMessage(final Message message, final byte[] pattern) {
        String msg = message.toString();
        if (msg.startsWith(RedisConstans.REDIS_TASK_ADD_SCORE)) {
            //積分處理
            String messageJson = msg.substring(RedisConstans.REDIS_TASK_ADD_SCORE.length(), msg.length());
            generatorProcessor.calculate(messageJson);
        }
    }
}

            訊息的釋出者 實現類            

public class RedisPublisherImpl implements IRedisPublisher {

    private RedisTemplate<String, Object> redisTemplate;
    private String channel;

    public RedisPublisherImpl(RedisTemplate<String, Object> redisTemplate, String channel) {
        super();
        this.redisTemplate = redisTemplate;
        this.channel = channel;
    }

    @Override
    public void sendMessage(String msg) {
        redisTemplate.convertAndSend(channel, msg);
    }
}

           介面

public interface IRedisPublisher {
    public void sendMessage(String msg);
}