1. 程式人生 > >Redis(十二):redis請求轉發的實現

Redis(十二):redis請求轉發的實現

  請求轉發一般的原因為: 1. 該請求自身無法處理,需要轉發給對應的伺服器處理; 2. 為實現負載均衡,使用路由服務,選擇目標例項進行轉發;

  在叢集模式下,請求可以打到任何一臺redis伺服器上。然而並不是所有的伺服器都會處理真正的請求,而是隻有符合redis slot規則的例項才會處理真正的請求;

  這就存在一個情況,當請求打到了一臺不應該打到的redis例項上,它應該是要進行轉發的。

  那麼,這個轉發該如何做呢?

1. 叢集模式下的命令轉發如何實現?

// server.c, 在統一處理請求時,會判斷出叢集模式,進行處理
int processCommand(client *c) {
    ...
    /* If cluster is enabled perform the cluster redirection here.
     * However we don't perform the redirection if:
     * 1) The sender of this command is our master.
     * 2) The command has no key arguments. */
    // 叢集模下,根據 hashslot 找到對應的redis節點處理
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;

        if (server.cluster->state != CLUSTER_OK) {
            flagTransaction(c);
            clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
            return C_OK;
        } else {
            int error_code;
            // 查詢相應的redis節點
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            // 除非是應該自己處理的資料,否則響應資料節點不在此處,讓客戶端另外查詢資料節點
            // 因此 redis 節點不做資料轉發,只是提示客戶再尋找
            // 客戶端拿送返回的資訊,再向對應的節點發起請求處理
            if (n == NULL || n != server.cluster->myself) {
                flagTransaction(c);
                clusterRedirectClient(c,n,hashslot,error_code);
                return C_OK;
            }
        }
    }
    ...
}

// cluster.c, 查詢key對應的redis節點
/* Return the pointer to the cluster node that is able to serve the command.
 * For the function to succeed the command should only target either:
 *
 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
 * 2) Multiple keys in the same hash slot, while the slot is stable (no
 *    resharding in progress).
 *
 * On success the function returns the node that is able to serve the request.
 * If the node is not 'myself' a redirection must be perfomed. The kind of
 * redirection is specified setting the integer passed by reference
 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
 * CLUSTER_REDIR_MOVED.
 *
 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
 *
 * If the command fails NULL is returned, and the reason of the failure is
 * provided via 'error_code', which will be set to:
 *
 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
 * don't belong to the same hash slot.
 *
 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
 * belonging to the same slot, but the slot is not stable (in migration or
 * importing state, likely because a resharding is in progress).
 *
 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
 * not bound to any node. In this case the cluster global state should be
 * already "down" but it is fragile to rely on the update of the global state,
 * so we also handle it here. */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
    clusterNode *n = NULL;
    robj *firstkey = NULL;
    int multiple_keys = 0;
    multiState *ms, _ms;
    multiCmd mc;
    int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;

    /* Set error code optimistically for the base case. */
    if (error_code) *error_code = CLUSTER_REDIR_NONE;

    /* We handle all the cases as if they were EXEC commands, so we have
     * a common code path for everything */
    if (cmd->proc == execCommand) {
        /* If CLIENT_MULTI flag is not set EXEC is just going to return an
         * error. */
        if (!(c->flags & CLIENT_MULTI)) return myself;
        ms = &c->mstate;
    } else {
        /* In order to have a single codepath create a fake Multi State
         * structure if the client is not in MULTI/EXEC state, this way
         * we have a single codepath below. */
        ms = &_ms;
        _ms.commands = &mc;
        _ms.count = 1;
        mc.argv = argv;
        mc.argc = argc;
        mc.cmd = cmd;
    }

    /* Check that all the keys are in the same hash slot, and obtain this
     * slot and the node associated. */
    for (i = 0; i < ms->count; i++) {
        struct redisCommand *mcmd;
        robj **margv;
        int margc, *keyindex, numkeys, j;

        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;
        // 獲取所有的 keyIndex, 用於後續依次取 key
        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        for (j = 0; j < numkeys; j++) {
            robj *thiskey = margv[keyindex[j]];
            // 計算hashSlot, crc16 演算法
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));

            if (firstkey == NULL) {
                /* This is the first key we see. Check what is the slot
                 * and node. */
                firstkey = thiskey;
                slot = thisslot;
                n = server.cluster->slots[slot];

                /* Error: If a slot is not served, we are in "cluster down"
                 * state. However the state is yet to be updated, so this was
                 * not trapped earlier in processCommand(). Report the same
                 * error to the client. */
                if (n == NULL) {
                    getKeysFreeResult(keyindex);
                    if (error_code)
                        *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
                    return NULL;
                }

                /* If we are migrating or importing this slot, we need to check
                 * if we have all the keys in the request (the only way we
                 * can safely serve the request, otherwise we return a TRYAGAIN
                 * error). To do so we set the importing/migrating state and
                 * increment a counter for every missing key. */
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1;
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    importing_slot = 1;
                }
            } else {
                /* If it is not the first key, make sure it is exactly
                 * the same key as the first we saw. */
                if (!equalStringObjects(firstkey,thiskey)) {
                    if (slot != thisslot) {
                        /* Error: multiple keys from different slots. */
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = CLUSTER_REDIR_CROSS_SLOT;
                        return NULL;
                    } else {
                        /* Flag this request as one with multiple different
                         * keys. */
                        multiple_keys = 1;
                    }
                }
            }

            /* Migarting / Improrting slot? Count keys we don't have. */
            // 查詢0號庫是否存在該值,沒找到則增加未命中率
            if ((migrating_slot || importing_slot) &&
                lookupKeyRead(&server.db[0],thiskey) == NULL)
            {
                missing_keys++;
            }
        }
        getKeysFreeResult(keyindex);
    }

    /* No key at all in command? then we can serve the request
     * without redirections or errors. */
    if (n == NULL) return myself;

    /* Return the hashslot by reference. */
    if (hashslot) *hashslot = slot;

    /* MIGRATE always works in the context of the local node if the slot
     * is open (migrating or importing state). We need to be able to freely
     * move keys among instances in this case. */
    if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
        return myself;

    /* If we don't have all the keys and we are migrating the slot, send
     * an ASK redirection. */
    if (migrating_slot && missing_keys) {
        if (error_code) *error_code = CLUSTER_REDIR_ASK;
        return server.cluster->migrating_slots_to[slot];
    }

    /* If we are receiving the slot, and the client correctly flagged the
     * request as "ASKING", we can serve the request. However if the request
     * involves multiple keys and we don't have them all, the only option is
     * to send a TRYAGAIN error. */
    if (importing_slot &&
        (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
    {
        if (multiple_keys && missing_keys) {
            if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
            return myself;
        }
    }

    /* Handle the read-only client case reading from a slave: if this
     * node is a slave and the request is about an hash slot our master
     * is serving, we can reply without redirection. */
    if (c->flags & CLIENT_READONLY &&
        cmd->flags & CMD_READONLY &&
        nodeIsSlave(myself) &&
        myself->slaveof == n)
    {
        return myself;
    }

    /* Base case: just return the right node. However if this node is not
     * myself, set error_code to MOVED since we need to issue a rediretion. */
    if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
    return n;
}
// cluster.c, 計算hashSlot, 使用 crc16演算法
// 特殊語法: {key_with_hash}key_without_hash
/* We have 16384 hash slots. The hash slot of a given key is obtained
 * as the least significant 14 bits of the crc16 of the key.
 *
 * However if the key contains the {...} pattern, only the part between
 * { and } is hashed. This may be useful in the future to force certain
 * keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */

    for (s = 0; s < keylen; s++)
        if (key[s] == '{') break;

    /* No '{' ? Hash the whole key. This is the base case. */
    if (s == keylen) return crc16(key,keylen) & 0x3FFF;

    /* '{' found? Check if we have the corresponding '}'. */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;

    /* No '}' or nothing betweeen {} ? Hash the whole key. */
    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

    /* If we are here there is both a { and a } on its right. Hash
     * what is in the middle between { and }. */
    return crc16(key+s+1,e-s-1) & 0x3FFF;
}


// 根據狀態值,響應客戶端,資料節點不在本節點
/* Send the client the right redirection code, according to error_code
 * that should be set to one of CLUSTER_REDIR_* macros.
 *
 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
 * are used, then the node 'n' should not be NULL, but should be the
 * node we want to mention in the redirection. Moreover hashslot should
 * be set to the hash slot that caused the redirection. */
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
    if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
        addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
    } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
        /* The request spawns mutliple keys in the same slot,
         * but the slot is not "stable" currently as there is
         * a migration or import in progress. */
        addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
    } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
        addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
    } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
        addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
    } else if (error_code == CLUSTER_REDIR_MOVED ||
               error_code == CLUSTER_REDIR_ASK)
    {
        // 當對應的資料節點不是自身,而且已經找到了應當處理的節點時,響應客戶端對應資訊
        // ASK錯誤說明資料正在遷移,不知道何時遷移完成,因此重定向是臨時的,不應重新整理slot快取
        // MOVED錯誤重定向則是(相對)永久的,應重新整理slot快取
        addReplySds(c,sdscatprintf(sdsempty(),
            "-%s %d %s:%d\r\n",
            (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
            hashslot,n->ip,n->port));
    } else {
        serverPanic("getNodeByQuery() unknown error.");
    }
}

  所以,redis叢集模式下的請求轉發,並非redis服務端直接轉發請求,而是通過向客戶端響應 轉移指令,由客戶端重新發起目標請求,從而實現命令轉發的。

  其實,redis做響應轉移的處理,應只會發生在redis節點發生變更的時候,比如增加節點或減少節點時,redis為實現資料再均衡,才會出現。正常情況下,具體哪個資料應該請求向哪個redis節點,則完全由客戶端負責。這也是叢集的優勢所在,各個資料節點只處理對應的範圍資料。因此,需要客戶端將服務端的slot存放規則或者位置快取起來(通過 cluster slots 可以獲取槽存放資訊),從而實現向正確的節點請求操作。

 

2. 主從模式的命令轉發如何實現?

  主從模式下,只有主節點可以寫請求,而從節點則負責同步主節點的資料即可。然而,在我們做讀寫分離的時候,從節點是可以承受讀流量的。但是,如果寫流程打到了從節點上,這是否又涉及到一個請求轉發呢?我們來看一下:

// 主從的命令處理判斷,也是在 processCommand 中統一處理的
int processCommand(client *c) {
    ...
    /* Don't accept write commands if this is a read only slave. But
     * accept write commands if this is our master. */
    // 針對從節點,只能接受讀請求,如果是寫請求,直接響應
    if (server.masterhost && server.repl_slave_ro &&
        // master 請求除外,因為master過來的請求,是用於同步資料的
        !(c->flags & CLIENT_MASTER) &&
        c->cmd->flags & CMD_WRITE)
    {
        // -READONLY You can't write against a read only slave.
        addReply(c, shared.roslaveerr);
        return C_OK;
    }
    ...
    return C_OK;
}

  所以,redis主從模式下,服務端並不做轉發處理。而要實現讀寫分離的功能,必然要客戶端自行處理了。比如要自行定位master節點,然後將寫請求傳送過去,讀請求則可以做負載均衡處理。這也是很多資料庫中介軟體的職責所在。

 

3. 如何使用redis叢集?

  redis叢集,本質上提供了資料的分片儲存能力(當然要實現這個功能有相當多的工作要做),但是訪問資料需要客戶端自行處理。所以,我們以jedis作為客戶端,看看客戶端都是如何利用叢集的吧!測試用例如下:

    @Test
    public void testCluster() throws Exception {
        // 新增叢集的服務節點Set集合
        Set<HostAndPort> hostAndPortsSet = new HashSet<HostAndPort>();
        // 新增節點
        hostAndPortsSet.add(new HostAndPort("192.168.1.103", 7000));
        hostAndPortsSet.add(new HostAndPort("192.168.1.103", 7001));
        hostAndPortsSet.add(new HostAndPort("192.168.1.103", 8000));
        hostAndPortsSet.add(new HostAndPort("192.168.1.103", 8001));
        hostAndPortsSet.add(new HostAndPort("192.168.1.103", 9000));
        hostAndPortsSet.add(new HostAndPort("192.168.1.103", 9001));

        // Jedis連線池配置
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        // 最大空閒連線數, 預設8個
        jedisPoolConfig.setMaxIdle(5);
        // 最大連線數, 預設8個
        jedisPoolConfig.setMaxTotal(10);
        //最小空閒連線數, 預設0
        jedisPoolConfig.setMinIdle(0);
        // 獲取連線時的最大等待毫秒數(如果設定為阻塞時BlockWhenExhausted),如果超時就拋異常, 小於零:阻塞不確定的時間,  預設-1
        jedisPoolConfig.setMaxWaitMillis(2000);
        //對拿到的connection進行validateObject校驗
        jedisPoolConfig.setTestOnBorrow(true);
        // JedisCluster 會繼承 JedisSlotBasedConnectionHandler, 即會處理 slot 定位問題
        JedisCluster jedis = new JedisCluster(hostAndPortsSet, jedisPoolConfig);
        String key = "key1";
        String value = "Value1";
        jedis.set(key, value);
        System.out.println("set a value to Redis over. " + key + "->" + value);
        value = jedis.get("key1");
        System.out.println("get a value from Redis over. " + key + "->" + value);
        jedis.close();
    }

  如上,就是jedis訪問redis叢集的方式了,sdk封裝之後的應用,總是簡單易用。主要就是通過 JedisCluster 進行訪問即可。而與單機的redis訪問的很大不同點,是在於資料key的定位上,我們可以詳細看看。

  如下是 JedisCluster 的類繼承圖:

 

 

   與之對比的是 Jedis 的類繼承圖:

 

  它們兩個都實現的介面有: BasicCommands, Closeable, JedisCommands. 

  可見,cluster下的redis操作上,與普通的redis還是有許多不同的。不過,我們只想探討的是,key如何定位的問題,所以一個set/get就夠了。

    // JedisCluster 初始化時會初始化 slot 資訊到本地快取中
    // redis.clients.jedis.JedisClusterConnectionHandler#JedisClusterConnectionHandler
  public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
                                       final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password) {
    this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password);
    // 在初始化 JedisCluster 時,會先觸發一次 slot 資訊的拉取,以備後續使用
    initializeSlotsCache(nodes, poolConfig, password);
  }
  private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) {
    for (HostAndPort hostAndPort : startNodes) {
      Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort());
      if (password != null) {
        jedis.auth(password);
      }
      try {
        // 只要某個節點成功響應,就夠了
        // 遍歷的目的,是為了高可用保證,為了避免某些節點故障而拿不到資訊
        cache.discoverClusterNodesAndSlots(jedis);
        break;
      } catch (JedisConnectionException e) {
        // try next nodes
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
  }

    // set 的操作,則是使用 JedisClusterCommand 包裝了一層 Jedis
    // redis.clients.jedis.JedisCluster#set(java.lang.String, java.lang.String)
  @Override
  public String set(final String key, final String value) {
      // connectionHandler 是 JedisSlotBasedConnectionHandler 的例項
      // 預設重試次數: 5
    return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {
      @Override
      public String execute(Jedis connection) {
        return connection.set(key, value);
      }
    }.run(key);
  }
  // redis.clients.jedis.JedisClusterCommand#run(java.lang.String)
  public T run(String key) {
    if (key == null) {
      throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
    }

    return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
  }
  // 帶重試的訪問 redis 節點, 重試的場景有:資料節點不在訪問節點; 訪問的節點正在進行資料遷移; 訪問節點不可用;
  // redis.clients.jedis.JedisClusterCommand#runWithRetries
  private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
    if (attempts <= 0) {
      throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
    }

    Jedis connection = null;
    try {

      if (asking) {
        // TODO: Pipeline asking with the original command to make it
        // faster....
        connection = askConnection.get();
        connection.asking();

        // if asking success, reset asking flag
        asking = false;
      } else {
        if (tryRandomNode) {
          connection = connectionHandler.getConnection();
        } else {
            // 直接呼叫 connectionHandler.getConnectionFromSlot 獲取對應的redis連線
            // 此處計算的 slot 就是redis服務端實現的那套 crc16 % 0x3FFF, 即各端保持一致,就可以做出相同的判定了
          connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
        }
      }

      return execute(connection);

    } catch (JedisNoReachableClusterNodeException jnrcne) {
      throw jnrcne;
    } catch (JedisConnectionException jce) {
      // release current connection before recursion
      releaseConnection(connection);
      connection = null;

      if (attempts <= 1) {
        //We need this because if node is not reachable anymore - we need to finally initiate slots renewing,
        //or we can stuck with cluster state without one node in opposite case.
        //But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request.
        //TODO make tracking of successful/unsuccessful operations for node - do renewing only
        //if there were no successful responses from this node last few seconds
        this.connectionHandler.renewSlotCache();

        //no more redirections left, throw original exception, not JedisClusterMaxRedirectionsException, because it's not MOVED situation
        throw jce;
      }
        // 連線異常,再次請求隨機節點
      return runWithRetries(key, attempts - 1, tryRandomNode, asking);
    } catch (JedisRedirectionException jre) {
      // if MOVED redirection occurred,
      if (jre instanceof JedisMovedDataException) {
        // it rebuilds cluster's slot cache
        // recommended by Redis cluster specification
        this.connectionHandler.renewSlotCache(connection);
      }

      // release current connection before recursion or renewing
      releaseConnection(connection);
      connection = null;

      if (jre instanceof JedisAskDataException) {
        asking = true;
        askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
      } else if (jre instanceof JedisMovedDataException) {
      } else {
        throw new JedisClusterException(jre);
      }
        // 收到 MOVED/ASK 響應,重新整理slot資訊後,重新再訪問
      return runWithRetries(key, attempts - 1, false, asking);
    } finally {
      releaseConnection(connection);
    }
  }
  // 計算hashSlot值
  // redis.clients.util.JedisClusterCRC16#getSlot(byte[])
  public static int getSlot(byte[] key) {
    int s = -1;
    int e = -1;
    boolean sFound = false;
    for (int i = 0; i < key.length; i++) {
      if (key[i] == '{' && !sFound) {
        s = i;
        sFound = true;
      }
      if (key[i] == '}' && sFound) {
        e = i;
        break;
      }
    }
    if (s > -1 && e > -1 && e != s + 1) {
      return getCRC16(key, s + 1, e) & (16384 - 1);
    }
    return getCRC16(key) & (16384 - 1);
  }
  // 根據hashSlot, 得到對應的 redis 連線例項
  @Override
  public Jedis getConnectionFromSlot(int slot) {
      // 先從快取中獲取slot對應的連線資訊,初始時自然是空的
    JedisPool connectionPool = cache.getSlotPool(slot);
    if (connectionPool != null) {
      // It can't guaranteed to get valid connection because of node
      // assignment
      return connectionPool.getResource();
    } else {
        // 重新整理slot快取資訊,大概就是請求 cluster slot, 獲取slot的分佈資訊,然後存入JedisClusterInfoCache中
      renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
      connectionPool = cache.getSlotPool(slot);
      // 如果還是獲取不到,則隨機選擇一個連線
      // 此時請求該隨機節點,服務端有可能會響應正確的節點位置資訊
      if (connectionPool != null) {
        return connectionPool.getResource();
      } else {
        //no choice, fallback to new connection to random node
        return getConnection();
      }
    }
  }
    // redis.clients.jedis.JedisClusterConnectionHandler#renewSlotCache()
  public void renewSlotCache() {
    cache.renewClusterSlots(null);
  }
  // redis.clients.jedis.JedisClusterInfoCache#renewClusterSlots
  public void renewClusterSlots(Jedis jedis) {
    //If rediscovering is already in process - no need to start one more same rediscovering, just return
    if (!rediscovering) {
      try {
        w.lock();
        rediscovering = true;

        if (jedis != null) {
          try {
            discoverClusterSlots(jedis);
            return;
          } catch (JedisException e) {
            //try nodes from all pools
          }
        }
        // 依次遍歷叢集節點,直到有一個正確的響應為止
        for (JedisPool jp : getShuffledNodesPool()) {
          try {
            jedis = jp.getResource();
            discoverClusterSlots(jedis);
            return;
          } catch (JedisConnectionException e) {
            // try next nodes
          } finally {
            if (jedis != null) {
              jedis.close();
            }
          }
        }
      } finally {
        rediscovering = false;
        w.unlock();
      }
    }
  }
  
  private void discoverClusterSlots(Jedis jedis) {
    // 傳送 cluster slots, 命令,獲取 slot 分佈資訊
    List<Object> slots = jedis.clusterSlots();
    this.slots.clear();

    for (Object slotInfoObj : slots) {
      List<Object> slotInfo = (List<Object>) slotInfoObj;

    /* Format: 1) 1) start slot
     *            2) end slot
     *            3) 1) master IP
     *               2) master port
     *               3) node ID
     *            4) 1) replica IP
     *               2) replica port
     *               3) node ID
     *           ... continued until done
     */
      if (slotInfo.size() <= MASTER_NODE_INDEX) {
        continue;
      }

      List<Integer> slotNums = getAssignedSlotArray(slotInfo);

      // hostInfos
      // 第三個元素是 master 資訊
      List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
      if (hostInfos.isEmpty()) {
        continue;
      }

      // at this time, we just use master, discard slave information
      HostAndPort targetNode = generateHostAndPort(hostInfos);
      // 只儲存master資訊
      assignSlotsToNode(slotNums, targetNode);
    }
  }

  private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
    List<Integer> slotNums = new ArrayList<Integer>();
    // 依次將所管轄slot範圍,新增到列表中
    // 如 0 ~ 5999
    for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
        .intValue(); slot++) {
      slotNums.add(slot);
    }
    return slotNums;
  }
  // 將所有給定的 slot, 放到 targetNode 的管轄範圍,方便後續獲取
  // redis.clients.jedis.JedisClusterInfoCache#assignSlotsToNode
  public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
    // 此處的鎖為讀寫鎖 ReentrantReadWriteLock 中的 writeLock
    w.lock();
    try {
        // 建立redis連線
      JedisPool targetPool = setupNodeIfNotExist(targetNode);
      // 依次將範圍內的slot指向 targetNode
      // 正常情況下,slots的大小應該都是16384
      for (Integer slot : targetSlots) {
        // slots = new HashMap<Integer, JedisPool>();
        slots.put(slot, targetPool);
      }
    } finally {
      w.unlock();
    }
  }
  // redis.clients.jedis.JedisClusterInfoCache#setupNodeIfNotExist(redis.clients.jedis.HostAndPort)
  public JedisPool setupNodeIfNotExist(HostAndPort node) {
    w.lock();
    try {
      String nodeKey = getNodeKey(node);
      JedisPool existingPool = nodes.get(nodeKey);
      if (existingPool != null) return existingPool;

      JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
          connectionTimeout, soTimeout, password, 0, null, false, null, null, null);
      nodes.put(nodeKey, nodePool);
      return nodePool;
    } finally {
      w.unlock();
    }
  }
  // 重新整理slot快取資訊後,再重新請求獲取redis連線就簡單了
  // redis.clients.jedis.JedisClusterInfoCache#getSlotPool
  public JedisPool getSlotPool(int slot) {
    r.lock();
    try {
      return slots.get(slot);
    } finally {
      r.unlock();
    }
  }

  從上面的描述,我們清楚了整個客戶如何處理叢集請求的。整體就兩個步驟: 1. 通過 cluster slot 獲取redis叢集的slot分佈資訊,然後快取到本地; 2. 根據slot分佈資訊,向對應的redis節點發起請求即可。

  另外,還有些意外情況,即客戶端拿到的 slot 資訊如果是錯誤的怎麼辦?如何保持客戶端快取與服務端的一致性?

  事實上,客戶端既不保證slot資訊的準確性,也不保證與服務端資料的一致性,而是在發生錯誤的時候,再進行重新整理即可。通過 JedisClusterCommand#runWithRetries, 進行錯誤重試,slot資料重新整理。

 

4. 通常的請求轉發如何實現?

  可以看到,redis實際上一直避開了轉發這個問題。

  那麼,實際中,我們的轉發工作都是如何實現的呢?

  最簡單的,接收到客戶端的請求之後,將資料重新封裝好,然後構建一個目標地址的新請求,傳送過去,然後等待結果響應。當目標伺服器響應後,再將結果響應給客戶端即可。如:應用閘道器、代理伺服器;

  其次,是響應客戶端一個狀態碼(如302),讓客戶端自主進行跳轉。這和redis實現倒是如出一轍;

  相對複雜的,直接使用流進行對接,接收到客戶端的請求後,直接將資料傳到目標伺服器,同樣,目標伺服器響應後,直接將資料寫入客戶端通道即可。這種情況避免大量資料的重新封裝,極大減少了轉發帶來的效能損失,從而提高響應速度。這種場景,一般用於傳輸大檔案。

&n