補習系列(13)-springboot redis 與釋出訂閱
目錄
一、訂閱釋出
訂閱釋出是一種常見的設計模式,常見於訊息系統的場景。
如下面的圖:
[圖來自百科]
訊息釋出者是訊息載體的生產者,其通過某些主題來向 排程中心 傳送訊息;
而訊息訂閱者會事先向 排程中心 訂閱其 "感興趣" 的主題,隨後會獲得新訊息。
在這裡, 排程中心 是一個負責訊息控制中轉的邏輯實體,可以是訊息佇列如ActiveMQ,也可以是Web服務等等。
常見應用
- 微博,每個使用者的粉絲都是該使用者的訂閱者,當用戶發完微博,所有粉絲都將收到他的動態;
- 新聞,資訊站點通常有多個頻道,每個頻道就是一個主題,使用者可以通過主題來做訂閱(如RSS),這樣當新聞釋出時,訂閱者可以獲得更新。
二、Redis 與訂閱釋出
Redis 支援 (pub/sub) 的訂閱釋出能力,客戶端可以通過channel(頻道)來實現訊息的釋出及接收。
- 客戶端通過 SUBSCRIBE 命令訂閱 channel;
- 客戶端通過PUBLISH 命令向channel 傳送訊息;
而後,訂閱 channel的客戶端可實時收到訊息。
除了簡單的SUBSCRIBE/PUBLISH命令之外,Redis還支援訂閱某一個模式的主題(正則表示式),
如下:
PSUBSCRIBE/topic/cars/*
於是,我們可以利用這點實現相對複雜的訂閱能力,比如:
- 在電商平臺中訂閱多個品類的商品促銷資訊;
- 智慧家居場景,APP可以訂閱所有房間的裝置訊息。
...
儘管如此,Redis pub/sub 機制存在一些缺點:
- 訊息無法持久化,存在丟失風險;
- 沒有類似 RabbitMQ的ACK機制;
- 由於是廣播機制,無法通過新增worker 提升消費能力;
因此,Redis 的訂閱釋出建議用於 實時且可靠性要求不高 的場景。
三、SpringBoot 與訂閱釋出
接下來,看一下SpringBoot 怎麼實現訂閱釋出的功能。
spring-boot-starter-data-redis幫我們實現了Jedis的引入,pom 依賴如下:
<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>${spring-boot.version}</version> </dependency>
在 application.properties 中指定配置
# redis 連線配置 spring.redis.database=0 spring.redis.host=127.0.0.1 spring.redis.password= spring.redis.port=6379 spring.redis.ssl=false # 連線池最大數 spring.redis.pool.max-active=10 # 空閒連線最大數 spring.redis.pool.max-idle=10 # 獲取連線最大等待時間(s) spring.redis.pool.max-wait=600000
A. 訊息模型
訊息模型描述了訂閱釋出的資料物件,這要求生產者與消費者都能理解
以下面的POJO為例:
public static class SimpleMessage { private String publisher; private String content; private Date createTime;
在SimpleMessage類中,我們聲明瞭幾個欄位:
欄位名 | 說明 |
---|---|
publisher | 釋出者 |
content | 文字內容 |
createTime | 建立時間 |
B. 序列化
如下的程式碼採用了JSON 作為序列化方式:
@Configuration public class RedisConfig { private static final Logger logger = LoggerFactory.getLogger(RedisConfig.class); /** * 序列化定製 * * @return */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>( Object.class); // 初始化objectmapper ObjectMapper mapper = new ObjectMapper(); mapper.setSerializationInclusion(Include.NON_NULL); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); return jackson2JsonRedisSerializer; } /** * 操作模板 * * @param connectionFactory * @param jackson2JsonRedisSerializer * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) { RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(connectionFactory); // 設定key/hashkey序列化 RedisSerializer<String> stringSerializer = new StringRedisSerializer(); template.setKeySerializer(stringSerializer); template.setHashKeySerializer(stringSerializer); // 設定值序列化 template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; }
C. 釋出訊息
訊息釋出,需要先指定一個ChannelTopic物件,隨後通過RedisTemplate方法操作。
@Service public class RedisPubSub { private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class); @Autowired private RedisTemplate<String, Object> redisTemplate; private ChannelTopic topic = new ChannelTopic("/redis/pubsub"); @Scheduled(initialDelay = 5000, fixedDelay = 10000) private void schedule() { logger.info("publish message"); publish("admin", "hey you must go now!"); } /** * 推送訊息 * * @param publisher * @param message */ public void publish(String publisher, String content) { logger.info("message send {} by {}", content, publisher); SimpleMessage pushMsg = new SimpleMessage(); pushMsg.setContent(content); pushMsg.setCreateTime(new Date()); pushMsg.setPublisher(publisher); redisTemplate.convertAndSend(topic.getTopic(), pushMsg); }
上述程式碼使用一個定時器(@Schedule)來做釋出,為了保證執行需要在主類中啟用定時器註解:
@EnableScheduling @SpringBootApplication public class BootSampleRedis{ ... }
D. 接收訊息
定義一個訊息接收處理的Bean:
@Component public static class MessageSubscriber { public void onMessage(SimpleMessage message, String pattern) { logger.info("topic {} received {} ", pattern, JsonUtil.toJson(message)); } }
接下來,利用 MessageListenerAdapter 可將訊息通知到Bean方法:
/** * 訊息監聽器,使用MessageAdapter可實現自動化解碼及方法代理 * * @return */ @Bean public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer, MessageSubscriber subscriber) { MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage"); adapter.setSerializer(jackson2JsonRedisSerializer); adapter.afterPropertiesSet(); return adapter; }
最後,關聯到訊息釋出的Topic:
/** * 將訂閱器繫結到容器 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listener, new PatternTopic("/redis/*")); return container; }
執行結果
啟動程式,從控制檯可輸出:
.RedisPubSub : publish message .RedisPubSub : message send hey you must go now! by admin .RedisPubSub : topic /redis/* received {"publisher":"admin","content":"hey you must go now!","createTime":1543418694007}
這樣,我們便完成了訂閱釋出功能。
小結
訊息訂閱釋出是分散式系統中的常用手段,也經常用來實現系統解耦、效能優化等目的;
當前小節結合SpringBoot 演示了 Redis訂閱釋出(pub/sub)的實現,在部分場景下可以參考使用。
歡迎繼續關注"美碼師的補習系列-springboot篇" ,期待更多精彩內容^-^