1. 程式人生 > >CAT跨語言服務加拿大28平臺搭建鏈監控(七)消息分析器與報表

CAT跨語言服務加拿大28平臺搭建鏈監控(七)消息分析器與報表

active 進行 del 哪些 name 取出 服務監控 想要 微服務

CrossAnalyzer-調用鏈加拿大28平臺搭建論壇:haozbbs.com Q1446595067分析

在分布式環境中,應用是運行在獨立的進程中的,有可能是不同的機器,或者不同的服務器進程。那麽他們如果想要彼此聯系在一起,形成一個調用鏈,在Cat中,CrossAnalyzer會統計不同服務之間調用的情況,包括服務的訪問量,錯誤量,響應時間,QPS等,這裏的服務主要指的是 RPC 服務,在微服務監控中,這是核心。

在講 CrossAnalyzer 的處理邏輯之前,我們先看下客戶端的埋點的一個模擬情況。

一般情況下不同服務會通過幾個ID進行串聯。這種串聯的模式,基本上都是一樣的。在Cat中,我們需要3個ID:

RootId,用於標識唯一的一個調用鏈
ParentId,父Id是誰?誰在調用我
ChildId,我在調用誰?
那麽我們如何傳遞這些ID?Cat為我們提供了一個內部接口 Cat.Context,但是我們需要自己實現Context,在下面代碼中我們首先在before函數中實現了Context 上下文,然後在rpcClient中開啟消息事務,並調用 Cat.logRemoteCallClient(context) 去填充Context的這3個MessageID。當然,該函數還記錄了一個RemoteCall類型的Event消息。

 隨後我們用rpcService函數中開啟新線程模擬遠程RPC服務,並將context上傳到 RPC 服務器,在真實環境中,Context是需要跨進程網絡傳輸,因此需要實現序列化接口。

在rpcService中,我們會調用 Cat.logRemoteCallServer(context) 將從rpcClient傳過來的Context設置到自己的 Transaction 當中。

隨著業務處理邏輯的結束, rpcServer 和 rpcClient 都會分別將自己的消息樹上傳到CAT服務器分析。

需要註意的是,Service的client和app需要和Call的server以及app對應上,要不然圖表是分析不出東西的!

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {
public Map<String, String> maps = new HashMap<String, String>();

public Cat.Context context;

@Before
public void before() {
    context = new Cat.Context() {
        @Override
        public void addProperty(String key, String value) { maps.put(key, value); }

        @Override
        public String getProperty(String key) { return maps.get(key); }
    };
}

@Test
public void simulateHierarchyTransaction() throws Exception {
        ...
        //RPC調用開始
        rpcClient();
        rpcClient2();
        ...
}

protected void rpcClient() {
    //客戶端埋點,Domain為RpcClient,調用服務端提供的Echo服務
    Transaction parent = Cat.newTransaction("Call", "CallServiceEcho");
    Cat.getManager().getThreadLocalMessageTree().setDomain("RpcClient");

    Cat.logEvent("Call.server","localhost");
    Cat.logEvent("Call.app","RpcService");
    Cat.logEvent("Call.port","8888");
    Cat.logRemoteCallClient(context, "RpcClient");

    //開啟新線程模擬遠程RPC服務,將context上傳到 RPC 服務器
    rpcService(context);

    parent.complete();
}

protected void rpcClient2() {
    ...
    //模擬另外一個RpcClient調用Echo服務
    rpcService(context, "RpcClient2");
    ...
}

protected void rpcService(final Cat.Context context, final String clientDomain) {
    Thread thread = new Thread() {
        @Override
        public void run() {
            //服務器埋點,Domain為 RpcService 提供Echo服務
            Transaction child = Cat.newTransaction("Service", "Echo");
            Cat.getManager().getThreadLocalMessageTree().setDomain("RpcService");

            Cat.logEvent("Service.client", localhost); //填客戶端地址
            Cat.logEvent("Service.app", clientDomain);
            Cat.logRemoteCallServer(context);

            //to do your business

            child.complete();
        }
    };

    thread.start();

    try {
        thread.join();
    } catch (InterruptedException e) {
    }
}

}

接下來我們看看CAT服務器端CrossAnalyzer的邏輯。

    我們依然會為每個周期時間內的每個Domain創建一張報表(Cro***eport),然後不同的IP會分配不同的Local對象統計,每個IP又可能會接收來自不同Remote端的調用。

由於這裏一個完整的調用鏈會涉及多個端的多個消息樹,我們首先會根據Transaction的類型來判斷是RpcService還是RpcClient,如果Type等於PigeonService或Service則該消息來自RpcService,如果Type等於 PigeonCall或Call則來自RpcClient。

先來看看RpcService端消息樹的上報處理邏輯,CAT會調用 parsePigeonServerTransaction 函數去填充 CrossInfo 信息,CrossInfo包含的具體內容如下:

 localAddress : RpcService的IP地址

 remoteAddress : 服務調用者(RpcClient)的IP地址,由type="Service.client" 的Event子消息提供,註意,在處理RpcClient的上報時,我們會根據上報信息中的remoteAddress再次統計該RpcService數據,大家可能會疑惑這裏是不是重復統計,事實上他們所處的視角是不一樣的,前者是站在服務提供者的視角來統計我完成這次服務所耗費的時間、資源等,而後者則是站在RpcClient視角去統計自己從發出請求到得到結果所需的時長、資源等等,比如這中間就包含網絡IO的消耗,這些在後續的報表中會有體現。

   app:客戶端的Domain, 由type="Service.app"的Event子消息提供。

   remoteRole:固定為 Pigeon.Client , 表示遠端角色為 Rpc 客戶端。

   detailType: 固定為 PigeonService , 表示自己角色為 Rpc 服務端。

    最後,我們將用CrossInfo信息來更新報表(Cro***eport),我們首先根據 localAddress 即 RpcService的 IP 找到或創建 Local對象,然後根據 remoteAddress+remoteRole 找到或創建 Remote 對象,然後統計服務的訪問量,錯誤量,處理時間,QPS。

   RpcService提供不只一個服務,不同的服務我們按名字分別統計在不同的Name對象裏,比如上面案例,RpcService提供的是Echo服務。

我們再來看看RpcClient端上報處理邏輯,CAT調用parsePigeonClientTransaction函數填充CrossInfo信息,具體如下:

localAddress : RpcClient的IP地址

remoteAddress :服務提供者(RpcService)的地址,由 type="Call.server" 的Event子消息提供。

app:服務提供者的Domain,由type="Call.app" 的Event子消息提供,在統計完RpcClient端數據之後,會通過該屬性獲取服務提供者的CrossInfo。從RpcClient的視角再次統計RpcService的數據。

port:客戶端端口,由 type="Call.port" 的Event子消息提供。

remoteRole:固定為 Pigeon.Server, 表示遠端角色為服務提供者。

detailType: 固定為 PigeonCall , 表示自己角色為服務調用者。

然後,我們將用CrossInfo信息來更新報表(Cro***eport),也是根據 localAddress 找到Local對象,然後根據 remoteAddress+remoteRole 找到 Remote 對象,進行統計。

接著,我們通過convertCrossInfo函數利用RpcClient的CrossInfo信息去生成服務提供者的CrossInfo信息,這裏實際上是為了從RpcClient的視角去統計服務提供者的報表!

public class CrossAnalyzer extends AbstractMessageAnalyzer<Cro***eport> implements LogEnabled {
private void processTransaction(Cro***eport report, MessageTree tree, Transaction t) {
CrossInfo crossInfo = parseCorssTransaction(t, tree);

    if (crossInfo != null && crossInfo.validate()) {
        updateCro***eport(report, t, crossInfo);

        String targetDomain = crossInfo.getApp();

        if (m_serverConfigManager.isRpcClient(t.getType()) && !DEFAULT.equals(targetDomain)) {
            CrossInfo serverCrossInfo = convertCrossInfo(tree.getDomain(), crossInfo);

            if (serverCrossInfo != null) {
                Cro***eport serverReport = m_reportManager.getHourlyReport(getStartTime(), targetDomain, true);

                updateCro***eport(serverReport, t, serverCrossInfo);
            }
        } else {
            m_errorAppName++;
        }
    }
    ...
}

}
這裏的 serverCrossInfo 被填充了什麽數據:

localAddress : RpcClient 的 remoteAddress。

remoteAddress :RpcClient 的 localAddress + clientPort

app:RpcClient 的 Domain。

remoteRole:固定為 Pigeon.Caller, 表示遠端角色為服務調用者。

detailType: 固定為 PigeonCall

 最後還是用CrossInfo信息來更新報表(Cro***eport)。

最後我們看看我們生成了哪些報表數據,3個報表數據,分別是服務調用方 RpcClient和 RpcClient2,以及服務提供方RpcService。

接下來我們看看服務提供方的remotes數據信息,一共3條數據,第1條記錄是站在RpcService角度統計服務器完成這2次服務所耗費的時間、資源等,後面2條記錄則是站在RpcClient視角去統計自己從發出請求到得到結果所需的時長、資源等等。

第1條記錄 duration 為 0.154ms, 第2,3條記錄 duration 分別為 1072.62ms、1506.38ms, 兩者巨大的時間差一般就是網絡 IO 所需的時間,事實上大多數的服務時間的消耗都是在各種IO上。這類服務統稱為IO密集型。

StorageAnalyzer --數據庫/緩存分析

StorageAnalyzer主要分析一段時間內數據庫、Cache訪問情況:各種操作訪問次數、響應時間、錯誤次數、長時間訪問量等等,當客戶端消息過來,StorageAnalyzer首先會分析事務屬於數據庫操作還是緩存操作,然後進行不同的處理,消息類型如果是SQL則是數據庫操作,如果以Cache.memcached開頭則認為是緩存操作。

我們首先看看數據庫操作的分析過程,下面源碼是客戶端的案例,這是一個獲取cat庫config表全部數據的sql查詢,我們將數據庫操作所有信息都放在一個type="SQL" 的子事務消息中。

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {@Test
br/>@Test
...
Transaction sqlT = cat.newTransaction("SQL", "Select");

    //do your SQL query

    cat.logEvent("SQL.Database", "jdbc:mysql://192.168.20.67:3306/cat");
    cat.logEvent("SQL.Method", "select");
    cat.logEvent("SQL.Statement", "SELECT", SUCCESS, "select * from cat.config");
    sqlT.complete();
    ...
}

}
上面消息上報到服務端之後,分析器將SQL類型子事務取出,調用processSQLTransaction去處理,將結果寫入報表StorageReport

processSQLTransaction 首先通過DatabaseParser提取數據庫的IP和數據庫名稱,該信息由type="SQL.Database"的Event子消息提供,該Event消息上報的是數據庫連接的URL。

接著我們會獲取數據庫操作名,type="SQL.Method" 的Event子消息提供,數據庫操作分4類,分別是select, update, delete, insert,如果不上報,分析器默認客戶端在做select查詢。

最後我們會為周期內的每個數據庫創建一個Storage報表。並將提取信息放入StorageUpdateParam對象,然後將對象交給StorageReportUpdater來更新Storage報表。

public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {@Inject
br/>@Inject

@Inject
private StorageReportUpdater m_updater;

private void processSQLTransaction(MessageTree tree, Transaction t) {
    String databaseName = null;
    String method = "select";
    String ip = null;
    String domain = tree.getDomain();
    List<Message> messages = t.getChildren();

    for (Message message : messages) {
        if (message instanceof Event) {
            String type = message.getType();

            if (type.equals("SQL.Method")) {
                method = message.getName().toLowerCase();
            }
            if (type.equals("SQL.Database")) {
                Database database = m_databaseParser.queryDatabaseName(message.getName());

                if (database != null) {
                    ip = database.getIp();
                    databaseName = database.getName();
                }
            }
        }
    }
    if (databaseName != null && ip != null) {
        String id = querySQLId(databaseName);
        StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
        StorageUpdateParam param = new StorageUpdateParam();

        param.setId(id).setDomain(domain).setIp(ip).setMethod(method).setTransaction(t)
              .setThreshold(LONG_SQL_THRESHOLD);// .setSqlName(sqlName).setSqlStatement(sqlStatement);
        m_updater.updateStorageReport(report, param);
    }
}

}
數據庫與緩存的報表更新邏輯相同,不同ip地址的數據庫/緩存的統計信息在不同Machine裏面,同時也可能會有不同的Domain訪問同一個數據庫/緩存,每個Domain的訪問都會被單獨統計,每個Domain對數據庫/緩存不同的操作會統計在不同Operation裏,除了當前小時周期的統計匯總外,我們還會用Segment記錄每分鐘的匯總數據。訪問時間超過1秒的數據庫操作(緩存是50ms) 會被認為是長時間訪問記錄。

緩存操作

接下來我們看下緩存的案例,獲取memcached中key="uid_1234567"的值,Storage分析器會判斷Type是否以"Cache.memcached"開頭,如果是,則認為這是一個緩存操作,(這裏代碼我認為有些稍稍不合理,如果我用的是Redis緩存,我希望上報的Type="Cache.Redis",所以我這裏講源碼稍稍做了修改,判斷Type如果以"Cache."開頭,就認為是緩存)。

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {@Test
br/>@Test
...
Transaction cacheT = cat.newTransaction("Cache.memcached", "get:uid_1234567");

    //do your cache operation

    cat.logEvent("Cache.memcached.server", "192.168.20.67:6379");
    cacheT.complete();
    ...
}

}
接下來我們看下Storage分析器的處理邏輯,processCacheTransaction負責分析消息, 事務類型"Cache.memcached"的“Cache.”後面部分將會被提取作為緩存類型,分析器會為每個類型的緩存都創建一個報表,事務名稱":"前面部分會被提取作為操作名稱,一般緩存有 add,get,hGet,mGet,remove等操作,緩存地址將由type="Cache.memcached.server"的Event子消息提供,最後我們還是將domain、ip、method、事務、閾值等消息放入StorageUpdateParam交由StorageReportUpdater來更新報表,更新邏輯與數據庫一致。
public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {@Inject
br/>@Inject

private void processCacheTransaction(MessageTree tree, Transaction t) {
    String cachePrefix = "Cache.";
    String ip = "Default";
    String domain = tree.getDomain();
    String cacheType = t.getType().substring(cachePrefix.length());
    String name = t.getName();
    String method = name.substring(name.lastIndexOf(":") + 1);
    List<Message> messages = t.getChildren();

    for (Message message : messages) {
        if (message instanceof Event) {
            String type = message.getType();

            if (type.equals("Cache.memcached.server")) {
                ip = message.getName();
                int index = ip.indexOf(":");

                if (index > -1) {
                    ip = ip.substring(0, index);
                }
            }
        }
    }
    String id = queryCacheId(cacheType);
    StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
    StorageUpdateParam param = new StorageUpdateParam();

    param.setId(id).setDomain(domain).setIp(ip).setMethod(method).setTransaction(t)
          .setThreshold(LONG_CACHE_THRESHOLD);
    m_updater.updateStorageReport(report, param);
}

}
StateAnalyzer

主要是分析CAT服務器自身的異常,他在周期任務運行中,不搜集任何數據,而是在周期結束後,對CAT的整體狀況做一個匯總後生成報表,他的報表結構如下。

HeartbeatAnalyzer

分析器HeartbeatAnalyzer用於上報的心跳數據的分析。我們先看看客戶端的收集邏輯,CAT客戶端在初始化CatClientModule的時候,會開啟一個StatusUpdateTask的線程任務,每隔一分鐘去收集客戶端的心跳狀態,通過 Heartbeat 消息上報到客戶端,心跳數據以xml格式存在於Heartbeat消息中。

public class CatClientModule extends AbstractModule {@Override
br/>@Override
...
if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);

        Threads.forGroup("cat").start(statusUpdateTask);
        ...
    }
}

}

Cat客戶端會為Heartbeat消息創建一個System類型事務消息,然後將 Heartbeat 消息放入該事務,信息的收集靠StatusInfoCollector來完成,StatusInfoCollector將收集的數據寫入StatusInfo對象,然後StatusUpdateTask將StatusInfo轉化成xml之後放到Heartbeat消息數據段上報。

public class StatusUpdateTask implements Task, Initializable {@Override
br/>@Override
//創建類目錄, 上報CAT客戶端啟動信息
...

    while (m_active) {
        long start = MilliSecondTimer.currentTimeMillis();

        if (m_manager.isCatEnabled()) {
            Transaction t = cat.newTransaction("System", "Status");
            Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);
            StatusInfo status = new StatusInfo();

            t.addData("dumpLocked", m_manager.isDumpLocked());
            try {
                StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);

                status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));

                buildExtensionData(status);
                h.addData(status.toString());
                h.setStatus(Message.SUCCESS);
            } catch (Throwable e) {
                h.setStatus(e);
                cat.logError(e);
            } finally {
                h.complete();
            }
            t.setStatus(Message.SUCCESS);
            t.complete();
        }

        //sleep 等待下一次心跳上報
        ...
    }
}

}
我們上報的XML到底包含哪些數據,我們看下StatusInfo的結構,StatusInfo除了包含上報時間戳之外,還有哪些系統狀態信息、附加擴展信息(Extension)會被StatusInfoCollector收集?

1、運行時數據RuntimeInfo:JAVA版本 java.version、用戶名user.name、項目目錄user.dir、java類路徑等等。

2、操作系統信息 OsInfo,同時創建System附加擴展信息。

3、磁盤信息DiskInfo,磁盤的總量、空閑與使用情況,同時創建Disk附加擴展信息

4、內存使用情況MemoryInfo,同時創建垃圾回收擴展信息、JAVA虛擬機堆附加擴展信息

CAT跨語言服務加拿大28平臺搭建鏈監控(七)消息分析器與報表