redis 訊息佇列實現
阿新 • • 發佈:2019-07-18
方式一:通過list的阻塞讀取命令,blpop或者brpop
消費者
public class Consumer extends DemoApplicationTests{ @Test public void consume(){ int timeout = 0;//永不超時 String key = "test_que"; //list集合 第一個元素為key值,第二個元素為彈出的元素值;當超時返回[null] while(true){ List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { //佇列沒有元素會阻塞操作,直到佇列獲取新的元素或超時 return connection.bLPop(timeout,key.getBytes()); } },new StringRedisSerializer()); for(Object o:obj){ System.out.println("---------------"+o); } } } }
生產者
public class Productor extends DemoApplicationTests { @Test public void generateMsg() { String key = "test_que"; redisTemplate.opsForList().leftPush(key,"hht2"); } }
方式二:Pub/Sub(釋出/訂閱)使用的 spring boot
依賴包
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> </dependencies>
配置類
@Configuration @AutoConfigureAfter(RedisAutoConfiguration.class) @EnableCaching public class RedisConfig extends CachingConfigurerSupport { /** * 配置自定義redisTemplate * @return */ @Bean RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值 Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); //使用StringRedisSerializer來序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(serializer); template.afterPropertiesSet(); return template; } /** * 序列化定製 * * @return */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>( Object.class); // 初始化objectmapper ObjectMapper mapper = new ObjectMapper(); mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); return jackson2JsonRedisSerializer; } /** * 訊息監聽器,使用MessageAdapter可實現自動化解碼及方法代理 * * @return */ @Bean public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer, MessageSubscriber subscriber) { MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage"); adapter.setSerializer(jackson2JsonRedisSerializer); adapter.afterPropertiesSet(); return adapter; } /** * 將訂閱器繫結到容器 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listener, new PatternTopic("/redis/*")); return container; } }
模擬訊息釋出類
@Service public class RedisPubSub { private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class); @Autowired private RedisTemplate<String, Object> redisTemplate; private ChannelTopic topic = new ChannelTopic("/redis/pubsub"); @Scheduled(initialDelay = 5000, fixedDelay = 10000) private void schedule() { logger.info("publish message"); publish("admin", "hey you must go now!"); } /** * 推送訊息 * * @param publisher * @param message */ public void publish(String publisher, String content) { logger.info("message send {} by {}", content, publisher); redisTemplate.convertAndSend(topic.getTopic(), content); } }
模擬訊息接收類
@Component public class MessageSubscriber { Logger logger = LoggerFactory.getLogger(MessageSubscriber.class); public void onMessage(String message, String pattern) { logger.info("topic {} received {} ", pattern, message); } }
啟動類
@SpringBootApplication @EnableScheduling public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class,