1. 程式人生 > >深入詳解美團點評CAT跨語言服務監控(七)訊息分析器與報表(二)

深入詳解美團點評CAT跨語言服務監控(七)訊息分析器與報表(二)

CrossAnalyzer-呼叫鏈分析 

    在分散式環境中,應用是執行在獨立的程序中的,有可能是不同的機器,或者不同的伺服器程序。那麼他們如果想要彼此聯絡在一起,形成一個呼叫鏈,在Cat中,CrossAnalyzer會統計不同服務之間呼叫的情況,包括服務的訪問量,錯誤量,響應時間,QPS等,這裡的服務主要指的是 RPC 服務,在微服務監控中,這是核心。

    在講 CrossAnalyzer 的處理邏輯之前,我們先看下客戶端的埋點的一個模擬情況。

    一般情況下不同服務會通過幾個ID進行串聯。這種串聯的模式,基本上都是一樣的。在Cat中,我們需要3個ID:

  • RootId,用於標識唯一的一個呼叫鏈
  • ParentId,父Id是誰?誰在呼叫我
  • ChildId,我在呼叫誰?

      那麼我們如何傳遞這些ID?Cat為我們提供了一個內部介面 Cat.Context,但是我們需要自己實現Context,在下面程式碼中我們首先在before函式中實現了Context 上下文,然後在rpcClient中開啟訊息事務,並呼叫 Cat.logRemoteCallClient(context) 去填充Context的這3個MessageID。當然,該函式還記錄了一個RemoteCall型別的Event訊息。

     隨後我們用rpcService函式中開啟新執行緒模擬遠端RPC服務,並將context上傳到 RPC 伺服器,在真實環境中,Context是需要跨程序網路傳輸,因此需要實現序列化介面。

    在rpcService中,我們會呼叫 Cat.logRemoteCallServer(context) 將從rpcClient傳過來的Context設定到自己的 Transaction 當中。

    隨著業務處理邏輯的結束, rpcServer 和 rpcClient 都會分別將自己的訊息樹上傳到CAT伺服器分析。

    需要注意的是,Service的client和app需要和Call的server以及app對應上,要不然圖表是分析不出東西的!

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {
    public Map<String, String> maps = new HashMap<String, String>();

    public Cat.Context context;

    @Before
    public void before() {
        context = new Cat.Context() {
            @Override
            public void addProperty(String key, String value) { maps.put(key, value); }

            @Override
            public String getProperty(String key) { return maps.get(key); }
        };
    }

    @Test
    public void simulateHierarchyTransaction() throws Exception {
            ...
            //RPC呼叫開始
            rpcClient();
            rpcClient2();
            ...
    }

    protected void rpcClient() {
        //客戶端埋點,Domain為RpcClient,呼叫服務端提供的Echo服務
        Transaction parent = Cat.newTransaction("Call", "CallServiceEcho");
        Cat.getManager().getThreadLocalMessageTree().setDomain("RpcClient");

        Cat.logEvent("Call.server","localhost");
        Cat.logEvent("Call.app","RpcService");
        Cat.logEvent("Call.port","8888");
        Cat.logRemoteCallClient(context, "RpcClient");

        //開啟新執行緒模擬遠端RPC服務,將context上傳到 RPC 伺服器
        rpcService(context);

        parent.complete();
    }
    
    protected void rpcClient2() {
        ...
        //模擬另外一個RpcClient呼叫Echo服務
        rpcService(context, "RpcClient2");
        ...
    }
    
    protected void rpcService(final Cat.Context context, final String clientDomain) {
        Thread thread = new Thread() {
            @Override
            public void run() {
                //伺服器埋點,Domain為 RpcService 提供Echo服務
                Transaction child = Cat.newTransaction("Service", "Echo");
                Cat.getManager().getThreadLocalMessageTree().setDomain("RpcService");

                Cat.logEvent("Service.client", localhost); //填客戶端地址
                Cat.logEvent("Service.app", clientDomain);
                Cat.logRemoteCallServer(context);

                //to do your business

                child.complete();
            }
        };

        thread.start();

        try {
            thread.join();
        } catch (InterruptedException e) {
        }
    }
}

接下來我們看看CAT伺服器端CrossAnalyzer的邏輯。

        我們依然會為每個週期時間內的每個Domain建立一張報表(CrossReport),然後不同的IP會分配不同的Local物件統計,每個IP又可能會接收來自不同Remote端的呼叫。

    由於這裡一個完整的呼叫鏈會涉及多個端的多個訊息樹,我們首先會根據Transaction的型別來判斷是RpcService還是RpcClient,如果Type等於PigeonService或Service則該訊息來自RpcService,如果Type等於 PigeonCall或Call則來自RpcClient。

    先來看看RpcService端訊息樹的上報處理邏輯,CAT會呼叫 parsePigeonServerTransaction 函式去填充 CrossInfo 資訊,CrossInfo包含的具體內容如下:

     localAddress : RpcService的IP地址

     remoteAddress : 服務呼叫者(RpcClient)的IP地址,由type="Service.client" 的Event子訊息提供,注意,在處理RpcClient的上報時,我們會根據上報資訊中的remoteAddress再次統計該RpcService資料,大家可能會疑惑這裡是不是重複統計,事實上他們所處的視角是不一樣的,前者是站在服務提供者的視角來統計我完成這次服務所耗費的時間、資源等,而後者則是站在RpcClient視角去統計自己從發出請求到得到結果所需的時長、資源等等,比如這中間就包含網路IO的消耗,這些在後續的報表中會有體現。

       app:客戶端的Domain, 由type="Service.app"的Event子訊息提供。

       remoteRole:固定為 Pigeon.Client , 表示遠端角色為 Rpc 客戶端。

       detailType: 固定為 PigeonService , 表示自己角色為 Rpc 服務端。

        最後,我們將用CrossInfo資訊來更新報表(CrossReport),我們首先根據 localAddress 即 RpcService的 IP 找到或建立 Local物件,然後根據 remoteAddress+remoteRole 找到或建立 Remote 物件,然後統計服務的訪問量,錯誤量,處理時間,QPS。

       RpcService提供不只一個服務,不同的服務我們按名字分別統計在不同的Name物件裡,比如上面案例,RpcService提供的是Echo服務。

我們再來看看RpcClient端上報處理邏輯,CAT呼叫parsePigeonClientTransaction函式填充CrossInfo資訊,具體如下:

localAddress : RpcClient的IP地址

remoteAddress :服務提供者(RpcService)的地址,由 type="Call.server" 的Event子訊息提供。

app:服務提供者的Domain,由type="Call.app" 的Event子訊息提供,在統計完RpcClient端資料之後,會通過該屬性獲取服務提供者的CrossInfo。從RpcClient的視角再次統計RpcService的資料。

port:客戶端埠,由 type="Call.port" 的Event子訊息提供。

remoteRole:固定為 Pigeon.Server, 表示遠端角色為服務提供者。

detailType: 固定為 PigeonCall , 表示自己角色為服務呼叫者。

    然後,我們將用CrossInfo資訊來更新報表(CrossReport),也是根據 localAddress 找到Local物件,然後根據 remoteAddress+remoteRole 找到 Remote 物件,進行統計。

    接著,我們通過convertCrossInfo函式利用RpcClient的CrossInfo資訊去生成服務提供者的CrossInfo資訊,這裡實際上是為了從RpcClient的視角去統計服務提供者的報表!

public class CrossAnalyzer extends AbstractMessageAnalyzer<CrossReport> implements LogEnabled {
    private void processTransaction(CrossReport report, MessageTree tree, Transaction t) {
        CrossInfo crossInfo = parseCorssTransaction(t, tree);

        if (crossInfo != null && crossInfo.validate()) {
            updateCrossReport(report, t, crossInfo);

            String targetDomain = crossInfo.getApp();

            if (m_serverConfigManager.isRpcClient(t.getType()) && !DEFAULT.equals(targetDomain)) {
                CrossInfo serverCrossInfo = convertCrossInfo(tree.getDomain(), crossInfo);

                if (serverCrossInfo != null) {
                    CrossReport serverReport = m_reportManager.getHourlyReport(getStartTime(), targetDomain, true);

                    updateCrossReport(serverReport, t, serverCrossInfo);
                }
            } else {
                m_errorAppName++;
            }
        }
        ...
    }
}

這裡的 serverCrossInfo 被填充了什麼資料:

localAddress : RpcClient 的 remoteAddress。

remoteAddress :RpcClient 的 localAddress + clientPort

app:RpcClient 的 Domain。

remoteRole:固定為 Pigeon.Caller, 表示遠端角色為服務呼叫者。

detailType: 固定為 PigeonCall 

     最後還是用CrossInfo資訊來更新報表(CrossReport)。

最後我們看看我們生成了哪些報表資料,3個報表資料,分別是服務呼叫方 RpcClient和 RpcClient2,以及服務提供方RpcService。

    接下來我們看看服務提供方的remotes資料資訊,一共3條資料,第1條記錄是站在RpcService角度統計伺服器完成這2次服務所耗費的時間、資源等,後面2條記錄則是站在RpcClient視角去統計自己從發出請求到得到結果所需的時長、資源等等。

    第1條記錄 duration 為 0.154ms, 第2,3條記錄 duration 分別為 1072.62ms、1506.38ms, 兩者巨大的時間差一般就是網路 IO 所需的時間,事實上大多數的服務時間的消耗都是在各種IO上。這類服務統稱為IO密集型。

StorageAnalyzer  --資料庫/快取分析

    StorageAnalyzer主要分析一段時間內資料庫、Cache訪問情況:各種操作訪問次數、響應時間、錯誤次數、長時間訪問量等等,當客戶端訊息過來,StorageAnalyzer首先會分析事務屬於資料庫操作還是快取操作,然後進行不同的處理,訊息型別如果是SQL則是資料庫操作,如果以Cache.memcached開頭則認為是快取操作。

    我們首先看看資料庫操作的分析過程,下面原始碼是客戶端的案例,這是一個獲取cat庫config表全部資料的sql查詢,我們將資料庫操作所有資訊都放在一個type="SQL" 的子事務訊息中。

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {
    @Test
    public void simulateHierarchyTransaction() throws Exception {
        ...
        Transaction sqlT = cat.newTransaction("SQL", "Select");
        
        //do your SQL query
        
        cat.logEvent("SQL.Database", "jdbc:mysql://192.168.20.67:3306/cat");
        cat.logEvent("SQL.Method", "select");
        cat.logEvent("SQL.Statement", "SELECT", SUCCESS, "select * from cat.config");
        sqlT.complete();
        ...
    }
}

    上面訊息上報到服務端之後,分析器將SQL型別子事務取出,呼叫processSQLTransaction去處理,將結果寫入報表StorageReport

    processSQLTransaction 首先通過DatabaseParser提取資料庫的IP和資料庫名稱,該資訊由type="SQL.Database"的Event子訊息提供,該Event訊息上報的是資料庫連線的URL。

    接著我們會獲取資料庫操作名,type="SQL.Method" 的Event子訊息提供,資料庫操作分4類,分別是select, update, delete, insert,如果不上報,分析器預設客戶端在做select查詢。

    最後我們會為週期內的每個資料庫建立一個Storage報表。並將提取資訊放入StorageUpdateParam物件,然後將物件交給StorageReportUpdater來更新Storage報表。

public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {
    @Inject
    private DatabaseParser m_databaseParser;
    
    @Inject
    private StorageReportUpdater m_updater;

    private void processSQLTransaction(MessageTree tree, Transaction t) {
        String databaseName = null;
        String method = "select";
        String ip = null;
        String domain = tree.getDomain();
        List<Message> messages = t.getChildren();

        for (Message message : messages) {
            if (message instanceof Event) {
                String type = message.getType();

                if (type.equals("SQL.Method")) {
                    method = message.getName().toLowerCase();
                }
                if (type.equals("SQL.Database")) {
                    Database database = m_databaseParser.queryDatabaseName(message.getName());

                    if (database != null) {
                        ip = database.getIp();
                        databaseName = database.getName();
                    }
                }
            }
        }
        if (databaseName != null && ip != null) {
            String id = querySQLId(databaseName);
            StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
            StorageUpdateParam param = new StorageUpdateParam();

            param.setId(id).setDomain(domain).setIp(ip).setMethod(method).setTransaction(t)
                  .setThreshold(LONG_SQL_THRESHOLD);// .setSqlName(sqlName).setSqlStatement(sqlStatement);
            m_updater.updateStorageReport(report, param);
        }
    }
}

     資料庫與快取的報表更新邏輯相同,不同ip地址的資料庫/快取的統計資訊在不同Machine裡面,同時也可能會有不同的Domain訪問同一個資料庫/快取,每個Domain的訪問都會被單獨統計,每個Domain對資料庫/快取不同的操作會統計在不同Operation裡,除了當前小時週期的統計彙總外,我們還會用Segment記錄每分鐘的彙總資料。訪問時間超過1秒的資料庫操作(快取是50ms) 會被認為是長時間訪問記錄。

快取操作

    接下來我們看下快取的案例,獲取memcached中key="uid_1234567"的值,Storage分析器會判斷Type是否以"Cache.memcached"開頭,如果是,則認為這是一個快取操作,(這裡程式碼我認為有些稍稍不合理,如果我用的是Redis快取,我希望上報的Type="Cache.Redis",所以我這裡講原始碼稍稍做了修改,判斷Type如果以"Cache."開頭,就認為是快取)。

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {
    @Test
    public void simulateHierarchyTransaction() throws Exception {
        ...
        Transaction cacheT = cat.newTransaction("Cache.memcached", "get:uid_1234567");
        
        //do your cache operation
        
        cat.logEvent("Cache.memcached.server", "192.168.20.67:6379");
        cacheT.complete();
        ...
    }
}

接下來我們看下Storage分析器的處理邏輯,processCacheTransaction負責分析訊息, 事務型別"Cache.memcached"的“Cache.”後面部分將會被提取作為快取型別,分析器會為每個型別的快取都建立一個報表,事務名稱":"前面部分會被提取作為操作名稱,一般快取有 add,get,hGet,mGet,remove等操作,快取地址將由type="Cache.memcached.server"的Event子訊息提供,最後我們還是將domain、ip、method、事務、閾值等訊息放入StorageUpdateParam交由StorageReportUpdater來更新報表,更新邏輯與資料庫一致。

public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {
    @Inject
    private StorageReportUpdater m_updater;
    
    private void processCacheTransaction(MessageTree tree, Transaction t) {
        String cachePrefix = "Cache.";
        String ip = "Default";
        String domain = tree.getDomain();
        String cacheType = t.getType().substring(cachePrefix.length());
        String name = t.getName();
        String method = name.substring(name.lastIndexOf(":") + 1);
        List<Message> messages = t.getChildren();

        for (Message message : messages) {
            if (message instanceof Event) {
                String type = message.getType();

                if (type.equals("Cache.memcached.server")) {
                    ip = message.getName();
                    int index = ip.indexOf(":");

                    if (index > -1) {
                        ip = ip.substring(0, index);
                    }
                }
            }
        }
        String id = queryCacheId(cacheType);
        StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
        StorageUpdateParam param = new StorageUpdateParam();

        param.setId(id).setDomain(domain).setIp(ip).setMethod(method).setTransaction(t)
              .setThreshold(LONG_CACHE_THRESHOLD);
        m_updater.updateStorageReport(report, param);
    }
}

StateAnalyzer

    主要是分析CAT伺服器自身的異常,他在週期任務執行中,不蒐集任何資料,而是在週期結束後,對CAT的整體狀況做一個彙總後生成報表,他的報表結構如下。

HeartbeatAnalyzer

    分析器HeartbeatAnalyzer用於上報的心跳資料的分析。我們先看看客戶端的收集邏輯,CAT客戶端在初始化CatClientModule的時候,會開啟一個StatusUpdateTask的執行緒任務,每隔一分鐘去收集客戶端的心跳狀態,通過 Heartbeat 訊息上報到客戶端,心跳資料以xml格式存在於Heartbeat訊息中。

public class CatClientModule extends AbstractModule {
    @Override
    protected void execute(final ModuleContext ctx) throws Exception {
        ...
        if (clientConfigManager.isCatEnabled()) {
            // start status update task
            StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);

            Threads.forGroup("cat").start(statusUpdateTask);
            ...
        }
    }
}

    

    Cat客戶端會為Heartbeat訊息建立一個System型別事務訊息,然後將 Heartbeat 訊息放入該事務,資訊的收集靠StatusInfoCollector來完成,StatusInfoCollector將收集的資料寫入StatusInfo物件,然後StatusUpdateTask將StatusInfo轉化成xml之後放到Heartbeat訊息資料段上報。

public class StatusUpdateTask implements Task, Initializable {
    @Override
    public void run() {
        //建立類目錄, 上報CAT客戶端啟動資訊
        ...

        while (m_active) {
            long start = MilliSecondTimer.currentTimeMillis();

            if (m_manager.isCatEnabled()) {
                Transaction t = cat.newTransaction("System", "Status");
                Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);
                StatusInfo status = new StatusInfo();

                t.addData("dumpLocked", m_manager.isDumpLocked());
                try {
                    StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);

                    status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));

                    buildExtensionData(status);
                    h.addData(status.toString());
                    h.setStatus(Message.SUCCESS);
                } catch (Throwable e) {
                    h.setStatus(e);
                    cat.logError(e);
                } finally {
                    h.complete();
                }
                t.setStatus(Message.SUCCESS);
                t.complete();
            }
            
            //sleep 等待下一次心跳上報
            ...
        }
    }
}

    我們上報的XML到底包含哪些資料,我們看下StatusInfo的結構,StatusInfo除了包含上報時間戳之外,還有哪些系統狀態資訊、附加擴充套件資訊(Extension)會被StatusInfoCollector收集?

1、執行時資料RuntimeInfo:JAVA版本 java.version、使用者名稱user.name、專案目錄user.dir、java類路徑等等。

2、作業系統資訊 OsInfo,同時建立System附加擴充套件資訊。

3、磁碟資訊DiskInfo,磁碟的總量、空閒與使用情況,同時建立Disk附加擴充套件資訊

4、記憶體使用情況MemoryInfo,同時建立垃圾回收擴充套件資訊、JAVA虛擬機器堆附加擴充套件資訊

5、執行緒資訊,以及 FrameworkThread 附加擴充套件資訊。

6、Cat使用狀態資訊

下面我列一個上報的XML資料案例:

<?xml version="1.0" encoding="utf-8"?>
<status timestamp="2018-05-28 16:23:08.625">
   <runtime start-time="1527495705011" up-time="114212" java-version="1.8.0_40" user-name="CAOHAO1">
      <user-dir>E:\cat\cat-client</user-dir>
      <java-classpath>idea_rt.jar,junit-rt.jar,charsets.jar ... </java-classpath>
   </runtime>
   <os name="Windows 7" arch="amd64" version="6.1" available-processors="4" system-load-average="-1.0" process-time="5881237700" total-physical-memory="8538804224" free-physical-memory="1369870336" committed-virtual-memory="277372928" total-swap-space="17075703808" free-swap-space="4815220736"/>
    
   <disk>
      <disk-volume id="C:\" total="77218181120" free="23651565568" usable="23651565568"/>
      <disk-volume id="D:\" total="461373435904" free="249596563456" usable="249596563456"/>
      ...
   </disk>
   
   <memory max="1897922560" total="128974848" free="118387592" heap-usage="10961920" non-heap-usage="17081736">
      <gc name="PS Scavenge" count="1" time="167"/>
      <gc name="PS MarkSweep" count="0" time="0"/>
   </memory>
   <thread count="11" daemon-count="10" peek-count="11" total-started-count="12" cat-thread-count="0" pigeon-thread-count="0" http-thread-count="0">
      <dump>1: "Attach Listener" Id=5 RUNNABLE ... </dump>
   </thread>
   <message produced="0" overflowed="0" bytes="0"/>
   <extension id="System">
      <extensionDetail id="LoadAverage" value="-1.0"/>
      <extensionDetail id="FreePhysicalMemory" value="1.369247744E9"/>
      <extensionDetail id="FreeSwapSpaceSize" value="4.814426112E9"/>
   </extension>
   <extension id="Disk">
      <extensionDetail id="C:\ Free" value="2.3651565568E10"/>
      <extensionDetail id="D:\ Free" value="2.49596563456E11"/>
      ...
   </extension>
   <extension id="GC">
      <extensionDetail id="PS ScavengeCount" value="1.0"/>
      <extensionDetail id="PS ScavengeTime" value="167.0"/>
      <extensionDetail id="PS MarkSweepCount" value="0.0"/>
      <extensionDetail id="PS MarkSweepTime" value="0.0"/>
   </extension>
   <extension id="JVMHeap">
      <extensionDetail id="Code Cache" value="3707200.0"/>
      <extensionDetail id="Metaspace" value="1.2053E7"/>
      <extensionDetail id="Compressed Class Space" value="1412600.0"/>
      <extensionDetail id="PS Eden Space" value="4805792.0"/>
      <extensionDetail id="PS Survivor Space" value="5214992.0"/>
      <extensionDetail id="PS Old Gen" value="941136.0"/>
   </extension>
   <extension id="FrameworkThread">
      <extensionDetail id="HttpThread" value="0.0"/>
      <extensionDetail id="CatThread" value="0.0"/>
      <extensionDetail id="PigeonThread" value="0.0"/>
      <extensionDetail id="ActiveThread" value="11.0"/>
      <extensionDetail id="StartedThread" value="12.0"/>
   </extension>
   <extension id="CatUsage">
      <extensionDetail id="Produced" value="2.0"/>
      <extensionDetail id="Overflowed" value="0.0"/>
      <extensionDetail id="Bytes" value="1038.0"/>
   </extension>
</status>

      我們再來看看服務端的分析邏輯,HeartbeatAnalyzer會為每個Domain建立一張心跳報表HeartbeatReport,不同IP的機器心跳資料存在於不同Machine物件裡,每分鐘的心跳資料都由一個Period物件儲存;

    HeartbeatAnalyzer首先將XML還原為StatusInfo,然後會用StatusInfo的RuntimeInfo、OsInfo、DiskInfo、MemoryInfo、ThreadInfo、MessageInfo的資訊以及Extensions的動態屬性m_dynamicAttributes去更新Period的m_extensions。

 DumpAnalyzer -- 原始訊息LogView儲存

    DumpAnalyzer 與其它分析器有點不同,它不是為了報表而設計,而是用於原始訊息LogView的儲存,與報表統計不一樣,他的資料量非常大,幾年前美團點評每天處理訊息就達到1000億左右,大小大約100TB單物理機高峰期每秒要處理100MB左右的流量,因為資料量比較大所以儲存整體要求就是批量壓縮以及隨機讀,採用佇列化、非同步化、執行緒池等技術來保證併發。

    當有客戶端訊息過來,DumpAnalyzer會呼叫本地訊息處理器管理類(LocalMessageBucketManager) 的 storeMessage 方法儲存訊息,LocalMessageBucketManager是LogView管理的核心類,我們先看一看 LocalMessageBucketManager 物件的初始化函式 initialize() 的處理邏輯:

1、首先獲取訊息儲存的基礎路徑(m_baseDir),預設是 /data/appdatas/cat/bucket/dump, 在 server.xml 中可以配置,訊息在基礎路徑之內,會根據domain、機器、時間等元素來分門別類的儲存。

2、開啟 BlockDumper 執行緒, 將本地訊息處理器(LocalMessageBucket)、阻塞佇列(BlockingQueue)以及統計資訊的指標傳入BlockDumper 物件,當記憶體訊息塊到達 64K 的時候, 該執行緒會非同步將記憶體訊息塊寫入資料檔案和索引檔案。

3、開啟LogviewUploader執行緒,將自己的指標、本地訊息處理器、HDFS上傳物件(HdfsUploader)以及配置管理器的指標傳入LogviewUploader物件,該用於非同步將檔案上傳到 HDFS, 前提是配置了 hdfs 上傳配置。

4、開啟20個訊息壓縮執行緒(本地模式僅2個執行緒),併為每個執行緒分配一個阻塞佇列,當DumpAnalyzer接收到訊息請求,會將訊息寫入該佇列,MessageGzip會輪訓從佇列取訊息處理,注意這裡雖然有20個佇列,然而正常我們只插入前19個佇列,只有在前面入隊失敗了,訊息將會被插入最後那個佇列,可以認為最後那個佇列是前面佇列的一個備用佇列。

public class LocalMessageBucketManager extends ContainerHolder implements MessageBucketManager, Initializable, LogEnabled {
    @Override
    public void initialize() throws InitializationException {
        m_baseDir = new File(m_configManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR));

        Threads.forGroup("cat").start(new BlockDumper(m_buckets, m_messageBlocks, m_serverStateManager));
        Threads.forGroup("cat").start(new LogviewUploader(this, m_buckets, m_logviewUploader, m_configManager));

        if (m_configManager.isLocalMode()) {
            m_gzipThreads = 2;
        }

        for (int i = 0; i < m_gzipThreads; i++) {
            LinkedBlockingQueue<MessageItem> messageQueue = new LinkedBlockingQueue<MessageItem>(m_gzipMessageSize);

            m_messageQueues.put(i, messageQueue);
            Threads.forGroup("cat").start(new MessageGzip(messageQueue, i));
        }
        m_last = m_messageQueues.get(m_gzipThreads - 1);
    }
    
    @Override
    public void storeMessage(final MessageTree tree, final MessageId id) {
        boolean errorFlag = true;
        int hash = Math.abs((id.getDomain() + '-' + id.getIpAddress()).hashCode());
        int index = (int) (hash % m_gzipThreads);
        MessageItem item = new MessageItem(tree, id);
        LinkedBlockingQueue<MessageItem> queue = m_messageQueues.get(index % (m_gzipThreads - 1));
        boolean result = queue.offer(item);
        ...
    }
}

    當DumpAnalyzer接收到訊息請求,會呼叫storeMessage(...) 函式處理訊息,如上原始碼,函式會根據domain和客戶端ip將訊息均勻分配到那19個阻塞佇列(LinkedBlockingQueue)中,然後MessageGzip會輪詢從佇列獲取訊息資料,呼叫gzipMessage(item)函式處理,每處理 10000 條訊息,MessageGzip會上報一條Gzip壓縮執行緒監控記錄。

    我們再看看最核心的 gzipMessage(MessageItem item) 函式的處理邏輯,CAT根據日期,週期小時,domain,客戶端地址,服務端地址建立儲存路徑和檔案,包含資料檔案和索引檔案, 例如 20180611/15/Cat-127.0.01-127.0.01、20180611/15/Cat-127.0.01-127.0.01.idx ,從前面可以看出Message-ID的前3段可以確定唯一的索引檔案,每條訊息的儲存由本地訊息處理器(LocalMessageBucket)控制,LocalMessageBucket 的 storeMessage(...)方法會將訊息資訊寫入訊息塊(MessageBlock)物件存放在記憶體中,當MessageBlock資料塊大小達到 64K 時,將記憶體資料(MessageBlock) 放入阻塞佇列 (m_messageBlocks),非同步寫入檔案,並清空記憶體MessageBlock。LocalMessageBucket 有個欄位 m_blockSize 用於記錄訊息塊總大小,注意這裡的 64K 是壓縮前的訊息塊總大小。

public class MessageGzip implements Task {
    private void gzipMessage(MessageItem item) {
        MessageId id = item.getMessageId();
        String name = id.getDomain() + '-' + id.getIpAddress() + '-' + m_localIp;
        String path = m_pathBuilder.getLogviewPath(new Date(id.getTimestamp()), name);
        LocalMessageBucket bucket = m_buckets.get(path);

        if (bucket == null) {
            synchronized (m_buckets) {
                bucket = m_buckets.get(path);
                if (bucket == null) {
                    bucket = (LocalMessageBucket) lookup(MessageBucket.class, LocalMessageBucket.ID);
                    bucket.setBaseDir(m_baseDir);
                    bucket.initialize(path);

                    m_buckets.put(path, bucket);
                }
            }
        }

        DefaultMessageTree tree = (DefaultMessageTree) item.getTree();
        ByteBuf buf = tree.getBuffer();
        MessageBlock block = bucket.storeMessage(buf, id);

        if (block != null) {
            if (!m_messageBlocks.offer(block)) {
                m_serverStateManager.addBlockLoss(1);
            }
        }
    }
}

    從上程式碼可以看出,當 storeMessage(...) 返回不為空的訊息塊(MessageBlock)時,則認為記憶體資料已經達到64K,需要寫入檔案,MessageGzip將訊息塊推入阻塞佇列m_messageBlocks, BlockDumper執行緒會對佇列進行消費, 它在例項化的時候會建立一個執行執行緒池 m_executors,然後 BlockDumper 執行緒輪詢從阻塞佇列取訊息塊(MessageBlock),為每個訊息塊建立一個塊寫入任務(FlushBlockTask),並將任務提交給執行執行緒池執行。FlushBlockTask實際會呼叫BlockDumper的flushBlock(block)函式將MessageBlock寫入檔案。

    最終寫入操作,還是得由LocalMessageBucket的MessageBlockWriter來完成,接下來我們介紹下本地訊息處理器(LocalMessageBucket),它是一個控制訊息資料讀寫的物件,資料在記憶體中的載體是訊息塊(MessageBlock),LocalMessageBucket 在gzipMessage(...)被首次例項化、初始化,初始化過程中會建立一個訊息塊(MessageBlock)、訊息塊讀處理物件(MessageBlockReader)、訊息塊寫處理物件(MessageBlockWriter)、、緩衝區以及緩衝區壓縮流。

     MessageBlock 包含4個資訊:檔案路徑、資料緩衝區、每條ID的序列號、每條訊息資料的大小(壓縮前)。    

     訊息塊讀處理物件負責訊息的讀取操作。  

     訊息塊寫處理物件則負責資料檔案、索引檔案的寫入操作,他會維護一個檔案遊標偏移量,記錄壓縮訊息塊(MessageBlock)在資料檔案中的起始位置,即圖2中塊地址,下面是具體的寫邏輯,先寫索引檔案,CAT先獲取訊息塊中訊息總條數,為每個Message-ID都寫一個索引記錄,每條訊息的索引記錄長度都是48bits,索引根據Message-ID的第四段(序列號)來確定索引的位置,比如訊息Message-ID為ShopWeb-0a010680-375030-2,這條訊息ID對應的索引位置為2*48bits的位置,48bits索引包含32bits的塊地址 和 16bits 的塊內偏移地址,前者記錄壓縮訊息塊(MessageBlock)在資料檔案中的偏移位置,由於訊息塊包含多條訊息,我們需要16bits來記錄訊息在訊息塊中的位置,注意這裡指解壓後的訊息塊。寫完索引檔案再寫入資料檔案,每一段壓縮資料,前4位都是壓縮塊的大小,後面才是訊息塊的實際資料。

public class MessageBlockWriter {
    private RandomAccessFile m_indexFile;
    private RandomAccessFile m_dataFile;
    private int m_blockAddress;
    
    public synchronized void writeBlock(MessageBlock block) throws IOException {
        int len = block.getBlockSize();
        byte[] data = block.getData();
        int blockSize = 0;

        for (int i = 0; i < len; i++) {
            int seq = block.getIndex(i);
            int size = block.getSize(i);

            m_indexFile.seek(seq * 6L);
            m_indexFile.writeInt(m_blockAddress);
            m_indexFile.writeShort(blockSize);
            blockSize += size;
        }

        m_dataFile.writeInt(data.length);
        m_dataFile.write(data);
        m_blockAddress += data.length + 4;
    }
}

    CAT讀取訊息的時候,首先根據Message-ID的前面三段確定唯一的索引檔案,在根據Message-ID第四段確定此Message-ID索引位置,根據索引檔案的48bits讀取資料檔案的內容,然後將資料檔案進行GZIP解壓,在根據塊內偏移地址讀取出真正的訊息內容。

    一定得注意的是,同一臺客戶端機器產生的Message-ID的第四段,即當前小時的順序遞增號,在當前小時內一定不能重複,因為在服務端,CAT會為每個客戶端IP、每個小時的原始訊息儲存都建立一個索引檔案,每條訊息的索引記錄在索引檔案內的偏移位置是由順序遞增號決定的,一旦順序號重複生成,那麼該小時的重複索引資料將會被覆蓋,導致我們無法通過索引找到原始訊息資料。

上傳HDFS

自定義分析器與報表