6.Sentinel原始碼分析—Sentinel是如何動態載入配置限流的?
Sentinel原始碼解析系列:
1.Sentinel原始碼分析—FlowRuleManager載入規則做了什麼?
2. Sentinel原始碼分析—Sentinel是如何進行流量統計的?
3. Sentinel原始碼分析— QPS流量控制是如何實現的?
4.Sentinel原始碼分析— Sentinel是如何做到降級的?
5.Sentinel原始碼分析—Sentinel如何實現自適應限流?
有時候我們做限流的時候並不想直接寫死在程式碼裡面,然後每次要改規則,或者增加規則的時候只能去重啟應用來解決。而是希望能夠動態的更改配置,這樣萬一出現緊急情況還能動態的進行配置修改。例如2018年的雙十一,淘寶的其他服務沒有一點問題,萬萬沒想到在前幾分鐘購物車服務掛了,這個時候就可以緊急限流,對應用進行拯救。
其實看完前面的內容,對動態配置應該是水到渠成的事情,因為所有的配置修改都是通過限流管理器如FlowRuleManager的內部監聽器來實現的,所以只要動態的給監聽器訊號,那麼就可以做到動態的修改配置。
接下來我們來看看Sentinel是怎麼做的。一般的情況下,動態配置常見的實現方式有兩種:
- 拉模式:客戶端主動向某個規則管理中心定期輪詢拉取規則,這個規則中心可以是 RDBMS、檔案,甚至是 VCS 等。這樣做的方式是簡單,缺點是無法及時獲取變更;
- 推模式:規則中心統一推送,客戶端通過註冊監聽器的方式時刻監聽變化,比如使用 Nacos、Zookeeper 等配置中心。這種方式有更好的實時性和一致性保證。
而Sentinel目前兩種都支援:
- Pull-based: 檔案、Consul (since 1.7.0)
- Push-based: ZooKeeper, Redis, Nacos, Apollo
由於支援的方式太多,我這裡只講解兩種,檔案和ZooKeeper,分別對應推拉兩種模式。
Pull-based: 檔案
首先上個例子:
FlowRule.json
[ { "resource": "abc", "controlBehavior": 0, "count": 20.0, "grade": 1, "limitApp": "default", "strategy": 0 }, { "resource": "abc1", "controlBehavior": 0, "count": 20.0, "grade": 1, "limitApp": "default", "strategy": 0 } ]
SimpleFileDataSourceDemo:
public class SimpleFileDataSourceDemo {
private static final String KEY = "abc";
public static void main(String[] args) throws Exception {
SimpleFileDataSourceDemo simpleFileDataSourceDemo = new SimpleFileDataSourceDemo();
simpleFileDataSourceDemo.init();
Entry entry = null;
try {
entry = SphU.entry(KEY);
// dosomething
} catch (BlockException e1) {
// dosomething
} catch (Exception e2) {
// biz exception
} finally {
if (entry != null) {
entry.exit();
}
}
}
private void init() throws Exception {
String flowRulePath = "/Users/luozhiyun/Downloads/test/FlowRule.json";
// Data source for FlowRule
FileRefreshableDataSource<List<FlowRule>> flowRuleDataSource = new FileRefreshableDataSource<>(
flowRulePath, flowRuleListParser);
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
}
private Converter<String, List<FlowRule>> flowRuleListParser = source -> JSON.parseObject(source,
new TypeReference<List<FlowRule>>() {});
}
這個例子主要就是寫死一個資原始檔,然後讀取資原始檔裡面的內容,再通過自定義的資源解析器來解析檔案的內容後設置規則。
這裡我們主要需要分析FileRefreshableDataSource是怎麼載入檔案然後通過FlowRuleManager註冊的。
FileRefreshableDataSource繼承關係:
FileRefreshableDataSource
private static final int MAX_SIZE = 1024 * 1024 * 4;
private static final long DEFAULT_REFRESH_MS = 3000;
private static final int DEFAULT_BUF_SIZE = 1024 * 1024;
private static final Charset DEFAULT_CHAR_SET = Charset.forName("utf-8");
public FileRefreshableDataSource(String fileName, Converter<String, T> configParser) throws FileNotFoundException {
this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET);
}
public FileRefreshableDataSource(File file, Converter<String, T> configParser, long recommendRefreshMs, int bufSize,
Charset charset) throws FileNotFoundException {
super(configParser, recommendRefreshMs);
if (bufSize <= 0 || bufSize > MAX_SIZE) {
throw new IllegalArgumentException("bufSize must between (0, " + MAX_SIZE + "], but " + bufSize + " get");
}
if (file == null || file.isDirectory()) {
throw new IllegalArgumentException("File can't be null or a directory");
}
if (charset == null) {
throw new IllegalArgumentException("charset can't be null");
}
this.buf = new byte[bufSize];
this.file = file;
this.charset = charset;
// If the file does not exist, the last modified will be 0.
this.lastModified = file.lastModified();
firstLoad();
}
FileRefreshableDataSource的構造器裡面會設定各種引數,如:緩衝區大小、字元編碼、檔案上次的修改時間、檔案定時重新整理時間等。
這個方法會呼叫父類的構造器進行初始化,我們再看一下AutoRefreshDataSource做了什麼。
AutoRefreshDataSource
public AutoRefreshDataSource(Converter<S, T> configParser, final long recommendRefreshMs) {
super(configParser);
if (recommendRefreshMs <= 0) {
throw new IllegalArgumentException("recommendRefreshMs must > 0, but " + recommendRefreshMs + " get");
}
this.recommendRefreshMs = recommendRefreshMs;
startTimerService();
}
AutoRefreshDataSource的構造器一開始會呼叫父類的構造器進行初始化,如下:
AbstractDataSource
public AbstractDataSource(Converter<S, T> parser) {
if (parser == null) {
throw new IllegalArgumentException("parser can't be null");
}
this.parser = parser;
this.property = new DynamicSentinelProperty<T>();
}
AbstractDataSource的構造器是為了給兩個變數設值parser和property,其中property是DynamicSentinelProperty的例項。
我們再回到AutoRefreshDataSource中,AutoRefreshDataSource設值完recommendRefreshMs引數後會呼叫startTimerService方法來開啟一個定時的排程任務。
AutoRefreshDataSource#startTimerService
private void startTimerService() {
service = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-datasource-auto-refresh-task", true));
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (!isModified()) {
return;
}
T newValue = loadConfig();
getProperty().updateValue(newValue);
} catch (Throwable e) {
RecordLog.info("loadConfig exception", e);
}
}
}, recommendRefreshMs, recommendRefreshMs, TimeUnit.MILLISECONDS);
}
public SentinelProperty<T> getProperty() {
return property;
}
這個方法裡面會開啟一個執行緒,每3000ms呼叫一次run方法。run方法裡會首先會校驗一下檔案有沒有被修改過,如果有的話就呼叫loadConfig來載入配置,然後呼叫getProperty方法獲取父類設定的property來更新配置。
下來我們依次來講解一下這幾個主要的方法:
isModified方法是一個鉤子,呼叫的是FileRefreshableDataSource的isModified方法:
FileRefreshableDataSource#isModified
protected boolean isModified() {
long curLastModified = file.lastModified();
if (curLastModified != this.lastModified) {
this.lastModified = curLastModified;
return true;
}
return false;
}
isModified每次都會檢視file有沒有被修改,並記錄一下修改的時間。
接著往下是呼叫loadConfig載入檔案:
AbstractDataSource#loadConfig
public T loadConfig() throws Exception {
return loadConfig(readSource());
}
public T loadConfig(S conf) throws Exception {
T value = parser.convert(conf);
return value;
}
FileRefreshableDataSource#readSource
public String readSource() throws Exception {
if (!file.exists()) {
// Will throw FileNotFoundException later.
RecordLog.warn(String.format("[FileRefreshableDataSource] File does not exist: %s", file.getAbsolutePath()));
}
FileInputStream inputStream = null;
try {
inputStream = new FileInputStream(file);
FileChannel channel = inputStream.getChannel();
if (channel.size() > buf.length) {
throw new IllegalStateException(file.getAbsolutePath() + " file size=" + channel.size()
+ ", is bigger than bufSize=" + buf.length + ". Can't read");
}
int len = inputStream.read(buf);
return new String(buf, 0, len, charset);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception ignore) {
}
}
}
}
loadConfig方法的實現還是很清晰的,首先是呼叫readSource通過io流讀取檔案,然後再通過傳入的解析器解析檔案的內容。
接著會呼叫DynamicSentinelProperty的updateValue方法,遍歷監聽器更新配置:
DynamicSentinelProperty#updateValue
public boolean updateValue(T newValue) {
//判斷新的元素和舊元素是否相同
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
value = newValue;
for (PropertyListener<T> listener : listeners) {
listener.configUpdate(newValue);
}
return true;
}
當然,還沒載入FlowRuleManager的時候肯定是沒有監聽器的。
講完了FileRefreshableDataSource的父類的載入,我們再回到FileRefreshableDataSource的構造器中。繼續往下走會呼叫firstLoad方法首次載入配置檔案初始化一次。
FileRefreshableDataSource#firstLoad
private void firstLoad() {
try {
T newValue = loadConfig();
getProperty().updateValue(newValue);
} catch (Throwable e) {
RecordLog.info("loadConfig exception", e);
}
}
下面我們再看一下FlowRuleManager是怎麼註冊的。註冊的時候會呼叫register2Property方法進行註冊:
FlowRuleManager#register2Property
public static void register2Property(SentinelProperty<List<FlowRule>> property) {
AssertUtil.notNull(property, "property cannot be null");
synchronized (LISTENER) {
RecordLog.info("[FlowRuleManager] Registering new property to flow rule manager");
currentProperty.removeListener(LISTENER);
property.addListener(LISTENER);
currentProperty = property;
}
}
這個方法實際上就是添加了一個監聽器,然後將FlowRuleManager的currentProperty替換成flowRuleDataSource建立的property。然後flowRuleDataSource裡面的定時執行緒會每隔3秒鐘呼叫一下這個LISTENER的configUpdate方法進行重新整理規則,這樣就實現了動態更新規則。
Push-based:ZooKeeper
我們還是先給出一個例子:
public static void main(String[] args) {
final String remoteAddress = "127.0.0.1:2181";
final String path = "/Sentinel-Demo/SYSTEM-CODE-DEMO-FLOW";
ReadableDataSource<String, List<FlowRule>> flowRuleDataSource = new ZookeeperDataSource<>(remoteAddress, path,
source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {}));
FlowRuleManager.register2Property(flowRuleDataSource.getProperty());
}
在這裡我定義了/Sentinel-Demo/SYSTEM-CODE-DEMO-FLOW
這個path,如果這個path內的內容發生了變化,那麼就會重新整理規則。
我們先看一下ZookeeperDataSource的繼承關係:
ZookeeperDataSource
public ZookeeperDataSource(final String serverAddr, final String path, Converter<String, T> parser) {
super(parser);
if (StringUtil.isBlank(serverAddr) || StringUtil.isBlank(path)) {
throw new IllegalArgumentException(String.format("Bad argument: serverAddr=[%s], path=[%s]", serverAddr, path));
}
this.path = path;
init(serverAddr, null);
}
AbstractDataSource
public AbstractDataSource(Converter<S, T> parser) {
if (parser == null) {
throw new IllegalArgumentException("parser can't be null");
}
this.parser = parser;
this.property = new DynamicSentinelProperty<T>();
}
ZookeeperDataSource首先會呼叫父類進行引數的設定,在校驗完之後呼叫init方法進行初始化。
ZookeeperDataSource#init
private void init(final String serverAddr, final List<AuthInfo> authInfos) {
initZookeeperListener(serverAddr, authInfos);
loadInitialConfig();
}
ZookeeperDataSource#initZookeeperListener
private void initZookeeperListener(final String serverAddr, final List<AuthInfo> authInfos) {
try {
//設定監聽
this.listener = new NodeCacheListener() {
@Override
public void nodeChanged() {
try {
T newValue = loadConfig();
RecordLog.info(String.format("[ZookeeperDataSource] New property value received for (%s, %s): %s",
serverAddr, path, newValue));
// Update the new value to the property.
getProperty().updateValue(newValue);
} catch (Exception ex) {
RecordLog.warn("[ZookeeperDataSource] loadConfig exception", ex);
}
}
};
String zkKey = getZkKey(serverAddr, authInfos);
if (zkClientMap.containsKey(zkKey)) {
this.zkClient = zkClientMap.get(zkKey);
} else {
//如果key不存在,那麼就加鎖設值
synchronized (lock) {
if (!zkClientMap.containsKey(zkKey)) {
CuratorFramework zc = null;
//根據不同的條件獲取client
if (authInfos == null || authInfos.size() == 0) {
zc = CuratorFrameworkFactory.newClient(serverAddr, new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES));
} else {
zc = CuratorFrameworkFactory.builder().
connectString(serverAddr).
retryPolicy(new ExponentialBackoffRetry(SLEEP_TIME, RETRY_TIMES)).
authorization(authInfos).
build();
}
this.zkClient = zc;
this.zkClient.start();
Map<String, CuratorFramework> newZkClientMap = new HashMap<>(zkClientMap.size());
newZkClientMap.putAll(zkClientMap);
newZkClientMap.put(zkKey, zc);
zkClientMap = newZkClientMap;
} else {
this.zkClient = zkClientMap.get(zkKey);
}
}
}
//為節點新增watcher
//監聽資料節點的變更,會觸發事件
this.nodeCache = new NodeCache(this.zkClient, this.path);
this.nodeCache.getListenable().addListener(this.listener, this.pool);
this.nodeCache.start();
} catch (Exception e) {
RecordLog.warn("[ZookeeperDataSource] Error occurred when initializing Zookeeper data source", e);
e.printStackTrace();
}
}
這個方法主要就是用來建立client和設值監聽,都是zk的常規操作,不熟悉的,可以去看看Curator是怎麼使用的。
private void loadInitialConfig() {
try {
//呼叫父類的loadConfig方法
T newValue = loadConfig();
if (newValue == null) {
RecordLog.warn("[ZookeeperDataSource] WARN: initial config is null, you may have to check your data source");
}
getProperty().updateValue(newValue);
} catch (Exception ex) {
RecordLog.warn("[ZookeeperDataSource] Error when loading initial config", ex);
}
}
設值完zk的client和監聽後會呼叫一次updateValue,首次載入節點的資訊。
AbstractDataSource
public T loadConfig() throws Exception {
return loadConfig(readSource());
}
public T loadConfig(S conf) throws Exception {
T value = parser.convert(conf);
return value;
}
父類的loadConfig會呼叫子類的readSource讀取配置資訊,然後呼叫parser.convert進行反序列化。
ZookeeperDataSource#readSource
public String readSource() throws Exception {
if (this.zkClient == null) {
throw new IllegalStateException("Zookeeper has not been initialized or error occurred");
}
String configInfo = null;
ChildData childData = nodeCache.getCurrentData();
if (null != childData && childData.getData() != null) {
configInfo = new String(childData.getData());
}
return configInfo;
}
這個方法是用來讀取zk節點裡面的資訊。
最後FlowRuleManager.register2Property的方法就和上面的檔案動態配置的是一樣的了。