1. 程式人生 > >利用Redis實現非同步訊息佇列優化系統性能 (Redis高階應用)

利用Redis實現非同步訊息佇列優化系統性能 (Redis高階應用)

寫在前面

今天把之前在專案中使用 Redis 做非同步訊息佇列的使用經驗總結一下。首先明確使用目的,因為專案中,我們進行某個操作後可能後續會有一系列的其他耗時操作,但是我們不希望將主執行緒阻塞在此過程中,這時便可將其他操作非同步化。舉個栗子,當你給這篇部落格點贊或評論的時候,部落格系統會儲存你的點贊評論資訊,同時將此操作封裝為事件發給非同步訊息佇列,處理過程中會給我發個私信告訴我誰評論了我,或誰給我點了贊,這個發私信通知我的環節即為非同步操作過程。

具體實現思路

主要分為如下幾個部分:

  1. EventProducer:事件生產者,面向client,使用者進行操作後如果會有後續非同步操作,則會將使用者此次操作的事件加入訊息佇列,fire/push 意思是點燃這個事件,至於它啥時候爆炸(被另一執行緒 handler 處理)主執行緒就不管了,主執行緒只關心 fire/push 操作。
  2. EventModel:事件模型,下圖中的每個顏色小塊代表一種事件模型,裡面可以定義事件所屬的種類、與該事件相關的物件id、具體內容等資訊。即事件是由誰產生的、將要影響到誰、影響的內容是什麼等...
  3. 訊息佇列:具體存放訊息事件的資料結構,可以是Redis list,也可以是Java BlockingQueue等。這裡選擇Redis list實現。
  4. EentConsumer:另開執行緒對訊息佇列中的事件進行處理,我們需要在系統初始化後將所有的Handler註冊進來(建立事件型別與Handler的對映),之後根據相應的事件找到對應的Handler進行處理。
  5. EventHandler:具體處理事件的介面,我們可以實現不同的事件處理業務的具體Handler。

具體過程圖示如下:

程式碼實現

1、定義模型

// 文章
public class Article {
    private int id;
    private String articleName;
    private int ownerID;
    private int likeCount;
    private String content;
    // getter、setter
    ...
}

// 評論
public class Comment {
    private int id;
    private int commentUserID;
    private int articleID;
    private String content;
    // getter、setter
    ...
}

// 私信類
public class MessageLetter {
    private int id;
    private int fromUserID;
    private int toUserID;
    private String content;
    // getter、setter
    ...
}

// 使用者
public class User {
    private int id;
    private String userName;
    private String passWord;
    // getter、setter
    ...
}

 2、定義事件型別、事件模型

首先定義事件型別,評論事件、上傳檔案事件、傳送郵件事件等,這裡以評論事件為例。

public enum  EventType {
    // 這裡以評論事件型別作為演示
    COMMENT,
    LIKE,
    UPLOAD_FILE,
    MAIL
}
package com.bowen.BWQASystem.async;

import java.util.HashMap;
import java.util.Map;

// 事件模型
public class EventModel {
    /*
     * 事件型別,不同的事件型別會觸發呼叫不同的Handler,
     * 一種事件型別可以註冊多個Handler,對應於訊息佇列中
     * 每個事件可能觸發多個Handler非同步去做其他事情。
     **/
    private EventType eventType;
    // 觸發事件使用者ID
    private int triggerID;
    // 事件影響的使用者ID
    private int receiverID;

    // 額外資訊
    private Map<String,String> extraInfo = new HashMap<String,String>();

    // 構造方法
    public EventModel(EventType eventType) {
        this.eventType = eventType;
    }

    public EventType getEventType() {
        return eventType;
    }

    // 所有set方法,均返回當前呼叫此方法的物件,用於鏈式設定屬性
    public EventModel setEventType(EventType eventType) {
        this.eventType = eventType;
        return this;
    }

    public int getTriggerID() {
        return triggerID;
    }

    // 其他setter、getter略
    ...
}

3、定義處理各類事件介面(EventHandler)

public interface EventHandler {
    // 各個具體Handler處理事件過程
    void doHandle(EventModel eventModel);

    /* 一個Handler可以處理多種事件,如評論CommentHandler
     * 可以處理基本的評論事件,
     * 此方法用於註冊handler到對應的事件型別上的時候用
     **/
    List<EventType> getSupportedEvents();
}

4、根據業務需求實現處理事件的Handler

這裡我們需要實現一個Handler,它能夠對使用者在某篇文章評論區下評論或點讚的事件進行處理,此Handler會以私信的形式通知對方。告訴他,他的文章被評論(或點讚了)。

注意:我們每個Handler都需要用@Component註解標註,因為後面在註冊環節會利用 Spring context 在 IOC 容器裡來尋找繼承自 EventHandler 的每一個真實 Handler。

/*
* 對評論、點贊事件執行發私信操作
* */
@Component
public class CommentAndLikeHandler implements EventHandler {

    @Autowired
    MessageLetterService messageLetterService;
    @Autowired
    UserService userService;

    @Override
    public void doHandle(EventModel eventModel) {

        // 1、自動生成一私信
        MessageLetter messageLetter = new MessageLetter();
        // 觸發此事件的使用者id(即評論者id)
        messageLetter.setFromUserID(eventModel.getTriggerID());
        messageLetter.setToUserID(eventModel.getReceiverID());

        //messageLetter.setContent(eventModel.getExtraInfo().get("msgContent"));
        // 評論COMMENT事件
        // 自動發的私信中含有的文章題目、具體評論類容等資訊存在extraInfo這個HashMap裡
        if(eventModel.getEventType() == EventType.COMMENT){
            messageLetter.setContent("使用者" + eventModel.getTriggerID() +
                    "在你文章" + eventModel.getExtraInfo().get("articleName") +
                    "評論中說到" + eventModel.getExtraInfo().get("commentContent"));
            // 發私信給被評論的人
            messageLetterService.addMessageLetter(messageLetter);

//            // 2、目標使用者積分加 1
//            User user = userService.getUserByUserID(String.valueOf(eventModel.getReceiverID()));
//            user.setScore(user.getScore() + 1 );
//            userService.save(user);
        }

        // 點贊LIKE事件
        if(eventModel.getEventType() == EventType.LIKE){
            messageLetter.setContent("使用者" + eventModel.getTriggerID() +
                    "給你文章《" +
                    eventModel.getExtraInfo().get("articleName")
                    + "》點讚了,去瞅瞅");
            messageLetterService.addMessageLetter(messageLetter);
        }
    }

    @Override
    public List<EventType> getSupportedEvents() {
        // 此handler僅處理COMMENT評論事件、LIKE點贊事件
        return Arrays.asList(EventType.COMMENT, EventType.LIKE);
    }
}

5、實現訊息佇列生產者

因為生產者需要將事件 push 到訊息佇列中,在此之前我們需要初始化 Redis 連線池,而在消費端也需要對 Redis 進行操作,故我們將初始化 Redis 寫成一個 Service ,在需要它的地方注入即可。

JedisUtilService.java
@Service
public class JedisUtilService implements InitializingBean {

    // 非同步訊息佇列在Redis中的list的key名
   public static final String EVENT_QUEUE_KEY = "EventQueue";
   private JedisPool jedisPool;

    @Override
    public void afterPropertiesSet() throws Exception {
        jedisPool = new JedisPool("redis://localhost:6379/0");
    }

    /*
     * list的push操作(事件入佇列)
     * @Param String key list的名字,即key
     * @Param String value 將要放入的值value
     */
    public long lpush(String key, String value){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.lpush(key, value);
        } catch (Exception e){
            return -1;
        } finally {
            if(jedis != null) jedis.close();
        }
    }

    /*
     * list的brpop操作
     * @Param int timeout 超時時間
     * @Param String key list對應的key
     * @reutrn List<String> 返回list的名字key和對應的元素
     */
    public List<String> brpop(int timeout, String key){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.brpop(timeout, key);
        } catch (Exception e){
            return null;
        } finally {
            if(jedis != null) jedis.close();
        }
    }

}

生產者會在 Controller 層某個業務功能被呼叫時,通過呼叫 Redis api,將相應的事件加入非同步訊息佇列,具體實現如下:

public class EventProducer{
    @Autowired
    JedisUtilService jedisUtilService;

    public boolean pushEvent(EventModel eventModel){
        // 序列化
        String eventJson = JSONObject.toJSONString(eventModel);
        // 加入key為"EventQueue"的list裡
        jedisUtilService.lpush(JedisUtilService.EVENT_QUEUE_KEY, eventJson);
        return true;
    }
}

6、實現非同步訊息佇列消費者

消費者端會開啟一額外執行緒去從非同步訊息佇列中獲取事件,交給對應的 Handler 處理。在此之前我們需要在 所有 Bean 載入完畢後,找到每個 Handler ,併為每個 Handler 註冊到相應的事件上,以便後續處理。

(一個事件型別可以對應多個handler處理,一個handler可以處理多個型別事件)

public class EventConsumer implements InitializingBean,ApplicationContextAware {

    // 用來尋找Handler
    private ApplicationContext applicationContext;
    // 註冊事件與Handler對映
    // (一個事件型別可以對應多個handler處理,一個handler可以處理多個型別事件)
    private Map<EventType, List<EventHandler>> eventConfig = new HashMap<>();

    @Autowired
    JedisUtilService jedisUtilService;

    // 註冊Handler到相應的事件
    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String, EventHandler> handlerBeans = applicationContext.getBeansOfType(EventHandler.class);
        if(handlerBeans != null){
            // 依次遍歷每個找到的Handler(被@Component註解標註的)
            for (Map.Entry<String, EventHandler> entry : handlerBeans.entrySet()) {
                // 獲取每個Handler支援的事件Event列表
                List<EventType> supportedEventTypeList = entry.getValue().getSupportedEvents();
                // 將每一個Handler支援的事件Event註冊到config map中
                for(EventType eventType : supportedEventTypeList){
                    if(!eventConfig.containsKey(eventType)){
                        eventConfig.put(eventType, new ArrayList<EventHandler>());
                    }
                    eventConfig.get(eventType).add(entry.getValue());
                }
            }
        }

        // 消費執行緒池
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 消費非同步訊息佇列
        executorService.submit((Runnable) () ->{
            while (true){
                // 彈出一元素
                List<String> eleList = jedisUtilService.brpop(0, JedisUtilService.EVENT_QUEUE_KEY);
                EventModel eventModel = JSON.parseObject(eleList.get(1), EventModel.class);
                if(eventConfig.containsKey(eventModel.getEventType())) {
                    for (EventHandler eventHandler : eventConfig.get(eventModel.getEventType())){
                        // 執行註冊到該事件的每一個Handler
                        eventHandler.doHandle(eventModel);
                    }
                }
            }
        });

        // 消費其他事件佇列(根據自己業務)
        // 略...
        // executorService.submit(...)
        
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

具體使用

我們直接在評論相關的 Controller 中編寫評論邏輯程式碼,將評論內容儲存後,我們直接生成一事件型別為 COMMENT 型別的事件,並將它加入非同步佇列中,供後臺執行緒去處理後續操作(發私信)。CommentAndLikeHandler 便會對此評論事件進行處理。關於點贊事件的業務邏輯大致類似,這裡就不贅述了,評論業務處理程式碼如下:

@Controller
public class ArticalController {

    @Autowired
    CommentService commentService;
    @Autowired
    ArticleService articleService;
    @Autowired
    UserHolder userHolder;
    @Autowired
    EventProducer eventProducer;
    
    @RequestMapping(path = {"/commentArticle"}, method = RequestMethod.POST)
    public void commentArticle(@RequestParam("articleID") int id,
                               @RequestParam("commentContent") String commentContent){
        // 當前使用者id
        int currentUserID = userHolder.getUsers().getId();
        Comment comment = new Comment();
        comment.setArticleID(id);
        comment.setCommentUserID(currentUserID);
        comment.setContent(commentContent);
        commentService.addComment(comment);

        // 當前文章
        Article article = articleService.findArticleByID(id);
        // 後續生成的私信內容需要這些額外資訊
        HashMap<String, String> articleInfo = new HashMap();
        articleInfo.put("articleName", article.getArticleName());
        articleInfo.put("commentContent", article.getContent());

        // 後續發私信操作加入非同步佇列
        eventProducer.pushEvent(new EventModel(EventType.COMMENT)
                .setTriggerID(currentUserID).setReceiverID(article.getOwnerID())
                .setExtraInfo(articleInfo));
    }

}

總結

1、這裡利用了 Redis 的 list 資料結構,它支援 lpush 新增元素,blpop(從隊尾),brpop(從隊頭,先加入的元素排在隊頭) 等阻塞式獲取元素操作。

2、通過將事件模型物件序列化為 JSON 串的方式,將其儲存至 Redis 資料庫。

3、通過生產者消費者模型實現非同步訊息佇列。

4、當然 Redis 還有其他各個方面的應用,如排行榜、驗證碼(定時失效)、PageView、非同步訊息佇列、實時熱點等等,通過選擇不同的資料結構即能實現。