1. 程式人生 > >Redis哨兵原理總結(四):原始碼分析

Redis哨兵原理總結(四):原始碼分析

目錄

 

本博文主要總結關於哨兵的一些理論知識,主要關注點有一下幾個方面:

一、哨兵解決了什麼問題?

二、哨兵是如何解決“問題一”的?

三、如何使用哨兵?

四、Redis Sentinel客戶端實現的原理是什麼?Java如何操作Redis Sentinel?

四、Redis Sentinel客戶端實現的原理是什麼?Java如何操作Redis Sentinel?

首先,看JedisSentinelPool的建構函式。



Jedis Version:2.9.0

sentinel建構函式:

/**
  *Set<String> sentinels:哨兵集合,格式為IP:PORT
  */
public JedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int timeout, String password) {
	this(masterName, sentinels, poolConfig, timeout, password, 0);
}
	
	
public JedisSentinelPool(String masterName, Set<String> sentinels, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, String password, int database, String clientName) {
	this.connectionTimeout = 2000;
	this.soTimeout = 2000;
	this.database = 0;
	this.masterListeners = new HashSet();
	this.log = Logger.getLogger(this.getClass().getName());
	this.poolConfig = poolConfig;
	this.connectionTimeout = connectionTimeout;
	this.soTimeout = soTimeout;
	this.password = password;
	this.database = database;
	this.clientName = clientName;
	HostAndPort master = this.initSentinels(sentinels, masterName);
	this.initPool(master);
}

 再看,如何初始化哨兵的,initSentinels(sentinels, masterName)。

private HostAndPort initSentinels(Set<String> sentinels, String masterName) {
	HostAndPort master = null;
	boolean sentinelAvailable = false;
	this.log.info("Trying to find master from available Sentinels...");
	Iterator var5 = sentinels.iterator();

	String sentinel;
	HostAndPort hap;
	while(var5.hasNext()) {
		sentinel = (String)var5.next();
		hap = HostAndPort.parseString(sentinel);
		this.log.fine("Connecting to Sentinel " + hap);
		Jedis jedis = null;

		try {
			jedis = new Jedis(hap.getHost(), hap.getPort());
			List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
			sentinelAvailable = true;
			if (masterAddr != null && masterAddr.size() == 2) {
				master = this.toHostAndPort(masterAddr);
				this.log.fine("Found Redis master at " + master);
				break;
			}

			this.log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + ".");
		} catch (JedisException var13) {
			this.log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + var13 + ". Trying next one.");
		} finally {
			if (jedis != null) {
				jedis.close();
			}

		}
	}

	if (master == null) {
		if (sentinelAvailable) {
			throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored...");
		} else {
			throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running...");
		}
	} else {
		this.log.info("Redis master running at " + master + ", starting Sentinel listeners...");
		var5 = sentinels.iterator();

		while(var5.hasNext()) {
			sentinel = (String)var5.next();
			// 獲取哨兵的IP和PORT
			hap = HostAndPort.parseString(sentinel);
			// 建立一個執行緒建立監聽器,對每個哨兵進行監聽
			JedisSentinelPool.MasterListener masterListener = new JedisSentinelPool.MasterListener(masterName, hap.getHost(), hap.getPort());
			// 設定為守護執行緒
			masterListener.setDaemon(true);
			this.masterListeners.add(masterListener);
			masterListener.start();
		}
		return master;
	}
}
private HostAndPort initSentinels(Set<String> sentinels, String masterName) {
	HostAndPort master = null;
	boolean sentinelAvailable = false;
	this.log.info("Trying to find master from available Sentinels...");
	Iterator var5 = sentinels.iterator();

	String sentinel;
	HostAndPort hap;
	while(var5.hasNext()) {
		sentinel = (String)var5.next();
		hap = HostAndPort.parseString(sentinel);
		this.log.fine("Connecting to Sentinel " + hap);
		Jedis jedis = null;

		try {
			jedis = new Jedis(hap.getHost(), hap.getPort());
			List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);
			sentinelAvailable = true;
			if (masterAddr != null && masterAddr.size() == 2) {
				master = this.toHostAndPort(masterAddr);
				this.log.fine("Found Redis master at " + master);
				break;
			}

			this.log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + ".");
		} catch (JedisException var13) {
			this.log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + var13 + ". Trying next one.");
		} finally {
			if (jedis != null) {
				jedis.close();
			}

		}
	}

	if (master == null) {
		if (sentinelAvailable) {
			throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored...");
		} else {
			throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running...");
		}
	} else {
		this.log.info("Redis master running at " + master + ", starting Sentinel listeners...");
		var5 = sentinels.iterator();

		while(var5.hasNext()) {
			sentinel = (String)var5.next();
			// 獲取哨兵的IP和PORT
			hap = HostAndPort.parseString(sentinel);
			// 建立一個執行緒建立監聽器,對每個哨兵進行監聽
			JedisSentinelPool.MasterListener masterListener = new JedisSentinelPool.MasterListener(masterName, hap.getHost(), hap.getPort());
			// 設定為守護執行緒
			masterListener.setDaemon(true);
			this.masterListeners.add(masterListener);
			masterListener.start();
		}
		return master;
	}
}

對master監聽的監聽器的原始碼,建立一個執行緒監聽master。

protected class MasterListener extends Thread {
	protected String masterName;
	protected String host;
	protected int port;
	protected long subscribeRetryWaitTimeMillis;
	protected volatile Jedis j;
	protected AtomicBoolean running;

	protected MasterListener() {
		this.subscribeRetryWaitTimeMillis = 5000L;
		this.running = new AtomicBoolean(false);
	}

	public MasterListener(String masterName, String host, int port) {
		super(String.format("MasterListener-%s-[%s:%d]", masterName, host, port));
		this.subscribeRetryWaitTimeMillis = 5000L;
		this.running = new AtomicBoolean(false);
		this.masterName = masterName;
		this.host = host;
		this.port = port;
	}

	public MasterListener(String masterName, String host, int port, long subscribeRetryWaitTimeMillis) {
		this(masterName, host, port);
		this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
	}

	public void run() {
		this.running.set(true);

		while(this.running.get()) {
			this.j = new Jedis(this.host, this.port);

			try {
				if (!this.running.get()) {
					break;
				}
				// 訂閱+switch-master頻道。
				// 如果哨兵完成了主從切換,會在+switch-master頻道釋出這行資訊:pmaster 172.16.0.149 6379 172.16.0.34 6379
				// 然後對訊息“pmaster 172.16.0.149 6379 172.16.0.34 6379”以“ ”分割得到一個String型別的陣列,陣列的第4個元素就是新的master的IP,第5個元素就是新的master的port,第1個元素就是新的master的名稱。
				this.j.subscribe(new JedisPubSub() {
					public void onMessage(String channel, String message) {
						JedisSentinelPool.this.log.fine("Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " published: " + message + ".");
						String[] switchMasterMsg = message.split(" ");
						if (switchMasterMsg.length > 3) {
							if (MasterListener.this.masterName.equals(switchMasterMsg[0])) {
								JedisSentinelPool.this.initPool(JedisSentinelPool.this.toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));
							} else {
								JedisSentinelPool.this.log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + MasterListener.this.masterName);
							}
						} else {
							JedisSentinelPool.this.log.severe("Invalid message received on Sentinel " + MasterListener.this.host + ":" + MasterListener.this.port + " on channel +switch-master: " + message);
						}
					}
				}, new String[]{"+switch-master"});
			} catch (JedisConnectionException var8) {
				if (this.running.get()) {
					JedisSentinelPool.this.log.log(Level.SEVERE, "Lost connection to Sentinel at " + this.host + ":" + this.port + ". Sleeping 5000ms and retrying.", var8);

					try {
						Thread.sleep(this.subscribeRetryWaitTimeMillis);
					} catch (InterruptedException var7) {
						JedisSentinelPool.this.log.log(Level.SEVERE, "Sleep interrupted: ", var7);
					}
				} else {
					JedisSentinelPool.this.log.fine("Unsubscribing from Sentinel at " + this.host + ":" + this.port);
				}
			} finally {
				this.j.close();
			}
		}
	}
}

 

Redis訊息訂閱與釋出:

客戶端可以將 Sentinel 看作是一個只提供了訂閱功能的 Redis 伺服器:

你不可以使用 PUBLISH 命令向這個伺服器傳送資訊, 但你可以用 SUBSCRIBE 命令或者 PSUBSCRIBE 命令, 通過訂閱給定的頻道來獲取相應的事件提醒。

一個頻道能夠接收和這個頻道的名字相同的事件。

比如說, 名為 +sdown 的頻道就可以接收所有例項進入主觀下線(SDOWN)狀態的事件。通過執行 PSUBSCRIBE * 命令可以接收所有事件資訊。
    +switch-master <master name> <oldip> <oldport> <newip> <newport> :配置變更,主伺服器的 IP 和地址已經改變。

這是絕大多數外部使用者都關心的資訊。

可以看出,我們使用Sentinel命令和釋出訂閱兩種機制就能很好的實現和客戶端的整合整合:使用get-master-addr-by-name和slaves指令可以獲取當前的Master和Slaves的地址和資訊;而當發生故障轉移時,即Master發生切換,可以通過訂閱的+switch-master事件獲得最新的Master資訊。

——————————————————Redis哨兵原理總結完結——————————————————

訂閱與釋出的基本原理:
https://redisbook.readthedocs.io/en/latest/feature/pubsub.html