1. 程式人生 > >7.Sentinel原始碼分析—Sentinel是怎麼和控制檯通訊的?

7.Sentinel原始碼分析—Sentinel是怎麼和控制檯通訊的?

這裡會介紹:

  1. Sentinel會使用多執行緒的方式實現一個類Reactor的IO模型
  2. Sentinel會使用心跳檢測來觀察控制檯是否正常

Sentinel原始碼解析系列:

1.Sentinel原始碼分析—FlowRuleManager載入規則做了什麼?

2. Sentinel原始碼分析—Sentinel是如何進行流量統計的?

3. Sentinel原始碼分析— QPS流量控制是如何實現的?

4.Sentinel原始碼分析— Sentinel是如何做到降級的?

5.Sentinel原始碼分析—Sentinel如何實現自適應限流?

6.Sentinel原始碼分析—Sentinel是如何動態載入配置限流的?


在看我的這篇文章之前大家可以先看一下官方的這篇文章:https://github.com/alibaba/Sentinel/wiki/%E6%8E%A7%E5%88%B6%E5%8F%B0。介紹了控制檯怎麼使用,以及客戶端要怎麼設定才能被收集資料。

客戶端會在InitExecutor呼叫doInit方法中與控制檯建立通訊,所以我們直接看doInit方法:

InitExecutor#doInit

public static void doInit() {
    //InitExecutor只會初始化一次,並且初始化失敗會退出
    if (!initialized.compareAndSet(false, true)) {
        return;
    }
    try {
        //通過spi載入InitFunc子類
        ServiceLoader<InitFunc> loader = ServiceLoader.load(InitFunc.class);
        List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
        for (InitFunc initFunc : loader) {
            RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
            //給所有的initFunc排序,按@InitOrder從小到大進行排序
            //然後封裝成OrderWrapper物件
            insertSorted(initList, initFunc);
        }
        for (OrderWrapper w : initList) {
            w.func.init();
            RecordLog.info(String.format("[InitExecutor] Executing %s with order %d",
                w.func.getClass().getCanonicalName(), w.order));
        }
    } catch (Exception ex) {
        RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
        ex.printStackTrace();
    } catch (Error error) {
        RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
        error.printStackTrace();
    }
}

因為這裡我們引入了sentinel-transport-simple-http模組,所以使用spi載入InitFunc的子類的時候會載入三個子類例項,分別是:CommandCenterInitFunc、HeartbeatSenderInitFunc、MetricCallbackInit。
然後會遍歷loader,根據@InitOrder的大小進行排序,並封裝成OrderWrapper放入到initList中。
所以initList裡面的物件順序是:

  1. CommandCenterInitFunc
  2. HeartbeatSenderInitFunc
  3. MetricCallbackInit
    然後遍歷initList依次呼叫init方法。

所以下面我們來看一下這三個實現類的init方法做了什麼:

CommandCenterInitFunc

CommandCenterInitFunc#init

public void init() throws Exception {
    //獲取commandCenter物件
    CommandCenter commandCenter = CommandCenterProvider.getCommandCenter();

    if (commandCenter == null) {
        RecordLog.warn("[CommandCenterInitFunc] Cannot resolve CommandCenter");
        return;
    }
    //呼叫SimpleHttpCommandCenter的beforeStart方法
    //用來設定CommandHandler的實現類
    commandCenter.beforeStart();
    commandCenter.start();
    RecordLog.info("[CommandCenterInit] Starting command center: "
            + commandCenter.getClass().getCanonicalName());
}

這個方法裡面的所有操作都是針對CommandCenter來進行的,所以我們先來看看CommandCenterProvider這個類。

CommandCenterProvider

static {
    //初始化commandCenter物件
    resolveInstance();
}

private static void resolveInstance() {
    //獲取SpiOrder更大的子類實現類
    CommandCenter resolveCommandCenter = SpiLoader.loadHighestPriorityInstance(CommandCenter.class);

    if (resolveCommandCenter == null) {
        RecordLog.warn("[CommandCenterProvider] WARN: No existing CommandCenter found");
    } else {
        commandCenter = resolveCommandCenter;
        RecordLog.info("[CommandCenterProvider] CommandCenter resolved: " + resolveCommandCenter.getClass()
            .getCanonicalName());
    }
}

CommandCenterProvider會在首次初始化的時候呼叫resolveInstance方法。在resolveInstance方法裡面會呼叫SpiLoader.loadHighestPriorityInstance來獲取CommandCenter,這裡獲取的是SimpleHttpCommandCenter這個例項,loadHighestPriorityInstance方法具體的實現非常簡單,我就不去分析了。
然後將commandCenter賦值SimpleHttpCommandCenter例項。

所以CommandCenterProvider.getCommandCenter()方法返回的是SimpleHttpCommandCenter例項。

然後呼叫SimpleHttpCommandCenter的beforeStart方法。

SimpleHttpCommandCenter#beforeStart

public void beforeStart() throws Exception {
    // Register handlers
    //呼叫CommandHandlerProvider的namedHandlers方法
    //獲取CommandHandler的spi中設定的實現類
    Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
    //將handlers中的資料設定到handlerMap中
    registerCommands(handlers);
}

這個方法首先會呼叫CommandHandlerProvider的namedHandlers中獲取所有的CommandHandler實現類。

CommandHandlerProvider#namedHandlers

private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoader.load(CommandHandler.class);

public Map<String, CommandHandler> namedHandlers() {
    Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
    for (CommandHandler handler : serviceLoader) {
        //獲取實現類CommandMapping註解的name屬性
        String name = parseCommandName(handler);
        if (!StringUtil.isEmpty(name)) {
            map.put(name, handler);
        }
    }
    return map;
}

這個類會通過spi先載入CommandHandler的實現類,然後將實現類按註解上面的name屬性放入到map裡面去。
CommandHandler的實現類是用來和控制檯進行互動的處理類,負責處理。
這也是策略模式的一種應用,根據map裡面的不同策略來做不同的處理,例如SendMetricCommandHandler是用來統計呼叫資訊然後傳送給控制檯用的,ModifyRulesCommandHandler是用來做實時修改限流策略的處理的等等。

然後我們再回到CommandCenterInitFunc中,繼續往下走,呼叫commandCenter.start()方法。

SimpleHttpCommandCenter#start

public void start() throws Exception {
    //獲取當前機器的cpu執行緒數
    int nThreads = Runtime.getRuntime().availableProcessors();
    //建立一個cpu執行緒數大小的固定執行緒池,用來做業務執行緒池用
    this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
        new ArrayBlockingQueue<Runnable>(10),
        new NamedThreadFactory("sentinel-command-center-service-executor"),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                CommandCenterLog.info("EventTask rejected");
                throw new RejectedExecutionException();
            }
        });

    Runnable serverInitTask = new Runnable() {
        int port;

        {
            try {
                //獲取port
                port = Integer.parseInt(TransportConfig.getPort());
            } catch (Exception e) {
                port = DEFAULT_PORT;
            }
        }

        @Override
        public void run() {
            boolean success = false;
            //建立一個ServerSocket
            ServerSocket serverSocket = getServerSocketFromBasePort(port);

            if (serverSocket != null) {
                CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                socketReference = serverSocket;
                executor.submit(new ServerThread(serverSocket));
                success = true;
                port = serverSocket.getLocalPort();
            } else {
                CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
            }

            if (!success) {
                port = PORT_UNINITIALIZED;
            }

            TransportConfig.setRuntimePort(port);
            //關閉執行緒池
            executor.shutdown();
        }

    };

    new Thread(serverInitTask).start();
}
  1. 這個方法會建立一個固定大小的業務執行緒池
  2. 建立一個serverInitTask,裡面負責建立serverSocket然後用executor去建立一個ServerThread非同步執行serverSocket
  3. executor用完之後會在serverInitTask裡面呼叫executor的shutdown方法去關閉執行緒池

其中executor是一個單執行緒的執行緒池:

private ExecutorService executor = Executors.newSingleThreadExecutor(
    new NamedThreadFactory("sentinel-command-center-executor"));

ServerThread是SimpleHttpCommandCenter的內部類:

public void run() {
    while (true) {
        Socket socket = null;
        try {
              //建立連線
            socket = this.serverSocket.accept();
              //預設的超時時間是3s
            setSocketSoTimeout(socket);
            HttpEventTask eventTask = new HttpEventTask(socket);
            //使用業務執行緒非同步處理
            bizExecutor.submit(eventTask);
        } catch (Exception e) {
            CommandCenterLog.info("Server error", e);
            if (socket != null) {
                try {
                    socket.close();
                } catch (Exception e1) {
                    CommandCenterLog.info("Error when closing an opened socket", e1);
                }
            }
            try {
                // In case of infinite log.
                Thread.sleep(10);
            } catch (InterruptedException e1) {
                // Indicates the task should stop.
                break;
            }
        }
    }
}

run方法會使用構造器傳入的serverSocket建立連線後設置超時時間,封裝成HttpEventTask類,然後使用上面建立的bizExecutor非同步執行任務。

HttpEventTask是Runnable的實現類,所以呼叫bizExecutor的submit的時候會呼叫其中的run方法使用socket與控制檯進行互動。

HttpEventTask#run

public void run() {
          ....
        // Validate the target command.
        //獲取commandName
        String commandName = HttpCommandUtils.getTarget(request);
        if (StringUtil.isBlank(commandName)) {
            badRequest(printWriter, "Invalid command");
            return;
        }
        // Find the matching command handler.
        //根據commandName獲取處理器名字
        CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
        if (commandHandler != null) {
            //呼叫處理器結果,然後返回給控制檯
            CommandResponse<?> response = commandHandler.handle(request);
            handleResponse(response, printWriter, outputStream);
        }  
          ....
    } catch (Throwable e) {
        ....
    } finally {
        ....
    }
}

HttpEventTask的run方法很長,但是很多都是有關輸入輸出流的,我們不關心,所以省略。只需要知道會把request請求最後轉換成一個控制檯發過來的指令,然後通過SimpleHttpCommandCenter呼叫getHandler得到處理器,然後處理資料就行了。

所以這個整個的處理流程就是:

通過這樣的一個處理流程,然後實現了類似reactor的一個處理流程。

SimpleHttpCommandCenter#getHandler

public static CommandHandler getHandler(String commandName) {
    return handlerMap.get(commandName);
}

handlerMap裡面的資料是通過前面我們分析的呼叫beforeStart方法設定進來的。

然後通過commandName獲取對應的控制檯,例如:控制檯傳送過來metric指令,那麼就會對應的呼叫SendMetricCommandHandler的handle方法來處理控制檯的指令。

我們來看看SendMetricCommandHandler是怎麼處理返回統計資料的:

SendMetricCommandHandler#handle

public CommandResponse<String> handle(CommandRequest request) {
    // Note: not thread-safe.
    if (searcher == null) {
        synchronized (lock) {
            //獲取應用名
            String appName = SentinelConfig.getAppName();
            if (appName == null) {
                appName = "";
            }
            if (searcher == null) {
                //用來找metric檔案,
                searcher = new MetricSearcher(MetricWriter.METRIC_BASE_DIR,
                    MetricWriter.formMetricFileName(appName, PidUtil.getPid()));
            }
        }
    }
    //獲取請求的開始結束時間和最大的行數
    String startTimeStr = request.getParam("startTime");
    String endTimeStr = request.getParam("endTime");
    String maxLinesStr = request.getParam("maxLines");
    //用來確定資源
    String identity = request.getParam("identity");
    long startTime = -1;
    int maxLines = 6000;
    if (StringUtil.isNotBlank(startTimeStr)) {
        startTime = Long.parseLong(startTimeStr);
    } else {
        return CommandResponse.ofSuccess("");
    }
    List<MetricNode> list;
    try {
        // Find by end time if set.
        if (StringUtil.isNotBlank(endTimeStr)) {
            long endTime = Long.parseLong(endTimeStr);
            //根據開始結束時間找到統計資料
            list = searcher.findByTimeAndResource(startTime, endTime, identity);
        } else {
            if (StringUtil.isNotBlank(maxLinesStr)) {
                maxLines = Integer.parseInt(maxLinesStr);
            }
            maxLines = Math.min(maxLines, 12000);
            list = searcher.find(startTime, maxLines);
        }
    } catch (Exception ex) {
        return CommandResponse.ofFailure(new RuntimeException("Error when retrieving metrics", ex));
    }
    if (list == null) {
        list = new ArrayList<>();
    }
    //如果identity為空就加入CPU負載和系統負載
    if (StringUtil.isBlank(identity)) {
        addCpuUsageAndLoad(list);
    }
    StringBuilder sb = new StringBuilder();
    for (MetricNode node : list) {
        sb.append(node.toThinString()).append("\n");
    }
    return CommandResponse.ofSuccess(sb.toString());
}

我們在1.Sentinel原始碼分析—FlowRuleManager載入規則做了什麼?裡介紹了Metric統計資訊會在MetricTimerListener的run方法中定時寫入檔案中去。

所以handle方法裡面主要是如何根據請求的開始結束時間,資源名來獲取磁碟的檔案,然後返回磁碟的統計資訊,並記錄一下當前的統計資訊,防止重複傳送統計資料到控制檯。

HeartbeatSenderInitFunc

HeartbeatSenderInitFunc主要是用來做心跳執行緒使用的,定期的和控制檯進行心跳連線。

HeartbeatSenderInitFunc#init

public void init() {
    //獲取HeartbeatSender的實現類
    HeartbeatSender sender = HeartbeatSenderProvider.getHeartbeatSender();
    if (sender == null) {
        RecordLog.warn("[HeartbeatSenderInitFunc] WARN: No HeartbeatSender loaded");
        return;
    }
    //建立一個corepoolsize為2,maximumPoolSize為最大的執行緒池
    initSchedulerIfNeeded();
    //獲取心跳間隔時間,預設10s
    long interval = retrieveInterval(sender);
    //設定間隔心跳時間
    setIntervalIfNotExists(interval);
    //開啟一個定時任務,每隔interval時間傳送一個心跳
    scheduleHeartbeatTask(sender, interval);
}
  1. 首先會呼叫HeartbeatSenderProvider.getHeartbeatSender方法,裡面會根據spi建立例項,返回一個SimpleHttpHeartbeatSender例項。
  2. 呼叫initSchedulerIfNeeded方法建立一個corepoolsize為2的執行緒池
  3. 獲取心跳間隔時間,如果沒有設定,那麼是10s
  4. 呼叫scheduleHeartbeatTask方法開啟一個定時執行緒呼叫。

我們來看看scheduleHeartbeatTask方法:
HeartbeatSenderInitFunc#scheduleHeartbeatTask

private void scheduleHeartbeatTask(/*@NonNull*/ final HeartbeatSender sender, /*@Valid*/ long interval) {
    pool.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                sender.sendHeartbeat();
            } catch (Throwable e) {
                RecordLog.warn("[HeartbeatSender] Send heartbeat error", e);
            }
        }
    }, 5000, interval, TimeUnit.MILLISECONDS);
    RecordLog.info("[HeartbeatSenderInit] HeartbeatSender started: "
        + sender.getClass().getCanonicalName());
}

預設的情況,建立的這個定時任務會每隔10s呼叫一次SimpleHttpHeartbeatSender的sendHeartbeat方法。

SimpleHttpHeartbeatSender#sendHeartbeat

public boolean sendHeartbeat() throws Exception {
    if (TransportConfig.getRuntimePort() <= 0) {
        RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat");
        return false;
    }
    //獲取控制檯的ip和埠等資訊
    InetSocketAddress addr = getAvailableAddress();
    if (addr == null) {
        return false;
    }
    //設定http呼叫的ip和埠,還有訪問的url
    SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH);
    //獲取版本號,埠等資訊
    request.setParams(heartBeat.generateCurrentMessage());
    try {
        //傳送post請求
        SimpleHttpResponse response = httpClient.post(request);
        if (response.getStatusCode() == OK_STATUS) {
            return true;
        }
    } catch (Exception e) {
        RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e);
    }
    return false;
}

這個心跳檢測的方法就寫的很簡單了,通過Dcsp.sentinel.dashboard.server預先設定好的ip和埠號傳送post請求到控制檯,然後檢測是否返回200,如果是則說明控制檯正常,否則進行異常處理