1. 程式人生 > >使用MongoDB實現訊息佇列的非同步訊息功能

使用MongoDB實現訊息佇列的非同步訊息功能

一、訊息佇列概述

訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。

目前在生產環境,使用較多的訊息佇列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

自己實現一個較完善的訊息佇列要考慮高可用、順序和重複訊息、可靠投遞、消費關係解析等等比較複雜的問題,筆者不對這些內容進行闡述,重點結合線程池實現解耦,非同步訊息功能,對其他功能有興趣的話推薦美團技術部落格的一篇文章訊息佇列設計的精髓基本都藏在本文裡了

二.非同步處理的流程

場景說明:使用者註冊後,需要發註冊郵件和註冊簡訊。傳統的做法有兩種
1.序列的方式;2.並行方式。
(1)序列方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件,再發送註冊簡訊。以上三個任務全部完成後,返回給客戶端。

這裡寫圖片描述

(2)並行方式:將註冊資訊寫入資料庫成功後,傳送註冊郵件的同時,傳送註冊簡訊。以上三個任務完成後,返回給客戶端。與序列的差別是,並行的方式可以提高處理的時間。
這裡寫圖片描述

假設三個業務節點每個使用50毫秒鐘,不考慮網路等其他開銷,則序列方式的時間是150毫秒,並行的時間可能是100毫秒。

因為CPU在單位時間內處理的請求數是一定的,假設CPU1秒內吞吐量是100次。則序列方式1秒內CPU可處理的請求量是7次(1000/150)。並行方式處理的請求量是10次(1000/100)。

小結:如以上案例描述,傳統的方式系統的效能(併發量,吞吐量,響應時間)會有瓶頸。如何解決這個問題呢?

引入非同步處理。改造後的架構如下:

這裡寫圖片描述

三.使用MongoDB實現上述的架構

3.1 資料庫定義

如上圖所示,“傳送註冊簡訊“和“傳送註冊郵件“都是需要非同步處理的任務。該任務將均由主執行緒寫入資料庫(“任務隊列表”),同時有另一個執行緒讀取任務隊列表的資料並根據具體的“傳送註冊簡訊“和“傳送註冊郵件“來處理。

任務佇列模型定義
TaskQueue.java

@Document
public class TaskQueue {
    @Id
    private String _id;
    private
String name; //任務名稱。例如:“傳送註冊簡訊“或“傳送註冊郵件“等 private String param; //需要的處理任務的引數 private int status; // 狀態,0:初始 1:處理中 8:處理失敗 9:處理成功 private int retry; // 重試次數,僅對於處理失敗,status:8 private Date created; // 建立時間 private int threadNid; //處理成功該任務的任務處理執行緒唯一標示符 private int priority; // 越小,被執行的概率越大 }

任務處理執行緒模型定義
ThreadInstance.java

@Document
public class ThreadInstance implements Comparable<ThreadInstance>{
    @Id
    private String _id;
    @Indexed(unique=true)
    private int nid; // 序號,唯一索引。唯一索引的意義後續解釋
    private long pid; // 執行緒id
    private int taskCounts; // 當前任務處理執行緒正在處理的任務數,通過心跳來更新
    private int execedTasks; //該任務處理執行緒的總數,通過心跳來更新
    private Date update; // 上次活躍時間,通過心跳來更新
}

3.2非同步處理任務的執行緒池

對應上圖,“傳送註冊簡訊“和“傳送註冊郵件“等需要通過單獨的執行緒來完成。因為可以更快的完成非同步任務,可能需要多個執行緒同時來作為非同步處理執行緒。此處,使用執行緒池來完成。
ThreadPoolExecutor執行緒池的工作原理

執行緒池有三個核心關鍵詞:核心執行緒、工作佇列、最大執行緒和飽和策略

  • 核心執行緒池對應corePoolSize變數的值,如果執行的執行緒小於corePoolSize無論當前是否有空閒執行緒,總是會建立新的執行緒執行任務(這個過程需要獲取全域性鎖)
  • 如果執行的執行緒大於corePoolSize,則將任務加入BlockingQueue–對應工作佇列
  • 如果BlockingQueue已滿,且當前執行緒數尚未超過maximumPoolSize—最大執行緒。則執行緒池繼續建立執行緒。
  • 如果BlockingQueue已滿,並且當前執行緒數超過maximumPoolSize,則根據當前執行緒池已飽和。根據指定的飽和策略來處理。丟擲異常,丟棄等等。
        // 建立一個順序儲存的阻塞佇列,並指定大小為500
        BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(500);
        // 建立執行緒池的飽和策略,AbortPolicy拋異常
        RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
        // 建立執行緒池,執行緒池基本大小5 最大執行緒數為10 執行緒最大空閒時間10分鐘
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.MINUTES, blockingQueue,
                handler);

        // 提交5個job
        for (int i = 0; i < 5; i++) {
            final int nid = i; //nid是每個非同步任務處理佇列的唯一識別符號
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    //不停的從資料庫中獲取“傳送註冊簡訊“和“傳送註冊郵件“等任務並完成
                }
            });
        }

3.3任務的抽象介面和實現

無論是“傳送註冊簡訊“和“傳送註冊郵件“,都屬於被非同步處理都任務,需抽象出一個介面,這樣各個非同步處理執行緒才能方便的執行

需要非同步處理任務的介面:

Job.java

public interface Job {
  /**
   *@params _id如前所述,每個待處理待任務都是mongo中的一條記錄,該引數為mongo主鍵
   * param 該任務需要的引數
   *@return 8:處理失敗 9:處理成功
   */
  public abstract int exec(String _id, String param);
}

傳送註冊簡訊任務
MessageJob.java

public class MessageJob implements Job{
    public int exec(String _id, String param) {
        try {
          //TODO 傳送簡訊的程式碼
          return 9;
        }catch(Exception e) {
          return 8;
        }
    }

}

傳送註冊簡訊任務
EmailJob.java

public class EmailJob implements Job{
    public int exec(String _id, String param) {
        try {
          //TODO 傳送郵件的程式碼
          return 9;
        }catch(Exception e) {
          return 8;
        }
    }

}

3.4非同步任務處理執行緒的實現

這是最為核心的內容,程式碼實現如下:

public class JobTread {
    //當前job執行緒從資料庫中加載出的任務
    private List<TaskQueue> prepareExecTask;
    //當前執行緒正在處理的任務數。該變數需要每格一段時間心跳給ThreadInstance,需要volatile修飾
    private volatile int nowExecTasks = 0;
    //上次心跳到當前心跳到時間中處理的程序數,用做記錄當前執行緒已處理的task總數。該變數需要每格一段時間心跳給ThreadInstance,需要volatile修飾
    private volatile int execedTasks = 0;
    @Autowired
    private ThreadInstanceService processService;
    private Timer timer = new Timer();
    @Autowired
    private TaskQueueService taskQueueService;
    private String[] jobs = { "com.mq.job.jobImpl.EmailJob", "com.mq.job.jobImpl.MessageJob" };
    //當前非同步任務處理的唯一識別符號
    private int nid;
    //儲存所有任務的例項(根據jobs類路徑資料反射)
    private Map<String, Job> jobMap;

    public JobTread() {

    }

    public JobTread(int nid) {
        this.nid = nid;
        this.jobMap = getJobMap();
        openHeartbeat(Thread.currentThread().getId());
    }
    /**
     *根據所有任務的類路徑反射出執行例項並存儲在Map中,key為類名,也是TaskQueue模型中的name
     */
    private Map<String, Job> getJobMap() {
        jobMap = new HashMap<String, Job>();
        // 根據jobs中的類名,反射出Class,根據類名為key,Class為value存入map
        for (int i = 0; i < jobs.length; i++) {
            String classPath = jobs[i];
            try {
                Class<Job> clazz = (Class<Job>) Class.forName(classPath);
                String className[] = clazz.getName().split("\\.");
                jobMap.put(className[className.length - 1], clazz.newInstance());
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return jobMap;
    }

    /**
     * 每個Thread載入處理的task
     * @return
     */
    public int loadTask() {

        //從資料庫中peak出一定數量的task
        prepareExecTask = taskQueueService.peaks(nid);
        nowExecTasks= prepareExecTask.size();
        prepareExecTask.stream().forEach(it -> this.exec(it));

        return prepareExecTask.size();
    }
    /**
     *使用Timer開啟當前執行緒對TaskQueue模型的心跳。每一定的時間會將當前執行緒正在處理的任務數和已經處理的任務數記錄到資料庫中
     */
    private void openHeartbeat(long threadId) {
        // 開啟一個定時
        timer.schedule(new TimerTask() {

            @Override
            public void run() {
                // TaskProcess的心跳,隨時更新當前獲取時間和正常處理的task數量
                processService.touch(nid, threadId, nowExecTasks, execedTasks);
                execedTasks = 0;
            }
        }, 0, 1000*30);
    }

    /**
     * 執行一個task
     */
    public void exec(TaskQueue take) {
        if (take==null) {
            return;
        }
        // 獲取Job例項(com.mq.job.jobImpl下的例項)
        Job jobInstance = jobMap.get(take.getName());
        // Job來執行task
        int result = jobInstance.exec(take.get_id(), take.getParam());

        switch (result) {
        case 8:
            // 處理失敗, 置Task.status為8
            taskQueueService.execFail(take.get_id());
            break;
        case 9:
            // 處理成功,置Task.status為9
            taskQueueService.execSuccess(take.get_id());
            break;
        }

        nowExecTasks--;
        execedTasks++;

    }
}

該類主要提供了四個方法:

  • 根據類路徑反射所有的具體的任務類(“傳送註冊簡訊“和“傳送註冊郵件“)的實現,儲存與Map中。key為類名,也是TaskQueue(儲存非同步任務的模型中的name)
  • 從TaskQueue中取出一定數量的任務來處理,並實時維護當前正在處理的任務數和已處理完成的任務數
  • 將當前正在處理的任務數和已處理完成的任務數資訊實時更新到ThreadInstance例項中
  • 具體的任務執行,根據TaskQueue.name從Map中取出任務實現,並執行exec方法,具體返回值標示給任務是否處理成功

四.ThreadInstance模型的DAO方法

ThreadInstance即為非同步任務處理執行緒的例項,即提交給執行緒池的任務例項。該模型的唯一標示符為建立時的序號id,並非執行緒id,因為考慮到重啟時執行緒id可能會改變。故將向執行緒池中提交的序號id(nid)作為唯一索引。

DAO方法僅提供一個touch心跳方法,即在Timer中定時執行,來傳遞當前正在處理的任務數和距離上次心跳後處理的任務總數

public class ThreadInstanceService {
    @Autowired
    private ThreadDao threadDao;
    /**
     *@params nid 當前執行緒---非同步任務處理佇列的唯一標示符
     * threadId執行緒ID
     * taskSize 當前正在處理的任務數
     * execedTasks 距離上次心跳後處理的任務總數
     */
    public void touch(int nid,long threadId, int taskSize, int execedTasks) {
        ThreadInstance thread = threadDao.findAndUpdatByNid(nid,threadId, taskSize, execedTasks);
        //如果當前執行緒還未創建於ThreadInstance,則建立
        if (thread == null) {
             threadDao.create(nid, threadId);
        }
    }

}

public class ThreadDaoImpl implements ThreadDao{
    @Autowired
    private MongoTemplate template;
    /**
     db.ThreadInstance.update({
       nid: nid
     },{
       $set: {
         pid: threadId,
         taskCounts: taskSize,
         update: new Date()
       },
       $inc: {
         execedTasks: execedTasks
       }
    })
     */
    public ThreadInstance findAndUpdatByNid(long nid, long threadId, int taskSize, int execedTasks) {

        return template.findAndModify(Query.query(Criteria.where("nid").is(nid)), new Update().set("pid", threadId).set("update", new Date()).set("taskCounts", taskSize).inc("execedTasks", execedTasks), ThreadInstance.class);
    }
    /**
     db.ThreadInstance.create({
       nid: nid,
       pid: threadId,
       taskCounts: 0,
       execedTasks: 0,
     })

    **/
    @Override
    public void create(int nid, long threadId) {
        template.insert(new ThreadInstance(nid,threadId,0,0));
    }

}

五.ThreadQueue模型的DAO方法

ThreadQueue模型的每一個記錄即為待非同步處理的任務,如需要“傳送註冊簡訊“和“傳送註冊郵件“等。

如前所示,ThreadQueue主要提供如下三個方法:

  • 從執行緒池中獲取一定數量的task
  • 標註當前任務已處理成功
  • 標註當前任務處理失敗

由於從執行緒池中獲取一定數量的任務是多個執行緒同時獲取的,因次可能存在多個執行緒獲取到同一個任務的可能。相信讀者也知道Mongo一個操作是原子性的,因為可以使用findOneAndUpdate,{status:0,{$set:{status:1}}}。這樣固然不會存在多個執行緒獲取到同一個記錄到情況,筆者第一次也是這麼實現的。但當將其放入生產環境後,發現這樣每次僅僅從資料庫中讀一個記錄來處理效率非常非常低。大量的任務被阻塞到資料庫中。
因此,需要一種一次性可以獲取多條記錄,又避免重複獲取的方法。TastQueue中的threadNid欄位就是為此而生

public class TaskQueueService {

    private TaskQueueDao taskQueueDao = new TaskQueueDaoImpl();

    /**
     * 獲取該次peak中的優先順序
     * 返回為1的概率最大,2,3,4,5依次
     * @return
     */
    public int roll() {
        Double random = Math.random();
        for (int i = 1; i <= 5; i++) {
            if (random > 1 / Math.pow(2, i)) {
                return i;
            }
        }
        return 1;
    }

    /**
     * 從執行緒池中獲取一定數量的task
     * 
     * @param threadNid
     *            需要獲取task的threadNid.非執行緒id而是作為唯一索引的nid
     * @return List<TaskQueue>
     */
    public List<TaskQueue> peaks(int threadNid) {

        // 獲取task狀態為0,1(初始化,處理中)。避免將載入到記憶體中task因重啟而丟失
        List<Integer> statuses = new ArrayList<Integer>();
        statuses.add(0);
        statuses.add(1);

        // 獲取task的tag為null和當前執行緒的
        List<Integer> tags = new ArrayList<Integer>();
        tags.add(null);
        tags.add(threadNid);

        //獲取一個優先順序
        int priority = this.roll();

        //獲取出未處理的和當前執行緒正在處理的TaskQueue
        List<TaskQueue> taskQueues = taskQueueDao.peaks(statuses, tags, priority);
        //取出所有的_id。 多個執行緒可能會獲取出相同的任務,均是未處理的
        List<String> ids = taskQueues.stream().map(it -> it.get_id()).collect(Collectors.toList());

        // 將上述獲取到所有未處理的或當前執行緒正在處理的所有執行緒全部更新為當前執行緒正常處理
        // 此處的更新的條件除了$in:ids外,還必須threadNid為null。
        // 對於一個任務來說,無論有多少個執行緒獲取到它。但只能有一個執行緒更新成功“處理中”,即status更新為1,threadNid更新為當前執行緒nid
        int updateSize = taskQueueDao.execing(ids, threadNid);

        // 如果更新出的數量為0,說明所有的task全部是處理中的,返回空
        if (updateSize == 0) {
            return new ArrayList<TaskQueue>();
        }

        // 此時,再次獲取出所有當前執行緒處理中的task,即多個執行緒不會返回相同的任務給上級
        List<Integer> execStatuses = new ArrayList<Integer>();
        execStatuses.add(1);
        List<Integer> execTags = new ArrayList<Integer>();
        execTags.add(threadNid);

        return taskQueueDao.peaks(execStatuses, execTags, priority);
    }

    //更新status為9
    public void execSuccess(String _id) {
        taskQueueDao.success(_id);
    }

    //更新status為8
    public void execFail(String _id) {
        taskQueueDao.fail(_id);
    }
}
public class TaskQueueDaoImpl implements TaskQueueDao{
    @Autowired
    private MongoTemplate template;
    /**
    db.TaskQueue.find({
      status: {$in: statuses},
      threadNid: {$in: threadNids},
      priority: priority,
    }).limit(20)
    **/
    @Override
    public List<TaskQueue> peaks(List<Integer> statuses, List<Integer> threadNids, int priority) {
        return template.find(Query.query(Criteria.where("status").in(statuses).and("threadNid").in(threadNids).and("priority").is(priority)).limit(20), TaskQueue.class);
    }

    /**
    db.TaskQueue.update({_id: _id},{$set:{status: 9}})
    **/
    @Override
    public void success(String _id) {

        template.updateFirst(Query.query(Criteria.where("_id").is(_id)), Update.update("status", 9), TaskQueue.class);
    }

    /**
    db.TaskQueue.update({_id: _id},{$set:{status: 8}})
    **/
    @Override
    public void fail(String _id) {

        template.updateFirst(Query.query(Criteria.where("_id").is(_id)), Update.update("status", 3), TaskQueue.class);
    }

    @Override
    public int execing(List<String> _ids, int threadNid) {

        WriteResult writeResult = template.updateMulti(Query.query(Criteria.where("_id").in(_ids).and("threadNid").is(null)), Update.update("status", 1).set("threadTag", threadNid), TaskQueue.class);
        return writeResult.getN();
    }
}

六.測試

執行如下程式碼在TaskQueue中建立300個任務,然後啟動執行緒池,提交5個執行緒來處理這300個任務。

    public static void testData() {

          new Thread(new Runnable() {

            @Override
            public void run() {
                for(int i=0;i<150;i++){
                    getMongoTemplate().save(new TaskQueue("EmailJob",UUID.randomUUID().toString()+"@sina.com"));
                    getMongoTemplate().save(new TaskQueue("MessageJob","隨機手機號"));

                }
            }
        }).start();
    }
// 提交5個job
        for (int i = 0; i < 5; i++) {
            final int nid = i;
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    exec(nid);
                }
            });
        }

public static void exec(int nid) {
        JobTread thread = new JobTread(nid);
        while (true) {
            int tasks = thread.loadTask();
            if (tasks == 0) {
                // 休息一會
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }

此時會在ThreadInstance模型中產生5條記錄,並且實時會更新出每個執行緒正常處理的任務數,和處理的任務總數。當執行完成後,執行如下語句,輸出300:

 db.getCollection('threadInstance').aggregate( [
     {
       $group:
         {
           _id: 'execedTasks',
          count:{$sum:'$execedTasks'}
         }
     }
   ])