osquery監控系統解密
osquery作為一個主機資訊收集的軟體,限制其資源使用是非常重要的,尤其如果將osquery部署在高併發伺服器的生產環境,查詢程序資訊、埠資訊或者是audit資訊,那麼勢必會造成osquery的記憶體激增,所以對於osquery的資源限制是必須的。facebook團隊也考慮到這一點,於是在osquery中就內建了資源監控和資源限制的設定,具體可以參考 Daemon control flags
watchdog
在說明watchdog之前,需要osquery中的一個基本的概念。以osqueryd為例,當我們啟動osqueryd之後,檢視osqueryd的程序:
$ ps -ef | grep osqueryd root2823210 16:17 ?00:00:00 /usr/bin/osqueryd --flagfile /etc/osquery/osquery.flags --config_path /etc/osquery/osquery.conf root28259282322 16:17 ?00:00:00 /usr/bin/osqueryd
可以發現存在兩個與osqueryd相關的程序。其中pid為28232的osqueryd的程序是 /usr/bin/osqueryd
的父程序。此時pid為28232的osqueryd的程序稱之為watcher程序,而pid為28259的程序稱之為worker程序。實際執行query查詢的是worker程序,而watcher程序則是負責對worker程序的資源監控。
Daemon control flags 中介紹了常見的設定,其中與資源限制,資源監控相關的基本上都帶有 watchdog
字樣。
-
--disable_watchdog=false
,預設值是false
,表示osquery是預設開啟資源監控的。如果發現超過了閾值,則會重啟worker程序 -
--watchdog_level=0
,資源監控的級別,級別分為三檔(0=normal, 1=restrictive, -1=disabled),預設是0。具體的級別劃分可以參考英文原文.還可以將其設定為-1
,表示完全禁用資源監控。這個和disable_watchdog
的區別在於,--disable_watchdog=false
僅僅只是對worker程序生效無法對擴充套件程序生效。而設定--watchdog_level=-1
會同時worker程序和擴充套件生效。 -
--watchdog_memory_limit=0
,設定osquery記憶體使用閾值。由於在watchdog_level
已經存在這個值的了,所以如果設定了watchdog_memory_limit
就會覆蓋掉watchdog_level
中的值。 -
--watchdog_utilization_limit=0
同watchdog_memory_limit
. -
--watchdog_delay=60
在worker程序啟動後,間隔watchdog_delay
之後watcher程序才開始監控。因為worker程序剛啟動時必然會涉及到資源的初始化等等,所以剛啟動時記憶體和CPU佔用比較多,此時就需要一個時間間隔,之後由watcher監控。 -
--enable_extensions_watchdog=false
,表示是否開啟擴充套件的監控,預設是關閉的。但是即使在關閉情況下,watcher程序還是會監控擴充套件的異常關閉,只不過不監控擴充套件的使用情況。如果需要開啟對擴充套件的資源的監控,將此選項設定為true即可。
watcher.cpp
與監控的程式碼位於 osquery/core/watcher.cpp
kWatchdogLimits
在前面已經說到通過 watchdog_level
來控制監控級別,每一個監控級別具體的閾值的設定是在osquery的程式碼中硬編碼的。
using WatchdogLimitMap = std::map<WatchdogLimitType, LimitDefinition>; struct LimitDefinition { size_t normal; size_t restrictive; size_t disabled; }; const WatchdogLimitMap kWatchdogLimits = { // Maximum MB worker can privately allocate. {WatchdogLimitType::MEMORY_LIMIT, {200, 100, 10000}}, // % of (User + System + Idle) CPU time worker can utilize // for LATENCY_LIMIT seconds. {WatchdogLimitType::UTILIZATION_LIMIT, {10, 5, 100}}, // Number of seconds the worker should run, else consider the exit fatal. {WatchdogLimitType::RESPAWN_LIMIT, {4, 4, 1000}}, // If the worker respawns too quickly, backoff on creating additional. {WatchdogLimitType::RESPAWN_DELAY, {5, 5, 1}}, // Seconds of tolerable UTILIZATION_LIMIT sustained latency. {WatchdogLimitType::LATENCY_LIMIT, {12, 6, 1000}}, // How often to poll for performance limit violations. {WatchdogLimitType::INTERVAL, {3, 3, 3}}, }; size_t getWorkerLimit(WatchdogLimitType name) { if (kWatchdogLimits.count(name) == 0) { return 0; } if (name == WatchdogLimitType::MEMORY_LIMIT && FLAGS_watchdog_memory_limit > 0) { return FLAGS_watchdog_memory_limit; } if (name == WatchdogLimitType::UTILIZATION_LIMIT && FLAGS_watchdog_utilization_limit > 0) { return FLAGS_watchdog_utilization_limit; } auto level = FLAGS_watchdog_level; // If no level was provided then use the default (config/switch). if (level == -1) { return kWatchdogLimits.at(name).disabled; } if (level == 1) { return kWatchdogLimits.at(name).restrictive; } return kWatchdogLimits.at(name).normal; }
getWorkerLimit(WatchdogLimitType::UTILIZATION_LIMIT)
. getWorkerLimit()
根據傳入的引數名稱以及 FLAGS_watchdog_level
的等級決定在 kWatchdogLimits
中具體的取值。上述的程式碼結構還是很清晰的。
WatcherRunner
WatcherRunner
就是watcher監控入口。
void WatcherRunner::start() { // Hold the current process (watcher) for inspection too. auto &watcher = Watcher::get(); auto self = PlatformProcess::getCurrentProcess(); // Set worker performance counters to an initial state. watcher.resetWorkerCounters(0); PerformanceState watcher_state; // Enter the watch loop. do { if (use_worker_ && !watch(watcher.getWorker())) { if (watcher.fatesBound()) { // A signal has interrupted the watcher. break; } auto status = watcher.getWorkerStatus(); if (status == EXIT_CATASTROPHIC) { Initializer::requestShutdown(EXIT_CATASTROPHIC); break; } if (watcher.workerRestartCount() == getWorkerLimit(WatchdogLimitType::RESPAWN_LIMIT)) { // Too many worker restarts. Initializer::requestShutdown(EXIT_FAILURE, "Too many worker restarts"); break; } // The watcher failed, create a worker. createWorker(); } // After inspecting the worker, check the extensions. // Extensions may be active even if a worker/watcher is not used. watchExtensions(); if (use_worker_) { auto status = isWatcherHealthy(*self, watcher_state); if (!status.ok()) { Initializer::requestShutdown( EXIT_CATASTROPHIC, "Watcher has become unhealthy: " + status.getMessage()); break; } } if (run_once_) { // A test harness can end the thread immediately. break; } pause(std::chrono::seconds(getWorkerLimit(WatchdogLimitType::INTERVAL))); } while (!interrupted() && ok()); }
整個的監控是放在一個大型的 while (!interrupted() && ok())
中的。分步拆解一下。
-
watcher.getWorker()
得到worker程序; -
watch(watcher.getWorker())
監控worker程序,監控的所有資訊全部是封裝在watcher
物件中。(小插曲其中watcher.fatesBound()
用於判斷watcher程序與worker程序是否是父子程序關係,如果發現不是,則中斷監控); -
auto status = watcher.getWorkerStatus();
,得到監控狀態。根據不同的監控狀態返回不同的錯誤資訊; - 由於進入到
do{}
中基本上都是watch(watcher.getWorker()
監控到wroker的資源使用存在問題,最後呼叫createWorker();
重啟worker程序。
通過分析,在上述過程中 watch()
和 createWorker()
是最為關鍵的。
createWorker()
void WatcherRunner::createWorker() { auto &watcher = Watcher::get(); /** * init check */ // Get the complete path of the osquery process binary. boost::system::error_code ec; auto exec_path = fs::system_complete(fs::path(qd[0]["path"]), ec); if (!pathExists(exec_path).ok()) { LOG(WARNING) << "osqueryd doesn't exist in: " << exec_path.string(); return; } if (!safePermissions(exec_path.parent_path().string(), exec_path.string(), true)) { // osqueryd binary has become unsafe. LOG(ERROR) << RLOG(1382) << "osqueryd has unsafe permissions: " << exec_path.string(); Initializer::requestShutdown(EXIT_FAILURE); return; } auto worker = PlatformProcess::launchWorker(exec_path.string(), argc_, argv_); if (worker == nullptr) { // Unrecoverable error, cannot create a worker process. LOG(ERROR) << "osqueryd could not create a worker process"; Initializer::shutdown(EXIT_FAILURE); return; } watcher.setWorker(worker); watcher.resetWorkerCounters(getUnixTime()); VLOG(1) << "osqueryd watcher (" << PlatformProcess::getCurrentPid() << ") executing worker (" << worker->pid() << ")"; watcher.worker_status_ = -1; }
- 通過
exec_path = fs::system_complete(fs::path(qd[0]["path"]), ec);
,得到osqueryd
的執行路徑; -
auto worker = PlatformProcess::launchWorker(exec_path.string(), argc_, argv_);
,通過launchWorker()
執行osqueryd
,通過這種方式保證osqueryd啟動。這種方式和之前文章 osquery動態除錯和重打包 中所講到的 啟動分析 是一樣的。最終是呼叫::execve(exec_path.c_str(), argv, ::environ);
-
watcher.setWorker(worker);watcher.resetWorkerCounters(getUnixTime());
重新設定worker的pid和啟動時間;
watch()
watch()
是整個監控系統的核心部分。 watch()
函式負責對worker程序各項指標進行監控,包括前面說的CPU,記憶體等等。
bool WatcherRunner::watch(const PlatformProcess &child) const { int process_status = 0; ProcessState result = child.checkStatus(process_status); if (Watcher::get().fatesBound()) { // A signal was handled while the watcher was watching. return false; } if (!child.isValid() || result == PROCESS_ERROR) { // Worker does not exist or never existed. return false; } else if (result == PROCESS_STILL_ALIVE) { // If the inspect finds problems it will stop/restart the worker. auto status = isChildSane(child); // A delayed watchdog does not stop the worker process. if (!status.ok() && getUnixTime() >= delayedTime()) { stopChild(child); return false; } return true; } if (result == PROCESS_EXITED) { // If the worker process existed, store the exit code. Watcher::get().worker_status_ = process_status; return false; } return true; }
watch()
的引數 const PlatformProcess &child
就是worker程序;
-
Watcher::get().fatesBound()
和WatcherRunner::start()
一樣,首先判斷watcher程序與worker程序是否是父子程序關係。如果發現不是,則直接返回false
; -
ProcessState result = child.checkStatus(process_status);
,檢查worker程序當前狀態。如果檢查到程序是PROCESS_ERROR
或者是PROCESS_EXITED
,則直接返回false; -
auto status = isChildSane(child);
,如果發現worker程序正常執行,呼叫isChildSane()
檢測worker程序的資源問題;
isChildSane()
Status WatcherRunner::isChildSane(const PlatformProcess &child) const { auto rows = getProcessRow(child.pid()); if (rows.size() == 0) { // Could not find worker process? return Status(1, "Cannot find process"); } PerformanceChange change; { WatcherExtensionsLocker locker; auto &state = Watcher::get().getState(child); change = getChange(rows[0], state); } // Only make a decision about the child sanity if it is still the watcher's // child. It's possible for the child to die, and its pid reused. if (change.parent != PlatformProcess::getCurrentPid()) { // The child's parent is not the watcher. Watcher::get().reset(child); // Do not stop or call the child insane, since it is not our child. return Status(0); } if (exceededCyclesLimit(change)) { return Status(1,"Maximum sustainable CPU utilization limit exceeded: " + std::to_string(change.sustained_latency * change.iv)); } // Check if the private memory exceeds a memory limit. if (exceededMemoryLimit(change)) { return Status(1, "Memory limits exceeded: " + std::to_string(change.footprint)); } // The worker is sane, no action needed. // Attempt to flush status logs to the well-behaved worker. if (use_worker_ && child.pid() == Watcher::get().getWorker().pid()) { relayStatusLogs(); } return Status(0); }
-
auto rows = getProcessRow(child.pid());
,根據worker程序的pid在process
表中查詢資訊,在process
-
auto &state = Watcher::get().getState(child);
拿到worker程序的資訊。-
getState()
PerformanceState state_; PerformanceState &Watcher::getState(const PlatformProcess &child) { if (child == getWorker()) { return state_; } else { return extension_states_[getExtensionPath(child)]; } }
-
watch.h
:PerformanceState
struct PerformanceState { /// A counter of how many intervals the process exceeded performance limits. size_t sustained_latency; /// The last checked user CPU time. size_t user_time; /// The last checked system CPU time. size_t system_time; /// A timestamp when the process/worker was last created. size_t last_respawn_time; /// The initial (or as close as possible) process image footprint. size_t initial_footprint; PerformanceState() { sustained_latency = 0; user_time = 0; system_time = 0; last_respawn_time = 0; initial_footprint = 0; } };
可以看到
PerformanceState
是一個結構體,儲存了:sustained_latency user_time system_time last_respawn_time initial_footprint
-
getProcessRow()
QueryData WatcherRunner::getProcessRow(pid_t pid) const { int p = pid; #ifdef WIN32 WIN32 code.... #endif return SQL::selectFrom( {"parent", "user_time", "system_time", "resident_size"}, "processes", "pid", EQUALS, INTEGER(p)); }
getProcessRow(pid_t pid)
其實就是查詢的processes
表,然後獲取了parent
(父程序程序ID即PPID),user_time
(在使用者態執行的CPU時間),system_time
(在核心態執行CPU的時間),resident_size
(程序使用的私有記憶體大小)。getProcessRow(pid_t pid)
查詢得到的資訊與第二步中的auto &state = Watcher::get().getState(child);
得到的資訊基本一致。在processes
表中查詢到的是worker
當前狀態下的實時資源使用資訊。這樣通過比較之前的資源使用情況和當前的程序的使用情況,通過change = getChange(rows[0], state);
比較分析就能夠判斷當前的程序是否存在問題。 -
change.parent != PlatformProcess::getCurrentPid()
,這種情況下有可能是worker
程序中途發生了改變,此時比較之後如果發現不一樣,就執行Watcher::get().reset(child);
重置watcher
監控程序的子程序。 -
呼叫
(exceededMemoryLimit(change)
和exceededCyclesLimit(change)
對change之後的結果進行判斷,分別判斷記憶體和CPU的情況。以exceededMemoryLimit(chaneg)
為例來說明。static bool exceededMemoryLimit(const PerformanceChange &change) { if (change.footprint == 0) { return false; } return (change.footprint > getWorkerLimit(WatchdogLimitType::MEMORY_LIMIT) * 1024 * 1024); }
通過
change.footprint >getWorkerLimit(WatchdogLimitType::MEMORY_LIMIT) * 1024 * 1024
,通過判斷diff的結果與預設的結果的比較。而WatchdogLimitType::MEMORY_LIMIT
的定義在前面的 kWatchdogLimits 中已經說明了。
-
- 由此看來在
isChildSane(const PlatformProcess &child)
中最為關鍵的是比較方法,即change = getChange(rows[0], state);
getChange
PerformanceChange getChange(const Row &r, PerformanceState &state) { PerformanceChange change; // IV is the check interval in seconds, and utilization is set per-second. change.iv = std::max(getWorkerLimit(WatchdogLimitType::INTERVAL), 1_sz); // 3 long long user_time = 0, system_time = 0; try { change.parent = static_cast<pid_t>(tryTo<long long>(r.at("parent")).takeOr(0LL)); user_time = tryTo<long long>(r.at("user_time")).takeOr(0LL); system_time = tryTo<long long>(r.at("system_time")).takeOr(0LL); change.footprint = tryTo<long long>(r.at("resident_size")).takeOr(0LL); } catch (const std::exception & /* e */) { state.sustained_latency = 0; } // Check the difference of CPU time used since last check. auto percent_ul = getWorkerLimit(WatchdogLimitType::UTILIZATION_LIMIT); percent_ul = (percent_ul > 100) ? 100 : percent_ul; UNSIGNED_BIGINT_LITERAL iv_milliseconds = change.iv * 1000;//3*1000 // 此時cpu_ul = (10*3000*1)/100=300 UNSIGNED_BIGINT_LITERAL cpu_ul = (percent_ul * iv_milliseconds * kNumOfCPUs) / 100; auto user_time_diff = user_time - state.user_time; auto sys_time_diff = system_time - state.system_time; auto cpu_utilization_time = user_time_diff + sys_time_diff; if (cpu_utilization_time > cpu_ul) { state.sustained_latency++; } else { state.sustained_latency = 0; } // Update the current CPU time. state.user_time = user_time; state.system_time = system_time; // Check if the sustained difference exceeded the acceptable latency limit. change.sustained_latency = state.sustained_latency; // Set the memory footprint as the amount of resident bytes allocated // since the process image was created (estimate). // A more-meaningful check would limit this to writable regions. if (state.initial_footprint == 0) { state.initial_footprint = change.footprint; } // Set the measured/limit-applied footprint to the post-launch allocations. if (change.footprint < state.initial_footprint) { change.footprint = 0; } else { change.footprint = change.footprint - state.initial_footprint; } return change; }
在 getChange()
函式中,主要是對記憶體和CPU的使用情況進行了判斷,由於這兩個邏輯是混在一起的。為了便於分析,我們將記憶體和CPu的使用情況分開分析。
footprint
change.footprint = tryTo<long long>(r.at("resident_size")).takeOr(0LL); ..... if (state.initial_footprint == 0) { state.initial_footprint = change.footprint; } // Set the measured/limit-applied footprint to the post-launch allocations. if (change.footprint < state.initial_footprint) { change.footprint = 0; } else { change.footprint = change.footprint - state.initial_footprint; }
change.footprint = tryTo<long long>(r.at("resident_size")).takeOr(0LL);
, change.footprint
拿的就是在上面通過查詢 process
表拿到的結果,即當前情況下 worker
程序的資源使用情況;
其中 state
則是 PerformanceState
結構體,儲存了當前的 worker
的資源使用資訊。
初始化 change.footprint
中儲存的就是當前的資訊,最終返回 change.footprint = change.footprint - state.initial_footprint
。此時 change.footprint
返回的就是當前的資源使用與上一次的資源使用的差值。其中的一個小細節就是,如果返現 change.footprint < state.initial_footprint
,那麼就將 change.footprint
置為0,也就是說差值都是大於或等於0的。
sustained_latency
時鐘頻率的判斷也比較的簡單。
// cpu_ul的預設設定 UNSIGNED_BIGINT_LITERAL cpu_ul = (percent_ul * iv_milliseconds * kNumOfCPUs) / 100; // 得到當前狀態的在使用者態下消耗的時間和在核心態下消耗的時間 user_time = tryTo<long long>(r.at("user_time")).takeOr(0LL); system_time = tryTo<long long>(r.at("system_time")).takeOr(0LL); auto user_time_diff = user_time - state.user_time; auto sys_time_diff = system_time - state.system_time; // cpu_utilization_time 就是當前狀態與上次狀態的核心態時間和使用者態時間的差值總和 auto cpu_utilization_time = user_time_diff + sys_time_diff; // 如果超過了cpu_ul的預設設定,則sustained_latency的數量加1,為什麼使用的是state.sustained_latency?因為state.sustained_latency可能之前就不為空,之前就有可能超過了預設的設定 if (cpu_utilization_time > cpu_ul) { state.sustained_latency++; } else { state.sustained_latency = 0; } // 將state.sustained_latency賦值給change.sustained_latency,用於之後的CPU資源使用的分析判斷。 change.sustained_latency = state.sustained_latency;、
最後通過 change = getChange(rows[0], state);
得到了變化情況之後,分別呼叫 exceededCyclesLimit(change)
和 exceededMemoryLimit(change)
進行分析判斷CPU和記憶體是否超標,具體的實現方法在前面也簡要地說明了。如果最終發現存在問題,則返回類似於 return Status(1, "Memory limits exceeded: " + std::to_string(change.footprint));
的錯誤資訊,如果執行正常,則返回 Status(0)
watch(const PlatformProcess &child)
前面已經說過了 watch()
是整個監控系統的核心部分。 watch()
函式負責對worker程序各項指標進行監控。那麼就會根據 isChildSane(child)
的檢測結果決定下一步的動作。
auto status = isChildSane(child); // A delayed watchdog does not stop the worker process. if (!status.ok() && getUnixTime() >= delayedTime()) { // Since the watchdog cannot use the logger plugin the error message // should be logged to stderr and to the system log. std::stringstream error; error << "osqueryd worker (" << child.pid()<< ") stopping: " << status.getMessage(); systemLog(error.str()); LOG(WARNING) << error.str(); stopChild(child); return false; } return true;
如果發現 worker
的資源使用過多,就會呼叫 systemLog(error.str());
打出日誌,同時還會呼叫 stopChild(child);
停止掉 worker
程序。
void WatcherRunner::stopChild(const PlatformProcess &child) const { child.killGracefully(); // Clean up the defunct (zombie) process. if (!child.cleanup()) { auto child_pid = child.pid(); LOG(WARNING) << "osqueryd worker (" << std::to_string(child_pid) << ") could not be stopped. Sending kill signal."; child.kill(); if (!child.cleanup()) { auto message = std::string("Watcher cannot stop worker process (") + std::to_string(child_pid) + ")."; Initializer::requestShutdown(EXIT_CATASTROPHIC, message); } } } `osquery/core/posix/process.cpp` bool PlatformProcess::killGracefully() const { if (!isValid()) { return false; } // 關於kill程序的用法:http://man7.org/linux/man-pages/man2/kill.2.html // In the case of SIGCONT, it suffices when the sending and receiving processes belong to the same session int status = ::kill(nativeHandle(), SIGTERM); return (status == 0); }
通過追蹤呼叫棧,可以發現最終呼叫的是 ::kill(nativeHandle(), SIGTERM)
方法殺掉 worker
程序的。
start()
do { if (use_worker_ && !watch(watcher.getWorker())) { ....... createWorker(); } ..... } while (!interrupted() && ok());
在 WatcherRunner::start()
中發現 watch(watcher.getWorker())
發現 workder
的狀態有問題時,最終就會呼叫 createWorker();
重新啟動新的 worker
程序(之前的worker程序已經在 watch(const PlatformProcess &child)
中被幹掉了)。
整體來說,整個檢測邏輯還是比較簡單的,唯一有點痛苦的是,可能你如果不分析原始碼,對其中的函式呼叫關係難以理清楚,下面這個圖可能幫助清理osquery的整個監控邏輯。
總結
HIDS中的Agent熔斷機制,監控機制的設計和實現是在設計HIDS中需要著重考量的一個地方,因為一旦出現了大量的資源佔用的情況,要求我們能夠能夠及時地停止我們的agent的資訊收集活動。通過分析osquery的監控機制,也為我們自己實現監控和熔斷提供了一些思路。