1. 程式人生 > >redis-cluster主流客戶端驅動不支援pipeline,該怎麼解決。

redis-cluster主流客戶端驅動不支援pipeline,該怎麼解決。

redis-cluster,是基於redis的基礎上開發分散式快取資料庫系統。

1、redis-cluster叢集的特點:

1)無中心節點(share-nothing)架構,各個節點對等,每個節點儲存叢集資料的一部分。

2)叢集資料分佈在所有master節點上的16384個slots中,無冗餘。

3)通過redis配置slave節點,完成資料備份儲存以及提供failover節點。

4)redis cluster提供HA方案。節點之間通過gossip協議進行通訊,交換節點的狀態資訊;如果超過半數以上的master節點與故障節點通訊超時,則認為該節點是故障節點,則通過投票協議完成選舉,實現redis 主從節點自動切換(auto-failover

)。

5)直接ASK 轉向/MOVED 轉向機制.

6)redis cluster支援線上擴容、遷移和升級。

2、redis-cluster不足之處:

1)任意master掛掉,且當前master沒有slave時,叢集進入fail狀態;

2)如果叢集中超過半數的master故障,不管故障master是否有存活的salve,叢集都會進入fail狀態;

3)目前主流的redis-cluster客戶端均不支援pipeline操作(這節所講的重點)

3、如何實現redis-cluster的pipeline操作?

redis是一個cs模式的tcp-server,使用和http類似的請求響應協議,一個client可以通過一個socket連結傳送多個請求命令。每次client發出請求命令後通常會阻塞等待server處理請求,server端處理完請求命令後給客戶端返回響應報文。如果我們傳送四次請求,需要8個tcp報文才能完成,其中如果包含報文傳送時間,網路延時,這極大的浪費了redis server的處理資料的能力。具體通訊過程如下:

client: set f1 1

server:OK

client: set f2 1

server:OK

client: set f3 1

server:OK

client: set f4 1

server:OK

如果我們利用redis的pipeline操作,我們則可以把多個命令打包成一個請求報文一次性發送給redis服務端,這樣就不需要等待單條命令執行結果的返回,而redis服務端會處理完多條命令後會將多條命令的處理結果打包到一起返回給客戶端。通訊過程如下

client: set f1 1

client: set f2 1

client: set f3 1

client: set f4 1


server:OK

server:OK

server:OK

server:OK

但是目前redis-cluster客戶端並不支援pipeline操作,客戶端在初始化redis叢集資料的時候,使用的是JedisCluster而不是Jedis物件,而JedisCluster 並沒有發現提供有像Jedis一樣的獲取Pipeline物件的 pipelined()方法.

 private static JedisCluster jc;
    
    private final static String REDIS_CLUSTER_HOSTS = "redis_cluster_hosts";
    
    static {
        String redis_cluster_hosts = Config.getConfigValue(REDIS_CLUSTER_HOSTS);
        String[] redis_hosts = redis_cluster_hosts.split(",");
        Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
        for(String redisNode:redis_hosts){
            String[] hostPort = redisNode.split(":");
            HostAndPort hostAndPort = new HostAndPort(hostPort[0], Integer.parseInt(hostPort[1]));
            jedisClusterNodes.add(hostAndPort);
        }
        jc = new JedisCluster(jedisClusterNodes);
    }

根據redis-cluster的特性,叢集資料分佈在所有master節點上的16384個slots中,redis客戶端在儲存key的時候,首先是計算key儲存的slot,再通過叢集中solt與節點的對映,插入到對應的slot的master節點中。

如果我們需要支援批量操作,第一步,我們需要通過下面程式碼得到slotHostMap對映:

Map<String, JedisPool> nodeMap = jc.getClusterNodes();
String anyHostAndPortStr = nodeMap.keySet().iterator().next();
TreeMap<Long, String> slotHostMap = getSlotHostMap(anyHostAndPortStr);
/**
 * 獲取slot與host對應關係
 * @param anyHostAndPortStr
 * @return
 */
private static TreeMap<Long, String> getSlotHostMap(String anyHostAndPortStr) {
    TreeMap<Long, String> tree = new TreeMap<Long, String>();
    String parts[] = anyHostAndPortStr.split(":");
    HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1]));
    Jedis jedis = null;
    try{
        jedis = new Jedis(anyHostAndPort.getHost(), anyHostAndPort.getPort());
        List<Object> list = jedis.clusterSlots();
        for (Object object : list) {
            List<Object> list1 = (List<Object>) object;
            List<Object> master = (List<Object>) list1.get(2);
            String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
            tree.put((Long) list1.get(0), hostAndPort);
            tree.put((Long) list1.get(1), hostAndPort);
        }
    }catch(Exception e){
        logger.error("getSlotHostMap error",e);
    }
    return tree;
}

通過上面的程式碼,我們可以得到每個master節點上分佈的slot。這樣我們就可以通過key計算出slot,再通過slot找到對應儲存的節點。
  int slot = JedisClusterCRC16.getSlot(key); 
        //獲取到對應的Jedis物件
        Map.Entry<Long, String> entry = slotHostMap.lowerEntry(Long.valueOf(slot));
        Jedis jedis = nodeMap.get(entry.getValue()).getResource();

找到了Jedis物件,那我們就可以通過jedis.pipelined()建立pipeline物件實現pipeline了。

在我們暴露給其他開發人員使用的批量操作方法中,是支援多個key批量操作,在計算每個key儲存的slot之後,我們需要對通過節點上的key進行分類,然後分批通過pipeline操作同一個redis節點。

 public static String batchSet(Map<String,String> keyValues){
        logger.error("keymap size={}",keyValues.size());
        //通過key獲取到相應的node,再通過node把key做分類
        Map<String, Map<String, String>> todoMap = getNodeKeyMap(keyValues);
        try {
            //todoMap儲存了node與key之間的一對多的對映關係
            for(String key:todoMap.keySet()){
                Jedis jedis = nodeMap.get(key).getResource();
                Map<String, String> kv = todoMap.get(key);
                if(kv.size() > 0){
                    Pipeline p = jedis.pipelined();
                    for(String _key:kv.keySet()){
                        p.set(_key, kv.get(_key));
                    }
                    p.sync();
                }
            }
        } catch (Exception e) {
          logger.error("batchSet error",e);
        }
        return null;
    }

以上的一系列操作,最終的目的還是為了獲取到單個master節點的Jedis物件,獲取到pipeline物件。

休息之餘總結出來的方法,如果大家有更好的方法,歡迎一起探討。qq:1549690699