1. 程式人生 > >分散式環境保證定時任務的冪等性

分散式環境保證定時任務的冪等性

提供介面服務的API部署在了8臺機器上,要保證定時任務只在一臺機器上跑,因為有些定時任務不能同時進行,並且多臺機器同時執行定時任務也浪費了資源,這就涉及到鎖的問題。

方案一:根據機器的IP來限制 

因為部署服務的8臺機器ip是已知的,那麼就通過ip來限制哪幾臺機器上的應用可以跑定時任務,獲取本伺服器ip方法,請移步這裡

@Component
public class RegularTask {
    @Lazy
    @Scheduled(cron = "")
    public void send() {
        String ip = IPUtil.getLocalIP(); //獲取本臺伺服器ip
        String allowIp = PropertiesUtil.get("RUN_TASK_IP");//允許的跑定時任務的ip放在properties檔案中或者配置在資料庫中
        if (allowIp.indexOf(ip) == -1) { //ip不匹配直接return
            return;
        }
        //TODO  do task
    }
}

方案二:redis的分散式鎖實現

讓分散式應用去爭取鎖,誰搶到了誰執行。注意很多人會將redis.setInx(key,value)方法用於實現分散式鎖,但是這個方法有漏洞,更多的關於redis分散式鎖,請移步這裡

@Component
public class RegularTask {

    @Autowired
    private JedisPool jedisPool;

    @Lazy
    @Scheduled(cron = "")
    public void send() {
        Jedis jedis =null;
        try {
            jedis = jedisPool.getResource();
            boolean isLock = this.tryLock(jedis, "lock_ip_key", RandomStringUtils.randomNumeric(16), 1800); //過期時間為30min
            if (isLock) {
                //TODO  do task
            }
        }catch (Exception e){
            log.error("");
        }finally {
            if(jedis!=null){
                jedis.close();
            }
        }
    }

    private static final String LOCKED_SUCCESS = "OK";
    private static final String NX = "NX";
    private static final String EXPIRE_TIME = "EX";

    public static boolean tryLock(Jedis jedis, String lockKey, String uniqueId, long expireTime) {
        String result = jedis.set(lockKey, uniqueId, NX, EXPIRE_TIME, expireTime);
        return LOCKED_SUCCESS.equals(result);
    }
}

方案三:zookeeper客戶端curator

利用zookeeper來實現Master選舉,只有Master機器(leader)上能執行定時任務。分散式機器同時在同一節點下建立子節點,而zookeeper保證了只有一個能建立成功,Curator裡面就封裝了這些操作。選舉分為Leader Latch和Leader Election兩種選舉方案,這裡使用Leader Latch實現。更多的關於zookeeper客戶端curator,請移步這裡

<dependency>
   <groupId>org.apache.curator</groupId>
   <artifactId>curator-recipes</artifactId>
   <version>2.4.1</version>
</dependency>
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class RegularTask {
    private static final Logger log = LoggerFactory.getLogger(RegularTask.class);
    //zookeeper例項數,要大於等於3
    private static final String ZOOKEEPER_STR = "10.25.142.55:2181,10.48.24.36:2181,10.48.24.36:2182,10.48.124.36:2181,10.48.125.36:2181";
    private static CuratorFramework curatorFramework;
    private static LeaderLatch leaderLatch;


    static {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        curatorFramework = CuratorFrameworkFactory.newClient(ZOOKEEPER_STR, retryPolicy);
        curatorFramework.start();
        leaderLatch = new LeaderLatch(curatorFramework, "/regulartask");
        try {
            leaderLatch.start();
        } catch (Exception e) {
            log.error("LeaderLatch start error:{}", e);
        }
    }
    
    @Lazy
    @Scheduled(cron = "××")
    public void send() {
        try {
            if (!leaderLatch.hasLeadership()) {
                log.info("current mechine is not a leader");
                return;
            }
            //TODO 定時任務邏輯
        } catch (Exception e) {
            log.error("regulartask run error:{}", e);
        } finally {
            try {
                if (leaderLatch != null) {
                    leaderLatch.close();
                }
            } catch (IOException e) {
                log.error("leaderLatch close error:{}", e);
                e.printStackTrace();
            }
        }
    }
}