1. 程式人生 > >redis 叢集部署及分散式鎖的實現

redis 叢集部署及分散式鎖的實現

一、redis叢集的部署

  • 安裝redis
    確保安裝資料夾有redis-trib.rb檔案,通過rudy構建redis叢集
  • 安裝ruby環境
    配置好環境變數,gem install redis 安裝redis依賴

詳細環境安裝教程:點選開啟連結

叢集搭建

      redis叢集最小包含3個主節點,並且每個節點都應該部署在不同的伺服器上,這裡測試建立3個主節點和三個從節點的redis叢集,並部署在本地機器上,分別監聽不同的埠(7000,7001,7100,7101,7200,7201),7000、7100、7200為三個主節點,7001、7101、7201分別為它們對應的從節點。

     在redis安裝目錄建立cluster資料夾,在這個目錄下為這六個redis節點新建六個資料夾,名稱為各自的埠名,再在建好的檔案裡建立如redis_7000.conf配置檔案,內容為:

port 7000
#繫結監聽的IP地址,預設為127.0.0.1
#bind 172.16.10.49
appendonly yes
appendfilename "appendonly_7000.aof"

# // 如果要設定最大記憶體空間,可新增如下兩句
maxmemory 200mb
maxmemory-policy allkeys-lru

cluster-enabled yes
cluster-config-file nodes_7000.conf
cluster-node-timeout 15000
cluster-slave-validity-factor 10
cluster-migration-barrier 1
cluster-require-full-coverage yes

埠號7000都要改成各自對應的埠號,IP地址這裡測試就用預設127.0.0.1,建議使用電腦的本地ip地址。cluster-config-file引數指定了redis叢集節點的配置檔名稱,建立集群后生成,內容如下:

0adf165a965376913d6a4d426784371e6b0d7cf6 127.0.0.1:7101 slave 2ec1abbf3bcd7758281bcc7b6762532084c0b60c 0 1521124105758 5 connected
8d8f817d997b57ccb10049208738d5ebfdd5c2fe 127.0.0.1:7001 master - 0 1521124104759 7 connected 0-5460
9334416471942fd042f704ae8a345ab6c1adcd37 127.0.0.1:7201 slave b7fedf227aec4fc2911bdf7de2566b73e0a88045 0 1521124107358 9 connected
1030b137672c69175e7cf1d99be325329e016422 127.0.0.1:7000 myself,slave 8d8f817d997b57ccb10049208738d5ebfdd5c2fe 0 0 1 connected
b7fedf227aec4fc2911bdf7de2566b73e0a88045 127.0.0.1:7200 master - 0 1521124100754 9 connected 10923-16383
2ec1abbf3bcd7758281bcc7b6762532084c0b60c 127.0.0.1:7100 master - 0 1521124106758 2 connected 5461-10922
vars currentEpoch 9 lastVoteEpoch 0

如果配置集群后,想修改各節點的資訊,如IP地址埠等,可以在這裡修改,但必須每個節點下的都要改。

然後使用如下命令安裝各服務,並啟動

  安裝   redis-server --service-install cluster/7100/redis_7100.conf --service-name redis7100   

  啟動   redis-server --service-start --service-name redis7100                                               

刪除服務 redis-server --service-uninstall --service-name redis7100

現在已經啟動了六個redis服務,但六個redis並沒有聯絡,沒有實現叢集。

建立叢集:

ruby redis-trib.rb create --replicas 1 127.0.0.1:700 0 127.0.0.1:7100 127.0.0.1:7200 127.0.0.1:7001 127.0.0.1:7101 127.0.0.1:7201

使用redis-trib.rb命令建立叢集,ruby可以去掉。命令執行成功,則叢集就建立成功了。

連線叢集:

redis-cli -c -p 7200 -h 127.0.0.1

這裡連線任何一個節點都可以。

redis叢集工作

set keya valuea
1、先對keya 計算值
2、對16384(總槽點數)取餘(得到槽點)
3、通過槽點找到對應的節點
4、在這個節點執行set keya valuea

節點,分為主節點和從節點,一個主節點下可以有多個從節點。槽點值分配在主節點上,redis中的key-value就是
儲存在槽點上。
當主節點掛掉好,選舉一個從節點成為主節點,若該主節點沒有從節點,則叢集處於fail狀態,或有半數以上的主
節點掛掉,叢集也處於fail狀態
主節點會把key—value寫入從節點

二、redis實現分散式鎖

redis分散式的實現原理:

    1、通過setNX操作,如果存在key,不操作;不存在,才會set值,保證鎖的互斥性

    2、value設定鎖的到期時間,當鎖超時時,進行getAndSet操作,先get舊值,再set新值,避免發生死鎖。這裡也可以通過設定key的有效期來避免死鎖,但是setNx和exprise(設定有效期)操作非原子性,可能發生鎖沒有設定有效時間的問題,從而發生死鎖。

實現:

spring boot 通過jdeis連線redsi叢集

redis配置檔案:

default.redis.maxRedirects=3
#連線池中最大連線數。高版本:maxTotal,低版本:maxActive
default.redis.maxTotal=20
#連線池中最大空閒的連線數
default.redis.maxIdle=10
#連線池中最少空閒的連線數
default.redis.minIdle=1
#當連線池資源耗盡時,呼叫者最大阻塞的時間,超時將跑出異常。單位,毫秒數;預設為-1.表示永不超時。高版本:maxWaitMillis,低版本:maxWait
default.redis.maxWaitMillis=3000
#連線空閒的最小時間,達到此值後空閒連線將可能會被移除。負值(-1)表示不移除
default.redis.minEvictableIdleTimeMillis=-1
#對於“空閒連結”檢測執行緒而言,每次檢測的連結資源的個數。預設為3
default.redis.numTestsPerEvictionRun=3
#“空閒連結”檢測執行緒,檢測的週期,毫秒數。如果為負值,表示不執行“檢測執行緒”。預設為-1
default.redis.timeBetweenEvictionRunsMillis=-1
#向呼叫者輸出“連結”資源時,是否檢測是有有效,如果無效則從連線池中移除,並嘗試獲取繼續獲取。預設為false。建議保持預設值
default.redis.testOnBorrow=false
#連超時設定
default.redis.timeout=15000
#是否使用連線池
default.redis.usePool=true
#host&port
# 建議使用實際ip地址建立叢集
default.redis.nodes[0]=127.0.0.1:7000
default.redis.nodes[1]=127.0.0.1:7001
default.redis.nodes[2]=127.0.0.1:7100
default.redis.nodes[3]=127.0.0.1:7101
default.redis.nodes[4]=127.0.0.1:7200
default.redis.nodes[5]=127.0.0.1:7201

通過@ConfigurationProperties註解讀取配置資訊:

@Component
@ConfigurationProperties(prefix = "default.redis")
public class RedisProperties {
    private int maxRedirects;
    private int maxTotal;
    private int maxIdle;
    private int minIdle;
    private int maxWaitMillis;
    private int minEvictableIdleTimeMillis;
    private int numTestsPerEvictionRun;
    private int timeBetweenEvictionRunsMillis;
    private boolean testOnBorrow;
    private int timeout;
    private boolean usePool;
    private List<String> nodes;


    public int getMaxRedirects() {
        return maxRedirects;
    }

    public void setMaxRedirects(int maxRedirects) {
        this.maxRedirects = maxRedirects;
    }

    public int getMaxTotal() {
        return maxTotal;
    }

    public void setMaxTotal(int maxTotal) {
        this.maxTotal = maxTotal;
    }

    public int getMaxIdle() {
        return maxIdle;
    }

    public void setMaxIdle(int maxIdle) {
        this.maxIdle = maxIdle;
    }

    public int getMinIdle() {
        return minIdle;
    }

    public void setMinIdle(int minIdle) {
        this.minIdle = minIdle;
    }

    public int getMaxWaitMillis() {
        return maxWaitMillis;
    }

    public void setMaxWaitMillis(int maxWaitMillis) {
        this.maxWaitMillis = maxWaitMillis;
    }

    public int getMinEvictableIdleTimeMillis() {
        return minEvictableIdleTimeMillis;
    }

    public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
    }

    public int getNumTestsPerEvictionRun() {
        return numTestsPerEvictionRun;
    }

    public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
        this.numTestsPerEvictionRun = numTestsPerEvictionRun;
    }

    public int getTimeBetweenEvictionRunsMillis() {
        return timeBetweenEvictionRunsMillis;
    }

    public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
        this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
    }

    public boolean isTestOnBorrow() {
        return testOnBorrow;
    }

    public void setTestOnBorrow(boolean testOnBorrow) {
        this.testOnBorrow = testOnBorrow;
    }

    public int getTimeout() {
        return timeout;
    }

    public void setTimeout(int timeout) {
        this.timeout = timeout;
    }

    public boolean isUsePool() {
        return usePool;
    }

    public void setUsePool(boolean usePool) {
        this.usePool = usePool;
    }

    public List<String> getNodes() {
        return nodes;
    }

    public void setNodes(List<String> nodes) {
        this.nodes = nodes;
    }

}

redis配置類,生成redisTemplate

@Configuration
public class RedisConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedisClusterConfiguration redisClusterConfiguration() {
        RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
        redisClusterConfiguration.setClusterNodes(getRedisNode());
        redisClusterConfiguration.setMaxRedirects(redisProperties.getMaxRedirects());
        return redisClusterConfiguration;
    }

    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxIdle(redisProperties.getMaxIdle());
        jedisPoolConfig.setMaxTotal(redisProperties.getMaxTotal());
        jedisPoolConfig.setMinIdle(redisProperties.getMinIdle());
        jedisPoolConfig.setMaxWaitMillis(redisProperties.getMaxWaitMillis());
        jedisPoolConfig.setNumTestsPerEvictionRun(redisProperties.getNumTestsPerEvictionRun());
        jedisPoolConfig.setTimeBetweenEvictionRunsMillis(redisProperties.getTimeBetweenEvictionRunsMillis());
        jedisPoolConfig.setTestOnBorrow(redisProperties.isTestOnBorrow());
        return jedisPoolConfig;
    }

    @Bean
    public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration redisClusterConfiguration, JedisPoolConfig jedisPoolConfig) {
        JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisClusterConfiguration, jedisPoolConfig);
        jedisConnectionFactory.setTimeout(redisProperties.getTimeout());
        return jedisConnectionFactory;
    }

    @Bean
    public RedisTemplate redisTemplate(JedisConnectionFactory jedisConnectionFactory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(jedisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        return redisTemplate;
    }

    private List<RedisNode> getRedisNode() {
        List<String> nodes = redisProperties.getNodes();
        if (CommonUtils.isNotEmpty(nodes)) {
            List<RedisNode> redisNodes = nodes.stream().map(node -> {
                String[] ss = node.split(":");
                return new RedisNode(ss[0], Integer.valueOf(ss[1]));
            }).collect(Collectors.toList());
            return redisNodes;
        }
        return new ArrayList<>();
    }
}

redis鎖的實現:

@Component
public class RedisLock {

    private static final Logger log = LoggerFactory.getLogger(RedisLock.class);
    /* 預設鎖的有效時間30s */
    private static final int DEFAULT_LOCK_EXPIRSE_MILL_SECONDS = 30 * 1000;
    /* 預設請求鎖等待超時時間10s */
    private static final int DEFAULT_LOCK_WAIT_DEFAULT_TIME_OUT = 10 * 1000;
    /* 預設的輪詢獲取鎖的間隔時間 */
    private static final int DEFAULT_LOOP_WAIT_TIME = 150;
    /* 鎖的key字首 */
    private static final String LOCK_PREFIX = "LOCK_";

    /* 是否獲得鎖的標誌 */
    private boolean lock = false;
    /* 鎖的key */
    private String lockKey;
    /* 鎖的有效時間(ms) */
    private int lockExpirseTimeout;
    /* 請求鎖的阻塞時間(ms) */
    private int lockWaitTimeout;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public boolean isLock() {
        return lock;
    }

    public void setLock(boolean lock) {
        this.lock = lock;
    }

    public String getLockKey() {
        return lockKey;
    }

    public void setLockKey(String lockKey) {
        this.lockKey = LOCK_PREFIX + lockKey;
    }

    public int getLockExpirseTimeout() {
        return lockExpirseTimeout;
    }

    public void setLockExpirseTimeout(int lockExpirseTimeout) {
        this.lockExpirseTimeout = lockExpirseTimeout;
    }

    public int getLockWaitTimeout() {
        return lockWaitTimeout;
    }

    public void setLockWaitTimeout(int lockWaitTimeout) {
        this.lockWaitTimeout = lockWaitTimeout;
    }

    public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    public RedisLock() {
    }

    public RedisLock(String lockKey, int lockExpirseTimeout, int lockWaitTimeout) {
        this.lockKey = LOCK_PREFIX + lockKey;
        this.lockExpirseTimeout = lockExpirseTimeout;
        this.lockWaitTimeout = lockWaitTimeout;
    }

    public RedisLock newInstance(String lockKey) {
        RedisLock redisLock = new RedisLock(lockKey, DEFAULT_LOCK_EXPIRSE_MILL_SECONDS, DEFAULT_LOCK_WAIT_DEFAULT_TIME_OUT);
        redisLock.setRedisTemplate(this.redisTemplate);
        return redisLock;
    }

    public RedisLock newInstance(String lockKey, int lockExpirseTimeout, int lockWaitTimeout) {
        if (lockExpirseTimeout == 0 || lockWaitTimeout == 0) {
            lockExpirseTimeout = DEFAULT_LOCK_EXPIRSE_MILL_SECONDS;
            lockWaitTimeout = DEFAULT_LOCK_WAIT_DEFAULT_TIME_OUT;
        }
        RedisLock redisLock = new RedisLock(lockKey, lockExpirseTimeout, lockWaitTimeout);
        redisLock.setRedisTemplate(this.redisTemplate);
        return redisLock;
    }

    public boolean setIfAbsent(String expirseTimeStr) {
        // setIfAbsent通過jedis的setNx實現
        return this.redisTemplate.opsForValue().setIfAbsent(this.lockKey, expirseTimeStr);
    }

    public String getAndSet(String expiresTimeStr) {
        // 獲取原來的值,並設定新的值,原子操作
        return (String) this.redisTemplate.opsForValue().getAndSet(this.lockKey, expiresTimeStr);
    }

    /**
     * 1、獲得當前系統時間,計算鎖的到期時間
     * 2、setNx操作,加鎖
     * 3、如果,加鎖成功,設定鎖的到期時間,返回true;取鎖失敗,取出當前鎖的value(到期時間)
     * 4、如果value不為空而且小於當前系統時間,進行getAndSet操作,重新設定value,並取出舊value;否則,等待間隔時間後,重複步驟2;
     * 5、如果步驟3和4取出的value一樣,加鎖成功,設定鎖的到期時間,返回true;否則,別人加鎖成功,恢復鎖的value,等待間隔時間後,重複步驟2。
     */
    public boolean lock() {
        log.info("{}-----嘗試獲取鎖...", Thread.currentThread().getName());
        int lockWaitMillSeconds = this.lockWaitTimeout;
        // key 的值,表示key的到期時間
        String redisValue = String.valueOf(System.currentTimeMillis() + this.lockExpirseTimeout);
        while (lockWaitMillSeconds > 0) {
            lock = setIfAbsent(redisValue);
            if (lock) {
                // 拿到鎖,設定鎖的有效期,這裡可能因為故障沒有被執行,鎖會一直存在,這時就需要value的有效期去判斷鎖是否失效
                this.redisTemplate.expire(this.lockKey, lockExpirseTimeout, TimeUnit.MILLISECONDS);
                log.info("{}-----獲得鎖", Thread.currentThread().getName());
                return lock;
            } else {
                // 鎖存在,判斷鎖有沒有過期
                String oldValue = (String) this.redisTemplate.opsForValue().get(this.lockKey);
                if (CommonUtils.isNotEmpty(oldValue) && Long.parseLong(oldValue) < System.currentTimeMillis()) {
                    // 鎖的到期時間小於當前時間,說明鎖已失效, 修改value,獲得鎖
                    String currentRedisValue = getAndSet(String.valueOf(lockExpirseTimeout + System.currentTimeMillis()));
                    // 如果兩個值不相等,說明有另外一個執行緒拿到了鎖,阻塞
                    if (currentRedisValue.equals(oldValue)) {
                        // 如果修改的鎖的有效期之前沒被其他執行緒修改,則獲得鎖, 設定鎖的超時時間
                        redisTemplate.expire(this.lockKey, lockExpirseTimeout, TimeUnit.MILLISECONDS);
                        log.info("{}-----獲得鎖", Thread.currentThread().getName());
                        this.lock = true;
                        return this.lock;
                    } else {
                        // 有另外一個執行緒獲得了這個超時的鎖,不修改鎖的value
                        redisTemplate.opsForValue().set(this.lockKey, currentRedisValue);
                    }
                }
            }
            // 減掉固定輪詢獲取鎖的間隔時間
            lockWaitMillSeconds -= DEFAULT_LOOP_WAIT_TIME;
            try {
                log.info("{}-----等待{}ms後,再嘗試獲取鎖...", Thread.currentThread().getName(), DEFAULT_LOOP_WAIT_TIME);
                // 取鎖失敗時,應該在隨機延時後進行重試,避免不同客戶端同時重試導致誰都無法拿到鎖的情況出現,也可以採用等待佇列的方式
                Thread.sleep(DEFAULT_LOOP_WAIT_TIME);
            } catch (InterruptedException e) {
                log.error("redis 同步鎖出現未知異常", e);
            }
        }
        log.info("{}-----請求鎖超時,獲得鎖失敗", Thread.currentThread().getName());
        return false;
    }

    public void unlock() {
        if (lock) {
            this.redisTemplate.delete(this.lockKey);
            this.lock = false;
        }
    }

}

加鎖過程:

1、獲得當前系統時間,計算鎖的到期時間
2、setNx操作,加鎖
3、如果,加鎖成功,設定鎖的到期時間,返回true;取鎖失敗,取出當前鎖的value(到期時間)
4、如果value不為空而且小於當前系統時間,進行getAndSet操作,重新設定value,並取出舊value;否則,等待間隔時間後,重複步驟2;
5、如果步驟3和4取出的value一樣,加鎖成功,設定鎖的到期時間,返回true;否則,別人加鎖成功,恢復鎖的value,等待間隔時間後,重複步驟2。

這裡設定鎖的到期時間,只是為了減少後面複雜邏輯的執行

測試:

測試類
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class TestRedis {
    private CountDownLatch countDownLatch = new CountDownLatch(2);

    @Test
    public void testRedisLock() throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                RedisLock lock = ((RedisLock) SpringContextUtil.getBean("redisLock")).newInstance("test");
                if (lock.lock()) {
                    System.out.println("work1獲得鎖");
                    System.out.println("work1 工作15s...");
                    try {
                        Thread.sleep(15000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("work1完成工作,釋放鎖");
                    lock.unlock();
                }
                countDownLatch.countDown();
            }
        },"work1").start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                RedisLock lock = ((RedisLock) SpringContextUtil.getBean("redisLock")).newInstance("test");
                if (lock.lock()) {
                    System.out.println("work2獲得鎖");
                    System.out.println("work2 工作5s...");
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("work2完成工作,釋放鎖");
                    lock.unlock();
                }
                countDownLatch.countDown();
            }
        }, "work2").start();
        // 等待兩個執行緒完成,才完成主執行緒
        countDownLatch.await();
    }
}

這裡在單元測試中起兩個執行緒work1和work2,work1模擬工作15s,work2模擬工作5s。這裡為了juint的主執行緒不會在兩個work執行緒完成工作之前就停止,用到了CountDownLatch,讓主執行緒在兩個work執行緒完成前等待。

上面測試有兩種結果,第一種,work2先拿到鎖,工作5s,work1等待(預設等待超時時間10s),等待過程中一值嘗試獲取鎖(預設間隔150ms),預設鎖的有效期30s,顯然,5s後work2完成工作釋放鎖,work1獲得鎖,work1和work2都正常完成了工作。結果如下:

2018-03-16 14:16:50,579 5576 [work1] INFO  c.s.component.redis.lock.RedisLock - work1-----嘗試獲取鎖...
2018-03-16 14:16:50,579 5576 [work2] INFO  c.s.component.redis.lock.RedisLock - work2-----嘗試獲取鎖...
2018-03-16 14:16:50,598 5595 [work1] INFO  c.s.component.redis.lock.RedisLock - work1-----等待150ms後,再嘗試獲取鎖...
2018-03-16 14:16:50,599 5596 [work2] INFO  c.s.component.redis.lock.RedisLock - work2-----獲得鎖
work2獲得鎖
work2 工作5s...
2018-03-16 14:16:50,748 5745 [work1] INFO  c.s.component.redis.lock.RedisLock - work1-----等待150ms後,再嘗試獲取鎖...
。。。省略重複嘗試獲取鎖日誌。。。
2018-03-16 14:16:55,571 10568 [work1] INFO  c.s.component.redis.lock.RedisLock - work1-----等待150ms後,再嘗試獲取鎖...
work2完成工作,釋放鎖
2018-03-16 14:16:55,722 10719 [work1] INFO  c.s.component.redis.lock.RedisLock - work1-----獲得鎖
work1獲得鎖
work1 工作15s...
work1完成工作,釋放鎖

第二種結果,work1先拿到鎖,工作15s,work2等待,這裡,因為work1的工作時間超過了預設的等待超時時間10s,所以work2在work1完成工作釋放鎖之前就因為等待超時而獲取鎖失敗,不能完成工作,結果如下:

2018-03-16 14:22:45,292 5448 [work1] INFO  c.s.component.redis.lock.RedisLock - work1-----嘗試獲取鎖...
2018-03-16 14:22:45,292 5448 [work2] INFO  c.s.component.redis.lock.RedisLock - work2-----嘗試獲取鎖...
2018-03-16 14:22:45,308 5464 [work2] INFO  c.s.component.redis.lock.RedisLock - work2-----等待150ms後,再嘗試獲取鎖...
2018-03-16 14:22:45,309 5465 [work1] INFO  c.s.component.redis.lock.RedisLock - work1-----獲得鎖
work1獲得鎖
work1 工作15s...
2018-03-16 14:22:45,459 5615 [work2] INFO  c.s.component.redis.lock.RedisLock - work2-----等待150ms後,再嘗試獲取鎖...
。。。省略重複嘗試獲取鎖日誌。。。
2018-03-16 14:22:55,251 15407 [work2] INFO  c.s.component.redis.lock.RedisLock - work2-----等待150ms後,再嘗試獲取鎖...
2018-03-16 14:22:55,401 15557 [work2] INFO  c.s.component.redis.lock.RedisLock - work2-----請求鎖超時,獲得鎖失敗
。。。這裡還要等待大概5s。。。
work1完成工作,釋放鎖

相關推薦

redis 叢集部署分散式實現

一、redis叢集的部署安裝redis確保安裝資料夾有redis-trib.rb檔案,通過rudy構建redis叢集安裝ruby環境配置好環境變數,gem install redis 安裝redis依賴詳細環境安裝教程:點選開啟連結叢集搭建      redis叢集最小包含3

叢集多JVM分散式實現

#### 基於資料庫表樂觀鎖 (基本廢棄) 要實現分散式鎖,最簡單的⽅方式可能就是直接建立⼀一張鎖表,然後通過操作該表中的資料來實現了了。 當我們要鎖住某個⽅法或資源時,我們就在該表中增加一條記錄,想要釋放鎖的時候就刪除這條記錄。 比如建立這樣一張資料庫表: ``` CREATE TABLE `methodL

基於 Redis分散式實現踩坑案例

關於分散式鎖的實現,目前常用的方案有以下三類:資料庫樂觀鎖;基於分散式快取實現的鎖服務,典型代表有 Redis 和基於 Redis 的 RedLock;基於分散式一致性演算法實現的鎖服務,典型代表有 ZooKeeper、Chubby 和 ETCD。本場 Chat 將介紹基於

【zookeeper】Apache curator的使用zk分散式實現

接上篇,本篇主要講Apache開源的curator的使用,有了curator,利用Java對zookeeper的操作變得極度便捷. 其實在學之前我也有個疑慮,我為啥要學curator,撇開漲薪這些外在的東西,就單技術層面來講,學curator能幫我做些什麼?這就不得不從zookeeper說起,上

dubbo 常用的基於redis分散式實現

  小弟本著先會用在學習原理的原則 先用了dubbo 現在在實際業務中 因為分散式專案做了叢集,需要用的分散式鎖,就用到了基於redis的分散式鎖,廢話不多說,先來程式碼: package com.tiancaibao.utils; import org.slf4j.Logger

jedisLock—redis分散式實現

一、使用分散式鎖要滿足的幾個條件: 系統是一個分散式系統(關鍵是分散式,單機的可以使用ReentrantLock或者synchronized程式碼塊來實現) 共享資源(各個系統訪問同一個資源,資源的載體可能是傳統關係型資料庫或者NoSQL) 同步訪問(即有很多個程序同事

Redis分散式 實現秒殺系統 SET命令實現

基於Redis命令:SET key valueNX EX max-lock-time   可適用於redis單機和redis叢集模式 1.SET命令是原子性操作,NX指令保證只要當key不存在時才會設定value 2.設定的value要有唯一性,來確保鎖不會被誤刪(

【原創】redis庫存操作,分散式的四種實現方式[連載一]--基於zookeeper實現分散式

一、背景 在電商系統中,庫存的概念一定是有的,例如配一些商品的庫存,做商品秒殺活動等,而由於庫存操作頻繁且要求原子性操作,所以絕大多數電商系統都用Redis來實現庫存的加減,最近公司專案做架構升級,以微服務的形式做分散式部署,對庫存的操作也單獨封裝為一個微服務,這樣在高併發情況下,加減庫存時,就會出現超賣等

【原創】redis庫存操作,分散式的四種實現方式[連載二]--基於Redisson實現分散式

一、redisson介紹 redisson實現了分散式和可擴充套件的java資料結構,支援的資料結構有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。並且是執行緒安全的,底層使用N

【連載】redis庫存操作,分散式的四種實現方式[三]--基於Redis watch機制實現分散式

一、redis的事務介紹 1、 Redis保證一個事務中的所有命令要麼都執行,要麼都不執行。如果在傳送EXEC命令前客戶端斷線了,則Redis會清空事務佇列,事務中的所有命令都不會執行。而一旦客戶端傳送了EXEC命令,所有的命令就都會被執行,即使此後客戶端斷線也沒關係,因為Redis中已經記錄了所有要執行的

Redis分散式實現簡單秒殺功能

這版秒殺只是解決瞬間訪問過高伺服器壓力過大,請求速度變慢,大大消耗伺服器效能的問題。 主要就是在高併發秒殺的場景下,很多人訪問時並沒有拿到鎖,所以直接跳過了。這樣就處理了多執行緒併發問題的同時也保證了伺服器的效能的穩定。 接下來我們使用redis的分散式鎖來進行枷鎖處理: 我們可以在進入下單的方法後將核

Redis叢集方案實現

之前做了一個Redis的叢集方案,跑了小半年,線上執行的很穩定差不多可以跟大家分享下經驗,前面寫了一篇文章 資料線上服務的一些探索經驗,可以做為背景閱讀應用我們的Redis叢集主要承擔了以下服務:1. 實時推薦2. 使用者畫像3. 誠信分值服務叢集狀況叢集峰值QPS 1W左右

Redis面試題分散式叢集

1. 使用Redis有哪些好處? (1) 速度快,因為資料存在記憶體中,類似於HashMap,HashMap的優勢就是查詢和操作的時間複雜度都是O(1) (2) 支援豐富資料型別,支援string,list,set,sorted set,hash (3)

架構師日記——基於redis分散式實現

很久之前有講過併發程式設計中的鎖 併發程式設計的鎖機制:synchronized和lock 。在單程序的系統中,當存在多個執行緒可以同時改變某個變數時,就需要對變數或程式碼塊做同步,使其在修改這種變數時能夠線性執行消除併發修改變數。而同步的本質是通過鎖來實現的。為了實現多個執行緒在一個時

redis學習之五】基於redis分散式實現

    在單個JVM中,我們可以很方便的用sychronized或者reentrantLock在資源競爭時進行加鎖,保證高併發下資料執行緒安全。但是若是分散式環境下,多個JVM同時對一個資源進行競爭時,我們該如何保證執行緒安全呢?分散式鎖便能實現我們的要求。   &n

redis分散式實現-面試題

1 加鎖(key自定義,value為uuid) Boolean setNX(byte[] key, byte[] value); 2 設定過期時間(key,timeout,unit) public Boolean expire(K key, final long t

基於redis分散式實現“秒殺”

最近在專案中遇到了類似“秒殺”的業務場景,在本篇部落格中,我將用一個非常簡單的demo,闡述實現所謂“秒殺”的基本思路。 業務場景 所謂秒殺,從業務角度看,是短時間內多個使用者“爭搶”資源,這裡的資源在大部分秒殺場景裡是商品;將業務抽象,技術角度看,秒殺就是

分散式(一):基於redis分散式實現

隨著業務越來越複雜,應用服務都會朝著分散式、叢集方向部署,而分散式CAP原則告訴我們,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分割槽容錯性),三者不可得兼。很多場景中,需要使用分散式事務、分散式鎖等技術來

分散式實現--基於zookeeper和redis的兩種方案

通過實現Watch介面,實現process(WatchedEvent event)方法來實施監控,使CountDownLatch來完成監控,在等待鎖的時候使用CountDownLatch來計數,等到後進行countDown,停止等待,繼續執行。以下整體流程基本與上述描述流程一致,只是在監聽的時候使用的是Cou

SpringMVC+Redis實現分散式實現秒殺功能

1.實現分散式鎖的幾種方案 1.Redis實現 (推薦) 2.Zookeeper實現 3.資料庫實現 Redis實現分散式鎖 * * 在叢集等多伺服器中經常使用到同步處理一下業務,這是普通的事務是滿足不了業務需求,需要分散