1. 程式人生 > >深入詳解美團點評CAT跨語言服務監控(八)報表持久化

深入詳解美團點評CAT跨語言服務監控(八)報表持久化

週期結束

    我們從訊息分發章節知道,RealtimeConsumer在初始化的時候,會啟動一個執行緒,每隔1秒鐘就去從判斷是否需要開啟或結束一個週期(Period),如下原始碼,如果 value < 0 的時候,就會啟動一個週期結束執行緒,執行緒會呼叫endPeriod函式,找到需要結束的週期,完成周期的結束以及清理工作,並將週期物件從PeriodManager中移除。

public class PeriodManager implements Task {
    private List<Period> m_periods = new ArrayList<Period>();
    
    @Override
    public void run() {
        while (m_active) {
            try {
                long now = System.currentTimeMillis();
                long value = m_strategy.next(now);

                if (value > 0) {
                    startPeriod(value);
                } else if (value < 0) {
                    // last period is over,make it asynchronous
                    Threads.forGroup("cat").start(new EndTaskThread(-value));
                }
            }
            ...
        }
    }
    
    private void endPeriod(long startTime) {
        int len = m_periods.size();
        for (int i = 0; i < len; i++) {
            Period period = m_periods.get(i);
            
            if (period.isIn(startTime)) {
                period.finish();
                m_periods.remove(i);
                break;
            }
        }
    }
    
    private class EndTaskThread implements Task {
        public void run() { endPeriod(m_startTime); }
    }
}

    我們知道,週期是由許多的週期任務(PeriodTask)構成,所以事實上,一個週期的結束,就是週期內所有周期任務的結束,每個週期任務對應著一個任務佇列和一個訊息分析器(MessageAnalyzer),歸根結底是對MessageAnalyzer的結束。

public class PeriodTask implements Task, LogEnabled {
    private MessageAnalyzer m_analyzer;
    
    public void finish() {
        try {
            m_analyzer.doCheckpoint(true);
            m_analyzer.destroy();
        } catch (Exception e) {
            Cat.logError(e);
        }
    }
}

        doCheckpoint 我們似曾相識,在CatHomeModule初始化的最後,我們會向虛擬機器註冊shutdownhook,保證在虛擬機器關閉時,未被正常結束的週期會被RealtimeConsumer結束,RealtimeConsumer.doCheckpoint與上面正常結束週期所做的工作是一樣的,都是呼叫分析器的doCheckpoint方法,唯一的區別是,分析器doCheckpoint函式的傳入的atEnd引數不同,表示週期是否在到期後正常結束的。

Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
        consumer.doCheckpoint();
    }
});

分析器的結束 -- 報表持久化

    分析器的結束實際上就是報表的持久化的一個過程,分析器處理訊息的過程中,我們一共形成了9個報表,圖1展示了這9個報表的結構:

    我們知道每個週期的訊息分析器(MessageAnalyzer)的結束都是在doCheckpoint來實現的,實際執行中一共有10種訊息分析器參與訊息分析工作,那麼不同類別的分析器,他的結束邏輯都是一樣的嗎?

    除了幾個特殊的分析器(如Metric、Dump)之外,其它訊息分析器結束邏輯都同下面原始碼,呼叫storeHourlyReports方法儲存報表,所有報表都會被存到檔案,atEnd 和 localMode 引數決定我們是否將報表存到資料庫。

    其中State報表有點特殊,因為State是對CAT本身的監控,在週期任務(PeriodTask)執行過程中並沒有收集任何資料,而是在doCheckpoint的時候對CAT訊息監控情況彙總生成的一個報表,所以呼叫storeHourlyReports之前,他需要首先收集State報表資料。

public class XxxAnalyzer extends AbstractMessageAnalyzer<XxxReport> implements LogEnabled {
    private ReportManager<XxxReport> m_reportManager;

    public synchronized void doCheckpoint(boolean atEnd) {
        if (atEnd && !isLocalMode()) {
            m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index);
        } else {
            m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
        }
    }
}

    我們先詳細剖析storeHourlyReports 的過程,然後再看看幾個特殊的分析器的結束邏輯。storeHourlyReports 首先將該分析器生成的所有報表都取出,然後我們會校驗報表的domain名稱是否合法,不合法的報表將被移除,在序列化之前,我們會呼叫ReportDelegate.beforeSave(...)方法做一些預處理的工作。不同種類的報表,預處理所做的工作是不同的,後續我們分別講解,做完預處理的工作之後,我們就正式持久化了,支援檔案和資料庫兩種持久化方式,我們會根據傳入的序列化策略(StoragePolicy) 來選擇需要進行哪種序列化,一般來說,如果是正常的週期結束,資料會持久化到檔案和資料庫,如果是JVM Shutdown導致的結束,只持久化到檔案,兩種持久化的細節後續我們也會分別詳細講解。

public class DefaultReportManager<T> extends ContainerHolder implements ReportManager<T>, Initializable, LogEnabled {
    @Override
    public void storeHourlyReports(long startTime, StoragePolicy policy, int index) {
        Map<String, T> reports = m_reports.get(startTime);
        ReportBucket bucket = null;

        try {
            if (reports != null) {
                //校驗、移除不合法Domain名字的報表
                ...

                m_reportDelegate.beforeSave(reports);

                if (policy.forFile()) {
                    bucket = m_bucketManager.getReportBucket(startTime, m_name, index);

                    try {
                        storeFile(reports, bucket);
                    } finally {
                        m_bucketManager.closeBucket(bucket);
                    }
                }

                if (policy.forDatabase()) {
                    storeDatabase(startTime, reports);
                }
            }
        } catch (Throwable e) {
            //報告異常
            ...
        } finally {
            cleanup(startTime);
            t.complete();

            if (bucket != null) {
                m_bucketManager.closeBucket(bucket);
            }
        }
    }
}

    報表預處理  

    在繼續講解序列化之前,我們來說一說報表的預處理工作(beforeSave),各報表的預處理邏輯,有相同、也有異同的地方,Top、State報表預處理不做任何事情,其它報表都有處理邏輯,以Transaction報表的預處理工作為例,分為兩個部分。

      第一部分是將所有Transaction報表的domain都收集起來,寫入報表的成員變數 m_domainNames,這樣每個報表都會知道一共都有哪些domain參與監控, 從圖1的報表結構來看,Heartbeat、Event、Problem、Cross報表也包含m_domainNames欄位,事實上這些報表的預處理也確實會收集所有domain。其中Heartbeat和Cross的預處理僅僅包含這部分邏輯。

      第二部分是聚合報表,所謂聚合,就是建立一個命名為ALL的聚合報表,將同一個Domain下不同IP地址的資料彙總起來,寫到報表ALL的同一個Machine物件內,Machine的ip不再是地址,而是Domain名,所有Domain資料都彙總到ALL報表的不同Machine下。因為現在服務端幾乎都是採用叢集,有可能10幾臺機器上執行著同一個專案,這時我們可以通過聚合報表去站在專案角度去看待統計結果,報表的聚合大量採用了訪問者模式。

    也不是所有型別的事務都會參與聚合,配置 all-report-config 會指定哪些事務會參與聚合,如下預設type="URL"的事務,這是因為通常URL是代表一個專案的介面對外服務的最完整鏈路耗時,從以下配置可以看到,除了Transaction訊息之外,Event訊息也會參與聚合,邏輯與Transaction大同小異,在此不再贅述。

<all-config>
    <report id="transaction">
        <type id="URL">
            <name id="*"></name>
        </type>
    </report>
    <report id="event">
        <type id="URL">
            <name id="*"></name>
        </type>
        <type id="SQL">
            <name id="*"></name>
        </type>
    </report>
</all-config>

    Problem有一個獨有的預處理流程,就是通過ProblemReportFilter物件將長時URL訪問(long-url)的記錄總數控制在100條之內,防止長時訪問數量過多,導致報表資料過大。

   Storage報表的預處理只有一個 updateStorageIds 呼叫,他的功能和Transaction預處理第一部分類似,也是讓每個Storage報表都知道目前有哪些資料庫/快取在被訪問、監控,我們知道Storage報表的ID是由 資料庫/快取名 + 型別(SQL/Cache) 組成,updateStorageIds會將所有資料庫/快取名收集,然後寫入StorageReport的成員變數m_ids。

    下面將所有報表持久化的預處理做了一個彙總,放在一個函式內,呈現在以下虛擬碼中:

public class XxxDelegate implements ReportDelegate<XxxReport> {
    @Override  
    public void beforeSave(Map<String, XxxReport> reports) {
        //Top、State不幹任何事直接返回
        return;
        
        //storage 僅有下面 updateStorageIds 步驟,完成後返回。
        for (StorageReport report : reports.values()) {  
            m_reportUpdater.updateStorageIds(report.getId(), reports.keySet(), report);  
        }
        
        //Problem、Transaction、Event、Heartbeat、Cross都有的步驟
        for (XxxReport report : reports.values()) {
            Set<String> domainNames = report.getDomainNames();  
  
            domainNames.clear();  
            domainNames.addAll(reports.keySet());
        }
        
        //報表聚合,Transaction、Event 獨有
        if (reports.size() > 0) {  
            TransactionReport all = createAggregatedReport(reports);  
  
            reports.put(all.getDomain(), all);  
        }
        
        //Problem獨有,控制long-url訊息量。
        ProblemReportFilter problemReportURLFilter = new ProblemReportFilter();  
  
        for (Entry<String, ProblemReport> entry : reports.entrySet()) {  
            ProblemReport report = entry.getValue();  
  
            problemReportURLFilter.visitProblemReport(report);  
        }
    }
}

報表的檔案儲存 -- 重入鎖

    在做完預處理之後,所有報表都將被持久化到檔案,在DefaultReportManager呼叫storeFile儲存檔案之前,我們先呼叫 m_bucketManager.getReportBucket(...) 來建立並初始化ReportBucket,檔案的讀寫相關操作都封裝於ReportBucket裡面,檔案的讀寫同步採用重入鎖(ReentrantLock)保證讀寫安全。

public class LocalReportBucket implements ReportBucket, LogEnabled {
    @Override
    public void initialize(String name, Date timestamp, int index) throws IOException {
        m_baseDir = m_configManager.getHdfsLocalBaseDir("report");
        m_writeLock = new ReentrantLock();
        m_readLock = new ReentrantLock();

        String logicalPath = m_pathBuilder.getReportPath(name, timestamp, index);

        File dataFile = new File(m_baseDir, logicalPath);
        File indexFile = new File(m_baseDir, logicalPath + ".idx");

        if (indexFile.exists()) {
            loadIndexes(indexFile);
        }

        final File dir = dataFile.getParentFile();

        if (!dir.exists() && !dir.mkdirs()) {
            throw new IOException(String.format("Fail to create directory(%s)!", dir));
        }

        m_logicalPath = logicalPath;
        m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192);
        m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192);
        m_writeDataFileLength = dataFile.length();
        m_readDataFile = new RandomAccessFile(dataFile, "r");
    }
}

      報表儲存基礎路徑(m_baseDir)在配置 server.xml 中指定,每個分析器例項都會最終生成若干個報表,我們會為這個分析器產生的這些報表生成一個數據檔案和報表索引檔案,存於邏輯路徑(logicalPath )下,邏輯路徑以日期/小時/index來劃分,例如:20180604/15/1/report-cross , 20180604為日期, 15為下午3點的週期,1是分析器例項index,之前說過有些分析器處理過程複雜,可能會有多個例項,例如Cross、Event、Problem、Transaction報表, 資料檔名取 m_baseDir + logicalPath, 索引檔案是在資料檔名加上 ".idx" 字尾,如下:

    資料檔案儲存該分析器下所有轉化為xml格式的報表資料,索引檔案是對資料檔案內報表的一個位置索引,比如 report-problem.idx索引檔案內容如下,每一行都記錄一個報表名稱和報表在資料檔案的起始位置。

RpcService	0
Cat	        1388
RpcClient2	4600
RpcClient	5807

    現在再來看看storeFile的邏輯就非常簡單了, 獲取domain,將報表物件轉化為xml資料,最後呼叫storeById將xml寫入資料檔案和索引檔案。

public class DefaultReportManager<T> extends ContainerHolder implements ReportManager<T>, Initializable, LogEnabled {
    private void storeFile(Map<String, T> reports, ReportBucket bucket) {
        for (T report : reports.values()) {
            try {
                String domain = m_reportDelegate.getDomain(report);
                String xml = m_reportDelegate.buildXml(report);

                bucket.storeById(domain, xml);
            } catch (Exception e) {
                Cat.logError(e);
            }
        }
    }
}

報表的資料庫儲存

    如果是正常的週期結束之後,發起的持久化,而不是由於虛擬機器關閉引起的,資料除了被持久化到檔案之外,還會被持久化到資料庫。

    所有的資料庫持久化邏輯都在 storeDatabase(...) 方法中完成,每個分析器中的每個報表的描述資訊,都會被插入資料庫report表中,在程式中,HourlyReport實體與該表對應,如圖3, 報表具體內容會通過m_reportDelegate.buildBinary(report)轉化成二進位制資料,然後插入資料庫report_content 表,在程式中,HourlyReportContent實體與該表對應,如圖4,report_content 表的主鍵來自report的主鍵。

public class DefaultReportManager<T> extends ContainerHolder implements ReportManager<T>, Initializable, LogEnabled {
    @Inject
    private HourlyReportDao m_reportDao;

    @Inject
    private HourlyReportContentDao m_reportContentDao;
    
    private void storeDatabase(long startTime, Map<String, T> reports) {
        Date period = new Date(startTime);
        String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();

        for (T report : reports.values()) {
            try {
                String domain = m_reportDelegate.getDomain(report);
                HourlyReport r = m_reportDao.createLocal();

                r.setName(m_name);
                r.setDomain(domain);
                r.setPeriod(period);
                r.setIp(ip);
                r.setType(1);

                m_reportDao.insert(r);

                int id = r.getId();
                byte[] binaryContent = m_reportDelegate.buildBinary(report);
                HourlyReportContent content = m_reportContentDao.createLocal();

                content.setReportId(id);
                content.setContent(binaryContent);
                m_reportContentDao.insert(content);
                m_reportDelegate.createHourlyTask(report);
            } catch (Throwable e) {
                Cat.getProducer().logError(e);
            }
        }
    }
}
CREATE TABLE `report` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `type` tinyint(4) NOT NULL COMMENT '報表型別, 1/xml, 9/binary 預設1',
  `name` varchar(20) NOT NULL COMMENT '報表名稱',
  `ip` varchar(50) DEFAULT NULL COMMENT '報表來自於哪臺機器',
  `domain` varchar(50) NOT NULL COMMENT '報表專案',
  `period` datetime NOT NULL COMMENT '報表時間段',
  `creation_date` datetime NOT NULL COMMENT '報表建立時間',
  PRIMARY KEY (`id`),
  KEY `IX_Domain_Name_Period` (`domain`,`name`,`period`),
  KEY `IX_Name_Period` (`name`,`period`),
  KEY `IX_Period` (`period`)
) ENGINE=InnoDB AUTO_INCREMENT=18497 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPRESSED COMMENT='用於存放實時報表資訊,處理之後的結果';
CREATE TABLE `report_content` (
  `report_id` int(11) NOT NULL COMMENT '報表ID',
  `content` longblob NOT NULL COMMENT '二進位制報表內容',
  `creation_date` datetime NOT NULL COMMENT '建立時間',
  PRIMARY KEY (`report_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPRESSED COMMENT='小時報表二進位制內容';

    定時任務生產者

        資料庫的持久化完成標誌著一個完整週期的結束,CAT實時處理報表都是產生小時級別統計,小時級報表中會帶有最低分鐘級別粒度的統計,在資料庫持久化完成之後,我們會呼叫 m_reportDelegate.createHourlyTask(report) 建立一些定時任務,去建立小時模式、天模式、周模式、月模式等等粒度更粗的檢視,為什麼這裡還會建立小時任務,因為在叢集情況下,同一週期下的多張報表可能分散在幾臺CAT伺服器上,這時我們建立小時定時任務去合併報表形成小時檢視。

        但是,針對不同報表、不同domain,建立的定時任務也不同,有些可能小時模式、天模式、周模式、月模式檢視定時任務都有,有些也可能只建立天任務,在解釋完以下幾個domain的描述之後我們看下定時任務的列表:

  • crashLogDomain:客戶端崩潰日誌埋點,預設有:AndroidCrashLog/iOSCrashLog/MerchantAndroidCrashLog/MerchantIOSCrashLog/ApolloAndroidCrashLog/ApolloIOSCrashLog/TVAndroidCrashLog
  • serverFilterDomain:配置serverFilter中過濾的domain,預設有:空/PhoenixAgent/cat-agent/All/FrontEnd/paas/SMS-RECEIVER
  • validateDomain:非ServerFilterLog 和 非CrashLog
  • ALL:聚合報表,Transaction和Event特有。
  • *:所有domain
ReportType 任務名 Domain/Task Hourly Daily Weekly Month
Problem Problem validateDomain
crashLogDomain ×
其它 × × × ×
Transaction Transaction ALL ×
validateDomain
其它 × × × ×
Event Event ALL ×
validateDomain
其它 × × × ×
Top Top * × × × ×
Heartbeat Heartbeat validateDomain × × ×
其它 × × × ×
Cross Cross validateDomain ×
其它 × × × ×
Storage Storage validateDomain ×

State比較特殊,他會建立比較多的定時任務,我們單獨列在下面:

任務名 Hourly Daily Weekly Month
State
appDatabasePruner × × ×
cmdb × × ×
NetTopology × × ×
bug
databaseCapacity
jar × × ×
heavy
utilization
service
dailyNotify × × ×
router × × ×
cachedReport × × ×
system × × ×
app × × ×

    定時任務在 TaskManager.createTask(...) 中生產,這裡就是將需要執行的定時任務插入資料庫task表中,以供消費者(TaskConsumer)到時候去表裡取定時任務然後執行,表字段如下:

TABLE `task` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `producer` varchar(20) NOT NULL COMMENT '任務建立者ip',
  `consumer` varchar(20) DEFAULT NULL COMMENT '任務執行者ip',
  `failure_count` tinyint(4) NOT NULL COMMENT '任務失敗次數',
  `report_name` varchar(20) NOT NULL COMMENT '報表名稱, transaction, problem...',
  `report_domain` varchar(50) NOT NULL COMMENT '報表處理的Domain資訊',
  `report_period` datetime NOT NULL COMMENT '報表時間',
  `status` tinyint(4) NOT NULL COMMENT '執行狀態: 1/todo, 2/doing, 3/done 4/failed',
  `task_type` tinyint(4) NOT NULL DEFAULT '1' COMMENT '0表示小時任務,1表示天任務',
  `creation_date` datetime NOT NULL COMMENT '任務建立時間',
  `start_date` datetime DEFAULT NULL COMMENT '開始時間, 這次執行開始時間',
  `end_date` datetime DEFAULT NULL COMMENT '結束時間, 這次執行結束時間',
  PRIMARY KEY (`id`),
  UNIQUE KEY `task_period_domain_name_type` (`report_period`,`report_domain`,`report_name`,`task_type`)
) ENGINE=InnoDB AUTO_INCREMENT=42594 DEFAULT CHARSET=utf8 COMMENT='後臺任務';

    其中producer就是生產定時任務的機器IP,stauts是執行狀態,這裡TaskManager作為生產者插入的記錄狀態都是todo=1,task_type指的任務類別,包含下面4種類別:

  • 0-小時任務,有小時任務需求的在生產者中都會被建立,合併CAT伺服器叢集的多臺機器的小時報表。
  • 1-天任務, 對於有天任務需求的,會在當天建立前一天的天檢視,
  • 2-周任務,有周任務需求的會建立上週六到這週五的周檢視,
  • 3-月任務,有月任務需求的會在每月1號建立上個月的月檢視。

    當然由於task表的report_period, report_domain, report_name, task_type 是聯合唯一鍵,所以,同一個型別、週期、domain、名稱的定時任務,只會插入一條。

public class TaskManager {
    @Inject
    private TaskDao m_taskDao;

    private static final int STATUS_TODO = 1;

    public static final int REPORT_HOUR = 0;
    public static final int REPORT_DAILY = 1;
    public static final int REPORT_WEEK = 2;
    public static final int REPORT_MONTH = 3;
    
    public boolean createTask(Date period, String domain, String name, TaskCreationPolicy prolicy) {
        try {
            if (prolicy.shouldCreateHourlyTask()) {
                createHourlyTask(period, domain, name);
            }

            Calendar cal = Calendar.getInstance();
            cal.setTime(period);

            int hour = cal.get(Calendar.HOUR_OF_DAY);
            cal.add(Calendar.HOUR_OF_DAY, -hour);
            Date currentDay = cal.getTime();

            if (prolicy.shouldCreateDailyTask()) {
                createDailyTask(new Date(currentDay.getTime() - ONE_DAY), domain, name);
            }

            if (prolicy.shouldCreateWeeklyTask()) {
                int dayOfWeek = cal.get(Calendar.DAY_OF_WEEK);
                if (dayOfWeek == 7) {
                    createWeeklyTask(new Date(currentDay.getTime() - 7 * ONE_DAY), domain, name);
                }
            }
            if (prolicy.shouldCreateMonthTask()) {
                int dayOfMonth = cal.get(Calendar.DAY_OF_MONTH);

                if (dayOfMonth == 1) {
                    cal.add(Calendar.MONTH, -1);
                    createMonthlyTask(cal.getTime(), domain, name);
                }
            }
            return true;
        } catch (DalException e) {
            Cat.logError(e);
            return false;
        }
    }
}

定時任務消費者

    我們再次回到CatHomeModule的初始化函式中,有如下一段程式碼,它會讀取server.xml中的配置 job-machine="true",用於指定是否開啟定時任務消費者執行緒。

if (serverConfigManager.isJobMachine()) {
    DefaultTaskConsumer taskConsumer = ctx.lookup(DefaultTaskConsumer.class);

    Threads.forGroup("cat").start(taskConsumer);
}

    執行緒會每隔1分鐘輪訓從資料庫取狀態為todo的定時任務,以及consumer為本機ip,然後狀態為doing的定時任務,即上次處理失敗,需要重試的,將任務狀態都修改為 doing,然後呼叫 processTask處理定時任務,如果處理失敗則間隔一段時間後重試,注意,這裡的間隔會阻塞任務執行緒,超過最大重試次數,狀態標為failed,成功則標為done。

public abstract class TaskConsumer implements org.unidal.helper.Threads.Task {
    @Override
    public void run() {
        String localIp = getLoaclIp();
        while (running) {
            if (checkTime()) {
                Task task = findDoingTask(localIp);
                if (task == null) {
                    task = findTodoTask();
                }
                boolean again = false;
                if (task != null) {
                    task.setConsumer(localIp);
                    if (task.getStatus() == TaskConsumer.STATUS_DOING || updateTodoToDoing(task)) {
                        int retryTimes = 0;
                        while (!processTask(task)) {
                            retryTimes++;
                            if (retryTimes < MAX_TODO_RETRY_TIMES) {
                                taskRetryDuration();
                            } else {
                                updateDoingToFailure(task);
                                again = true;
                                break;
                            }
                        }
                        if (!again) {
                            updateDoingToDone(task);
                        }
                    }
                } else {
                    taskNotFoundDuration(); //sleep 2 min
                }
            } else {
                Thread.sleep(60 * 1000);
            }
        }
        this.stopped = true;
    }
}

    processTask(...)處理的核心是將Task交給ReportFacade去構建檢視,我們可以認為ReportFacade是一個檢視構建工廠,工廠在初始化的時候,從plexus配置中讀取所有的任務構建器(TaskBuilder),並將他們裝入ReportFacade的成員變數m_reportBuilders中,TaskBuilder是一個介面,有4個方法,buildHourlyTask、buildDailyTask、buildWeeklyTask、buildMonthlyTask,我們從圖3可以看到該介面一共有24個實現,當有定時任務交付時,ReportFacade會根據任務名找到具體的任務構建類,然後根據任務是小時、天、周還是月分別呼叫以上4個方法。