1. 程式人生 > >雪花算法中機器id保證全局唯一

雪花算法中機器id保證全局唯一

elong mda druid ping oplog 更新 主鍵 star span

關於分布式id的生成系統, 美團技術團隊之前已經有寫過一篇相關的文章, 詳見 Leaf——美團點評分布式ID生成系統

通常在生產中會用Twitter開源的雪花算法來生成分布式主鍵 雪花算法中的核心就是機器id和數據中心id, 通常來說數據中心id可以在配置文件中配置, 通常一個服務集群可以共用一個配置文件, 而機器id如果也放在配置文件中維護的話, 每個應用就需要一個獨立的配置, 難免也會出現機器id重復的問題

解決方案: 1. 通過啟動參數去指定機器id, 但是這種方式也會有出錯的可能性 2. 每個應用啟動的時候註冊到redis或者zookeeper, 由redis或zookeeper來分配機器id

接下來主要介紹基於redis的實現方式, 一種是註冊的時候設置過期時間, 配置定時器定時去檢查機器id是否過期需要重新分配; 另一種是不設置過期時間, 只依靠在spring容器銷毀的時候去刪除記錄(但是這種方式容易刪除失敗)

實現方式一

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.10.RELEASE</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!-- 日誌包...開始 -->
    <!-- log配置:Log4j2 + Slf4j -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
    </dependency>
    <dependency> <!-- 橋接:告訴Slf4j使用Log4j2 -->
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
    </dependency>
    <dependency> <!-- 橋接:告訴commons logging使用Log4j2 -->
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-jcl</artifactId>
        <version>2.2</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <!-- 日誌包...結束 -->
</dependencies>

redis的配置

/**
 * redis的配置
 *
 * @author wang.js on 2019/3/8.
 * @version 1.0
 */
@Configuration
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port:6379}")
    private Integer port;

    @Bean
    public JedisPool jedisPool() {
        //1.設置連接池的配置對象
        JedisPoolConfig config = new JedisPoolConfig();
        //設置池中最大連接數
        config.setMaxTotal(50);
        //設置空閑時池中保有的最大連接數
        config.setMaxIdle(10);
        config.setMaxWaitMillis(3000L);
        config.setTestOnBorrow(true);
        //2.設置連接池對象
        return new JedisPool(config,host,port);
    }

}

snowflake算法中機器id的獲取

/**
 * snowflake算法中機器id的獲取
 *
 * @author wang.js on 2019/3/8.
 * @version 1.0
 */
@Configuration
public class MachineIdConfig {

    @Resource
    private JedisPool jedisPool;

    @Value("${snowflake.datacenter}")
    private Integer dataCenterId;

    @Value("${snowflake.bizType}")
    private String OPLOG_MACHINE_ID_kEY;

    /**
     * 機器id
     */
    public static Integer machineId;
    /**
     * 本地ip地址
     */
    private static String localIp;
    private static TimeUnit timeUnit = TimeUnit.DAYS;

    private static final Logger LOGGER = LoggerFactory.getLogger(MachineIdConfig.class);

    /**
     * 獲取ip地址
     *
     * @return
     * @throws UnknownHostException
     */
    private String getIPAddress() throws UnknownHostException {
        InetAddress address = InetAddress.getLocalHost();
        return address.getHostAddress();
    }

    /**
     * hash機器IP初始化一個機器ID
     */
    @Bean
    public SnowFlakeGenerator initMachineId() throws Exception {
        localIp = getIPAddress();

        Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));
        //這裏取128,為後續機器Ip調整做準備。
        machineId = ip_.hashCode() % 32;
        //創建一個機器ID
        createMachineId();
        LOGGER.info("初始化 machine_id :{}", machineId);

        return new SnowFlakeGenerator(machineId, dataCenterId);
    }

    /**
     * 容器銷毀前清除註冊記錄
     */
    @PreDestroy
    public void destroyMachineId() {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.del(OPLOG_MACHINE_ID_kEY + dataCenterId + machineId);
        }
    }


    /**
     * 主方法:獲取一個機器id
     *
     * @return
     */
    public Integer createMachineId() {
        try {
            //向redis註冊,並設置超時時間
            Boolean aBoolean = registerMachine(machineId, localIp);
            //註冊成功
            if (aBoolean) {
                //啟動一個線程更新超時時間
                updateExpTimeThread();
                //返回機器Id
                return machineId;
            }
            //檢查是否被註冊滿了.不能註冊,就直接返回
            if (!checkIfCanRegister()) {
                //註冊滿了,加一個報警
                return machineId;
            }
            LOGGER.info("createMachineId->ip:{},machineId:{}, time:{}", localIp, machineId, new Date());

            //遞歸調用
            createMachineId();
        } catch (Exception e) {
            getRandomMachineId();
            return machineId;
        }
        getRandomMachineId();
        return machineId;
    }

    /**
     * 檢查是否被註冊滿了
     *
     * @return
     */
    private Boolean checkIfCanRegister() {
        Boolean flag = true;
        //判斷0~127這個區間段的機器IP是否被占滿
        try (Jedis jedis = jedisPool.getResource()) {
            for (int i = 0; i <= 127; i++) {
                flag = jedis.exists(OPLOG_MACHINE_ID_kEY + dataCenterId + i);
                //如果不存在。說明還可以繼續註冊。直接返回i
                if (!flag) {
                    machineId = i;
                    break;
                }
            }
        }
        return !flag;
    }

    /**
     * 1.更新超時時間
     * 註意,更新前檢查是否存在機器ip占用情況
     */
    private void updateExpTimeThread() {
        //開啟一個線程執行定時任務:
        //1.每23小時更新一次超時時間
        new Timer(localIp).schedule(new TimerTask() {
            @Override
            public void run() {
                //檢查緩存中的ip與本機ip是否一致, 一致則更新時間,不一致則重新獲取一個機器id
                Boolean b = checkIsLocalIp(String.valueOf(machineId));
                if (b) {
                    LOGGER.info("更新超時時間 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                    try (Jedis jedis = jedisPool.getResource()) {
                        jedis.expire(OPLOG_MACHINE_ID_kEY + dataCenterId + machineId, 60 * 60 * 24 * 1000);
                    }
                } else {
                    LOGGER.info("重新生成機器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                    //重新生成機器ID,並且更改雪花中的機器ID
                    getRandomMachineId();
                    //重新生成並註冊機器id
                    createMachineId();
                    //更改雪花中的機器ID
                    SnowFlakeGenerator.setWorkerId(machineId);
                    // 結束當前任務
                    LOGGER.info("Timer->thread->name:{}", Thread.currentThread().getName());
                    this.cancel();
                }
            }
        }, 10 * 1000, 1000 * 60 * 60 * 23);
    }

    /**
     * 獲取1~127隨機數
     */
    public void getRandomMachineId() {
        machineId = (int) (Math.random() * 127);
    }

    /**
     * 機器ID順序獲取
     */
    public void incMachineId() {
        if (machineId >= 127) {
            machineId = 0;
        } else {
            machineId += 1;
        }
    }

    /**
     * @param mechineId
     * @return
     */
    private Boolean checkIsLocalIp(String mechineId) {
        try (Jedis jedis = jedisPool.getResource()) {
            String ip = jedis.get(OPLOG_MACHINE_ID_kEY + dataCenterId + mechineId);
            LOGGER.info("checkIsLocalIp->ip:{}", ip);
            return localIp.equals(ip);
        }
    }

    /**
     * 1.註冊機器
     * 2.設置超時時間
     *
     * @param machineId 取值為0~127
     * @return
     */
    private Boolean registerMachine(Integer machineId, String localIp) throws Exception {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.set(OPLOG_MACHINE_ID_kEY + dataCenterId + machineId, localIp);
            jedis.expire(OPLOG_MACHINE_ID_kEY + dataCenterId + machineId, 60 * 60 * 24 * 1000);
            return true;
        }
    }

}

雪花算法(雪花算法百度上很多, 自己可以隨便找一個)

/**
 * 雪花算法
 *
 * @author wang.js on 2019/3/8.
 * @version 1.0
 */
public class SnowFlakeGenerator {

    private final long twepoch = 1288834974657L;
    private final long workerIdBits = 5L;
    private final long datacenterIdBits = 5L;
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
    private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
    private final long sequenceBits = 12L;
    private final long workerIdShift = sequenceBits;
    private final long datacenterIdShift = sequenceBits + workerIdBits;
    private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    private static long workerId;
    private long datacenterId;
    private long sequence = 0L;
    private long lastTimestamp = -1L;

    public SnowFlakeGenerator(long actualWorkId, long datacenterId) {
        if (actualWorkId > maxWorkerId || actualWorkId < 0) {
            throw new IllegalArgumentException(String.format("worker Id can‘t be greater than %d or less than 0", maxWorkerId));
        }
        if (datacenterId > maxDatacenterId || datacenterId < 0) {
            throw new IllegalArgumentException(String.format("datacenter Id can‘t be greater than %d or less than 0", maxDatacenterId));
        }
        workerId = actualWorkId;
        this.datacenterId = datacenterId;
    }

    public static void setWorkerId(long workerId) {
        SnowFlakeGenerator.workerId = workerId;
    }

    public synchronized long nextId() {
        long timestamp = timeGen();
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }
        if (lastTimestamp == timestamp) {
            sequence = (sequence + 1) & sequenceMask;
            if (sequence == 0) {
                timestamp = tilNextMillis(lastTimestamp);
            }
        } else {
            sequence = 0L;
        }
        lastTimestamp = timestamp;
        return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;
    }

    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    protected long timeGen() {
        return System.currentTimeMillis();
    }
}

測試的controller

/**
 * 雪花算法
 *
 * @author wang.js on 2019/3/8.
 * @version 1.0
 */
@RequestMapping("/snowflake")
@RestController
public class SnowflakeController {

    @Resource
    private SnowFlakeGenerator snowFlakeGenerator;

    /**
     * 獲取分布式主鍵
     *
     * @return
     */
    @GetMapping("/get")
    public long getDistributeId() {
        return snowFlakeGenerator.nextId();
    }

}

配置文件

server:
  port: 12892
spring:
  redis:
    database: 0
    host: mini7
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        max-wait: -1
        min-idle: 0
    port: 6379
    timeout: 10000

snowflake:
  datacenter: 1 # 數據中心的id
  bizType: order_id_ # 業務類型

實現方式二

機器id註冊到redis的時候, 不設置過期時間 同時采用sharding-jdbc的分布式主鍵生成組件

maven依賴

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.10.RELEASE</version>
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <sharding-sphere.version>3.0.0.M4</sharding-sphere.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-aop</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <!--sharding-jdbc依賴開始-->
    <!-- for spring boot -->
    <dependency>
        <groupId>io.shardingsphere</groupId>
        <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
        <version>${sharding-sphere.version}</version>
    </dependency>

    <!-- for spring namespace -->
    <dependency>
        <groupId>io.shardingsphere</groupId>
        <artifactId>sharding-jdbc-spring-namespace</artifactId>
        <version>${sharding-sphere.version}</version>
    </dependency>
    <!--sharding-jdbc依賴結束-->

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.41</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid</artifactId>
        <version>1.1.0</version>
    </dependency>

    <!-- 日誌包...開始 -->
    <!-- log配置:Log4j2 + Slf4j -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
    </dependency>
    <dependency> <!-- 橋接:告訴Slf4j使用Log4j2 -->
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
    </dependency>
    <dependency> <!-- 橋接:告訴commons logging使用Log4j2 -->
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-jcl</artifactId>
        <version>2.2</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>
    <!-- 日誌包...結束 -->
</dependencies>

配置文件

server:
  port: 12893

# sharding-jdbc分庫分表的配置
sharding:
  jdbc:
    datasource:
      ds0:
        type: com.alibaba.druid.pool.DruidDataSource
        driver-class-name: com.mysql.jdbc.Driver
        url: jdbc:mysql://localhost:3306/ds0
        username: root
        password: 123456
      names: ds0
spring:
  redis:
    database: 0
    host: mini7
    lettuce:
      pool:
        max-active: 8
        max-idle: 8
        max-wait: -1
        min-idle: 0
    port: 6379
    timeout: 10000

snowflake:
  datacenter: 1 # 數據中心的id
  bizType: sharding_jdbc_id_ # 業務類型

redis的配置

/**
 * redis的配置
 *
 * @author wang.js on 2019/3/8.
 * @version 1.0
 */
@Configuration
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port:6379}")
    private Integer port;

    @Bean
    public JedisPool jedisPool() {
        //1.設置連接池的配置對象
        JedisPoolConfig config = new JedisPoolConfig();
        //設置池中最大連接數
        config.setMaxTotal(50);
        //設置空閑時池中保有的最大連接數
        config.setMaxIdle(10);
        config.setMaxWaitMillis(3000L);
        config.setTestOnBorrow(true);
        //2.設置連接池對象
        return new JedisPool(config,host,port);
    }

}

機器id的配置

/**
 * 保證workerId的全局唯一性
 *
 * @author wang.js on 2019/3/8.
 * @version 1.0
 */
@Component
public class WorkerIdConfig {

    @Resource
    private JedisPool jedisPool;

    @Value("${snowflake.datacenter}")
    private Integer dataCenterId;

    @Value("${snowflake.bizType}")
    private String bizType;
    /**
     * 機器id
     */
    private int workerId;

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerIdConfig.class);

    public int getWorkerId() throws UnknownHostException {
        String ipAddress = getIPAddress();
        Long ip = Long.parseLong(ipAddress.replaceAll("\\.", ""));
        //這裏取128,為後續機器Ip調整做準備。
        workerId = ip.hashCode() % 1024;
        try (Jedis jedis = jedisPool.getResource()) {
            Long setnx = jedis.setnx(bizType + dataCenterId + workerId, ipAddress);
            if (setnx > 0) {
                return workerId;
            } else {
                // 判斷是否是同一ip
                String cacheIp = jedis.get(bizType + dataCenterId + workerId);
                if (ipAddress.equalsIgnoreCase(cacheIp)) {
                    return workerId;
                }
            }
            throw new RuntimeException("機器id:" + workerId + "已經存在, 請先清理緩存");
        }
    }

    @PreDestroy
    public void delWorkerId() {
        LOGGER.info("開始銷毀機器id:" + workerId);
        try (Jedis jedis = jedisPool.getResource()) {
            Long del = jedis.del(bizType + dataCenterId + workerId);
            if (del == 0) {
                throw new RuntimeException("機器id:" + workerId + "刪除失敗");
            }
        }
    }

    /**
     * 獲取ip地址
     *
     * @return
     * @throws UnknownHostException
     */
    private String getIPAddress() throws UnknownHostException {
        InetAddress address = InetAddress.getLocalHost();
        return address.getHostAddress();
    }
}

sharding-jdbc分布式主鍵生成的配置

/**
 * sharding-jdbc分布式主鍵生成的配置
 *
 * @author wang.js on 2019/3/8.
 * @version 1.0
 */
@Configuration
public class ShardingIdConfig {

    @Resource
    private WorkerIdConfig workerIdConfig;

    @Bean
    public DefaultKeyGenerator defaultKeyGenerator() throws UnknownHostException {
        DefaultKeyGenerator generator = new DefaultKeyGenerator();
        // 最大值小於1024
        DefaultKeyGenerator.setWorkerId(workerIdConfig.getWorkerId());
        return generator;
    }

}

測試的controller

/**
 * 生成分布式主鍵
 *
 * @author wang.js on 2019/3/8.
 * @version 1.0
 */
@RestController
@RequestMapping("/id")
public class GenIdController {

    @Resource
    private DefaultKeyGenerator generator;

    @GetMapping("/get")
    public long get() {
        return generator.generateKey().longValue();
    }

}

sharding-jdbc的DefaultKeyGenerator中的源碼中可以看到最大的workerId是1024L

public static void setWorkerId(long workerId) {
    Preconditions.checkArgument(workerId >= 0L && workerId < 1024L);
    workerId = workerId;
}

雪花算法中機器id保證全局唯一