1. 程式人生 > >redis原始碼解讀總結(redis一致性雜湊實現)

redis原始碼解讀總結(redis一致性雜湊實現)

最近工作中一直在用redis進行快取功能的實現,redis的原始碼雖然只有一萬多行,但是確實值得研究一下,以下個人的一點研究和看法(本來打算用圖表示,實在找不到一種好的畫圖工具來描述,因此就用文字描述了),希望能跟各位共勉之。
一、1.構建JedisShardInfo列表List<JedisShardInfo> jedisShardInfoList,其中JedisShardInfo包含了伺服器IP,埠等資訊,其原始碼如下:

public JedisShardInfo(String host, int port, String name) {
this(host, port, 2000, name);
}


2.構建GenericObjectPoolConfig poolConfig這個裡面主要設定快取池的配置:最大連線數,最大等待時間等配置資訊。
3.基於1和2去構建ShardedJedisPool物件,主要原始碼如下:

public ShardedJedisPool(final GenericObjectPoolConfig poolConfig,
List<JedisShardInfo> shards) {
this(poolConfig, shards, Hashing.MURMUR_HASH);
}


構建此物件的過程中,同時構建了一個ShardedJedisFactory物件,這個物件很重要,後續會有詳細講解,構建此物件原始碼如下:

public ShardedJedisPool(final GenericObjectPoolConfig poolConfig,
List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
}


呼叫父類建構函式:

public Pool(final GenericObjectPoolConfig poolConfig,
PooledObjectFactory<T> factory) {
initPool(poolConfig, factory);
}


呼叫初始化池initPool()方法,下面的程式碼只留關鍵原始碼:

public void initPool(final GenericObjectPoolConfig poolConfig,
PooledObjectFactory<T> factory) {
this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
}


上面的程式碼將之前說的一個非常重要的ShardedJedisFactory物件,裝載到了GenericObjectPool類裡面。
以上的所有資訊都是為了後面的重要操作,像各種資料結構的操作,做鋪墊。
二、在"一"的基礎上進行以下操作,上面已經構建了ShardedJedisPool
1.呼叫ShardedJedisPool的getResource方法返回ShardedJedis進行get或者set操作

public T getResource() {
return internalPool.borrowObject();
}


此處的internalPool物件即為1中最後的GenericObjectPool這個類的物件例項,最終會呼叫到此類中的create方法

private PooledObject<T> create() throws Exception {
final PooledObject<T> p;
try {
p = factory.makeObject();
} catch (Exception e) {
createCount.decrementAndGet();
throw e;
}
return p;
}


上面說過ShardedJedisFactory這個物件非常重要就在於此,factory.makeObject()操作中的factory就是
ShardedJedisFactory這個類,之前已經裝載進去了,上面有說明;makeObject這個方法也是這個類裡面最重要的一個方法

@Override
public PooledObject<ShardedJedis> makeObject() throws Exception {
ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
return new DefaultPooledObject<ShardedJedis>(jedis);
}

makeObject這個方法(1)第一步就是構建了一個ShardedJedis物件,就是呼叫getResource()方法返回的物件,這個物件包含對資料結構的各種操作,
通過建構函式ShardedJedis(shards, algo, keyTagPattern)呼叫父類的構造方法,然後呼叫initialize方法,這個初始化方法就是redis中使用一致性雜湊的
地方所在,迴圈shards(此處的shards就是上面的jedisShardInfoList這個變數,比如說list的size為4,相當於有四臺伺服器),每一個shards設定160個
節點,n臺伺服器就會有n*160個節點,節點的key值就是通過Hashing計算出來,這些節點其實就是已經被均勻的分部到一個閉合的
環中了,這樣伺服器多話就會增加利用率;nodes設定完之後,就會將n個shards與對應的jedis設定到MAP中去了:

public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) {
this.algo = algo;
this.tagPattern = tagPattern;
initialize(shards);
}

private void initialize(List<S> shards) {
nodes = new TreeMap<Long, S>();

for (int i = 0; i != shards.size(); ++i) {
final S shardInfo = shards.get(i);
if (shardInfo.getName() == null)
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n),
shardInfo);
}
else
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(
this.algo.hash(shardInfo.getName() + "*"
+ shardInfo.getWeight() + n), shardInfo);
}
resources.put(shardInfo, shardInfo.createResource());
}
}


上面的初始化動作做完之後這樣在後面做快取的時候或者從快取中取值的時候都會首先通過key呼叫Hashing類中相同的演算法計算出一致性雜湊值,計算出
雜湊值之後,根據TreeMap的tailMap方法從上面的nodes中獲取到大於或者等於這個雜湊值的SortedMap,然後獲取第一個key以及對應的shardInfo,
最終會從resources中獲取到jedis:

public R getShard(String key) {
return resources.get(getShardInfo(key));
}

public S getShardInfo(byte[] key) {
SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
if (tail.isEmpty()) {
return nodes.get(nodes.firstKey());
}
return tail.get(tail.firstKey());
}
public String set(String key, String value) {
Jedis j = getShard(key);
return j.set(key, value);
}

(2)第二步構建了DefaultPooledObject物件,通過此物件返回ShardedJedis,這個物件就是第一步要做的事情。

2.通過上面獲取到的ShardedJedis進行資料的快取或者獲取,現舉一例進行簡單說明之,呼叫set方法,首先根據上面說的一致性雜湊演算法獲取到jedis
jedis裡面定義了一個客戶端Client:



呼叫Jedis的set方法,首先完成客戶端的連線,然後傳送命令,進行資料的快取:

protected Connection sendCommand(final Command cmd, final byte[]... args) {
connect();
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
}

public void connect() {
if (!isConnected()) {
try {
socket = new Socket();
// ->@wjw_add
socket.setReuseAddress(true);
socket.setKeepAlive(true); // Will monitor the TCP connection is
// valid
socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
// ensure timely delivery of data
socket.setSoLinger(true, 0); // Control calls close () method,
// the underlying socket is closed
// immediately
// <[email protected]_add

socket.connect(new InetSocketAddress(host, port), timeout);
socket.setSoTimeout(timeout);
outputStream = new RedisOutputStream(socket.getOutputStream());
inputStream = new RedisInputStream(socket.getInputStream());
} catch (IOException ex) {
throw new JedisConnectionException(ex);
}
}
}

public static void sendCommand(final RedisOutputStream os,
final Command command, final byte[]... args) {
sendCommand(os, command.raw, args);
}

private static void sendCommand(final RedisOutputStream os,
final byte[] command, final byte[]... args) {
try {
os.write(ASTERISK_BYTE);
os.writeIntCrLf(args.length + 1);
os.write(DOLLAR_BYTE);
os.writeIntCrLf(command.length);
os.write(command);
os.writeCrLf();

for (final byte[] arg : args) {
os.write(DOLLAR_BYTE);
os.writeIntCrLf(arg.length);
os.write(arg);
os.writeCrLf();
}
} catch (IOException e) {
throw new JedisConnectionException(e);
}
}

三、總結:以上就是本人對redis原始碼的一點簡單理解,還需要後續的深入學習使用,歡迎各位拍磚。