1. 程式人生 > >基於redis的訊息佇列的設計及實現

基於redis的訊息佇列的設計及實現

訊息佇列

所謂訊息佇列,就是一個以佇列資料結構為基礎的一個真實存在的實體,如陣列,redis中的佇列集合等等,都可以。

為什麼要使用佇列

主要原因是由於在高併發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說,大量的insert,update之類的請求同時到達MySQL,直接導致無數的行鎖表鎖,甚至最後請求會堆積過多,從而觸發too many connections錯誤。通過使用訊息佇列,我們可以非同步處理請求,從而緩解系統的壓力。
比如說點贊這個功能,這個在高併發的情況下,很容易造成資料庫連線數佔滿,到時整個網站響應緩慢,才是就是想到要解決資料庫的壓力問題,一般就是兩種方案,一是提高資料庫本身的能力(如增加連線數,讀寫分離等),但是資料庫總是有極限的,到達了極限是沒有辦法在提升了的,此時就要考慮第二種方案,釋放資料庫的壓力,將壓力轉移到快取裡面。就拿實際的點贊來說吧,使用者的點贊請求到來,我只是將點贊請求投遞到訊息佇列裡面,後續的點贊請求可以將訊息合併,即只更新點贊數,不產生新的任務,此時有個進行再不斷的輪訓訊息佇列,將點贊訊息消耗,並將值更新到資料庫裡面,這樣就有效的降低了資料庫的壓力,因為在快取層將數個數據庫更新請求合併成一個,大大提高了效率,降低了負載。

Redis實現的訊息佇列

Redis提供了兩種方式來作訊息佇列。 生產者消費模式和釋出訂閱者模式。 
生產者消費模式會讓一個或者多個客戶端監聽訊息佇列,一旦訊息到達,消費者馬上消費,誰先搶到算誰的,如果佇列裡沒有訊息,則消費者繼續監聽。 其實在生產者消費模式中生產者是一堆執行緒,消費者是另一堆執行緒,記憶體緩衝區可以使用List陣列佇列,資料型別只需要定義一個簡單的類就好。關鍵是如何處理多執行緒之間的協作。
釋出訂閱者模式也是一個或多個客戶端訂閱訊息頻道,只要釋出者釋出訊息,所有訂閱者都能收到訊息,訂閱者都是平等的。

這裡使用的是生產者消費模式。

基於Redis的訊息佇列實現的非同步操作原理圖如下:

程式碼實現

1.首先定義事件的型別,使用列舉類,便於取出各種事件

package com.springboot.springboot.async;
 
/**
 * @author WilsonSong
 * @date 2018/6/3
 * 列舉類,就是事件的各種型別
 */
public enum EventType {
    LIKE(0), COMMENT(1), LOGIN(2),MAIL(3);
 
    private int value;
    EventType(int value){
        this.value = value;
    }
    public int getValue(){
        return value;
    }
 
}

 2.定義事件的具體實現類

類裡面很多的實現方法都是返回的是EventModel這個類,是為了以後點讚的時候能夠鏈式的取出與這個事件相關的引數

/**
 * @author WilsonSong
 * @date 2018/6/3
 * 不同的事件肯定是有不同的型別的
 */
public class EventModel {
    //例如,有人評論了一個問題,那type就是評論, actorId就是誰評論的,
    // entityId和entityType就是評論的是那個問題,entityOwnerId就是那個問題關聯的物件
    private EventType type;    //事件的型別
    private int actorId;   //事件的觸發者
    private int entityType;    //觸發事件的載體
    private int entityId;  //和entityType組合成觸發事件的載體  可以使任何一個實體的id,問題,評論,使用者,站內信等等
    private int entityOwnerId;         //載體關聯的物件,當我們給一個人點贊時,系統要給那個人(也就是entityOwnerId)傳送一個站內信,通知那個人他被點讚了。
    public EventModel(){
 
    }
 
    public EventModel(EventType type){
        this.type = type;
    }
 
    //定義可擴充套件的欄位
    private Map<String, String> exts = new HashMap<>();
 
    public EventModel setExts(String key, String value){
         exts.put(key,value);
         return this;
    }
 
    public String getExts(String key){
        return  exts.get(key);
    }
 
    public EventType getType() {
        return type;
    }
 
    //為了能夠實現鏈狀的設定
    public EventModel setType(EventType type) {
        this.type = type;
        return this;      //這個就是為了實現這個xxx.setType().setXX();
    }
 
    public int getActorId() {
        return actorId;
    }
 
    public EventModel setActorId(int actorId) {
        this.actorId = actorId;
        return this;
    }
 
    public int getEntityType() {
        return entityType;
    }
 
    public EventModel setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }
 
    public int getEntityId() {
        return entityId;
    }
 
    public EventModel setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }
 
    public int getEntityOwnerId() {
        return entityOwnerId;
    }
 
    public EventModel setEntityOwnerId(int entityOwnerId) {
        this.entityOwnerId = entityOwnerId;
        return this;
    }
 
    public Map<String, String> getExts() {
        return exts;
    }
 
    public EventModel setExts(Map<String, String> exts) {
        this.exts = exts;
        return this;
    }
 
}

3.EventProducer的實現--生產者,作用是把事件分發到佇列中

/**
 * @author WilsonSong
 * @date 2018/6/3
 * 事件的入口,用來統一分發事件,就是在佇列中插入
 */
@Service
public class EventProducer {
 
    @Autowired
    JedisAdapter jedisAdapter;
 
    //把事件分發出去  EventProducer
    public boolean fireEvent(EventModel eventModel){
        try{
 
            //序列化,將EventModel 轉換WieJSON的字串
            String json = JSONObject.toJSONString(eventModel);
            String key = RedisKeyUtil.getEventQueueKey();
            jedisAdapter.lpush(key, json);
            return true;
        }catch (Exception e){
            return  false;
        }
    }
 
    //事件的取出與消費
 
 
}

4、Redis的統一封裝  --佇列 

 因為這裡是基於Redis的佇列實現非同步操作,需要對Redis的一些函式重新封裝,並與redis快取進行資料互動

/**
 * @author WilsonSong
 * @date 2018/6/1
 */
@Service
public class JedisAdapter implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(JedisAdapter.class);
 
    private JedisPool pool;
 
    public static void print(int index, Object object) {
        System.out.println(String.format("%d, %s", index, object.toString()));
    }
 
    @Override
    public void afterPropertiesSet() throws Exception {
        pool = new JedisPool("redis://localhost:6379/10");
    }

 
    public long lpush(String key, String value){
        Jedis jedis = null;
        try {
            jedis = pool.getResource();
            return jedis.lpush(key,value);
        }catch (Exception e){
            logger.error("Redis佇列新增異常");
        }finally {
            if (jedis != null){
                jedis.close();
            }
        }
        return 0;
    }
 
    public List<String>  brpop(int timeout, String key){
        Jedis jedis = null;
        try{
            jedis = pool.getResource();
            return jedis.brpop(timeout,key);
        }catch (Exception e){
            logger.error("Redis佇列彈出資料異常");
        }finally {
            if (jedis != null){
                jedis.close();
            }
        }
        return null;
    }

因為Redis是key--value的模式,每一個事件都應該有與其對應的key,為了統一管理並且不產生混淆,定義統一的key的生成 

 

/**
 * @author WilsonSong
 * @date 2018/6/2
 * 為了防止生成的key有衝突
 */
public class RedisKeyUtil {
    private static String SPLIT = ":";
    private static String BIZ_LIKE = "LIKE";
    private static String BIZ_DISLIKE = "DISLIKE";
    private static String BIZ_EVENTQUEUE = "EVENTQUEUE";
 
    //獲取點讚的key
    public static String getLikeKey(int entityType, int entityId){
        return BIZ_LIKE + SPLIT + String.valueOf(entityType) + SPLIT +String.valueOf(entityId);
    }
 
    //獲取點踩的key
    public static String getDislikeKey(int entityType, int entityId){
        return BIZ_DISLIKE +SPLIT + String.valueOf(entityType) + SPLIT + String.valueOf(entityId);
    }
 
    public static  String  getEventQueueKey(){
        return BIZ_EVENTQUEUE;
    }
 
 
 
}

5.EventHandler介面

在消費者與事件之間寫一個handler的介面,實現Consumer和handler之間的互動,因為消費者就是找到哪些EventHandler對當前的事件感興趣

/**
 * @author WilsonSong
 * @date 2018/6/3
 * 用來處理事件的,誰關心這個事件,誰來做這個事件
 */
public interface EventHandler {
 
    void doHander(EventModel model); //誰來處理事件
 
    List<EventType> getSupportEventTypes();  //有哪些關心這些事件的
 
}

 6. EventConsumer的實現---消費者
    建立一個型別為Map<EventType, List<EventHandler>>的map,用於存放所有的Handler,然後將所有的事件註冊到config中,即通過applicationContext獲取實現了EventHandler介面的全部Handler。
    啟動執行緒去不斷的去佇列中查詢事件並用brpop把事件拉出來,通過序列化和反序列化將取出的JSON轉化為EventModel,尋找是否有能處理EventModel的Handler,呼叫每一個對該事件感興趣的EventType的doHandle方法去處理事件
 

**
 * @author WilsonSong
 * @date 2018/6/3
 * 處理佇列中的事件並與各個handler溝通
 * InitializingBean介面的作用在spring 初始化後,執行完所有屬性設定方法(即setXxx)將
 * 自動呼叫 afterPropertiesSet(), 在配置檔案中無須特別的配置
 */
@Service
public class EventConsumer  implements InitializingBean,ApplicationContextAware{
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
    private Map<EventType, List<EventHandler>> config = new HashMap<>();
    private ApplicationContext applicationContext;             //sping的上下文
 
    @Autowired
    JedisAdapter jedisAdapter;
 
    //這個方法將在所有的屬性被初始化後呼叫
    @Override
    public void afterPropertiesSet() throws Exception {
        //獲取現在有多少個eventHandler初始化了
        Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);
        if (beans != null){
            for (Map.Entry<String,EventHandler> entry : beans.entrySet()){
                List<EventType> eventTypes = entry.getValue().getSupportEventTypes();   //找到那些handler對當前的事件感興趣
 
                for (EventType type : eventTypes){
                    if (!config.containsKey(type)){         //有可能是第一次註冊這個事件,所以就可能初始的時候是null
                        //把handler放到config中
                        config.put(type, new ArrayList<EventHandler>());     //把event註冊到config中
                    }
                    config.get(type).add(entry.getValue());       //把對這些event感興趣的handler新增到config中
                }
            }
        }
 
        //開啟執行緒去找佇列中的事件
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){           //一直取
                    String key  = RedisKeyUtil.getEventQueueKey();
                    List<String> events = jedisAdapter.brpop(0,key);  //若佇列中沒有這個事件的話就一直等待
                    for (String message : events){
                        if (message.equals(key)){      //返回的第一個值可能是key,把他先過濾掉,取後面的event
                            continue;
                        }
 
                        //通過JSon的方式反序列化
                        EventModel eventModel = JSON.parseObject(message,EventModel.class);
                        if (!config.containsKey(eventModel.getType())){      //是不是有對這個事件有處理的handler
                            logger.error("不能識別的事件");
                            continue;
                        }
 
                        for (EventHandler handler : config.get(eventModel.getType())){
                            handler.doHander(eventModel);
                        }
                    }
                }
            }
        });
        thread.start();
 
    }
 
    //將config中所有的配置的介面
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

7.處理具體事件的具體的XXXhandler

例如這裡寫的點讚的handler

**
 * @author WilsonSong
 * @date 2018/6/4
 * 處理點贊事件的handler
 */
@Component        //就是把普通的物件在spring容器中初始化
public class LikeHandler implements EventHandler {
 
    @Autowired
    MessageService messageService;
 
    @Autowired
    userService uService;
 
    @Override
    public void doHander(EventModel model) {
        Message message = new Message();
        message.setFrom_id(WendaUtil.SYSTEMCONTROLLER_userId);  //以系統管理員的額身份給你發訊息說誰給你點了贊
        message.setTo_id(model.getEntityOwnerId());      //發給誰,就是那個entity擁有者的id
        message.setCreated_date(new Date());
        User user = uService.getUser(model.getActorId()); //觸發這個事件的使用者id
        message.setContent("使用者" + user.getName() + "讚了你的評論,http://127.0.0.1:8080/question" + model.getExts("questionId"));
        message.setConversationId(message.getConversationId());
 
        messageService.addMessage(message);
 
    }
 
    @Override
    public List<EventType> getSupportEventTypes() {
        return Arrays.asList(EventType.LIKE);        //只需要返回點讚的事件即可
    }
}

 

原文連結:https://blog.csdn.net/WilsonSong1024/article/details/80573611