1. 程式人生 > >Spring-data-redis: pub/sub訊息訂閱

Spring-data-redis: pub/sub訊息訂閱

 Redis中pub/sub特性,可以用來實現類似與JMS的“topic”功能,只不過這些訊息無法被持久化而已。spring-data-redis元件中對pub/sub提供了類似JMS的程式設計模式,我們通過例項來展示如何使用。

    需要注意的是,在redis中訊息的訂閱端(subscribe)需要獨佔連結,那麼訊息接收將是阻塞的。

    程式碼例項中,使用了“連線池”/“訊息非同步接受”“訊息併發處理”,請根據需要調整相關引數。

    1) Redis中"pub/sub"的訊息,為"即發即失",server不會儲存訊息,如果publish的訊息,沒有任何client處於"subscribe"狀態,訊息將會被丟棄.如果client在subcribe時,連結斷開後重連,那麼此期間的訊息也將丟失.Redis server將會"盡力"將訊息傳送給處於subscribe狀態的client,但是仍不會保證每條訊息都能被正確接收.

    2) 如果期望pub/sub的訊息時持久的,那麼需要藉助額外的功能.參見"pub/sub持久化訂閱"

一.配置檔案

Java程式碼  收藏程式碼
  1. <beans xmlns="http://www.springframework.org/schema/beans"   
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  3. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
     default-autowire="byName">  
  4.     <bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">  
  5.         <property name="maxActive" value="32"></property>  
  6.         <property name="maxIdle" value="6"></property>  
  7.         <property name="maxWait" value="15000"
    ></property>  
  8.         <property name="minEvictableIdleTimeMillis" value="300000"></property>  
  9.         <property name="numTestsPerEvictionRun" value="3"></property>  
  10.         <property name="timeBetweenEvictionRunsMillis" value="60000"></property>  
  11.         <property name="whenExhaustedAction" value="1"></property>  
  12.     </bean>  
  13.     <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" destroy-method="destroy">  
  14.         <property name="poolConfig" ref="jedisPoolConfig"></property>  
  15.         <property name="hostName" value="127.0.0.1"></property>  
  16.         <property name="port" value="6379"></property>  
  17.         <property name="password" value="0123456"></property>  
  18.         <property name="timeout" value="15000"></property>  
  19.         <property name="usePool" value="true"></property>  
  20.     </bean>  
  21.     <bean id="jedisTemplate" class="org.springframework.data.redis.core.RedisTemplate">  
  22.         <property name="connectionFactory" ref="jedisConnectionFactory"></property>  
  23.         <property name="defaultSerializer">  
  24.             <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>  
  25.         </property>  
  26.     </bean>  
  27.     <bean id="topicMessageListener" class="com.sample.redis.sdr.TopicMessageListener">  
  28.         <property name="redisTemplate" ref="jedisTemplate"></property>  
  29.     </bean>  
  30.     <bean id="topicContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer" destroy-method="destroy">  
  31.         <property name="connectionFactory" ref="jedisConnectionFactory"/>  
  32.         <property name="taskExecutor"><!-- 此處有個奇怪的問題,無法正確使用其他型別的Executor -->  
  33.             <bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">  
  34.                 <property name="poolSize" value="3"></property>  
  35.             </bean>  
  36.         </property>  
  37.         <property name="messageListeners">  
  38.             <map>  
  39.                 <entry key-ref="topicMessageListener">  
  40.                     <bean class="org.springframework.data.redis.listener.ChannelTopic">  
  41.                         <constructor-arg value="user:topic"/>  
  42.                     </bean>  
  43.                 </entry>  
  44.             </map>  
  45.         </property>  
  46.     </bean>  
  47. </beans>  

二.訊息釋出(pub):

Java程式碼  收藏程式碼
  1. String channel = "user:topic";  
  2. //其中channel必須為string,而且“序列化”策略也是StringSerializer  
  3. //訊息內容,將會根據配置檔案中指定的valueSerializer進行序列化  
  4. //本例中,預設全部採用StringSerializer  
  5. //那麼在訊息的subscribe端也要對“發序列化”保持一致。  
  6. redisTemplate.convertAndSend(channel, "from app 1");  

三.訊息接收(subscribe):

   1) TopicMessageListener類:

Java程式碼  收藏程式碼
  1. public class TopicMessageListener implements MessageListener {  
  2.     private RedisTemplate redisTemplate;  
  3.     public void setRedisTemplate(RedisTemplate redisTemplate) {  
  4.         this.redisTemplate = redisTemplate;  
  5.     }  
  6.     @Override  
  7.     public void onMessage(Message message, byte[] pattern) {  
  8.         byte[] body = message.getBody();//請使用valueSerializer  
  9.         byte[] channel = message.getChannel();  
  10.         //請參考配置檔案,本例中key,value的序列化方式均為string。  
  11.         //其中key必須為stringSerializer。和redisTemplate.convertAndSend對應  
  12.         String itemValue = (String)redisTemplate.getValueSerializer().deserialize(body);  
  13.         String topic = (String)redisTemplate.getStringSerializer().deserialize(channel);  
  14.         //...  
  15.     }  
  16. }  

   2) 你會發現上述程式設計風格非常像JMS。需要注意的是訊息體的反序列化。