關於JedisCluster不支援Pipeline操作的解決方案
阿新 • • 發佈:2018-11-29
一、背景
業務需要,把redis單結點改為叢集,在對程式碼進行測試的時候發現了,原本使用jedis的批量操作pipeline,到了叢集的時候不可用了。報了org.springframework.data.redis.connection.jedis.JedisClusterConnection的錯誤。
即pipeline是不支援在叢集模式下使用的。
於是乎,只能技能PipelineBase介面,重寫這個方法了。
二、程式碼
JedisCluster.java(獲取jedis叢集實體類)
@Bean public JedisCluster getJedisCluster() { String[] serverArray = clusterNodes.split(",");//獲取伺服器陣列 Set<HostAndPort> nodes = new HashSet<>(); for (String ipPort : serverArray) { String[] ipPortPair = ipPort.split(":"); nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim()))); } return new JedisCluster(nodes, poolConfig()); }
JedisClusterPipeline.java(重寫的pipeline類)
package com.hqjy.msg.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.*; import redis.clients.jedis.exceptions.JedisMovedDataException; import redis.clients.jedis.exceptions.JedisRedirectionException; import redis.clients.util.JedisClusterCRC16; import redis.clients.util.SafeEncoder; import java.io.Closeable; import java.lang.reflect.Field; import java.util.*; /** * @program: message * @description: JedisClusterPipeLine * @author: Irving Wei * @create: 2018-11-27 10:22 **/ public class JedisClusterPipeline extends PipelineBase implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(JedisClusterPipeline.class); // 部分欄位沒有對應的獲取方法,只能採用反射來做 // 你也可以去繼承JedisCluster和JedisSlotBasedConnectionHandler來提供訪問介面 private static final Field FIELD_CONNECTION_HANDLER; private static final Field FIELD_CACHE; static { FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler"); FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache"); } private JedisSlotBasedConnectionHandler connectionHandler; private JedisClusterInfoCache clusterInfoCache; private Queue<Client> clients = new LinkedList<Client>(); // 根據順序儲存每個命令對應的Client private Map<JedisPool, Jedis> jedisMap = new HashMap<>(); // 用於快取連線 private boolean hasDataInBuf = false; // 是否有資料在快取區 /** * 根據jedisCluster例項生成對應的JedisClusterPipeline * @param * @return */ public static JedisClusterPipeline pipelined(JedisCluster jedisCluster) { JedisClusterPipeline pipeline = new JedisClusterPipeline(); pipeline.setJedisCluster(jedisCluster); return pipeline; } public JedisClusterPipeline() { } public void setJedisCluster(JedisCluster jedis) { connectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER); clusterInfoCache = getValue(connectionHandler, FIELD_CACHE); } /** * 重新整理叢集資訊,當叢集資訊發生變更時呼叫 * @param * @return */ public void refreshCluster() { connectionHandler.renewSlotCache(); } /** * 同步讀取所有資料. 與syncAndReturnAll()相比,sync()只是沒有對資料做反序列化 */ public void sync() { innerSync(null); } /** * 同步讀取所有資料 並按命令順序返回一個列表 * * @return 按照命令的順序返回所有的資料 */ public List<Object> syncAndReturnAll() { List<Object> responseList = new ArrayList<Object>(); innerSync(responseList); return responseList; } private void innerSync(List<Object> formatted) { HashSet<Client> clientSet = new HashSet<Client>(); try { for (Client client : clients) { // 在sync()呼叫時其實是不需要解析結果資料的,但是如果不呼叫get方法,發生了JedisMovedDataException這樣的錯誤應用是不知道的,因此需要呼叫get()來觸發錯誤。 // 其實如果Response的data屬性可以直接獲取,可以省掉解析資料的時間,然而它並沒有提供對應方法,要獲取data屬性就得用反射,不想再反射了,所以就這樣了 Object data = generateResponse(client.getOne()).get(); if (null != formatted) { formatted.add(data); } // size相同說明所有的client都已經新增,就不用再呼叫add方法了 if (clientSet.size() != jedisMap.size()) { clientSet.add(client); } } } catch (JedisRedirectionException jre) { if (jre instanceof JedisMovedDataException) { // if MOVED redirection occurred, rebuilds cluster's slot cache, // recommended by Redis cluster specification refreshCluster(); } throw jre; } finally { if (clientSet.size() != jedisMap.size()) { // 所有還沒有執行過的client要保證執行(flush),防止放回連線池後後面的命令被汙染 for (Jedis jedis : jedisMap.values()) { if (clientSet.contains(jedis.getClient())) { continue; } flushCachedData(jedis); } } hasDataInBuf = false; close(); } } @Override public void close() { clean(); clients.clear(); for (Jedis jedis : jedisMap.values()) { if (hasDataInBuf) { flushCachedData(jedis); } jedis.close(); } jedisMap.clear(); hasDataInBuf = false; } private void flushCachedData(Jedis jedis) { try { jedis.getClient().getAll(); } catch (RuntimeException ex) { } } @Override protected Client getClient(String key) { byte[] bKey = SafeEncoder.encode(key); return getClient(bKey); } @Override protected Client getClient(byte[] key) { Jedis jedis = getJedis(JedisClusterCRC16.getSlot(key)); Client client = jedis.getClient(); clients.add(client); return client; } private Jedis getJedis(int slot) { JedisPool pool = clusterInfoCache.getSlotPool(slot); // 根據pool從快取中獲取Jedis Jedis jedis = jedisMap.get(pool); if (null == jedis) { jedis = pool.getResource(); jedisMap.put(pool, jedis); } hasDataInBuf = true; return jedis; } private static Field getField(Class<?> cls, String fieldName) { try { Field field = cls.getDeclaredField(fieldName); field.setAccessible(true); return field; } catch (NoSuchFieldException | SecurityException e) { throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + cls.getName(), e); } } @SuppressWarnings({"unchecked" }) private static <T> T getValue(Object obj, Field field) { try { return (T)field.get(obj); } catch (IllegalArgumentException | IllegalAccessException e) { LOGGER.error("get value fail", e); throw new RuntimeException(e); } } }
XXXService.java(使用)
@Autowired private JedisCluster jc; @RedisIdxConnection public synchronized void pipeline(String key, List list) { // 如果要操作的list沒有值,則直接返回 if (!(list.size() > 0)) { return; } JedisClusterPipeline jcp = JedisClusterPipeline.pipelined(jc); jcp.refreshCluster(); List<Object> batchResult = null; try { for (int i = 0; i < list.size(); i++) { // for迴圈的內容根據自己的業務進行修改,核心就是用jcp加到陣列再一起同步到redis實現批量操作 Map map = (Map) list.get(i); String msg = (String) map.get("value"); long time = (long) map.get("score"); String zkey = (String) map.get(Constant.REDIS_KEY); jcp.zadd(zkey, time, msg); } jcp.sync(); batchResult = jcp.syncAndReturnAll(); } finally { jcp.close(); }
有問題歡迎留言討論~~~