Redis哨兵原理總結(四):原始碼分析
目錄
本博文主要總結關於哨兵的一些理論知識,主要關注點有一下幾個方面:
一、哨兵解決了什麼問題?
二、哨兵是如何解決“問題一”的?
三、如何使用哨兵?
四、Redis Sentinel客戶端實現的原理是什麼?Java如何操作Redis Sentinel?
四、Redis Sentinel客戶端實現的原理是什麼?Java如何操作Redis Sentinel?
首先,看JedisSentinelPool的建構函式。
Jedis Version:2.9.0 sentinel建構函式: /** *Set<String> sentinels:哨兵集合,格式為IP:PORT */ public JedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password) { this(masterName, sentinels, poolConfig, timeout, password, 0); } public JedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName) { this.connectionTimeout = 2000; this.soTimeout = 2000; this.database = 0; this.masterListeners = new HashSet(); this.log = Logger.getLogger(this.getClass().getName()); this.poolConfig = poolConfig; this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; this.password = password; this.database = database; this.clientName = clientName; HostAndPort master = this.initSentinels(sentinels, masterName); this.initPool(master); }
再看,如何初始化哨兵的,initSentinels(sentinels, masterName)。
private HostAndPort initSentinels(Set<String> sentinels, String masterName) { HostAndPort master = null; boolean sentinelAvailable = false; this.log.info("Trying to find master from available Sentinels..."); Iterator var5 = sentinels.iterator(); String sentinel; HostAndPort hap; while(var5.hasNext()) { sentinel = (String)var5.next(); hap = HostAndPort.parseString(sentinel); this.log.fine("Connecting to Sentinel " + hap); Jedis jedis = null; try { jedis = new Jedis(hap.getHost(), hap.getPort()); List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); sentinelAvailable = true; if (masterAddr != null && masterAddr.size() == 2) { master = this.toHostAndPort(masterAddr); this.log.fine("Found Redis master at " + master); break; } this.log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + "."); } catch (JedisException var13) { this.log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + var13 + ". Trying next one."); } finally { if (jedis != null) { jedis.close(); } } } if (master == null) { if (sentinelAvailable) { throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); } else { throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running..."); } } else { this.log.info("Redis master running at " + master + ", starting Sentinel listeners..."); var5 = sentinels.iterator(); while(var5.hasNext()) { sentinel = (String)var5.next(); // 獲取哨兵的IP和PORT hap = HostAndPort.parseString(sentinel); // 建立一個執行緒建立監聽器,對每個哨兵進行監聽 JedisSentinelPool.MasterListener masterListener = new JedisSentinelPool.MasterListener(masterName, hap.getHost(), hap.getPort()); // 設定為守護執行緒 masterListener.setDaemon(true); this.masterListeners.add(masterListener); masterListener.start(); } return master; } }
private HostAndPort initSentinels(Set<String> sentinels, String masterName) { HostAndPort master = null; boolean sentinelAvailable = false; this.log.info("Trying to find master from available Sentinels..."); Iterator var5 = sentinels.iterator(); String sentinel; HostAndPort hap; while(var5.hasNext()) { sentinel = (String)var5.next(); hap = HostAndPort.parseString(sentinel); this.log.fine("Connecting to Sentinel " + hap); Jedis jedis = null; try { jedis = new Jedis(hap.getHost(), hap.getPort()); List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); sentinelAvailable = true; if (masterAddr != null && masterAddr.size() == 2) { master = this.toHostAndPort(masterAddr); this.log.fine("Found Redis master at " + master); break; } this.log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + "."); } catch (JedisException var13) { this.log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + var13 + ". Trying next one."); } finally { if (jedis != null) { jedis.close(); } } } if (master == null) { if (sentinelAvailable) { throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); } else { throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running..."); } } else { this.log.info("Redis master running at " + master + ", starting Sentinel listeners..."); var5 = sentinels.iterator(); while(var5.hasNext()) { sentinel = (String)var5.next(); // 獲取哨兵的IP和PORT hap = HostAndPort.parseString(sentinel); // 建立一個執行緒建立監聽器,對每個哨兵進行監聽 JedisSentinelPool.MasterListener masterListener = new JedisSentinelPool.MasterListener(masterName, hap.getHost(), hap.getPort()); // 設定為守護執行緒 masterListener.setDaemon(true); this.masterListeners.add(masterListener); masterListener.start(); } return master; } }
對master監聽的監聽器的原始碼,建立一個執行緒監聽master。
protected class MasterListener extends Thread {
protected String masterName;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis;
protected volatile Jedis j;
protected AtomicBoolean running;
protected MasterListener() {
this.subscribeRetryWaitTimeMillis = 5000L;
this.running = new AtomicBoolean(false);
}
public MasterListener(String masterName, String host, int port) {
super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
this.subscribeRetryWaitTimeMillis = 5000L;
this.running = new AtomicBoolean(false);
this.masterName = masterName;
this.host = host;
this.port = port;
}
public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) {
this(masterName, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}
public void run() {
this.running.set(true);
while(this.running.get()) {
this.j = new Jedis(this.host, this.port);
try {
if (!this.running.get()) {
break;
}
// 訂閱+switch-master頻道。
// 如果哨兵完成了主從切換,會在+switch-master頻道釋出這行資訊:pmaster 172.16.0.149 6379 172.16.0.34 6379
// 然後對訊息“pmaster 172.16.0.149 6379 172.16.0.34 6379”以“ ”分割得到一個String型別的陣列,陣列的第4個元素就是新的master的IP,第5個元素就是新的master的port,第1個元素就是新的master的名稱。
this.j.subscribe(new JedisPubSub() {
public void onMessage(String channel, String message) {
JedisSentinelPool.this.log.fine("Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (MasterListener.this.masterName.equals(switchMasterMsg[0])) {
JedisSentinelPool.this.initPool(JedisSentinelPool.this.toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
} else {
JedisSentinelPool.this.log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + MasterListener.this.masterName);
}
} else {
JedisSentinelPool.this.log.severe("Invalid message received on Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " on channel +switch-master: " + message);
}
}
}, new String[]{"+switch-master"});
} catch (JedisConnectionException var8) {
if (this.running.get()) {
JedisSentinelPool.this.log.log(Level.SEVERE, "Lost connection to Sentinel at " + this.host + ":" + this.port + ". Sleeping 5000ms and retrying.", var8);
try {
Thread.sleep(this.subscribeRetryWaitTimeMillis);
} catch (InterruptedException var7) {
JedisSentinelPool.this.log.log(Level.SEVERE, "Sleep interrupted: ", var7);
}
} else {
JedisSentinelPool.this.log.fine("Unsubscribing from Sentinel at " + this.host + ":" + this.port);
}
} finally {
this.j.close();
}
}
}
}
Redis訊息訂閱與釋出:
客戶端可以將 Sentinel 看作是一個只提供了訂閱功能的 Redis 伺服器:
你不可以使用 PUBLISH 命令向這個伺服器傳送資訊, 但你可以用 SUBSCRIBE 命令或者 PSUBSCRIBE 命令, 通過訂閱給定的頻道來獲取相應的事件提醒。
一個頻道能夠接收和這個頻道的名字相同的事件。
比如說, 名為 +sdown 的頻道就可以接收所有例項進入主觀下線(SDOWN)狀態的事件。通過執行 PSUBSCRIBE * 命令可以接收所有事件資訊。
+switch-master <master name> <oldip> <oldport> <newip> <newport> :配置變更,主伺服器的 IP 和地址已經改變。
這是絕大多數外部使用者都關心的資訊。
可以看出,我們使用Sentinel命令和釋出訂閱兩種機制就能很好的實現和客戶端的整合整合:使用get-master-addr-by-name和slaves指令可以獲取當前的Master和Slaves的地址和資訊;而當發生故障轉移時,即Master發生切換,可以通過訂閱的+switch-master事件獲得最新的Master資訊。
——————————————————Redis哨兵原理總結完結——————————————————
訂閱與釋出的基本原理:
https://redisbook.readthedocs.io/en/latest/feature/pubsub.html