1. 程式人生 > >spring boot 定時任務基於zookeeper的分布式鎖實現

spring boot 定時任務基於zookeeper的分布式鎖實現

刪除 etl .org stack pub 具體實現 spec 分享 -i

基於ZooKeeper分布式鎖的流程

  • 在zookeeper指定節點(locks)下創建臨時順序節點node_n
  • 獲取locks下所有子節點children

  • 對子節點按節點自增序號從小到大排序

  • 判斷本節點是不是第一個子節點,若是,則獲取鎖;若不是,則監聽比該節點小的那個節點的刪除事件

  • 若監聽事件生效,則回到第二步重新進行判斷,直到獲取到鎖

具體實現

添加Maven依賴:

技術分享圖片
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.falsh</groupId> <artifactId>mytiming</artifactId> <version>
1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> <relativePath /> <!-- lookup parent from repository
--> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.alibaba.spring.boot</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>2.0.0</version> <exclusions> <exclusion> <artifactId>spring-context</artifactId> <groupId>org.springframework</groupId> </exclusion> <exclusion> <artifactId>spring-core</artifactId> <groupId>org.springframework</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.3.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
View Code

代碼:

技術分享圖片
package com.falsh.tss.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class CommonEnv {

    public static String ZK_ADDRESS;
    public static String ZK_LOCK_PATH;

    @Autowired
    public void setZkAddress(@Value("${zk.address}") String zkAddress) {
        ZK_ADDRESS = zkAddress;
    }

    @Autowired
    public void setZkLockPath(@Value("${zk.lock.path}") String zkLockPath) {
        ZK_LOCK_PATH = zkLockPath;
    }
}
View Code 技術分享圖片
package com.falsh.tss.global;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.falsh.tss.mutex.MutexLock;

@Aspect
@Component
public class ControlExecJobAspect {

    private final Logger logger = LoggerFactory.getLogger(ControlExecJobAspect.class);

    @Pointcut("execution(public * com.zhifu.tss.jobs..*(..))")
    public void jobExec() {
    }

    @Around("jobExec()")
    public void doAround(ProceedingJoinPoint pjp) throws Throwable{
        String jobClass = pjp.getTarget().getClass().getSimpleName();
        String methodName = ((MethodSignature) pjp.getSignature()).getMethod().getName();

        if (!MutexLock.isInstantiated() || !MutexLock.getInstance().isAcquiredLock()) {
            logger.info("-- None lock acquired ! {}.{} --", jobClass, methodName);
            return;
        }

        logger.info("------- start {}.{} --------", jobClass, methodName);
        long start = System.currentTimeMillis();

        try {
            pjp.proceed();
        } catch (Exception e) {
            logger.error("global erorr occured while {}.{}", jobClass, methodName, e);
        }

        long end = System.currentTimeMillis();
        logger.info("------- end {}.{}({}ms) --------", jobClass, methodName, end - start);
    }

}
View Code 技術分享圖片
package com.falsh.tss.jobs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class MyJob1 {

    private final static Logger logger = LoggerFactory.getLogger(MyJob1.class);

    @Scheduled(cron="${cron.job.myJob1}")
    public void execute(){
        logger.info("我在執行定時任務1.....");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
View Code 技術分享圖片
package com.falsh.tss.mutex;

import static com.falsh.tss.config.CommonEnv.ZK_ADDRESS;
import static com.falsh.tss.config.CommonEnv.ZK_LOCK_PATH;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;

public class MutexLock implements InitializingBean, DisposableBean {

    private static CuratorFramework client;
    private static InterProcessLock mutexLock;
    private static volatile boolean acquiredLock;
    private static volatile MutexLock instance;

    public MutexLock() {
        client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new RetryNTimes(10,5000));
        client.start();
        mutexLock =  new InterProcessMutex(client, ZK_LOCK_PATH);
    }

    public static MutexLock getInstance(){
        if (instance == null) {
            synchronized (MutexLock.class) {
                if (instance == null) {//二次檢查
                    instance = new MutexLock();
                }
            }
        }
        return instance;
    }

    public static boolean isInstantiated() {
        if (instance == null) {
            return false;
        }
        return true;
    }

    //獲取鎖
    public static void acquireMutexLock() throws Exception {
        mutexLock.acquire();
        acquiredLock = true;
    }

    //釋放鎖
    public static void releaseMutexLock() throws Exception {
        mutexLock.release();
    }

    public static boolean isAcquiredLock() {
        return acquiredLock;
    }


    @Override
    public void destroy() throws Exception {

    }

    @Override
    public void afterPropertiesSet() throws Exception {

    }
}
View Code 技術分享圖片
package com.falsh.tss;

import com.alibaba.dubbo.spring.boot.annotation.EnableDubboConfiguration;
import com.falsh.tss.mutex.MutexLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@EnableDubboConfiguration
@EnableAutoConfiguration
@SpringBootApplication
@ComponentScan("com.zhifu")
public class Application {

    private static final Log logger = LogFactory.getLog(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);

        try {
            MutexLock.getInstance().acquireMutexLock();
            while (true) {
                logger.warn("鎖在我這");
                Thread.sleep(60000);
            }
        } catch (Exception e) {
            logger.info("獲取鎖失敗", e);
        } finally{
            try {
                MutexLock.getInstance().releaseMutexLock();
                logger.warn("釋放了鎖");
            } catch (Exception e) {
                logger.error("釋放鎖異常", e);
            }
        }
    }
}
View Code

配置:

application-dev.properties


spring.dubbo.application.name=myjobtss
spring.dubbo.registry.address=zookeeper://192.168.x.x:2181
spring.dubbo.protocol.name=dubbo

cron.job.myJob1=0 0/1 * * * ?

zk.address=192.168.x.x:2181
zk.lock.path=/myjob-quartz-locks

spring boot 定時任務基於zookeeper的分布式鎖實現