1. 程式人生 > >輕量級分散式延時任務處理元件easyTask-L-入門篇

輕量級分散式延時任務處理元件easyTask-L-入門篇

  今天給大家介紹一款新武器。我自研的一個java元件easyTask-L。這個是做啥的呢?我之前研發了一款單機版本的easyTask,這次是要介紹另外一款easyTask-L。區別就是後者支援分散式環境,任務資料支援多個備份,具備了真正意義上的高可用。同時它又是輕量級的分散式應用,原因是因為它還不是一個獨立的中介軟體,它需要一個宿主程式才能使用。做成獨立的中介軟體是我後面要繼續做的一個版本。

  元件開源地址:https://github.com/liuche51/easyTask-L

  廢話不多說,先來介紹下easyTask-L元件的特性。

                 

  高可用:因為我們是分散式leader-follow叢集,每個任務多有多個備份資料,所以可靠性非常高

  秒級觸發:我們是採用時鐘秒級分片的資料結構,支援秒級觸發任務。不早也不遲

  分散式:元件支援分散式

  高併發:支援多執行緒同時提交任務,支援多執行緒同時執行任務

  資料一致性:使用TCC事務機制,保障資料在叢集中的強一致性

  海量任務:節點可以儲存非常多的任務,只要記憶體和磁碟足夠。觸發效率也是極高。需要配置好分派任務執行緒池和執行任務執行緒池大小即可

  開源:元件完全在GitHub上開源。任何人都可以隨意使用,在不侵犯著作權情況下

  易使用:無需獨立部署叢集,嵌入式開發。不過多的依賴於第三方中介軟體,除了zookeeper。

  easyTask-L元件的整體架構如下:

  整體採用分散式設計,leader-follow風格。叢集中每一個節點都是leader,同時也可能是其他某個節點的follow。每個leader都有若干個follow。leader上提交的新任務都會強制同步到follow中,刪除任務同時也會強制刪除follow中的備份任務。叢集中所有節點都會在zookeeper中註冊並維持心跳。

  easyTask-L元件的核心“環形佇列”的設計架構如下:

  環形佇列在之前單機版的easyTask中也講過,原理都是類似的。客戶端提交任務,服務端先將任務進行持久化,再新增上環形佇列這個資料結構中去,等待時間片輪詢的到來。不同的是這裡的持久化機制,改成了分散式儲存了。不僅leader自己儲存起來,還要同步儲存到其follow中去。刪除一個任務也是類似的過程。

  任務新增時會計算其觸發所屬的時間分片槽,等環形佇列的始終秒針到達時會判斷任務是否可以被執行了。如果可以執行了,則分派任務執行緒池將其丟入執行任務執行緒池等待執行。只要執行任務執行緒池執行緒數足夠,任務將立即得到執行。

   大概的原理清晰了,接下來就是寫個HelloWorld程式了!

  easyTask-L不是一箇中間件,所以需要一個宿主程式。建議在微服務框架如:dubbo、spring-cloud中使用此元件,並建立一個獨立的專門用於處理延時任務的服務模組。這樣可以使服務儘可能少的頻繁更新重啟。保持叢集的穩定性。下面我將以一個springboot應用為例來給大家演示如何使用easyTask-L元件

  第一步:引入jar包

 

   如果你是Maven專案,可以使用如下方式配置引入jar包。這可以讓專案自動引入easyTask-L中依賴的其他第三方jar包。最新版本請在maven中央倉庫中查詢。請在pom.xml中加入以下引用

 <dependency>
       <groupId>com.github.liuche51</groupId>
       <artifactId>easyTask-L</artifactId>
       <version>1.0.1</version>
 </dependency>

  第二步:配置啟動環形佇列

  這裡以springboot應用為例,在application.yml中做如下配置

server:
   port: 8081
spring:
   application:
      name: easyTask-L
easyTaskL:
   zkAddress: 127.0.0.1:2181
   taskStorePath: C:/db/node1
   serverPort: 2021
   sQLlitePoolSize: 5
   backupCount: 2
   dispatchPool:
      corePoolSize: 5
      maximumPoolSize: 50
   workPool:
      corePoolSize: 5
      maximumPoolSize: 50

  新建一個啟動配置類EasyTaskLConf.java

 1 package com.github.liuche51.easyTaskL.config;
 2 
 3 import com.github.liuche51.easyTask.core.AnnularQueue;
 4 import com.github.liuche51.easyTask.core.EasyTaskConfig;
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 import org.springframework.beans.factory.annotation.Value;
 8 import org.springframework.context.annotation.Bean;
 9 import org.springframework.context.annotation.Configuration;
10 
11 import java.util.concurrent.LinkedBlockingQueue;
12 import java.util.concurrent.ThreadPoolExecutor;
13 
14 @Configuration
15 public class EasyTaskLConf {
16     private static Logger log = LoggerFactory.getLogger(EasyTaskLConf.class);
17     @Value("${easyTaskL.zkAddress}")
18     private String zkAddress;
19     @Value("${easyTaskL.taskStorePath}")
20     private String taskStorePath;
21     @Value("${easyTaskL.serverPort}")
22     private int serverPort;
23     @Value("${easyTaskL.sQLlitePoolSize}")
24     private int sQLlitePoolSize;
25     @Value("${easyTaskL.backupCount}")
26     private int backupCount;
27     @Value("${easyTaskL.dispatchPool.corePoolSize}")
28     private int dispatchCorePoolSize;
29     @Value("${easyTaskL.dispatchPool.maximumPoolSize}")
30     private int dispatchMaximumPoolSize;
31     @Value("${easyTaskL.workPool.corePoolSize}")
32     private int workPoolCorePoolSize;
33     @Value("${easyTaskL.workPool.maximumPoolSize}")
34     private int workPoolMaximumPoolSize;
35     @Bean
36     public AnnularQueue initAnnularQueue(){
37         try {
38             EasyTaskConfig config =new EasyTaskConfig();
39             config.setTaskStorePath(taskStorePath);
40             config.setServerPort(serverPort);
41             config.setSQLlitePoolSize(sQLlitePoolSize);
42             //config.setBackupCount(backupCount);
43             config.setZkAddress(zkAddress);
44             AnnularQueue annularQueue = AnnularQueue.getInstance();
45             config.setDispatchs(new ThreadPoolExecutor(dispatchCorePoolSize, dispatchMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS,
46                     new LinkedBlockingQueue<Runnable>()));
47             config.setWorkers(new ThreadPoolExecutor(workPoolCorePoolSize, workPoolMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS,
48                     new LinkedBlockingQueue<Runnable>()));
49             annularQueue.start(config);
50             return annularQueue;
51         }catch (Exception e){
52             log.error("",e);
53              return null;
54         }
55     }
56 
57 }
EasyTaskLConf.java

  第三步:建立延時任務處理類

package com.github.liuche51.easyTaskL.task;
import com.github.liuche51.easyTask.dto.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;

public class CusTask1 extends Task implements Runnable {
    private static Logger log = LoggerFactory.getLogger(CusTask1.class);
    @Override
    public void run() {
        Map<String, String> param = getParam();
        if (param != null && param.size() > 0) {
            log.info("任務1已執行!姓名:{} 生日:{} 年齡:{} 執行緒ID:{}", param.get("name"), param.get("birthday"), param.get("age"), param.get("threadid"));
        }
    }
}

  第四步:向環形佇列中新增任務

  新建一個Controller,增加以下Action方法。

@RequestMapping("/once")
@ResponseBody
public String once(@RequestParam("name") String name, @RequestParam("time") int time) {
	CusTask1 task1 = new CusTask1();
	task1.setEndTimestamp(ZonedDateTime.now().plusSeconds(time).toInstant().toEpochMilli());
	Map<String, String> param = new HashMap<String, String>() {
		{
			put("name", name);
			put("birthday", "1996-1-1");
			put("age", "28");
			put("threadid", String.valueOf(Thread.currentThread().getId()));
		}
	};
	task1.setParam(param);
	return AnnularQueue.getInstance().submitAllowWait(task1);
}

  完整的demo可以使用Git克隆我的一個開源專案:https://gitee.com/liuche/DubboServer.git  找到子專案easyTask-L-demo