大眾點評CAT開源監控系統剖析
轉載:https://www.cnblogs.com/yeahwell/p/cat.html
參考文件:
Spring Cloud Sleuth 整合Zipkin、RabbitMQ 和 (Mysql或Elasticsearch)
1. 介紹
1.1 概述
CAT(Central Application Tracking)基於Java開發的實時監控平臺,主要包括移動端監控,應用側監控,核心網路層監控,系統層監控等。
CAT是一個提供實時監控報警,應用效能分析診斷的工具。
1.2 CAT能做什麼
在此之前,先來想一想對於線上應用我們希望能監控些什麼?可能有如下這些:
- 機器狀態資訊。CPU負載、記憶體資訊、磁碟使用率這些是必需的,另外可能還希望收集Java程序的資料,例如執行緒棧、堆、垃圾回收等資訊,以幫助出現問題時快速debug。
- 請求訪問情況
- 異常情況。譬如快取服務時不時出現無響應,我們希望能夠監控到這種異常,從而做進一步的處理。
- 業務情況。例如訂單量統計,銷售額等等。
CAT支援的監控訊息型別包括:
- Transaction 適合記錄跨越系統邊界的程式訪問行為,比如遠端呼叫,資料庫呼叫,也適合執行時間較長的業務邏輯監控,Transaction用來記錄一段程式碼的執行時間和次數。
- Event 用來記錄一件事發生的次數,比如記錄系統異常,它和transaction相比缺少了時間的統計,開銷比transaction要小。
- Heartbeat 表示程式內定期產生的統計資訊, 如CPU%, MEM%, 連線池狀態, 系統負載等。
- Metric 用於記錄業務指標、指標可能包含對一個指標記錄次數、記錄平均值、記錄總和,業務指標最低統計粒度為1分鐘。
- Trace 用於記錄基本的trace資訊,類似於log4j的info資訊,這些資訊僅用於檢視一些相關資訊
在一個請求處理中可能產生有多種訊息,CAT將其組織成訊息樹的形式。
在處理開始時,預設開始一個型別為URL的Transaction,在這個Transaction中業務本身可以產生子訊息。例如,產生一個數據庫訪問的子Transaction或者一個訂單統計的Metric。結構如下所示:
1.3 分散式監控系統要求
- 方便安裝
- 要求輕量
- 介面儘可能友好
- 監控策略豐富,監控元素多樣化
- 可以巢狀開發
- 佔用伺服器資源小,使用時不過多佔用機器硬體方面資源,對實際業務影響較小
1.4 CAT使用特點
- 非同步化傳輸資料,不太影響正常業務
- 實時監控
- 輕量,部署簡單
- 嵌入簡單
- 有問題跟蹤報表
- 訊息樹形化
- 日誌不落地本地磁碟,較少IO,但很消耗網路資源
- 監控訊息,按照分業務傳輸資料,如業務場景,時間等要求傳輸資料
- 有報警機制
- 可能複雜的訊息儲存和訊息ID查詢看起來麻煩,需要建立查詢索引(目前不考慮這個東東)
- 訊息佇列非同步化傳送
- 開源(這個最重要)
2. CAT設計
2.1 整體設計
2.2 客戶端設計
2.3 服務端設計
2.4 領域建模
3. 模組劃分
3.1 模組說明
3.1.1 client端
cat-client 提供給業務以及中間層埋點的底層SDK。
3.1.2 server端
cat-consumer 用於實時分析從客戶端提供的資料。
cat-home 作為使用者給使用者提供展示的控制端 ,並且cat-home做展示時,通過對cat-consumer的呼叫獲取其他節點的資料,將所有資料彙總展示。
consumer、home以及路由中心都是部署在一起的,每個服務端節點都可以充當任何一個角色。
CAT服務端在整個實時處理中,基本上實現了全非同步化處理:
- 訊息消費基於Netty的NIO實現(Netty-Server);
- 訊息消費到服務端就存放記憶體佇列,然後程式開啟一個執行緒會消費這個訊息做訊息分發(非同步消費處理);
- 每個訊息都會有一批執行緒併發消費各自佇列的資料,以做到訊息處理的隔離。(每報表每執行緒,分別按照自己的規則解析消費這個訊息,並且可以動態控制對某種報表型別的處理執行緒個數);
- 訊息(原始的訊息logView)儲存是先存入本地磁碟,然後非同步上傳到HDFS檔案,這也避免了強依賴HDFS;
4. 設計原理
4.1 cat-client設計
作為一個日誌上報的通用客戶端,考慮點至少有如下這些:
- 為了儘可能減少對業務的影響,需要對訊息進行非同步處理。即業務執行緒將訊息交給CAT客戶端與CAT客戶端上報這兩個過程需要非同步。
- 為了達到實時的目的以及適應高併發的情況,客戶端上報應該基於TCP而非HTTP開發。
- 線上程安全的前提下儘可能的資源低消耗以及低延時。我們知道,執行緒競爭的情況是由於資源共享造成的,要達到執行緒安全通常需要減少資源共享或者加鎖,而這兩點則會導致系統資源冗餘和高延時。
CAT客戶端實現並不複雜,但這些點都考慮到了。它的架構如下所示:
大概步驟為:
- 業務執行緒產生訊息,交給訊息Producer,訊息Producer將訊息存放在該業務執行緒訊息棧中;
- 業務執行緒通知訊息Producer訊息結束時,訊息Producer根據其訊息棧產生訊息樹放置在同步訊息佇列中;
- 訊息上報執行緒監聽訊息佇列,根據訊息樹產生最終的訊息報文上報CAT服務端。
4.1.1 cat-client包結構
└─com ├─dianping │ └─cat │ ├─build │ ├─configuration │ ├─log4j │ ├─message │ │ ├─internal │ │ ├─io │ │ └─spi │ │ ├─codec │ │ └─internal │ ├─servlet │ └─status └─site ├─helper └─lookup └─util
4.1.2 com.dianping.cat.message包介紹
包結構如下:
com.dianping.cat.message中主要包含了internal、io、spi這三個目錄:
- internal目錄包含主要的CAT客戶端內部實現類;
- io目錄包含建立服務端連線、重連、訊息佇列監聽、上報等io實現類;
- spi目錄為上報訊息工具包,包含訊息二進位制編解碼、轉義等實現類。
其uml圖如下所示(可以放大看):
類的功能如下:
- Message為所有上報訊息的抽象,它的子類實現有Transaction、Metric、Event、HeartBeat、Trace這五種。
- MessageProducer封裝了所有介面,業務在使用CAT時只需要通過MessageProducer來操作。
- MessageManager為CAT客戶端核心類,相當於MVC中的Controller。
- Context類儲存訊息上下文。
- TransportManager提供傳送訊息的sender,具體實現有DefaultTransportManager,呼叫其getSender介面返回一個TcpSocketSender。
- TcpSocketSender類負責傳送訊息。
1)Message
上面說到,Message有五類,分別為Transaction、Metric、Event、HeartBeat、Trace。其中Metric、Event、HeartBeat、Trace基本相同,儲存的資料都為一個字串;而Transaction則儲存一個Message列表。換句話說,Transaction的結構為一個遞迴包含的結構,其他結構則為原子性結構。
下面為DefaultTransaction的關鍵資料成員及操作:
public class DefaultTransaction extends AbstractMessage implements Transaction { private List<Message> m_children; private MessageManager m_manager; ... //新增子訊息 public DefaultTransaction addChild(Message message) { ... } //Transaction結束時呼叫此方法 public void complete() { ... m_manager.end(this); //呼叫MessageManager來結束Transaction ... }
值得一提的是,Transaction(或者其他的Message)在建立時自動開始,訊息結束時需要業務方呼叫complete方法,而在complete方法內部則呼叫MessageManager來完成訊息。
2)MessageProducer
MessageProducer對業務方封裝了CAT內部的所有細節,它的主要方法如下:
public void logError(String message, Throwable cause); public void logEvent(String type, String name, String status, String nameValuePairs); public void logHeartbeat(String type, String name, String status, String nameValuePairs); public void logMetric(String name, String status, String nameValuePairs); public void logTrace(String type, String name, String status, String nameValuePairs); ... public Event newEvent(String type, String name); public Event newEvent(Transaction parent, String type, String name); public Heartbeat newHeartbeat(String type, String name); public Metric newMetric(String type, String name); public Transaction newTransaction(String type, String name); public Trace newTrace(String type, String name);
logXXX方法為方法糖(造詞小能手呵呵),這些方法在呼叫時需要傳入訊息資料,方法結束後訊息自動結束。
newXXX方法返回相應的Message,業務方需要呼叫Message方法設定資料,並最終呼叫Message.complete()方法結束訊息。
MessageProducer只是介面封裝,訊息處理主要實現依賴於MessageManager這個類。
3)MessageManager
MessageManager為CAT的核心類,但它只是定義了介面,具體實現為DefaultMessageManager。DefaultMessageManager這個類裡面主要包含了兩個功能類,Context和TransportManager,分別用於儲存上下文和訊息傳輸。TransportManager執行期間為單例物件,而Context則包裝成ThreadLocal為每個執行緒儲存上下文。
我們通過介面來了解DefaultMessageManager的主要功能:
public void add(Message message); public void start(Transaction transaction, boolean forked); public void end(Transaction transaction); public void flush(MessageTree tree);
add()方法用來新增原子性的Message,也就是Metric、Event、HeartBeat、Trace。
start()和end()方法用來開始和結束Transaction這種訊息。
flush()方法用來將當前業務執行緒的所有訊息重新整理到CAT服務端,當然,是非同步的。
4)Context
Context用來儲存訊息上下文,我們可以通過它的主要介面來了解它功能:
public void add(Message message) { if (m_stack.isEmpty()) { MessageTree tree = m_tree.copy(); tree.setMessage(message); flush(tree); } else { Transaction parent = m_stack.peek(); addTransactionChild(message, parent); } }
add方法主要新增原子性訊息,它先判斷該訊息是否有上文訊息(即判斷是否處於一個Transaction中)。如果有則m_stack不為空並且將該訊息新增到上文Transaction的子訊息佇列中;否則直接呼叫flush來將此原子性訊息重新整理到服務端。
public void start(Transaction transaction, boolean forked) { if (!m_stack.isEmpty()) { ... Transaction parent = m_stack.peek(); addTransactionChild(transaction, parent); } else { m_tree.setMessage(transaction); } if (!forked) { m_stack.push(transaction); } }
start方法用來開始Transaction(Transaction是訊息裡比較特殊的一種),如果當前訊息棧為空則證明該Transaction為第一個Transaction,使用訊息樹儲存該訊息,同時將該訊息壓棧;否則將當前Transaction儲存到上文Transaction的子訊息佇列中,同時將該訊息壓棧。
public boolean end(DefaultMessageManager manager, Transaction transaction) { if (!m_stack.isEmpty()) { Transaction current = m_stack.pop(); ... if (m_stack.isEmpty()) { MessageTree tree = m_tree.copy(); m_tree.setMessageId(null); m_tree.setMessage(null); ... manager.flush(tree); //重新整理訊息到CAT服務端 return true; } } return false; }
end方法用來結束Transaction,每次呼叫都會pop訊息棧,如果棧為空則呼叫flush來重新整理訊息到CAT服務端。
綜上,Context的m_stack的結構如下:
Transaction之間是有引用的,因此在end方法中只需要將第一個Transaction(封裝在MessageTree中)通過MessageManager來flush,在拼接訊息時可以根據這個引用關係來找到所有的Transaction :)。
5)TransportManager和TcpSocketSender
這兩個類用來發送訊息到服務端。MessageManager通過TransportManager獲取到MessageSender,呼叫sender.send()方法來發送訊息。 TransportManager和MessageSender關係如下:
TCPSocketSender為MessageSender的具體子類,它裡面主要的資料成員為:
private MessageCodec m_codec; private MessageQueue m_queue = new DefaultMessageQueue(SIZE); private ChannelManager m_manager;
-
MessageCodec:CAT基於TCP傳輸訊息,因此在傳送訊息時需要對字元訊息編碼成位元組流,這個編碼的工作由MessageCodec負責實現。
-
MessageQueue:還記得剛才說業務方在新增訊息時,CAT非同步傳送到服務端嗎?在新增訊息時,訊息會被放置在TCPSocketSender的m_queue中,如果超出queue大小則拋棄訊息。
-
ChannelManager:CAT底層使用netty來實現TCP訊息傳輸,ChannelManager負責維護通訊Channel。通俗的說,維護連線。
TCPSocketSender主要方法為initialize、send和run,分別介紹如下:
public void initialize() { m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory); Threads.forGroup("cat").start(this); Threads.forGroup("cat").start(m_manager); ... }
initialize方法為初始化方法,在執行時主要建立兩個執行緒,一個用來執行自身run方法(TCPSocketSender實現了Runnable介面)監聽訊息佇列;另一個則用來執行ChannelManager維護通訊Channel。
public void send(MessageTree tree) { if (isAtomicMessage(tree)) { boolean result = m_atomicTrees.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } else { boolean result = m_queue.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } }
send方法被MessageManager呼叫,把訊息放置在訊息佇列中。
public void run() { m_active = true; while (m_active) { ChannelFuture channel = m_manager.channel(); if (channel != null && checkWritable(channel)) { try { MessageTree tree = m_queue.poll(); if (tree != null) { sendInternal(tree); tree.setMessage(null); } } catch (Throwable t) { m_logger.error("Error when sending message over TCP socket!", t); } } else { try { Thread.sleep(5); } catch (Exception e) { // ignore it m_active = false; } } } } private void sendInternal(MessageTree tree) { ChannelFuture future = m_manager.channel(); ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K m_codec.encode(tree, buf); int size = buf.readableBytes(); Channel channel = future.channel(); channel.writeAndFlush(buf); if (m_statistics != null) { m_statistics.onBytes(size); } }
run方法會一直執行直到程序退出,在迴圈裡先獲取通訊Channel,然後傳送訊息。值得注意的是,sendInternal方法在執行時呼叫m_codec.encode(tree, buf),引數為訊息樹和緩衝區。訊息樹裡面其實只儲存了一個訊息,還記得剛才說的Transaction上下文引用嗎?m_codec在encode的時候會判斷訊息型別是否為Transaction,如果為Transaction則會遞迴獲取子Transaction,否則直接將該訊息編碼。具體實現可以參考原始碼的PlainTextMessageCodec類的encode方法,此處不再贅述。
4.1.3 cat-client 主要類介紹
cat-client的主要入口是cat-client包中的Cat類
Cat類以及Cat的依賴類層級結構如下:
介面層
Cat類以及MessageProducer類。主要功能是為外部提供api,Cat主要作用是與plexus框架做整合,MessageProducer是處理api的主要類
PS:額外說一下,Cat這個專案很【有特色】地用了plexus作為管理容器,初次接觸的時候真是讓人頭大,plexus的基本功能和spring可以說別無二致,但是很多地方的注入竟然都需要手動處理,真是讓人尷尬,雖然作者說spring太重了,plexus的作用已經足夠
訊息處理層
MessageManager以及其內部類Context。主要功能是管理訊息的傳送,Transaction類訊息的歸集,等訊息的管理工作。在MessageManager中,使用了ThreadLocal型別作為當前執行緒訊息管理的上下文,通過這個物件執行緒安全地實現訊息的新增,合併,傳送等等。
PS:MessageManager管理的訊息Message是基於Cat的監控模型建立的,其中最主要的區別是Transaction類和其他訊息不太一樣,Transaction訊息是一個連結串列的模型,每一個訊息後面都連結著下一個訊息,所以MessageManager對Transaction的處理也不同,別的訊息都是放到Context中直接從訊息處理層flush到下一層,Transaction是放到Context的棧中,直到過了預定時間,或者訊息達到規定的最大長度才flush到下一層。
訊息傳輸層
TransportManager以及TcpSocketSender以及ChannelManager。主要功能是把訊息管理層發下來的訊息進行傳送,對於與多個傳送的目的伺服器進行Channel管理,保證有可用伺服器能接受訊息。TransportManager主要功能是根據配置檔案初始化TcpSocketSender,TcpSocketSender主要實現把Message進行編碼(如果是Transaction還會進行合併)並放置到待發送佇列中,再同時由ChannelManager消費佇列中的訊息,將訊息傳送給狀態為active的server端
PS:暫存訊息的佇列用的是LinkedBlockingQueue,實際上LinkedBlockingQueue屬於生產消費者佇列的標配了,因為這個類對於新增和移除的消耗小,執行緒安全,而且達到佇列容量時會成為blocking狀態,所以基本上都會用這個類,或者基於這個類進行擴充套件來實現相關需求。相對來說還有ConcurrentLinkedQueue可以用,和blockingqueue的主要區別是,Concurrent超過主要容量會直接返回false,不會block,所以如果想馬上就返回的可以用Concurrent佇列。
4.1.4 Cat入口類
1)測試用例
//靜態方法獲取Transaction物件 Transaction t=Cat.newTransaction("logTransaction", "logTransaction"); TimeUnit.SECONDS.sleep(30); t.setStatus("0"); t.complete();
2)Cat原始碼
private static Cat s_instance = new Cat(); private static volatile boolean s_init = false; private static void checkAndInitialize() { if (!s_init) { synchronized (s_instance) { if (!s_init) { initialize(new File(getCatHome(), "client.xml")); log("WARN", "Cat is lazy initialized!"); s_init = true; } } } } private Cat() { } public static MessageProducer getProducer() { checkAndInitialize(); return s_instance.m_producer; }
Cat lazy Init
可以看到類載入時已經完成了Cat物件的初始化,記憶體中有且僅有一個Cat Object(static Cat s_instance = new Cat();),但是包含配置資訊的完整的Cat物件並沒有完全初始化完成。呼叫Cat時會先嚐試獲取producer物件,並在獲取之前檢查客戶端配置是否載入完畢(checkAndInitialize)。
checkAndInitialize()通過使用doublecheck來對Cat相關配置填充的單次初始化載入。
cat-client首先會使用plexus(一個比較老的IOC容器)載入配置檔案/META-INF/plexus/plexus.xml,完成IOC容器的初始化。
接著使用../../client.xml檔案完成cat物件的配置資訊填充初始化。並且啟動這四個daemon執行緒,後文詳細說明:
- cat-StatusUpdateTask 用來每秒鐘上報客戶端基本資訊(JVM等資訊)
- cat-merge-atomic-task(訊息合併檢查)
- cat-TcpSocketSender-ChannelManager(NIO 連線服務端檢查)
- cat-TcpSocketSender(訊息傳送服務端)
4.1.5 CatClientModule
由於Cat用了十分low的plexus作為容器,所以在載入Cat類的時候會從靜態方法中載入各個Module,CatClientModule就是Cat client工程中首要Module
public class CatClientModule extends AbstractModule { public static final String ID = "cat-client"; @Override protected void execute(final ModuleContext ctx) throws Exception { ctx.info("Current working directory is " + System.getProperty("user.dir")); // initialize milli-second resolution level timer MilliSecondTimer.initialize(); // tracking thread start/stop,此處增加經典的hook,用於執行緒池關閉的清理工作。 Threads.addListener(new CatThreadListener(ctx)); // warm up Cat Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer()); // bring up TransportManager ctx.lookup(TransportManager.class); ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class); if (clientConfigManager.isCatEnabled()) { // start status update task StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class); Threads.forGroup("cat").start(statusUpdateTask); LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms // MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class); // Threads.forGroup("cat").start(mmapReaderTask); } }
這裡plexusIOC的具體的初始化載入邏輯在org\unidal\framework\foundation-service\2.5.0\foundation-service-2.5.0.jar中,有興趣可以仔細檢視。
當準備工作做完之後,會執行具體的訊息構造:
DefaultMessageProducer.newTransaction(String type, String name)
@Override public Transaction newTransaction(String type, String name) { // this enable CAT client logging cat message without explicit setup if (!m_manager.hasContext()) { //詳細可見下文原始碼,此處就是用ThreadLocal儲存一個Context物件:ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); m_manager.setup(); } if (m_manager.isMessageEnabled()) { DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager); //向Context中填充構造的訊息體:Context.m_tree;Context.m_stack;稍後看看Context這個物件 m_manager.start(transaction, false); return transaction; } else { return NullMessage.TRANSACTION; } }
DefaultMessageManager.start(Transaction transaction, boolean forked)
@Override public void start(Transaction transaction, boolean forked) { Context ctx = getContext();//這裡獲取上文中說到的ThreadLocal中構造的Context物件 if (ctx != null) { ctx.start(transaction, forked); if (transaction instanceof TaggedTransaction) { TaggedTransaction tt = (TaggedTransaction) transaction; m_taggedTransactions.put(tt.getTag(), tt); } } else if (m_firstMessage) { m_firstMessage = false; m_logger.warn("CAT client is not enabled because it's not initialized yet"); } }
DefaultMessageManager.Context.start(Transaction transaction, boolean forked)
public void start(Transaction transaction, boolean forked) { if (!m_stack.isEmpty()) {// { Transaction parent = m_stack.peek(); addTransactionChild(transaction, parent); } } else { m_tree.setMessage(transaction);//在這裡把返回的transaction放在tree上,如果有巢狀結構,後邊繼續在tree上添枝加葉 } if (!forked) { m_stack.push(transaction); } }
這部分程式碼可以看出,
通過ThreadLocal<Context.>,使Context中實際的訊息的構造保證了執行緒安全。
如果當前Context的棧m_stack不為空,那麼接著之前的訊息後邊,將當前訊息構造為一個孩子結點。如果當前訊息之前沒有其他訊息,放入m_stack中,並setMessage.也就是當前訊息時父節點。
至此,訊息體構造完畢。
這裡需要看一下Context類,是DefaultMessageManager包私有的內部類。
Context.java
class Context { private MessageTree m_tree;//初始化的時候構建一個MessageTree private Stack<Transaction> m_stack; private int m_length; private boolean m_traceMode; private long m_totalDurationInMicros; // for truncate message private Set<Throwable> m_knownExceptions; public Context(String domain, String hostName, String ipAddress) { m_tree = new DefaultMessageTree(); m_stack = new Stack<Transaction>(); Thread thread = Thread.currentThread(); String groupName = thread.getThreadGroup().getName(); m_tree.setThreadGroupName(groupName); m_tree.setThreadId(String.valueOf(thread.getId())); m_tree.setThreadName(thread.getName()); m_tree.setDomain(domain); m_tree.setHostName(hostName); m_tree.setIpAddress(ipAddress); m_length = 1; m_knownExceptions = new HashSet<Throwable>(); }
每個執行緒通過使用ThreadLocal構造一個Context物件並存儲。Context主要包含當前的訊息體m_tree,和多個巢狀訊息體填充的棧:m_stack :
再回到我們原來的UnitTest程式碼,
Transaction t=Cat.newTransaction("logTransaction", "logTransaction");
這行程式碼完成了客戶端plexusIOC容器的初始化,cat-client的載入初始化、啟動了四個daemon執行緒,並返回了Transaction物件。
t.setStatus("0");//很簡單,就是這是一個屬性值 t.complete();
訊息完成後,將訊息放入一個佇列中,從而保證非同步上報。
transaction.complete();的具體程式碼如下:
........ public void complete() { try { if (isCompleted()) { // complete() was called more than once DefaultEvent event = new DefaultEvent("cat", "BadInstrument"); event.setStatus("TransactionAlreadyCompleted"); event.complete(); addChild(event); } else { m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L; setCompleted(true); if (m_manager != null) { m_manager.end(this); } } } catch (Exception e) { // ignore } } ........ @Override public void end(Transaction transaction) { Context ctx = getContext(); if (ctx != null && transaction.isStandalone()) { if (ctx.end(this, transaction)) { m_context.remove(); } } } ........ public boolean end(DefaultMessageManager manager, Transaction transaction) { if (!m_stack.isEmpty()) { Transaction current = m_stack.pop();//Context的成員變數m_stack彈出棧頂元素,LIFO當然是最新的current元素。 if (transaction == current) { m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current); } else { while (transaction != current && !m_stack.empty()) { m_validator.validate(m_stack.peek(), current); current = m_stack.pop(); } } if (m_stack.isEmpty()) {//如果當前執行緒儲存的Context中m_stack無元素 MessageTree tree = m_tree.copy(); m_tree.setMessageId(null);//清理m_tree m_tree.setMessage(null); if (m_totalDurationInMicros > 0) { adjustForTruncatedTransaction((Transaction) tree.getMessage()); } manager.flush(tree);//將訊息放入消費佇列中 return true; } } return false; } ........ public void flush(MessageTree tree) { if (tree.getMessageId() == null) { tree.setMessageId(nextMessageId());//為訊息體生產全域性唯一ID,詳見snowflate演算法 } MessageSender sender = m_transportManager.getSender(); if (sender != null && isMessageEnabled()) { sender.send(tree); reset();//ThreadLocal中儲存的Context清理 } else { m_throttleTimes++; if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) { m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes); } } } ........ private Context getContext() { if (Cat.isInitialized()) { Context ctx = m_context.get();//ThreadLocal儲存一個Context物件 if (ctx != null) { return ctx; } else { if (m_domain != null) { ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); } else { ctx = new Context("Unknown", m_hostName, ""); } m_context.set(ctx); return ctx; } } return null; } //TcpSocketSender.send(MessageTree tree) private MessageQueue m_queue = new DefaultMessageQueue(SIZE); private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE); @Override public void send(MessageTree tree) { if (isAtomicMessage(tree)) { boolean result = m_atomicTrees.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } else { boolean result = m_queue.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } }
至此,構造的訊息體放入了阻塞佇列中等待上傳。
總結: 我們可以看到Cat-SDK通過ThreadLocal對訊息進行收集,
收集進來按照時間以及型別構造為Tree結構,在compele()方法中將這個構造的訊息放入一個記憶體佇列中,等待TcpSockekSender這個Daemon執行緒非同步上報給服務端。
4.1.6 cat-TcpSocketSender
訊息上傳服務端,會有一個執行緒cat-TcpSocketSender監聽消費佇列,並消費(上傳服務端)。
通訊上報服務端使用了Netty-Client,並且自定義了訊息協議。
@Override public void run() { m_active = true; while (m_active) { ChannelFuture channel = m_manager.channel(); if (channel != null && checkWritable(channel)) { try { MessageTree tree = m_queue.poll(); if (tree != null) { sendInternal(tree);//netty NIO編碼後TCP傳送到服務端。 tree.setMessage(null); } } catch (Throwable t) { m_logger.error("Error when sending message over TCP socket!", t); } } else { long current = System.currentTimeMillis(); long oldTimestamp = current - HOUR; while (true) { try { MessageTree tree = m_queue.peek(); if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) { MessageTree discradTree = m_queue.poll(); if (discradTree != null) { m_statistics.onOverflowed(discradTree); } } else { break; } } catch (Exception e) { m_logger.error(e.getMessage(), e); break; } } try { Thread.sleep(5); } catch (Exception e) { // ignore it m_active = false; } } } } private void sendInternal(MessageTree tree) { ChannelFuture future = m_manager.channel(); ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K System.out.println(tree); m_codec.encode(tree, buf);//編碼後傳送 int size = buf.readableBytes(); Channel channel = future.channel(); channel.writeAndFlush(buf); if (m_statistics != null) { m_statistics.onBytes(size); } }
4.1.7 cat-merge-atomic-task
符合如下邏輯判斷的atomicMessage會放入m_atomicTrees訊息佇列,然後由這個後臺執行緒監聽並消費。
具體程式碼如下:
TcpSocketSender.java
private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE); ...... private boolean isAtomicMessage(MessageTree tree) { Message message = tree.getMessage();//從tree上拿去message if (message instanceof Transaction) {//如果這個message實現了Transaction介面,也就是Transaction型別的訊息 String type = message.getType(); if (type.startsWith("Cache.") || "SQL".equals(type)) {//如果以Cache.,SQL開頭的則返回True return true; } else { return false; } } else { return true; } //看到這裡,也就是說,"Cache","SQL"開頭的Transaction訊息,或者非Transaction訊息,認為是atomicMessage. } ...... public void send(MessageTree tree) { if (isAtomicMessage(tree)) {//如果符合atomicMessage boolean result = m_atomicTrees.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree);//佇列溢位處理 } } else { boolean result = m_queue.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } } ......
public class DefaultMessageQueue implements MessageQueue { private BlockingQueue<MessageTree> m_queue; private AtomicInteger m_count = new AtomicInteger(); public DefaultMessageQueue(int size) { m_queue = new LinkedBlockingQueue<MessageTree>(size); } @Override public boolean offer(MessageTree tree) { return m_queue.offer(tree); } @Override public boolean offer(MessageTree tree, double sampleRatio) { if (tree.isSample() && sampleRatio < 1.0) {//如果這個訊息是sample,並且sampleRation大於1 if (sampleRatio > 0) {//這段邏輯就是按取樣率去剔除一些訊息,只選取其中一部分進行後續的消費上傳。 int count = m_count.incrementAndGet(); if (count % (1 / sampleRatio) == 0) { return offer(tree); } } return false; } else {//不做取樣過濾,放入佇列 return offer(tree); } } @Override public MessageTree peek() { return m_queue.peek(); } @Override public MessageTree poll() { try { return m_queue.poll(5, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { return null; } } @Override public int size() { return m_queue.size(); } }
這個後臺程序的消費動作:
...... private boolean shouldMerge(MessageQueue trees) { MessageTree tree = trees.peek();//獲取對頭元素,非移除 if (tree != null) { long firstTime = tree.getMessage().getTimestamp(); int maxDuration = 1000 * 30; //訊息在30s內生成,或者佇列擠壓訊息超過200,則需要merge if (System.currentTimeMillis() - firstTime > maxDuration || trees.size() >= MAX_CHILD_NUMBER) { return true; } } return false; } ...... @Override public void run() { while (true) { if (shouldMerge(m_atomicTrees)) { MessageTree tree = mergeTree(m_atomicTrees);//把m_atomicTrees佇列中的訊息merge為一條訊息樹 boolean result = m_queue.offer(tree);//放入m_queue佇列,等待cat-TcpSocketSender執行緒正常消費 if (!result) { logQueueFullInfo(tree); } } else { try { Thread.sleep(5); } catch (InterruptedException e) { break; } } } } ..... private MessageTree mergeTree(MessageQueue trees) { int max = MAX_CHILD_NUMBER; DefaultTransaction tran = new DefaultTransaction("_CatMergeTree", "_CatMergeTree", null);//增加merge處理埋點 MessageTree first = trees.poll();//從佇列頭部移除 tran.setStatus(Transaction.SUCCESS); tran.setCompleted(true); tran.addChild(first.getMessage()); tran.setTimestamp(first.getMessage().getTimestamp()); long lastTimestamp = 0; long lastDuration = 0; //這段邏輯就是不停從這個m_atomicTrees佇列頭部拿去messsage,並使用同一個messageId,把佇列中所有的訊息合併為一條Transaction訊息。 while (max >= 0) { MessageTree tree = trees.poll();//接著 從佇列頭部移除 if (tree == null) { tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration); break; } lastTimestamp = tree.getMessage().getTimestamp(); if(tree.getMessage() instanceof DefaultTransaction){ lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis(); } else { lastDuration = 0; } tran.addChild(tree.getMessage()); m_factory.reuse(tree.getMessageId()); max--; } ((DefaultMessageTree) first).setMessage(tran); return first; }
4.1.8 TcpSocketSender-ChannelManager 後臺執行緒
這個執行緒是通過服務端配置的路由ip,10s輪詢一次,當滿足自旋n(n = m_count % 30)次,去檢查路由服務端ip是否變動,並保證連線正常。典型的拉取配置資訊機制。
1)客戶端跟服務端連線建立,分兩步:
- 初始ChannelMananger的時候 ;
- ChannelManager非同步執行緒,每隔10秒做一次檢查。
初始ChannelMananger的時候
例項化ChannelManager的時候,根據配置的第一個server,從遠端伺服器讀取伺服器列表,如果能讀取到,則順序建立連線,直到建立成功為止;如果不能讀到,則根據本地配置的列表,逐個建立連線,直到成功為止。
ChannelMananger非同步執行緒,每隔10秒做一次檢查
- 檢查Server列表是否變更
每間隔10s,檢查當前channelFuture是否活躍,活躍,則300s檢查一次,不活躍,則執行檢查。檢查的邏輯是:比較本地server列表跟遠端服務提供的列表是否相等,不相等則根據遠端服務提供的server列表順序的重新建立第一個能用的ChannelFuture
- 檢視當前客戶端是否有積壓,或者ChannelFuture是否被關閉
如果有積壓,或者關閉掉了,則關閉當前連線,將activeIndex=-1,表示當前連線不可用。
- 重連預設Server
從0到activeIndex中找一個能連線的server,中心建立一個連線。如果activeIndex為-1,則從整個的server列表中順序的找一個可用的連線建立連線。
2)ChannelManager例項化,建立Netty連線邏輯
客戶端例項化DefaultTransportManager物件時,會按照如下流程先例項化m_tcpSocketSender,接著例項化ChannelManager。ChannelManager管理對服務端的netty連線。 例項化流程如下:
ChannelManager通過ChannelHolder把netty的ChannnelFuture封裝起來。ChannelHolder結構如下:
public static class ChannelHolder { /** * 當前活躍的channelFuture */ private ChannelFuture m_activeFuture; /** * 當前server在m_serverAddresses中的第幾個 */ private int m_activeIndex = -1; /** * 當前活躍的ChannelFuture對應的配置 */ private String m_activeServerConfig; /** * 從配置檔案中讀取的服務端列表 */ private List<InetSocketAddress> m_serverAddresses; /** * 當前活躍的ChannelFutre對應的ip */ private String m_ip; /** * 連線從第一次初始化開始,是否發生過變更 */ private boolean m_connectChanged;
//省略其它的程式碼
}
3)ChannelManager內部非同步執行緒,動態切換Netty連線邏輯。
ChannelManager內部每隔10秒鐘,檢查netty連線。這部分程式碼如下:
public void run() { while (m_active) { /** * make save message id index asyc * 本地儲存index,和 時間戳,防止重啟,導致本地的訊息id重了 */ m_idfactory.saveMark(); /** * 檢查本地初始化的服務列表跟遠端的服務列表是否有差異,如果有差異,則取遠端第一個能建立連線的server,建立一個新的連線,關閉舊的連線 */ checkServerChanged(); ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture(); List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses(); /** * 檢查當前channelFuture是否有訊息積壓(本地佇列長度超過4990),或者 channelFuture不是開的 */ doubleCheckActiveServer(activeFuture); /** * 從serverAddresses列表裡面,重新順序選一個,重新連線 */ reconnectDefaultServer(activeFuture, serverAddresses); try { Thread.sleep(10 * 1000L); // check every 10 seconds } catch (InterruptedException e) { // ignore } } }
總結:服務端沒有做到負載均衡,連線會慢慢連線到server列表裡面第一個可用的server上。
4.1.9 StatusUpdateTask
CatClientModule在載入過程中會從StatusUpdateTask中啟動一個執行緒來每隔一段時間傳送一個HeartBeatMessage,其中包括了客戶端能拿到的各種資訊,包括CPU,Memory,Disk等等,開發者也可以通過實現StatusExtension介面的方式來實現對於HeartBeatMessage傳送內容的擴充套件。
這個執行緒很簡單,類似傳統的agent,每分鐘上報關於應用的各種資訊(OS、MXBean資訊等等)。而且,在每次執行緒啟動時上報一個Reboot訊息表示重啟動。
其中比較重要的實現資訊收集的是這行程式碼
StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars); status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));
m_statistics包含的是已經發送過資訊的容量,m_jars是通過classLoader載入的jar包名稱,StatusInfoCollector通過大量訪問者模式的程式碼實現了將各種指標set到status中的功能,之後將status封裝到HeartBeatMessage中,按照一般對於message的處理流程,flush到訊息傳輸層中
4.1.10 MessageId的設計
CAT訊息的Message-ID格式applicationName-0a010680-375030-2,CAT訊息一共分為四段:
第一段是應用名applicationName。
第二段是當前這臺機器的IP的16進位制格式:
if (m_ipAddress == null) { String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress(); List<String> items = Splitters.by(".").noEmptyItem().split(ip); byte[] bytes = new byte[4]; for (int i = 0; i < 4; i++) { bytes[i] = (byte) Integer.parseInt(items.get(i)); } StringBuilder sb = new StringBuilder(bytes.length / 2); for (byte b : bytes) { //1.一個byte 8位 //2.先獲取高4位的16進位制字元 //3.在獲取低4位的16進位制數 sb.append(Integer.toHexString((b >> 4) & 0x0F));//通常使用0x0f來與一個整數進行&運算,來獲取該整數的最低4個bit位 sb.append(Integer.toHexString(b & 0x0F)); } m_ipAddress = sb.toString();
第三段的375030,是系統當前時間除以小時得到的整點數。
第四段的2,是表示當前這個客戶端在當前小時的順序遞增號(AtomicInteger自增,每小時結束後重置)。
public String getNextId() { String id = m_reusedIds.poll(); if (id != null) { return id; } else { long timestamp = getTimestamp(); if (timestamp != m_timestamp) { m_index = new AtomicInteger(0); m_timestamp = timestamp; } int index = m_index.getAndIncrement(); StringBuilder sb = new StringBuilder(m_domain.length() + 32); sb.append(m_domain); sb.append('-'); sb.append(m_ipAddress); sb.append('-'); sb.append(timestamp); sb.append('-'); sb.append(index); return sb.toString(); }
總之,同一個小時內、同一個domain、同一個ip , messageId的唯一性需要 AtomicInteger保證。
4.2 cat-home設計
4.2.1 服務端初始化
1)Servlet容器載入、啟動
CAT目前是使用war包放入Servlet容器(如:tomcat或者jetty,以下假設使用tomcat容器)中的方式部署啟動。
熟悉servlet容器的同學應該知道,容器啟動時會讀取每個Context(可理解為web工程)中的web.xml然後啟動Servlet等其他元件。
在cat-home模組中的web.xml中可以看到,除了容器預設的Servlet之外,tomcat啟動時會啟動CatServlet、MVC這兩個Servlet(因為load-on-startup>0,也就是會呼叫init方法初始化):
<web-app> <filter>...</filter> <servlet> <servlet-name>cat-servlet</servlet-name> <servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet> <servlet-name>mvc-servlet</servlet-name> <servlet-class>org.unidal.web.MVC</servlet-class> <init-param> <param-name>cat-client-xml</param-name> <param-value>client.xml</param-value> </init-param> <init-param> <param-name>init-modules</param-name> <param-value>false</param-value> </init-param> <load-on-startup>2</load-on-startup> </servlet> <filter-mapping>...</filter-mapping> <servlet-mapping>...</servlet-mapping> <jsp-config>...</jsp-config> </web-app>
2)com.dianping.cat.servlet.CatServlet
按照web.xml中Servlet的載入順序CatServlet會優先於MVC完成初始化。
CatServlet的邏輯基本可以概括為如下兩條線:
CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) ——>com.dianping.cat.CatHomeModule.setup(ModuleContext ctx) ——>TCPSocketReceiver(netty伺服器) CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) ——>com.dianping.cat.***Module.execute(ModuleContext ctx)(完成各個模組的初始化)
com.dianping.cat.servlet.CatServlet.init(ServletConfig servletConfig)
public void init(ServletConfig config) throws ServletException { super.init(config); try {//1.plexus IOC容器初始化(根據components.xml的設定完成IOC初始化) if (m_container == null) { m_container = ContainerLoader.getDefaultContainer(); } //2.用來列印日誌的m_logger物件例項化(根據plexus.xml設定完成例項化) m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent( getClass().getName()); //3.初始化CAT-Server必備的元件模組:cat-home\cat-consumer\cat-core initComponents(config); } catch (Exception e) { if (m_logger != null) { m_logger.error("Servlet initializing failed. " + e, e); } else { System.out.println("Servlet initializing failed. " + e