1. 程式人生 > >我在生產專案裡是如何使用Redis釋出訂閱的?(二)Java版程式碼實現(含原始碼)

我在生產專案裡是如何使用Redis釋出訂閱的?(二)Java版程式碼實現(含原始碼)

上篇文章講了在實際專案裡的哪些業務場景用到Redis釋出訂閱,這篇文章就講一下,在Java中如何實現的。

 

圖解程式碼結構

釋出訂閱的理論以及使用場景大家都已經有了大致瞭解了,但是怎麼用程式碼實現釋出訂閱呢?在這裡給大家分享一下實現方式。

 

我們以上篇文章的第三種使用場景為例,先來看一下整體實現類圖吧。

 

解釋一下,這裡我們首先定義一個統一介面`ICacheUpdate`,只有一個`update`方法,我們令`Service`層實現這個方法,執行具體的更新操作。

 

我們再來看`RedisMsgPubSub`,它繼承`redis.clients.jedis.JedisPubSub`,主要重寫其`onMessage()`方法(訂閱的頻道有訊息到來時會觸發這個方法),我們在這個方法裡呼叫`RedisMsgPubSub`的`update`方法執行更新操作。

 

當我們有多個`Service`實現`ICacheUpdate`時,我們就非常迫切地需要一個管理器來集中管理這些`Service`,並且當觸發onMessage方法時要告訴onMessage方法具體呼叫哪個`ICacheUpdate`的實現類,所以我們有了`PubSubManager`。並且我們單獨開啟一個執行緒來維護髮布訂閱,所以管理器繼承了`Thread`類。

 

程式碼實現

具體程式碼:

統一介面

public interface ICacheUpdate {
    public void update();
}

 

Service層

實現ICacheUpdate的update方法,執行具體的更新操作

public class InfoService implements ICacheUpdate {
  private static Logger logger = LoggerFactory.getLogger(InfoService.class);
  @Autowired
  private RedisCache redisCache;
  @Autowired
  private InfoMapper infoMapper;
  /**
   * 按資訊型別分類查詢資訊
   * @return
   */
  public Map<String, List<Map<String, Object>>> selectAllInfo(){
    Map<String, List<Map<String, Object>>> resultMap = new HashMap<String, List<Map<String, Object>>>();
    List<String> infoTypeList = infoMapper.selectInfoType();//資訊表中所有涉及的資訊型別
    logger.info("-------按資訊型別查詢公共資訊開始----"+infoTypeList);
    if(infoTypeList!=null && infoTypeList.size()>0) {
      for (String infoType : infoTypeList) {
        List<Map<String, Object>> result = infoMapper.selectByInfoType(infoType);
        resultMap.put(infoType, result);
      }
    }
    return resultMap;
  }
  @Override
  public void update() {
    //快取首頁資訊
    logger.info("InfoService selectAllInfo 重新整理快取");
    Map<String, List<Map<String, Object>>> resultMap = this.selectAllInfo();
    Set<String> keySet = resultMap.keySet();
    for(String key:keySet){
      List<Map<String, Object>> value = resultMap.get(key);
      redisCache.putObject(GlobalSt.PUBLIC_INFO_ALL+key, value);
    }
  }
}

Redis釋出訂閱的擴充套件類

 作用:

 1、統一管理ICacheUpdate,把所有實現ICacheUpdate介面的類新增到updates容器

 2、重寫onMessage方法,訂閱到訊息後進行重新整理快取的操作

public class RedisMsgPubSub extends JedisPubSub {
    private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSub.class);
    private Map<String , ICacheUpdate> updates = new HashMap<String , ICacheUpdate>();
    //1、由updates統一管理ICacheUpdate
    public boolean addListener(String key , ICacheUpdate update) {
        if(update == null) 
            return false;
      updates.put(key, update);
      return true;
    }
    /**
     * 2、重寫onMessage方法,訂閱到訊息後進行重新整理快取的操作
     * 訂閱頻道收到的訊息
     */
    @Override  
    public void onMessage(String channel, String message) {
        logger.info("RedisMsgPubSub onMessage channel:{},message :{}" ,channel, message);
        ICacheUpdate updater = null;
        if(StringUtil.isNotEmpty(message)) 
            updater = updates.get(message);
        if(updater!=null)
            updater.update();
    }
    //other code...
}

釋出訂閱的管理器

執行的操作:

1、將所有需要重新整理載入的Service類(實現ICacheUpdate介面)新增到RedisMsgPubSub的updates中

2、啟動執行緒訂閱pubsub_config頻道,收到訊息後的五秒後再次訂閱(避免訂閱到一次訊息後結束訂閱)

public class PubSubManager extends Thread{
    private static Logger logger = LoggerFactory.getLogger(PubSubManager.class);

    public static Jedis jedis;
    RedisMsgPubSub msgPubSub = new RedisMsgPubSub();
    //頻道
    public static final String PUNSUB_CONFIG = "pubsub_config";
    //1.將所有需要重新整理載入的Service類(實現ICacheUpdate介面)新增到RedisMsgPubSub的updates中
    public boolean addListener(String key, ICacheUpdate listener){
        return msgPubSub.addListener(key,listener);
    }
    @Override
    public void run(){
        while (true){
            try {
                JedisPool jedisPool = SpringTools.getBean("jedisPool", JedisPool.class);
                if(jedisPool!=null){
                    jedis = jedisPool.getResource();
                    if(jedis!=null){
                        //2.啟動執行緒訂閱pubsub_config頻道 阻塞
                        jedis.subscribe(msgPubSub,PUNSUB_CONFIG);
                    }
                }
            } catch (Exception e) {
                logger.error("redis connect error!");
            } finally {
                if(jedis!=null)
                    jedis.close();
            }
            try {
                //3.收到訊息後的五秒後再次訂閱(避免訂閱到一次訊息後結束訂閱)
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error("InterruptedException in redis sleep!");
            }
        }
    }
}

到此,Redis的釋出訂閱大致已經實現。我們什麼時候啟用呢?我們可以選擇在啟動專案時完成訂閱和基礎資料的載入,所以我們通過實現`javax.servlet.SevletContextListener`來完成這一操作。然後將監聽器新增到`web.xml`。

CacheInitListener.java

/**
 * 載入系統引數
 */
public class CacheInitListener implements ServletContextListener{
    private static Logger logger = LoggerFactory.getLogger(CacheInitListener.class);
    @Override
    public void contextDestroyed(ServletContextEvent arg0) {
    }
    @Override
    public void contextInitialized(ServletContextEvent arg0) {
        logger.info("---CacheListener初始化開始---");
        init();
        logger.info("---CacheListener初始化結束---");
    }
    public void init() {
        try {
            //獲得管理器
            PubSubManager pubSubManager = SpringTools.getBean("pubSubManager", PubSubManager.class);
            InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
            //新增到管理器
            pubSubManager.addListener("infoService", infoService);
            //other service...
            //啟動執行緒執行訂閱操作
            pubSubManager.start();
            //初始化載入
            loadParamToRedis();
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        }
    }
    private void loadParamToRedis() {
        InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
        infoService.update();
        //other service...
    }
}

web.xml

<listener>
  <listener-class>com.xxx.listener.CacheInitListener</listener-class>
</listener>

【end】

上篇文章講了在實際專案裡的哪些業務場景用到Redis釋出訂閱,這篇文章就講一下,在Java中如何實現