1. 程式人生 > >大眾點評CAT開源監控系統剖析

大眾點評CAT開源監控系統剖析

 轉載:https://www.cnblogs.com/yeahwell/p/cat.html

參考文件:

大眾點評的實時監控系統分析(一)

CAT_source_analyze

透過CAT,來看分散式實時監控系統的設計與實現

深度剖析開源分散式監控CAT

[分散式監控CAT] Client端原始碼解析

大眾點評Cat--架構分析

大眾點評Cat--Server模組架構分析

Plexus,Spring之外的IoC容器

plexus使用(一)

Spring Cloud Sleuth使用簡介

Spring Cloud Sleuth 整合Zipkin、RabbitMQ 和 (Mysql或Elasticsearch)

Cat監控Druid資料庫連線池

1. 介紹

1.1 概述

CAT(Central Application Tracking)基於Java開發的實時監控平臺,主要包括移動端監控,應用側監控,核心網路層監控,系統層監控等。

CAT是一個提供實時監控報警,應用效能分析診斷的工具。

1.2 CAT能做什麼

在此之前,先來想一想對於線上應用我們希望能監控些什麼?可能有如下這些:

  • 機器狀態資訊。CPU負載、記憶體資訊、磁碟使用率這些是必需的,另外可能還希望收集Java程序的資料,例如執行緒棧、堆、垃圾回收等資訊,以幫助出現問題時快速debug。
  • 請求訪問情況
    。例如請求個數、響應時間、處理狀態,如果有處理過程中的時間分析那就更完美了。
  • 異常情況。譬如快取服務時不時出現無響應,我們希望能夠監控到這種異常,從而做進一步的處理。
  • 業務情況。例如訂單量統計,銷售額等等。

CAT支援的監控訊息型別包括:

  • Transaction 適合記錄跨越系統邊界的程式訪問行為,比如遠端呼叫,資料庫呼叫,也適合執行時間較長的業務邏輯監控,Transaction用來記錄一段程式碼的執行時間和次數。
  • Event 用來記錄一件事發生的次數,比如記錄系統異常,它和transaction相比缺少了時間的統計,開銷比transaction要小。
  • Heartbeat 表示程式內定期產生的統計資訊, 如CPU%, MEM%, 連線池狀態, 系統負載等。
  • Metric 用於記錄業務指標、指標可能包含對一個指標記錄次數、記錄平均值、記錄總和,業務指標最低統計粒度為1分鐘。
  • Trace 用於記錄基本的trace資訊,類似於log4j的info資訊,這些資訊僅用於檢視一些相關資訊

在一個請求處理中可能產生有多種訊息,CAT將其組織成訊息樹的形式。

在處理開始時,預設開始一個型別為URL的Transaction,在這個Transaction中業務本身可以產生子訊息。例如,產生一個數據庫訪問的子Transaction或者一個訂單統計的Metric。結構如下所示:

 

1.3 分散式監控系統要求

  • 方便安裝
  • 要求輕量
  • 介面儘可能友好
  • 監控策略豐富,監控元素多樣化
  • 可以巢狀開發
  • 佔用伺服器資源小,使用時不過多佔用機器硬體方面資源,對實際業務影響較小

1.4 CAT使用特點

  • 非同步化傳輸資料,不太影響正常業務
  • 實時監控
  • 輕量,部署簡單
  • 嵌入簡單
  • 有問題跟蹤報表
  • 訊息樹形化
  • 日誌不落地本地磁碟,較少IO,但很消耗網路資源
  • 監控訊息,按照分業務傳輸資料,如業務場景,時間等要求傳輸資料
  • 有報警機制
  • 可能複雜的訊息儲存和訊息ID查詢看起來麻煩,需要建立查詢索引(目前不考慮這個東東)
  • 訊息佇列非同步化傳送
  • 開源(這個最重要)

2. CAT設計

2.1 整體設計

2.2 客戶端設計

2.3 服務端設計

 

2.4 領域建模

 

3. 模組劃分

 

3.1 模組說明

3.1.1 client端

cat-client 提供給業務以及中間層埋點的底層SDK。

3.1.2 server端

cat-consumer 用於實時分析從客戶端提供的資料。

cat-home 作為使用者給使用者提供展示的控制端 ,並且cat-home做展示時,通過對cat-consumer的呼叫獲取其他節點的資料,將所有資料彙總展示。

consumer、home以及路由中心都是部署在一起的,每個服務端節點都可以充當任何一個角色。

CAT服務端在整個實時處理中,基本上實現了全非同步化處理:

  • 訊息消費基於Netty的NIO實現(Netty-Server);
  • 訊息消費到服務端就存放記憶體佇列,然後程式開啟一個執行緒會消費這個訊息做訊息分發(非同步消費處理);
  • 每個訊息都會有一批執行緒併發消費各自佇列的資料,以做到訊息處理的隔離。(每報表每執行緒,分別按照自己的規則解析消費這個訊息,並且可以動態控制對某種報表型別的處理執行緒個數);
  • 訊息(原始的訊息logView)儲存是先存入本地磁碟,然後非同步上傳到HDFS檔案,這也避免了強依賴HDFS;

4. 設計原理

4.1 cat-client設計

作為一個日誌上報的通用客戶端,考慮點至少有如下這些:

  • 為了儘可能減少對業務的影響,需要對訊息進行非同步處理。即業務執行緒將訊息交給CAT客戶端與CAT客戶端上報這兩個過程需要非同步。
  • 為了達到實時的目的以及適應高併發的情況,客戶端上報應該基於TCP而非HTTP開發。
  • 線上程安全的前提下儘可能的資源低消耗以及低延時。我們知道,執行緒競爭的情況是由於資源共享造成的,要達到執行緒安全通常需要減少資源共享或者加鎖,而這兩點則會導致系統資源冗餘和高延時。

CAT客戶端實現並不複雜,但這些點都考慮到了。它的架構如下所示:

大概步驟為:

  • 業務執行緒產生訊息,交給訊息Producer,訊息Producer將訊息存放在該業務執行緒訊息棧中;
  • 業務執行緒通知訊息Producer訊息結束時,訊息Producer根據其訊息棧產生訊息樹放置在同步訊息佇列中;
  • 訊息上報執行緒監聽訊息佇列,根據訊息樹產生最終的訊息報文上報CAT服務端。

4.1.1 cat-client包結構

複製程式碼

└─com
    ├─dianping
    │  └─cat
    │      ├─build
    │      ├─configuration
    │      ├─log4j
    │      ├─message
    │      │  ├─internal
    │      │  ├─io
    │      │  └─spi
    │      │      ├─codec
    │      │      └─internal
    │      ├─servlet
    │      └─status
    └─site
        ├─helper
        └─lookup
            └─util

複製程式碼

 

4.1.2 com.dianping.cat.message包介紹

包結構如下:

com.dianping.cat.message中主要包含了internal、io、spi這三個目錄:

  • internal目錄包含主要的CAT客戶端內部實現類;
  • io目錄包含建立服務端連線、重連、訊息佇列監聽、上報等io實現類;
  • spi目錄為上報訊息工具包,包含訊息二進位制編解碼、轉義等實現類。

其uml圖如下所示(可以放大看):

類的功能如下:

  • Message為所有上報訊息的抽象,它的子類實現有Transaction、Metric、Event、HeartBeat、Trace這五種。
  • MessageProducer封裝了所有介面,業務在使用CAT時只需要通過MessageProducer來操作。
  • MessageManager為CAT客戶端核心類,相當於MVC中的Controller。
  • Context類儲存訊息上下文。
  • TransportManager提供傳送訊息的sender,具體實現有DefaultTransportManager,呼叫其getSender介面返回一個TcpSocketSender。
  • TcpSocketSender類負責傳送訊息。

1)Message

上面說到,Message有五類,分別為Transaction、Metric、Event、HeartBeat、Trace。其中Metric、Event、HeartBeat、Trace基本相同,儲存的資料都為一個字串;而Transaction則儲存一個Message列表。換句話說,Transaction的結構為一個遞迴包含的結構,其他結構則為原子性結構。

下面為DefaultTransaction的關鍵資料成員及操作:

複製程式碼

public class DefaultTransaction extends AbstractMessage implements Transaction {
    private List<Message> m_children;
    private MessageManager m_manager;
    ...

    //新增子訊息
    public DefaultTransaction addChild(Message message) {
        ...
    }

    //Transaction結束時呼叫此方法
    public void complete() {
        ...
        m_manager.end(this); //呼叫MessageManager來結束Transaction 
        ...
    }

複製程式碼

 

值得一提的是,Transaction(或者其他的Message)在建立時自動開始,訊息結束時需要業務方呼叫complete方法,而在complete方法內部則呼叫MessageManager來完成訊息。

2)MessageProducer

MessageProducer對業務方封裝了CAT內部的所有細節,它的主要方法如下:

複製程式碼

public void logError(String message, Throwable cause);
public void logEvent(String type, String name, String status, String nameValuePairs);
public void logHeartbeat(String type, String name, String status, String nameValuePairs);
public void logMetric(String name, String status, String nameValuePairs);
public void logTrace(String type, String name, String status, String nameValuePairs);
...
public Event newEvent(String type, String name);
public Event newEvent(Transaction parent, String type, String name);
public Heartbeat newHeartbeat(String type, String name);
public Metric newMetric(String type, String name);
public Transaction newTransaction(String type, String name);
public Trace newTrace(String type, String name);

複製程式碼

 

logXXX方法為方法糖(造詞小能手呵呵),這些方法在呼叫時需要傳入訊息資料,方法結束後訊息自動結束。

newXXX方法返回相應的Message,業務方需要呼叫Message方法設定資料,並最終呼叫Message.complete()方法結束訊息。

MessageProducer只是介面封裝,訊息處理主要實現依賴於MessageManager這個類。

3)MessageManager

MessageManager為CAT的核心類,但它只是定義了介面,具體實現為DefaultMessageManager。DefaultMessageManager這個類裡面主要包含了兩個功能類,ContextTransportManager,分別用於儲存上下文和訊息傳輸。TransportManager執行期間為單例物件,而Context則包裝成ThreadLocal為每個執行緒儲存上下文。

我們通過介面來了解DefaultMessageManager的主要功能:

public void add(Message message);
public void start(Transaction transaction, boolean forked);
public void end(Transaction transaction);

public void flush(MessageTree tree);

 

add()方法用來新增原子性的Message,也就是Metric、Event、HeartBeat、Trace。

start()和end()方法用來開始和結束Transaction這種訊息。

flush()方法用來將當前業務執行緒的所有訊息重新整理到CAT服務端,當然,是非同步的。

4)Context

Context用來儲存訊息上下文,我們可以通過它的主要介面來了解它功能:

複製程式碼

public void add(Message message) {
    if (m_stack.isEmpty()) {
         MessageTree tree = m_tree.copy();

         tree.setMessage(message);
         flush(tree);
    } else {
         Transaction parent = m_stack.peek();

         addTransactionChild(message, parent);
     }
 }

複製程式碼

add方法主要新增原子性訊息,它先判斷該訊息是否有上文訊息(即判斷是否處於一個Transaction中)。如果有則m_stack不為空並且將該訊息新增到上文Transaction的子訊息佇列中;否則直接呼叫flush來將此原子性訊息重新整理到服務端。

複製程式碼

public void start(Transaction transaction, boolean forked) {
    if (!m_stack.isEmpty()) {
        ...
        Transaction parent = m_stack.peek();
        addTransactionChild(transaction, parent);
    } else {
        m_tree.setMessage(transaction);
    }

    if (!forked) {
        m_stack.push(transaction);
    }
}

複製程式碼

start方法用來開始Transaction(Transaction是訊息裡比較特殊的一種),如果當前訊息棧為空則證明該Transaction為第一個Transaction,使用訊息樹儲存該訊息,同時將該訊息壓棧;否則將當前Transaction儲存到上文Transaction的子訊息佇列中,同時將該訊息壓棧。

複製程式碼

public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
        Transaction current = m_stack.pop();
        ...
        if (m_stack.isEmpty()) {
            MessageTree tree = m_tree.copy();

            m_tree.setMessageId(null);
            m_tree.setMessage(null);
            ...
            manager.flush(tree); //重新整理訊息到CAT服務端
            return true;
        }
    }

    return false;
}

複製程式碼

end方法用來結束Transaction,每次呼叫都會pop訊息棧,如果棧為空則呼叫flush來重新整理訊息到CAT服務端。

綜上,Context的m_stack的結構如下:

Transaction之間是有引用的,因此在end方法中只需要將第一個Transaction(封裝在MessageTree中)通過MessageManager來flush,在拼接訊息時可以根據這個引用關係來找到所有的Transaction :)。

5)TransportManager和TcpSocketSender

這兩個類用來發送訊息到服務端。MessageManager通過TransportManager獲取到MessageSender,呼叫sender.send()方法來發送訊息。 TransportManager和MessageSender關係如下:

TCPSocketSender為MessageSender的具體子類,它裡面主要的資料成員為:

private MessageCodec m_codec;
private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
private ChannelManager m_manager;
  • MessageCodec:CAT基於TCP傳輸訊息,因此在傳送訊息時需要對字元訊息編碼成位元組流,這個編碼的工作由MessageCodec負責實現。

  • MessageQueue:還記得剛才說業務方在新增訊息時,CAT非同步傳送到服務端嗎?在新增訊息時,訊息會被放置在TCPSocketSender的m_queue中,如果超出queue大小則拋棄訊息。

  • ChannelManager:CAT底層使用netty來實現TCP訊息傳輸,ChannelManager負責維護通訊Channel。通俗的說,維護連線。

TCPSocketSender主要方法為initialize、send和run,分別介紹如下:

複製程式碼

public void initialize() {
    m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);

    Threads.forGroup("cat").start(this);
    Threads.forGroup("cat").start(m_manager);
    ...
}

複製程式碼

initialize方法為初始化方法,在執行時主要建立兩個執行緒,一個用來執行自身run方法(TCPSocketSender實現了Runnable介面)監聽訊息佇列;另一個則用來執行ChannelManager維護通訊Channel。

複製程式碼

public void send(MessageTree tree) {
    if (isAtomicMessage(tree)) {
        boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

        if (!result) {
            logQueueFullInfo(tree);
        }
    } else {
        boolean result = m_queue.offer(tree, m_manager.getSample());

        if (!result) {
            logQueueFullInfo(tree);
        }
    }
}

複製程式碼

send方法被MessageManager呼叫,把訊息放置在訊息佇列中。

複製程式碼

public void run() {
    m_active = true;

    while (m_active) {
        ChannelFuture channel = m_manager.channel();

        if (channel != null && checkWritable(channel)) {
            try {
                MessageTree tree = m_queue.poll();

                if (tree != null) {
                    sendInternal(tree);
                    tree.setMessage(null);
                }

            } catch (Throwable t) {
                m_logger.error("Error when sending message over TCP socket!", t);
            }
        } else {
            try {
                Thread.sleep(5);
            } catch (Exception e) {
                // ignore it
                m_active = false;
            }
        }
    }
}

private void sendInternal(MessageTree tree) {
    ChannelFuture future = m_manager.channel();
    ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

    m_codec.encode(tree, buf);

    int size = buf.readableBytes();
    Channel channel = future.channel();

    channel.writeAndFlush(buf);
    if (m_statistics != null) {
        m_statistics.onBytes(size);
    }
}

複製程式碼

run方法會一直執行直到程序退出,在迴圈裡先獲取通訊Channel,然後傳送訊息。值得注意的是,sendInternal方法在執行時呼叫m_codec.encode(tree, buf),引數為訊息樹緩衝區。訊息樹裡面其實只儲存了一個訊息,還記得剛才說的Transaction上下文引用嗎?m_codec在encode的時候會判斷訊息型別是否為Transaction,如果為Transaction則會遞迴獲取子Transaction,否則直接將該訊息編碼。具體實現可以參考原始碼的PlainTextMessageCodec類的encode方法,此處不再贅述。

4.1.3 cat-client 主要類介紹

cat-client的主要入口是cat-client包中的Cat類

Cat類以及Cat的依賴類層級結構如下:

介面層

Cat類以及MessageProducer類。主要功能是為外部提供api,Cat主要作用是與plexus框架做整合,MessageProducer是處理api的主要類

PS:額外說一下,Cat這個專案很【有特色】地用了plexus作為管理容器,初次接觸的時候真是讓人頭大,plexus的基本功能和spring可以說別無二致,但是很多地方的注入竟然都需要手動處理,真是讓人尷尬,雖然作者說spring太重了,plexus的作用已經足夠

訊息處理層

MessageManager以及其內部類Context。主要功能是管理訊息的傳送,Transaction類訊息的歸集,等訊息的管理工作。在MessageManager中,使用了ThreadLocal型別作為當前執行緒訊息管理的上下文,通過這個物件執行緒安全地實現訊息的新增,合併,傳送等等。

PS:MessageManager管理的訊息Message是基於Cat的監控模型建立的,其中最主要的區別是Transaction類和其他訊息不太一樣,Transaction訊息是一個連結串列的模型,每一個訊息後面都連結著下一個訊息,所以MessageManager對Transaction的處理也不同,別的訊息都是放到Context中直接從訊息處理層flush到下一層,Transaction是放到Context的棧中,直到過了預定時間,或者訊息達到規定的最大長度才flush到下一層。

訊息傳輸層

TransportManager以及TcpSocketSender以及ChannelManager。主要功能是把訊息管理層發下來的訊息進行傳送,對於與多個傳送的目的伺服器進行Channel管理,保證有可用伺服器能接受訊息。TransportManager主要功能是根據配置檔案初始化TcpSocketSender,TcpSocketSender主要實現把Message進行編碼(如果是Transaction還會進行合併)並放置到待發送佇列中,再同時由ChannelManager消費佇列中的訊息,將訊息傳送給狀態為active的server端

PS:暫存訊息的佇列用的是LinkedBlockingQueue,實際上LinkedBlockingQueue屬於生產消費者佇列的標配了,因為這個類對於新增和移除的消耗小,執行緒安全,而且達到佇列容量時會成為blocking狀態,所以基本上都會用這個類,或者基於這個類進行擴充套件來實現相關需求。相對來說還有ConcurrentLinkedQueue可以用,和blockingqueue的主要區別是,Concurrent超過主要容量會直接返回false,不會block,所以如果想馬上就返回的可以用Concurrent佇列。

4.1.4 Cat入口類

1)測試用例

複製程式碼

        //靜態方法獲取Transaction物件
        Transaction t=Cat.newTransaction("logTransaction", "logTransaction");

        TimeUnit.SECONDS.sleep(30);
        t.setStatus("0");
        t.complete();

複製程式碼

2)Cat原始碼

複製程式碼

    private static Cat s_instance = new Cat();
    private static volatile boolean s_init = false;

    private static void checkAndInitialize() {
        if (!s_init) {
            synchronized (s_instance) {
                if (!s_init) {
                    initialize(new File(getCatHome(), "client.xml"));
                    log("WARN", "Cat is lazy initialized!");
                    s_init = true;
                }
            }
        }
    }
    private Cat() {
    }

    public static MessageProducer getProducer() {
        checkAndInitialize();

        return s_instance.m_producer;
    }

複製程式碼

Cat lazy Init

可以看到類載入時已經完成了Cat物件的初始化,記憶體中有且僅有一個Cat Object(static Cat s_instance = new Cat();),但是包含配置資訊的完整的Cat物件並沒有完全初始化完成。呼叫Cat時會先嚐試獲取producer物件,並在獲取之前檢查客戶端配置是否載入完畢(checkAndInitialize)。

checkAndInitialize()通過使用doublecheck來對Cat相關配置填充的單次初始化載入。

cat-client首先會使用plexus(一個比較老的IOC容器)載入配置檔案/META-INF/plexus/plexus.xml,完成IOC容器的初始化。

接著使用../../client.xml檔案完成cat物件的配置資訊填充初始化。並且啟動這四個daemon執行緒,後文詳細說明:

  • cat-StatusUpdateTask 用來每秒鐘上報客戶端基本資訊(JVM等資訊)
  • cat-merge-atomic-task(訊息合併檢查)
  • cat-TcpSocketSender-ChannelManager(NIO 連線服務端檢查)
  • cat-TcpSocketSender(訊息傳送服務端)

4.1.5 CatClientModule

由於Cat用了十分low的plexus作為容器,所以在載入Cat類的時候會從靜態方法中載入各個Module,CatClientModule就是Cat client工程中首要Module

複製程式碼

public class CatClientModule extends AbstractModule {
    public static final String ID = "cat-client";

    @Override
    protected void execute(final ModuleContext ctx) throws Exception {
        ctx.info("Current working directory is " + System.getProperty("user.dir"));

        // initialize milli-second resolution level timer
        MilliSecondTimer.initialize();

        // tracking thread start/stop,此處增加經典的hook,用於執行緒池關閉的清理工作。
        Threads.addListener(new CatThreadListener(ctx));

        // warm up Cat
        Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());

        // bring up TransportManager
        ctx.lookup(TransportManager.class);

        ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);

        if (clientConfigManager.isCatEnabled()) {
            // start status update task
            StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);

            Threads.forGroup("cat").start(statusUpdateTask);
            LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms

            // MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
            // Threads.forGroup("cat").start(mmapReaderTask);
        }
    }

複製程式碼

這裡plexusIOC的具體的初始化載入邏輯在org\unidal\framework\foundation-service\2.5.0\foundation-service-2.5.0.jar中,有興趣可以仔細檢視。 
當準備工作做完之後,會執行具體的訊息構造:

DefaultMessageProducer.newTransaction(String type, String name)

複製程式碼

@Override
    public Transaction newTransaction(String type, String name) {
        // this enable CAT client logging cat message without explicit setup
        if (!m_manager.hasContext()) {
            //詳細可見下文原始碼,此處就是用ThreadLocal儲存一個Context物件:ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
            m_manager.setup();

        }

        if (m_manager.isMessageEnabled()) {
            DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);

//向Context中填充構造的訊息體:Context.m_tree;Context.m_stack;稍後看看Context這個物件
            m_manager.start(transaction, false);
            return transaction;
        } else {
            return NullMessage.TRANSACTION;
        }
    }

複製程式碼

DefaultMessageManager.start(Transaction transaction, boolean forked)

複製程式碼

@Override
    public void start(Transaction transaction, boolean forked) {
        Context ctx = getContext();//這裡獲取上文中說到的ThreadLocal中構造的Context物件

        if (ctx != null) {
            ctx.start(transaction, forked);

            if (transaction instanceof TaggedTransaction) {
                TaggedTransaction tt = (TaggedTransaction) transaction;

                m_taggedTransactions.put(tt.getTag(), tt);
            }
        } else if (m_firstMessage) {
            m_firstMessage = false;
            m_logger.warn("CAT client is not enabled because it's not initialized yet");
        }
    }

複製程式碼

DefaultMessageManager.Context.start(Transaction transaction, boolean forked)

複製程式碼

        public void start(Transaction transaction, boolean forked) {
            if (!m_stack.isEmpty()) {//
                 {
                    Transaction parent = m_stack.peek();
                    addTransactionChild(transaction, parent);
                }
            } else {
                m_tree.setMessage(transaction);//在這裡把返回的transaction放在tree上,如果有巢狀結構,後邊繼續在tree上添枝加葉
            }

            if (!forked) {
                m_stack.push(transaction);
            }
        }

複製程式碼

這部分程式碼可以看出, 

通過ThreadLocal<Context.>,使Context中實際的訊息的構造保證了執行緒安全。

如果當前Context的棧m_stack不為空,那麼接著之前的訊息後邊,將當前訊息構造為一個孩子結點。如果當前訊息之前沒有其他訊息,放入m_stack中,並setMessage.也就是當前訊息時父節點。

至此,訊息體構造完畢。 

這裡需要看一下Context類,是DefaultMessageManager包私有的內部類。

Context.java

複製程式碼

    class Context {
        private MessageTree m_tree;//初始化的時候構建一個MessageTree

        private Stack<Transaction> m_stack;

        private int m_length;

        private boolean m_traceMode;

        private long m_totalDurationInMicros; // for truncate message

        private Set<Throwable> m_knownExceptions;

        public Context(String domain, String hostName, String ipAddress) {
            m_tree = new DefaultMessageTree();
            m_stack = new Stack<Transaction>();

            Thread thread = Thread.currentThread();
            String groupName = thread.getThreadGroup().getName();

            m_tree.setThreadGroupName(groupName);
            m_tree.setThreadId(String.valueOf(thread.getId()));
            m_tree.setThreadName(thread.getName());

            m_tree.setDomain(domain);
            m_tree.setHostName(hostName);
            m_tree.setIpAddress(ipAddress);
            m_length = 1;
            m_knownExceptions = new HashSet<Throwable>();
        }

複製程式碼

每個執行緒通過使用ThreadLocal構造一個Context物件並存儲。Context主要包含當前的訊息體m_tree,和多個巢狀訊息體填充的棧:m_stack :

再回到我們原來的UnitTest程式碼, 

Transaction t=Cat.newTransaction("logTransaction", "logTransaction"); 

這行程式碼完成了客戶端plexusIOC容器的初始化,cat-client的載入初始化、啟動了四個daemon執行緒,並返回了Transaction物件。

t.setStatus("0");//很簡單,就是這是一個屬性值
t.complete();

訊息完成後,將訊息放入一個佇列中,從而保證非同步上報。

transaction.complete();的具體程式碼如下:

複製程式碼

........
    public void complete() {
        try {
            if (isCompleted()) {
                // complete() was called more than once
                DefaultEvent event = new DefaultEvent("cat", "BadInstrument");

                event.setStatus("TransactionAlreadyCompleted");
                event.complete();
                addChild(event);
            } else {
                m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;

                setCompleted(true);

                if (m_manager != null) {
                    m_manager.end(this);
                }
            }
        } catch (Exception e) {
            // ignore
        }
    }
........
    @Override
    public void end(Transaction transaction) {
        Context ctx = getContext();

        if (ctx != null && transaction.isStandalone()) {
            if (ctx.end(this, transaction)) {
                m_context.remove();
            }
        }
    }
........

        public boolean end(DefaultMessageManager manager, Transaction transaction) {
            if (!m_stack.isEmpty()) {
                Transaction current = m_stack.pop();//Context的成員變數m_stack彈出棧頂元素,LIFO當然是最新的current元素。

                if (transaction == current) {
                    m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
                } else {
                    while (transaction != current && !m_stack.empty()) {
                        m_validator.validate(m_stack.peek(), current);

                        current = m_stack.pop();
                    }
                }

                if (m_stack.isEmpty()) {//如果當前執行緒儲存的Context中m_stack無元素
                    MessageTree tree = m_tree.copy();

                    m_tree.setMessageId(null);//清理m_tree
                    m_tree.setMessage(null);

                    if (m_totalDurationInMicros > 0) {
                        adjustForTruncatedTransaction((Transaction) tree.getMessage());
                    }

                    manager.flush(tree);//將訊息放入消費佇列中
                    return true;
                }
            }

            return false;
        }
........
    public void flush(MessageTree tree) {
        if (tree.getMessageId() == null) {
            tree.setMessageId(nextMessageId());//為訊息體生產全域性唯一ID,詳見snowflate演算法
        }

        MessageSender sender = m_transportManager.getSender();

        if (sender != null && isMessageEnabled()) {
            sender.send(tree);

            reset();//ThreadLocal中儲存的Context清理
        } else {
            m_throttleTimes++;

            if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
                m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
            }
        }
    }
........
    private Context getContext() {
        if (Cat.isInitialized()) {
            Context ctx = m_context.get();//ThreadLocal儲存一個Context物件

            if (ctx != null) {
                return ctx;
            } else {
                if (m_domain != null) {
                    ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
                } else {
                    ctx = new Context("Unknown", m_hostName, "");
                }

                m_context.set(ctx);
                return ctx;
            }
        }

        return null;
    }

//TcpSocketSender.send(MessageTree tree)

    private MessageQueue m_queue = new DefaultMessageQueue(SIZE);

    private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);

    @Override
    public void send(MessageTree tree) {
        if (isAtomicMessage(tree)) {
            boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        } else {
            boolean result = m_queue.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        }
    }

複製程式碼

至此,構造的訊息體放入了阻塞佇列中等待上傳。

總結: 我們可以看到Cat-SDK通過ThreadLocal對訊息進行收集, 

收集進來按照時間以及型別構造為Tree結構,在compele()方法中將這個構造的訊息放入一個記憶體佇列中,等待TcpSockekSender這個Daemon執行緒非同步上報給服務端。

 4.1.6 cat-TcpSocketSender

訊息上傳服務端,會有一個執行緒cat-TcpSocketSender監聽消費佇列,並消費(上傳服務端)。

通訊上報服務端使用了Netty-Client,並且自定義了訊息協議。

複製程式碼

    @Override
    public void run() {
        m_active = true;

        while (m_active) {
            ChannelFuture channel = m_manager.channel();

            if (channel != null && checkWritable(channel)) {
                try {
                    MessageTree tree = m_queue.poll();

                    if (tree != null) {
                        sendInternal(tree);//netty NIO編碼後TCP傳送到服務端。
                        tree.setMessage(null);
                    }

                } catch (Throwable t) {
                    m_logger.error("Error when sending message over TCP socket!", t);
                }
            } else {
                long current = System.currentTimeMillis();
                long oldTimestamp = current - HOUR;

                while (true) {
                    try {
                        MessageTree tree = m_queue.peek();

                        if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
                            MessageTree discradTree = m_queue.poll();

                            if (discradTree != null) {
                                m_statistics.onOverflowed(discradTree);
                            }
                        } else {
                            break;
                        }
                    } catch (Exception e) {
                        m_logger.error(e.getMessage(), e);
                        break;
                    }
                }

                try {
                    Thread.sleep(5);
                } catch (Exception e) {
                    // ignore it
                    m_active = false;
                }
            }
        }
    }

    private void sendInternal(MessageTree tree) {
        ChannelFuture future = m_manager.channel();
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

        System.out.println(tree);

        m_codec.encode(tree, buf);//編碼後傳送

        int size = buf.readableBytes();
        Channel channel = future.channel();

        channel.writeAndFlush(buf);
        if (m_statistics != null) {
            m_statistics.onBytes(size);
        }
    }

複製程式碼

 

4.1.7 cat-merge-atomic-task

符合如下邏輯判斷的atomicMessage會放入m_atomicTrees訊息佇列,然後由這個後臺執行緒監聽並消費。 

具體程式碼如下:

TcpSocketSender.java

複製程式碼

private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);

......

        private boolean isAtomicMessage(MessageTree tree) {
        Message message = tree.getMessage();//從tree上拿去message

        if (message instanceof Transaction) {//如果這個message實現了Transaction介面,也就是Transaction型別的訊息
            String type = message.getType();

            if (type.startsWith("Cache.") || "SQL".equals(type)) {//如果以Cache.,SQL開頭的則返回True
                return true;
            } else {
                return false;
            }
        } else {
            return true;
        }
        //看到這裡,也就是說,"Cache","SQL"開頭的Transaction訊息,或者非Transaction訊息,認為是atomicMessage.
    }

......

public void send(MessageTree tree) {
        if (isAtomicMessage(tree)) {//如果符合atomicMessage
            boolean result = m_atomicTrees.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);//佇列溢位處理
            }
        } else {
            boolean result = m_queue.offer(tree, m_manager.getSample());

            if (!result) {
                logQueueFullInfo(tree);
            }
        }
    }
......

複製程式碼

 

 

複製程式碼

public class DefaultMessageQueue implements MessageQueue {
    private BlockingQueue<MessageTree> m_queue;

    private AtomicInteger m_count = new AtomicInteger();

    public DefaultMessageQueue(int size) {
        m_queue = new LinkedBlockingQueue<MessageTree>(size);
    }

    @Override
    public boolean offer(MessageTree tree) {
        return m_queue.offer(tree);
    }

    @Override
    public boolean offer(MessageTree tree, double sampleRatio) {
        if (tree.isSample() && sampleRatio < 1.0) {//如果這個訊息是sample,並且sampleRation大於1
            if (sampleRatio > 0) {//這段邏輯就是按取樣率去剔除一些訊息,只選取其中一部分進行後續的消費上傳。
                int count = m_count.incrementAndGet();

                if (count % (1 / sampleRatio) == 0) {
                    return offer(tree);
                }
            }
            return false;
        } else {//不做取樣過濾,放入佇列
            return offer(tree);
        }
    }

    @Override
    public MessageTree peek() {
        return m_queue.peek();
    }

    @Override
    public MessageTree poll() {
        try {
            return m_queue.poll(5, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return null;
        }
    }

    @Override
    public int size() {
        return m_queue.size();
    }
}

複製程式碼

 

這個後臺程序的消費動作:

複製程式碼

......

private boolean shouldMerge(MessageQueue trees) {
        MessageTree tree = trees.peek();//獲取對頭元素,非移除

        if (tree != null) {
            long firstTime = tree.getMessage().getTimestamp();
            int maxDuration = 1000 * 30;
            //訊息在30s內生成,或者佇列擠壓訊息超過200,則需要merge
            if (System.currentTimeMillis() - firstTime > maxDuration || trees.size() >= MAX_CHILD_NUMBER) {
                return true;
            }
        }
        return false;
    }

......

        @Override
        public void run() {
            while (true) {
                if (shouldMerge(m_atomicTrees)) {
                    MessageTree tree = mergeTree(m_atomicTrees);//把m_atomicTrees佇列中的訊息merge為一條訊息樹
                    boolean result = m_queue.offer(tree);//放入m_queue佇列,等待cat-TcpSocketSender執行緒正常消費

                    if (!result) {
                        logQueueFullInfo(tree);
                    }
                } else {
                    try {
                        Thread.sleep(5);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
        }

.....

private MessageTree mergeTree(MessageQueue trees) {
        int max = MAX_CHILD_NUMBER;
        DefaultTransaction tran = new DefaultTransaction("_CatMergeTree", "_CatMergeTree", null);//增加merge處理埋點
        MessageTree first = trees.poll();//從佇列頭部移除

        tran.setStatus(Transaction.SUCCESS);
        tran.setCompleted(true);
        tran.addChild(first.getMessage());
        tran.setTimestamp(first.getMessage().getTimestamp());
        long lastTimestamp = 0;
        long lastDuration = 0;

        //這段邏輯就是不停從這個m_atomicTrees佇列頭部拿去messsage,並使用同一個messageId,把佇列中所有的訊息合併為一條Transaction訊息。
        while (max >= 0) {
            MessageTree tree = trees.poll();//接著 從佇列頭部移除

            if (tree == null) {
                tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration);
                break;
            }
            lastTimestamp = tree.getMessage().getTimestamp();
            if(tree.getMessage() instanceof DefaultTransaction){
                lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis();
            } else {
                lastDuration = 0;
            }
            tran.addChild(tree.getMessage());
            m_factory.reuse(tree.getMessageId());
            max--;
        }
        ((DefaultMessageTree) first).setMessage(tran);
        return first;
    } 

複製程式碼

4.1.8 TcpSocketSender-ChannelManager 後臺執行緒

這個執行緒是通過服務端配置的路由ip,10s輪詢一次,當滿足自旋n(n = m_count % 30)次,去檢查路由服務端ip是否變動,並保證連線正常。典型的拉取配置資訊機制。

1)客戶端跟服務端連線建立,分兩步:

  • 初始ChannelMananger的時候 ;
  • ChannelManager非同步執行緒,每隔10秒做一次檢查。

初始ChannelMananger的時候

例項化ChannelManager的時候,根據配置的第一個server,從遠端伺服器讀取伺服器列表,如果能讀取到,則順序建立連線,直到建立成功為止;如果不能讀到,則根據本地配置的列表,逐個建立連線,直到成功為止。

ChannelMananger非同步執行緒,每隔10秒做一次檢查

  • 檢查Server列表是否變更

每間隔10s,檢查當前channelFuture是否活躍,活躍,則300s檢查一次,不活躍,則執行檢查。檢查的邏輯是:比較本地server列表跟遠端服務提供的列表是否相等,不相等則根據遠端服務提供的server列表順序的重新建立第一個能用的ChannelFuture

  • 檢視當前客戶端是否有積壓,或者ChannelFuture是否被關閉

如果有積壓,或者關閉掉了,則關閉當前連線,將activeIndex=-1,表示當前連線不可用。

  • 重連預設Server

從0到activeIndex中找一個能連線的server,中心建立一個連線。如果activeIndex為-1,則從整個的server列表中順序的找一個可用的連線建立連線。

2)ChannelManager例項化,建立Netty連線邏輯

客戶端例項化DefaultTransportManager物件時,會按照如下流程先例項化m_tcpSocketSender,接著例項化ChannelManager。ChannelManager管理對服務端的netty連線。 例項化流程如下:

 ChannelManager通過ChannelHolder把netty的ChannnelFuture封裝起來。ChannelHolder結構如下:

複製程式碼

public static class ChannelHolder {

        /**
         * 當前活躍的channelFuture
         */
        private ChannelFuture m_activeFuture;

        /**
         * 當前server在m_serverAddresses中的第幾個
         */
        private int m_activeIndex = -1;

        /**
         * 當前活躍的ChannelFuture對應的配置
         */
        private String m_activeServerConfig;

        /**
         * 從配置檔案中讀取的服務端列表
         */
        private List<InetSocketAddress> m_serverAddresses;

        /**
         * 當前活躍的ChannelFutre對應的ip
         */
        private String m_ip;

        /**
         * 連線從第一次初始化開始,是否發生過變更
         */
        private boolean m_connectChanged;

    //省略其它的程式碼
}

複製程式碼

3)ChannelManager內部非同步執行緒,動態切換Netty連線邏輯。

ChannelManager內部每隔10秒鐘,檢查netty連線。這部分程式碼如下:

複製程式碼

    public void run() {
        while (m_active) {
            /**
             * make save message id index asyc
             * 本地儲存index,和 時間戳,防止重啟,導致本地的訊息id重了
              */
            m_idfactory.saveMark();

            /**
             * 檢查本地初始化的服務列表跟遠端的服務列表是否有差異,如果有差異,則取遠端第一個能建立連線的server,建立一個新的連線,關閉舊的連線
             */
            checkServerChanged();

            ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
            List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

            /**
             * 檢查當前channelFuture是否有訊息積壓(本地佇列長度超過4990),或者 channelFuture不是開的
             */
            doubleCheckActiveServer(activeFuture);
            /**
             * 從serverAddresses列表裡面,重新順序選一個,重新連線
             */
            reconnectDefaultServer(activeFuture, serverAddresses);

            try {
                Thread.sleep(10 * 1000L); // check every 10 seconds
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }

複製程式碼

 總結:服務端沒有做到負載均衡,連線會慢慢連線到server列表裡面第一個可用的server上。

4.1.9 StatusUpdateTask

CatClientModule在載入過程中會從StatusUpdateTask中啟動一個執行緒來每隔一段時間傳送一個HeartBeatMessage,其中包括了客戶端能拿到的各種資訊,包括CPU,Memory,Disk等等,開發者也可以通過實現StatusExtension介面的方式來實現對於HeartBeatMessage傳送內容的擴充套件。

這個執行緒很簡單,類似傳統的agent,每分鐘上報關於應用的各種資訊(OS、MXBean資訊等等)。而且,在每次執行緒啟動時上報一個Reboot訊息表示重啟動。

其中比較重要的實現資訊收集的是這行程式碼

StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);
status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));

m_statistics包含的是已經發送過資訊的容量,m_jars是通過classLoader載入的jar包名稱,StatusInfoCollector通過大量訪問者模式的程式碼實現了將各種指標set到status中的功能,之後將status封裝到HeartBeatMessage中,按照一般對於message的處理流程,flush到訊息傳輸層中

4.1.10 MessageId的設計

CAT訊息的Message-ID格式applicationName-0a010680-375030-2,CAT訊息一共分為四段: 

第一段是應用名applicationName。 

第二段是當前這臺機器的IP的16進位制格式:

複製程式碼

if (m_ipAddress == null) {
            String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
            List<String> items = Splitters.by(".").noEmptyItem().split(ip);
            byte[] bytes = new byte[4];

            for (int i = 0; i < 4; i++) {
                bytes[i] = (byte) Integer.parseInt(items.get(i));
            }

            StringBuilder sb = new StringBuilder(bytes.length / 2);

            for (byte b : bytes) {
                //1.一個byte 8位
                //2.先獲取高4位的16進位制字元
                //3.在獲取低4位的16進位制數            
                sb.append(Integer.toHexString((b >> 4) & 0x0F));//通常使用0x0f來與一個整數進行&運算,來獲取該整數的最低4個bit位
                sb.append(Integer.toHexString(b & 0x0F));
            }

            m_ipAddress = sb.toString();

複製程式碼

第三段的375030,是系統當前時間除以小時得到的整點數。 
第四段的2,是表示當前這個客戶端在當前小時的順序遞增號(AtomicInteger自增,每小時結束後重置)。

複製程式碼

public String getNextId() {
        String id = m_reusedIds.poll();

        if (id != null) {
            return id;
        } else {
            long timestamp = getTimestamp();

            if (timestamp != m_timestamp) {
                m_index = new AtomicInteger(0);
                m_timestamp = timestamp;
            }

            int index = m_index.getAndIncrement();

            StringBuilder sb = new StringBuilder(m_domain.length() + 32);

            sb.append(m_domain);
            sb.append('-');
            sb.append(m_ipAddress);
            sb.append('-');
            sb.append(timestamp);
            sb.append('-');
            sb.append(index);

            return sb.toString();
        }

複製程式碼

 

總之,同一個小時內、同一個domain、同一個ip , messageId的唯一性需要 AtomicInteger保證。

4.2 cat-home設計

4.2.1 服務端初始化

1)Servlet容器載入、啟動

CAT目前是使用war包放入Servlet容器(如:tomcat或者jetty,以下假設使用tomcat容器)中的方式部署啟動。 

熟悉servlet容器的同學應該知道,容器啟動時會讀取每個Context(可理解為web工程)中的web.xml然後啟動Servlet等其他元件。

在cat-home模組中的web.xml中可以看到,除了容器預設的Servlet之外,tomcat啟動時會啟動CatServlet、MVC這兩個Servlet(因為load-on-startup>0,也就是會呼叫init方法初始化):

複製程式碼

<web-app>

<filter>...</filter>

<servlet>
        <servlet-name>cat-servlet</servlet-name>
        <servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet>
        <servlet-name>mvc-servlet</servlet-name>
        <servlet-class>org.unidal.web.MVC</servlet-class>
        <init-param>
            <param-name>cat-client-xml</param-name>
            <param-value>client.xml</param-value>
        </init-param>
        <init-param>
            <param-name>init-modules</param-name>
            <param-value>false</param-value>
        </init-param>
        <load-on-startup>2</load-on-startup>
    </servlet>

<filter-mapping>...</filter-mapping>
<servlet-mapping>...</servlet-mapping>
<jsp-config>...</jsp-config>

</web-app>

複製程式碼

 

2)com.dianping.cat.servlet.CatServlet

按照web.xml中Servlet的載入順序CatServlet會優先於MVC完成初始化。 

CatServlet的邏輯基本可以概括為如下兩條線:

複製程式碼

CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) 
            ——>com.dianping.cat.CatHomeModule.setup(ModuleContext ctx)
                ——>TCPSocketReceiver(netty伺服器)

CatServlet.init——>CatServlet.initComponents——>DefaultModuleInitializer.execute(...) 
        ——>com.dianping.cat.***Module.execute(ModuleContext ctx)(完成各個模組的初始化)

複製程式碼

 

com.dianping.cat.servlet.CatServlet.init(ServletConfig servletConfig)

複製程式碼

public void init(ServletConfig config) throws ServletException {
        super.init(config);

        try {//1.plexus IOC容器初始化(根據components.xml的設定完成IOC初始化)
            if (m_container == null) {
                m_container = ContainerLoader.getDefaultContainer();
            }
            //2.用來列印日誌的m_logger物件例項化(根據plexus.xml設定完成例項化)
            m_logger = ((DefaultPlexusContainer) m_container).getLoggerManager().getLoggerForComponent(
                  getClass().getName());
            //3.初始化CAT-Server必備的元件模組:cat-home\cat-consumer\cat-core
            initComponents(config);
        } catch (Exception e) {
            if (m_logger != null) {
                m_logger.error("Servlet initializing failed. " + e, e);
            } else {
                System.out.println("Servlet initializing failed. " + e