1. 程式人生 > >深入詳解美團點評CAT跨語言服務監控(二) CAT服務端初始化

深入詳解美團點評CAT跨語言服務監控(二) CAT服務端初始化

Cat模組

Cat-client : cat客戶端,編譯後生成 cat-client-2.0.0.jar ,使用者可以通過它來向cat-home上報統一格式的日誌資訊,可以整合到 mybatis、spring、微服務 dubbo 的監控等等流行框架。 

Cat-consumer: 用於實時分析從客戶端提供的資料。在實際開發和部署中,Cat-consumer和Cat-home是部署在一個JVM內部,每個CAT服務端都可以作為consumer也可以作為home,這樣既能減少整個層級結構,也可以增加系統穩定性。

Cat-core:Cat核心模組

Cat-hadoop : 大資料統計依賴模組。

cat-home:大眾點評CAT伺服器端主程式,編譯安裝之後生成 cat-alpha-2.0.0.war 包部署於servlet容器中,我們用的是Tomcat,war包依賴cat-client.jar、cat-consumer.jar, cat-core.jar, cat-hadoop.jar 包,通過web.xml 配置,看到Cat會啟動 cat-servlet 和 mvc-servlet , mvc-servlet 是一個類似 spring MVC 的框架,用於處理使用者WEB管理平臺請求。cat-servlet是CAT服務端監聽入口,CAT會在這裡開啟監聽埠,接收處理客戶端的日誌記錄請求,本章主要介紹cat-servlet。

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
	version="2.5">
...
	<servlet>
		<servlet-name>cat-servlet</servlet-name>
		<servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class>
		<load-on-startup>1</load-on-startup>
	</servlet>
	<servlet>
		<servlet-name>mvc-servlet</servlet-name>
		<servlet-class>org.unidal.web.MVC</servlet-class>
		<init-param>
			<param-name>cat-client-xml</param-name>
			<param-value>client.xml</param-value>
		</init-param>
		<init-param>
			<param-name>init-modules</param-name>
			<param-value>false</param-value>
		</init-param>
		<load-on-startup>2</load-on-startup>
	</servlet>
....

Cat-servlet初始化

                                                                        圖1 - 容器初始化類圖

    CatServlet 首先會呼叫父類 AbstractContainerServlet 的init方法做初始化工作, 可以認為這是CatServlet的入口,他主要做了3件事情,首先呼叫基類HttpServlet的init方法對Servlet進行初始化,然後初始化Plexus容器,最後呼叫子類initComponents初始化Module模組。

public abstract class AbstractContainerServlet extends HttpServlet {
    public void init(ServletConfig config) throws ServletException {
        super.init(config);

        try {
            if(this.m_container == null) {
                this.m_container = ContainerLoader.getDefaultContainer();
            }

            this.m_logger = this.m_container.getLogger();
            this.initComponents(config);
        } catch (Exception var3) {
            ...
        }
    }
}

 plexus - IOC容器

 上面講到init(...)方法在初始化完Servlet之後呼叫 ContainerLoader.getDefaultContainer() 初始化plexus容器。

    注:這裡可能大家不太瞭解plexus,它相當於Spring的IoC容器,但是它和Spring框架不同,它並不是一個完整的,擁有各種元件的大型框架,僅僅是一個純粹的IoC容器,它的開發者與Maven的開發者是同一撥人,最初開發Maven的時候,Spring並不成熟,所以Maven的開發者決定使用自己維護的IoC容器Plexus,它與Spring在語法和描述方式稍有不同。在Plexus中,有ROLE的概念,相當於Spring中的一個Bean。支援元件生命週期管理。

   非JAVA開發者不懂IOC容器?簡單來說,IOC容器就相當於一個物件裝載器,物件不是由程式設計師new建立,而是框架在初始化的時候從配置檔案中讀取需要例項化的類資訊,將資訊裝入一個物件裝載器,然後在需要的時候,從物件裝載器中找是否存在該類的資訊,存在則返回類的物件。

        plexus容器是如何工作的呢?就上面的類圖來說,

    a. AbstractContainerServlet 通過容器工廠ContainerLoader 的 getDefaultContainer方法,該方法會建立 MyPlexusContainer 容器,MyPlexusContainer是介面 PlexusContainer 的實現,MyPlexusContainer在建構函式中會建立元件管理器(ComponentManager),可以認為每個類都是容器中的一個元件,ComponentManager就是用來管理這些元件的,包括他的生命週期,元件在Plexus容器配置檔案中配置。

     b.元件管理器(ComponentManager)會建立元件模型管理器(ComponentModelManager)以及元件生命週期管理器(ComponentLifecycle),ComponentModelManager用於儲存Plexus容器配置檔案中的所有component元件資訊,它的loadComponentsFromClasspath()方法會掃描各個jar包中存在的plexus容器配置檔案,如圖2,將xml內容解析之後放入PlexusModel 列表中。

public class ComponentManager {
    private Map<String, ComponentBox<?>> m_components = new HashMap();
    private PlexusContainer m_container;
    private ComponentLifecycle m_lifecycle;
    private ComponentModelManager m_modelManager;
    private LoggerManager m_loggerManager;
    
    public ComponentManager(PlexusContainer container, InputStream in) throws Exception {
        this.m_container = container;
        this.m_modelManager = new ComponentModelManager();
        this.m_lifecycle = new ComponentLifecycle(this);
        if(in != null) {
            this.m_modelManager.loadComponents(in);
        }

        this.m_modelManager.loadComponentsFromClasspath();
        this.m_loggerManager = (LoggerManager)this.lookup(new ComponentKey(LoggerManager.class, (String)null));
        this.register(new ComponentKey(PlexusContainer.class, (String)null), container);
        this.register(new ComponentKey(Logger.class, (String)null), this.m_loggerManager.getLoggerForComponent(""));
    }
}

    我們也可以將我們自己寫的類交給容器管理,只需要將類配置到容器配置檔案中,例如:cat-consumer/src/main/resources/META-INF/plexus/components-cat-consumer.xml, 只要是存在於 META-INF/plexus/ 目錄下,並且檔名以"components-" 開頭的 ".xml" 檔案,都會被 ComponentModelManager 認為是容器配置檔案。

            圖2 - plexus IOC容器類配置檔案

       c.然後就可以通過lookup方法找到類,並在首次使用的時候例項化,並且xml配置中的該類依賴的其它類也會被一併例項化,另外如果類方法實現了 Initializable 介面,建立物件後會執行類的 initialize() 方法做一些初始化的工作。

if(component instanceof Initializable) {
    try {
        ((Initializable)component).initialize();
    } catch (Throwable var5) {
        ComponentModel model = ctx.getComponentModel();
        throw new ComponentLookupException("Error when initializing component!", model.getRole(), model.getHint(), var5);
    }
}

模組的載入 - 模型模式

init(...)函式最後會呼叫CatServlet的initComponents()方法初始化Module模組。

                圖3 - 模組初始化類圖   

initComponents()方法首先建立一個模組上下文 DefaultModuleContext物件,該物件擁有plexus容器的指標,以及server.xml、client.xml配置檔案資訊 服務端配置server.xml中有訊息儲存路徑、HDFS上傳等一系列配置,由於cat-home預設是服務端也是客戶端,也就是說cat-home自身也會被監控,所以我們在這裡看到有client.xml配置,配置檔案所在目錄由環境變數CAT_HOME指定,如果未指定,預設是/data/appdatas/cat。

  隨後CatServlet建立一個模組初始器 DefaultModuleInitializer,並呼叫他的execute(ctx)方法建立並初始化模組。

  注:DefaultModuleInitializer有一個模組管理器DefaultModelManager m_manager, 讀者可能沒有看見m_manager的建立過程,實際上,物件在components-foundation-service.xml配置檔案中配置的,然後在plexus容器例項化類物件的過程中建立的,後面還有很多物件的屬性也是通過plexus容器注入的。比如DefaultModuleManager的m_topLevelModules屬性通過以下配置注入。

<component>
    <role>org.unidal.initialization.ModuleManager</role>
    <implementation>org.unidal.initialization.DefaultModuleManager</implementation>
    <configuration>
        <topLevelModules>cat-home</topLevelModules>
    </configuration>
</component>

    上面XML配置顯示m_topLevelModules 指定為cat-home,這樣DefaultModuleInitializer通過DefaultModelManagergetTopLevelModules()方法獲取的就是CatHomeModule模組物件,可以認為cat-home是一個頂層模組,所有Module都包含getDependencies方法,該方法會找到當前模組所依賴的其他模組,並例項化模組,比如下面cat-home就依賴cat-consumer模組,

public class CatHomeModule extends AbstractModule {
    @Override
    public Module[] getDependencies(ModuleContext ctx) {
        return ctx.getModules(CatConsumerModule.ID);
    }
}

從cat-consumer的getDependencies看出他依賴cat-core模組,cat-core模組又依賴cat-client模組,這樣子我們就從頂層模組引出了所有依賴的其它模組,在例項化模組的同時呼叫模組的setup方法安裝模組。在所有模組安裝完成之後,依次呼叫模組的execute方法完成初始化,但是初始化順序則是按照安裝順序反著來的,cat-client -> cat-core -> cat-consumer -> cat-home ,Modules之間的設計使用了典型的模板模式。

cat-home的setup

在上一章講到模組初始化的時候, 講到setup安裝cat-home模組,對於客戶端的請求的監聽處理,就是在這裡完成的。

@Named(type = Module.class, value = CatHomeModule.ID)
public class CatHomeModule extends AbstractModule {
    @Override
    protected void setup(ModuleContext ctx) throws Exception {
        if (!isInitialized()) {
            File serverConfigFile = ctx.getAttribute("cat-server-config-file");
            ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
            final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);

            serverConfigManager.initialize(serverConfigFile);
            messageReceiver.init();

            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    messageReceiver.destory();
                }
            });
        }
    }
}

1、讀取 server.xml 配置,裝進配置管理器(ServerConfigManager)。

2、建立訊息接收器 final TcpSocketReceiver messageReceiver;

3、messageReceiver.init() 初始化服務,採用的經典的 netty reactor 模型。

4、註冊一個JVM關閉的鉤子,在程序掛掉的時候,執行一些清理現場的程式碼。

TcpSocketReceiver--- netty reactor 模式的應用

我們來看看CatHomeModule對TcpSocketReceiver的初始化做了什麼,如下原始碼:

public final class TcpSocketReceiver implements LogEnabled {
    public void init() {
        try {
            startServer(m_port);
        } catch (Throwable e) {
            m_logger.error(e.getMessage(), e);
        }
    }
    
    public synchronized void startServer(int port) throws InterruptedException {
        boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
        int threads = 24;
        ServerBootstrap bootstrap = new ServerBootstrap();

        m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        bootstrap.group(m_bossGroup, m_workerGroup);
        bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();

                pipeline.addLast("decode", new MessageDecoder());
            }
        });

        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        try {
            m_future = bootstrap.bind(port).sync();
            m_logger.info("start netty server!");
        } catch (Exception e) {
            m_logger.error("Started Netty Server Failed:" + port, e);
        }
    }
}

1、建立EventLoopGroup物件, EventLoopGroup是用來處理IO操作的多執行緒事件迴圈器,m_bossGroup作為一個acceptor負責接收來自客戶端的請求,然後分發給m_workerGroup用來所有的事件event和channel的IO。

2、建立ServerBootstrap物件,ServerBootstrap 是一個啟動Epoll(非Linux為NIO)服務的輔助啟動類,他將設定bossGroup和workerGroup兩個多執行緒時間迴圈器。

3、接下來的channel()方法設定了ServerBootstrap 的 ChannelFactory,這裡傳入的引數是EpollServerSocketChannel.class (非Linux為NioServerSocketChannel.class),也就是說這個ChannelFactory建立的就是EpollServerSocketChannel/NioServerSocketChannel的例項。

    Channel是Netty的核心概念之一,它是Netty網路通訊的主體,他從EventLoopGroup獲得一個EventLoop,並註冊到該EventLoop,channel生命週期內都和該EventLoop在一起,由它負責對網路通訊連線的開啟、關閉、連線和讀寫操作。如果是對於讀寫事件,執行執行緒排程pipeline來處理使用者業務邏輯。

4、接下來bootstrap.childHandler的目的是新增一個handler,用來監聽已經連線的客戶端的Channel的動作和狀態,傳入的 ChannelInitializer重寫了initChannel方法,這個方法在Channel被註冊到EventLoop的時候會被呼叫。

5、initChannel會建立ChannelPipeline物件,並呼叫addLast新增ChannelHandler。有網路請求時,ChannelPipeline會呼叫ChannelHandler來處理,有ChannelInboundHandler和ChannelOutboundHandler兩種,ChannelPipeline會從頭到尾順序呼叫ChannelInboundHandler處理網路請求內容,從尾到頭呼叫ChannelOutboundHandler處理網路請求內容。這也是Netty用來靈活處理網路請求的機制之一,因為使用的時候可以用多個decoder和encoder進行組合,從而適應不同的網路協議。而且這種類似分層的方式可以讓每一個Handler專注於處理自己的任務而不用管上下游,這也是pipeline機制的特點。這跟TCP/IP協議中的五層和七層的分層機制有異曲同工之妙。

      在這裡,ChannelPipeline新增的 ChannelHandler 是MessageDecoder ,MessageDecoder的祖先類實現了ChannelHandler介面,他本質上還是一個Handler,是網路IO事件具體處理類,當客戶端將日誌資料上傳到伺服器之後,會交給MessageDecoder 解碼資料,然後進行後續處理。

6、呼叫 childOption 設定 channel 的引數。

7、最後呼叫bind()方法啟動服務。 

關於netty ,我就講到這裡,網上關於netty框架的文章非常多,大家可以自行去查。

訊息的解碼

    上一章我們講到Netty將接收到的訊息交給 MessageDecoder 去做解碼,解碼是交由PlainTextMessageCodec物件將接收到的位元組碼反序列化為MessageTree物件(所有的訊息都是由訊息樹來組織),具體的解碼邏輯在這裡暫不做詳細闡述,在第三章我們會闡述編碼過程,解碼只是編碼的一個逆過程。

解碼之後呼叫 DefaultMessageHandler 的 handle方法對訊息進行處理,handle方法就幹了一件事情,就是呼叫 m_consumer.consume(tree) 方法去消費訊息樹,在消費模組,CAT實現了佇列化,非同步化,在訊息消費章節會詳細闡述。

    當然netty handler也是支援非同步處理的,我們也可以將 DefaultMessageHandler 像 MessageDecoder那樣向netty註冊handler, 再由netty來做執行緒池分發。

public class MessageDecoder extends ByteToMessageDecoder {
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        if (buffer.readableBytes() < 4) {
            return;
        }
        buffer.markReaderIndex();
        int length = buffer.readInt();
        
        ...
        
        ByteBuf readBytes = buffer.readBytes(length + 4);
        
        ...
        
        DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);
        
        readBytes.resetReaderIndex();
        tree.setBuffer(readBytes);
        m_handler.handle(tree);
        m_processCount++;
        
        ...
    }
}