spring boot redis做為訊息佇列
SpringBoot基於Redis快速實現訊息佇列
1.常用訊息佇列工具
目前常用的訊息佇列大概有三種類型,RabbitMQ等AMQP系列,kafka,Redis等key value系列,他們的使用場景分別是:
1,rabbitMq:相對重量級的併發的情況,比如資料的非同步處理 任務的序列執行等。
2,kafka:基於Pull的模式來處理,集體很高的吞吐量,一般用來進行 日誌的儲存和收集。
3,redis:輕量級高併發,實時性要求高的情況,比如快取,秒殺,及時的資料分析(ELK日誌分析框架,使用的就是redis)
2.springBoot基於redis整合訊息佇列
實現:
1,maven依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
2,redis的配置
spring: redis: database: 1 #索引(預設為0) host: localhost #地址 port: 6379 #埠號 #password: #連線密碼(預設空) pool: max-idle: 8 #連線池中的最大空閒連線 min-idle: 0 #連線池中的最小空閒連線 max-active: 8 #連線池最大連線數(使用負值表示沒有限制) max-wait: -1 #連線池最大阻塞等待時間(使用負值表示沒有限制) #sentinel: #master: mymaster # 哨兵監聽的Redis server的名稱 #nodes: 127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579 #哨兵的配置列表 timeout: 0 #連線超時時間(毫秒)
3,註冊監聽者和傳送訊息
1,一個連線工廠(connection factory)
2,一個訊息監聽者容器(message listener container)
3,一個redis的模版(redis template)
我們通過redis模版來發送訊息,同時將receiver註冊給訊息監聽者容器。連線工廠將兩者連線起來,使得他們可以通過redis伺服器通訊,如何連線呢?我們將連線工廠例項分別注入監聽者容器和redis模版中即可。
首先是連線工廠,使用springBoot預設的RedisConnectionFactory,是jedisRedis庫提供的JedisConnectionFactory實現。
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
return container;
也可以自己建立連線工廠:
首先建立一個jedis連線池:
@Bean
JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig JedisPoolConfig = new JedisPoolConfig();
JedisPoolConfig.setMaxTotal(maxTotal);
JedisPoolConfig.setMinIdle(minIdel);
JedisPoolConfig.setMaxWaitMillis(maxWaitMillis);
JedisPoolConfig.setTestOnBorrow(testOnBorrow);
JedisPoolConfig.setTestOnReturn(testOnReturn);
JedisPoolConfig.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
JedisPoolConfig.setTestWhileIdle(testWhileIdle);
JedisPoolConfig.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
JedisPoolConfig.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
return JedisPoolConfig;
}
然後使用連線池來建立一個連線工廠:
@Bean
JedisConnectionFactory jedisConnectionFactory() {
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
jedisConnectionFactory.setPoolConfig(jedisPoolConfig());
jedisConnectionFactory.setHostName(hostName);
jedisConnectionFactory.setPort(port);
jedisConnectionFactory.setPassword(password);
jedisConnectionFactory.setUsePool(true);
jedisConnectionFactory.setDatabase(dbIndex);
return jedisConnectionFactory;
}
《對應的屬性值都是從property配置檔案中讀取出來的》
然後是redisTemplate
@Bean
@Scope("prototype")
RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(jedisConnectionFactory());
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
return redisTemplate;
}
最後是redis的監聽者
採用的方式為實現 MessageListener
public class RedisMessageListener implements MessageListener {
private GeneratorProcessor generatorProcessor;
public RedisMessageListener(GeneratorProcessor generatorProcessor) {
super();
this.generatorProcessor = generatorProcessor;
}
@Override
public void onMessage(final Message message, final byte[] pattern) {
String msg = message.toString();
if (msg.startsWith(RedisConstans.REDIS_TASK_ADD_SCORE)) {
//積分處理
String messageJson = msg.substring(RedisConstans.REDIS_TASK_ADD_SCORE.length(), msg.length());
generatorProcessor.calculate(messageJson);
}
}
}
訊息的釋出者 實現類
public class RedisPublisherImpl implements IRedisPublisher {
private RedisTemplate<String, Object> redisTemplate;
private String channel;
public RedisPublisherImpl(RedisTemplate<String, Object> redisTemplate, String channel) {
super();
this.redisTemplate = redisTemplate;
this.channel = channel;
}
@Override
public void sendMessage(String msg) {
redisTemplate.convertAndSend(channel, msg);
}
}
介面
public interface IRedisPublisher {
public void sendMessage(String msg);
}