1. 程式人生 > >Lettuce之RedisClusterClient使用以及源碼分析

Lettuce之RedisClusterClient使用以及源碼分析

一致性 mys sco hand pre eas red top future

Redis 集群的數據分片

redis集群並沒有使用一致性hash算法而引入了哈希槽概念,Redis 集群有16384個哈希槽,每個key通過CRC16校驗後對16384取模來決定放置哪個槽.集群的每個節點負責一部分hash槽.也就是說如果key是不變的對應的slot也是不變的

可以通過cluster info 命名查看

cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:12

通過cluster nodes命令查看當前節點以及該節點分配的slot,如下圖可以發現當前redis集群有12個節點,每個節點大約管理1365個slot

xx.xxx.xxx.xx:6959> cluster nodes
45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018
e40080f32a3fb89e34b7622038ce490682428fdf xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652
a749bba5614680dea9f47e3c8fe595aa8be71a2c xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460
1096e2a8737401b66c7d4ee0addcb10d7ff14088 xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730
fbc76f3481271241c1a89fabeb5139905e1ec2a6 xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383
85601fa67820a5af0de0cc21d102d72575709ec6 xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287
c00d86999c98f97d697f3a2b33ba26fbf50e46eb xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826
0b09a5c4c9e9158520389dd2672bd711d55085c6 xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095
9f26d208fa8772449d5c322eb63786a1cf9937e0 xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922
274294a88758fcb674e1a0292db0e36a66a0bf48 xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364
369780bdf56d483a0f0a92cb2baab786844051f3 xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556
71ed0215356c664cc56d4579684e86a83dba3a92 xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191

請求重定向

由於每個節點只負責部分slot,以及slot可能從一個節點遷移到另一節點,造成客戶端有可能會向錯誤的節點發起請求。因此需要有一種機制來對其進行發現和修正,這就是請求重定向。有兩種不同的重定向場景:

  • MOVED

聲明的是slot所有權的轉移,收到的客戶端需要更新其key-node映射關系

  • ASK

申明的是一種臨時的狀態,所有權還並沒有轉移,客戶端並不更新其映射關系。前面的加的ASKING命令也是申明其理解當前的這種臨時狀態

通過集群查詢數據key為test的值

xx.xxx.xxx.xx:6959> get test
(error) MOVED 6918 xx.xxx.xx.xxx:6956  

  此時返回的結果表示該key在6956這個實例上,通過這個實例可以獲取到緩存值

xx.xxx.xx.xxx:6956> get test
"cluster"

  通過上文的示例可以發現獲取緩存值的過程需要訪問cluster兩次,既然key到slot值的算法是已知的,如果可以通過key直接計算slot,在通過每個節點的管理的slot範圍就可以知道這個key對應哪個節點了,這樣不就可以一次獲取到了嗎?其實lettuce中就是這樣處理的.

Lettuce使用

  @Bean(name="clusterRedisURI")
    RedisURI clusterRedisURI(){
        return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build();
    }

    @Bean
    ClusterClientOptions clusterClientOptions(){
        return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build();
    }

    @Bean
    RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){
        RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI);
        redisClusterClient.setOptions(clusterClientOptions);
        return redisClusterClient;
    }
    /**
     * 集群模式
     */
    @Bean(destroyMethod = "close")
    StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){
        return  redisClusterClient.connect();
    }

  

Lettuce相關源碼

在創建連接時就會主動發現集群圖譜信息

  <K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) {
         //如果分區信息為null則初始化分區信息
        if (partitions == null) {
            initializePartitions();
        }
        //如果需要就激活拓撲刷新
        activateTopologyRefreshIfNeeded();

  

 protected void initializePartitions() {
        this.partitions = loadPartitions();
    }

  

  protected Partitions loadPartitions() {
        //獲取拓撲刷新信息,
        Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();

        String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;
        try {
            //加載拓撲信息
            Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());

  

 public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) {

        //獲取超時時間,默認60秒
        long commandTimeoutNs = getCommandTimeoutNs(seed);

        Connections connections = null;
        try {
            //獲取所有種子連接
            connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS);

            Requests requestedTopology = connections.requestTopology();
            Requests requestedClients = connections.requestClients();
            //獲取節點拓撲視圖
            NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);

            if (discovery) {//是否查找額外節點
                //獲取集群節點
                Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes();
                //排除種子節點,得到需要發現節點
                Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed));
                //如果需要發現節點不為空
                if (!discoveredNodes.isEmpty()) {
                    //需要發現節點連接
                    Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs,
                            TimeUnit.NANOSECONDS);
                    //合並連接
                    connections = connections.mergeWith(discoveredConnections);
                    //合並請求
                    requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology());
                    requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients());
                    //獲取節點視圖
                    nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs);
                    //返回uri對應分區信息
                    return nodeSpecificViews.toMap();
                }
            }

            return nodeSpecificViews.toMap();
        } catch (InterruptedException e) {

            Thread.currentThread().interrupt();
            throw new RedisCommandInterruptedException(e);
        } finally {
            if (connections != null) {
                connections.close();
            }
        }
    }

   這樣在創建connection的時候就已經知道集群中的所有有效節點.根據之前的文章可以知道對於集群命令的處理是在ClusterDistributionChannelWriter中處理的.其中有一些信息在初始化writer的時候就初始化了

class ClusterDistributionChannelWriter implements RedisChannelWriter {
    //默認寫入器
    private final RedisChannelWriter defaultWriter;
    //集群事件監聽器
    private final ClusterEventListener clusterEventListener;
    private final int executionLimit;
    //集群連接提供器
    private ClusterConnectionProvider clusterConnectionProvider;
    //異步集群連接提供器
    private AsyncClusterConnectionProvider asyncClusterConnectionProvider;
    //是否關閉
    private boolean closed = false;
    //分區信息
    private volatile Partitions partitions;

  寫命令的處理如下,會根據key計算出slot,進而找到這個slot對應的node,直接訪問這個node,這樣可以有效減少訪問cluster次數

public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

        LettuceAssert.notNull(command, "Command must not be null");
        //如果連接已經關閉則拋出異常
        if (closed) {
            throw new RedisException("Connection is closed");
        }
        //如果是集群命令且命令沒有處理完畢
        if (command instanceof ClusterCommand && !command.isDone()) {
            //類型轉換, 轉換為ClusterCommand
            ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command;
            if (clusterCommand.isMoved() || clusterCommand.isAsk()) {

                HostAndPort target;
                boolean asking;
                //如果集群命令已經遷移,此時通過ClusterCommand中到重試操作進行到此
                if (clusterCommand.isMoved()) {
                    //獲取命令遷移目標節點
                    target = getMoveTarget(clusterCommand.getError());
                    //觸發遷移事件
                    clusterEventListener.onMovedRedirection();
                    asking = false;
                } else {//如果是ask
                    target = getAskTarget(clusterCommand.getError());
                    asking = true;
                    clusterEventListener.onAskRedirection();
                }

                command.getOutput().setError((String) null);
                //連接遷移後的目標節點
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider
                        .getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort());
                //成功建立連接,則向該節點發送命令
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(command, asking, connectFuture.join(), null);
                } else {
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable));
                }

                return command;
            }
        }
        //不是集群命令就是RedisCommand,第一個請求命令就是非ClusterCommand
         //將當前命令包裝為集群命令
        ClusterCommand<K, V, T> commandToSend = getCommandToSend(command);
        //獲取命令參數
        CommandArgs<K, V> args = command.getArgs();

        //排除集群路由的cluster命令
        if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) {
            //獲取第一個編碼後的key
            ByteBuffer encodedKey = args.getFirstEncodedKey();
            //如果encodedKey不為null
            if (encodedKey != null) {
                //獲取slot值
                int hash = getSlot(encodedKey);
                //根據命令類型獲取命令意圖 是讀還是寫
                ClusterConnectionProvider.Intent intent = getIntent(command.getType());
                //根據意圖和slot獲取連接
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider)
                        .getConnectionAsync(intent, hash);
                //如果成功獲取連接
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(commandToSend, false, connectFuture.join(), null);
                } else {//如果連接尚未處理完,或有異常,則添加完成處理器
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection,
                            throwable));
                }

                return commandToSend;
            }
        }

        writeCommand(commandToSend, defaultWriter);

        return commandToSend;
    }

  但是如果計算出的slot因為集群擴展導致這個slot已經不在這個節點上lettuce是如何處理的呢?通過查閱ClusterCommand源碼可以發現在complete方法中對於該問題進行了處理;如果響應是MOVED則會繼續訪問MOVED目標節點,這個重定向的此時可以指定的,默認為5次,通過上文的配置可以發現,在配置中只允許一次重定向

 @Override
    public void complete() {
        //如果響應是MOVED或ASK
        if (isMoved() || isAsk()) {
            //如果最大重定向次數大於當前重定向次數則可以進行重定向
            boolean retryCommand = maxRedirections > redirections;
            //重定向次數自增
            redirections++;

            if (retryCommand) {
                try {
                    //重定向
                    retry.write(this);
                } catch (Exception e) {
                    completeExceptionally(e);
                }
                return;
            }
        }
        super.complete();
        completed = true;
    }

  如果是ask向重定向目標發送命令前需要同步發送asking

 private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking,
            StatefulRedisConnection<K, V> connection, Throwable throwable) {

        if (throwable != null) {
            command.completeExceptionally(throwable);
            return;
        }

        try {
            //如果需要asking則發送asking
            if (asking) {
                connection.async().asking();
            }
            //發送命令
            writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter());
        } catch (Exception e) {
            command.completeExceptionally(e);
        }
    }

  

  

Lettuce之RedisClusterClient使用以及源碼分析