1. 程式人生 > >Openfire分析之三:ConnectionManager 連接管理(1)

Openfire分析之三:ConnectionManager 連接管理(1)

max .com exc tco active eat cond hlist 觀察

Openfire是怎麽實現連接請求的?

  XMPPServer.start()方法,完成Openfire的啟動。但是,XMPPServer.start()方法中,並沒有提及如何監聽端口,那麽Openfire是如何接收客戶端的請求?

  因為Openfire的核心功能,是通過Module來管理的,那麽對應的連接管理應該就在Module中。

  查看在XMPPServer.loadModules()方法中,有如下代碼:

//Load this module always last since we don‘t want to start listening for clients
// before the rest of the modules have been started
loadModule(ConnectionManagerImpl.class.getName());

  這個ConnectionManagerImpl類,就是連接的管理模塊,而且註釋中說到,它還是在其他模塊啟動後之後再啟動。

  那麽下面,我們就來重點研究這個module,看看ConnectionManagerImpl如何實現連接監聽,並處理消息響應的。

連接請求監聽

  請求一般都與端口相對應,當客戶端發出連接請求時,服務器要能夠做出響應,首先需要對該請求的端口做監聽。

  ConnectionManagerImpl的繼承關系中,它實現了ConnectionManager接口,在ConnectionManager中,除了定義端口的設定、監聽開關等方法外,還定義一系列默認監聽的端口號:

final int DEFAULT_PORT = 5222;
final int DEFAULT_SSL_PORT = 5223;
final int DEFAULT_COMPONENT_PORT = 5275;
final int DEFAULT_COMPONENT_SSL_PORT = 5276;
final int DEFAULT_SERVER_PORT = 5269;
final int DEFAULT_MULTIPLEX_PORT = 5262;
final int DEFAULT_MULTIPLEX_SSL_PORT = 5263;

  這些端口號,在模塊初始化的時候,被設定到對應的監聽器對象中。

初始化

  ConnectionManagerImpl的初始化,除了自身的構造方法外, 還有module中的initialize()方法(module的概況在第二章有提及)。

  1. 初始化之一:ConnectionManagerImpl的構造方法

  ConnectionManagerImpl的初始化,首先是構造了各類連接監聽器,有如下幾種:

private final ConnectionListener clientListener;
private final ConnectionListener clientSslListener;
private final ConnectionListener boshListener;
private final ConnectionListener boshSslListener;
private final ConnectionListener serverListener;
private final ConnectionListener componentListener;
private final ConnectionListener componentSslListener;
private final ConnectionListener connectionManagerListener; // Also known as ‘multiplexer‘
private final ConnectionListener connectionManagerSslListener; // Also known as ‘multiplexer‘
private final ConnectionListener webAdminListener;
private final ConnectionListener webAdminSslListener;

  所有監聽器都用ConnectionListener進行包裝,以ConnectionType來做區分。這麽處理可以制定一套方法來管理各類ConnectionListener,做到抽象統一。所有的ConnectionListener是模塊啟動時開啟監聽。

  拿其中一種類型——SOCKET_C2S(即客戶端-服務端),來觀察一下它的構造方法,以下的分析也基於這一類型,這一類型是使用最多的。構造如下:

clientListener = new ConnectionListener(
                ConnectionType.SOCKET_C2S,
                ConnectionSettings.Client.PORT,
                DEFAULT_PORT,
                ConnectionSettings.Client.SOCKET_ACTIVE,
                ConnectionSettings.Client.MAX_THREADS,
                ConnectionSettings.Client.MAX_READ_BUFFER,
                ConnectionSettings.Client.TLS_POLICY,
                ConnectionSettings.Client.AUTH_PER_CLIENTCERT_POLICY,
                bindAddress,
                certificateStoreManager.getIdentityStoreConfiguration( ConnectionType.SOCKET_C2S ),
                certificateStoreManager.getTrustStoreConfiguration( ConnectionType.SOCKET_C2S ),
                ConnectionSettings.Client.COMPRESSION_SETTINGS );

  這些參數的意義:

  • ConnectionType.SOCKET_C2S:ConectionType是個枚舉類型,定義了所有connection的類型
  • ConnectionSettings.Client:提供了各個參數在數據庫ofProperty表中的鍵,ConnectionListener構造方法會根據傳入的鍵,從中讀取相應的配置值
  • DEFAULT_PORT:設置監聽的端口,對於C2S連接,openfire默認為5222端口
  • bindAddress: 配置文件中的network.interface,轉化一個InetAddress。InetAddress是Java對IP地址的封裝
  • certificateStoreManager:配置證書信息

  這些參數,設置了連接監聽器的端口號、最大並發數等信息,最後封裝在ConnectionConfiguration對象中,綁定到MINA的適配器NioSocketAcceptor。當MINA收到連接請求時,會根據端口的信息觸發指定的監聽器,進而執行相應的通信業務。

  2. 初始化之二:Module中定義的初始化方法

  這部分比較簡單,檢查了是否需要配置MINA來使用直接緩沖區、或堆緩沖區,並調用IoBuffer做相應的配置。默認是只使用堆內存。 

@Override
public void initialize(XMPPServer server) {
    super.initialize(server);
    
    // Check if we need to configure MINA to use Direct or Heap Buffers
    // Note: It has been reported that heap buffers are 50% faster than direct buffers
    if (JiveGlobals.getBooleanProperty("xmpp.socket.heapBuffer", true)) {
        IoBuffer.setUseDirectBuffer(false);
        IoBuffer.setAllocator(new SimpleBufferAllocator());
    }
} 

  關於緩沖區的使用,稍微提一下:

  • directBuffer:直接緩沖區, 為本地內存,不在Java堆中,不會被JVM回收。申請內存的API:ByteBuffer.allcateDirect(size)
  • heepBuffer:堆緩沖區,在堆中分配,當不再被引用的時候,buffer對象會被回收。申請內存的API:ByteBuffer.allocate(size)
  • 一般情況下:堆緩沖區的性能已經相當高,若無必要,使用堆緩沖區就足夠。

啟動監聽

模塊啟動的start()方法由module中定義,在相應的模塊實現,在XMPPServer中被調用。start()方法的代碼如下:
@Override
public void start() {
    super.start();
    startListeners();
    SocketSendingTracker.getInstance().start();
    CertificateManager.addListener(this);
}

  該方法執行了如下三步操作:

  • 啟動所有監聽,包括各個plugins、ConnectionListener、HTTP client
  • 啟動SocketSendingTracker線程,每隔10秒調用checkHealth檢查連接的Socket的狀態。SocketSendingTracker.start()中,執行checkHealth()做了一件事情:如果某個Socket發送數據的事件大於60秒,或者長時間處於idle狀態(表示長時間沒有接收到客戶端發來的心跳數據包),就調用forceClose將其關閉。
  • CertificateManager用來管理證書、監聽ssl的相關時間。

  我們主要分析startListeners()方法,代碼如下:

private synchronized void startListeners() {

    // Check if plugins have been loaded
    PluginManager pluginManager = XMPPServer.getInstance().getPluginManager();
    if (!pluginManager.isExecuted()) {
        pluginManager.addPluginManagerListener(new PluginManagerListener() {
            public void pluginsMonitored() {
                // Stop listening for plugin events
                XMPPServer.getInstance().getPluginManager().removePluginManagerListener(this);
                // Start listeners
                startListeners();
            }
        });
        return;
    }

    for ( final ConnectionListener listener : getListeners() ) {
        try {
            listener.start();
        } catch ( RuntimeException ex ) {
            Log.error( "An exception occurred while starting listener " + listener, ex );
        }
    }

    // Start the HTTP client listener.
    try {
        HttpBindManager.getInstance().start();
    } catch ( RuntimeException ex ) {
        Log.error( "An exception occurred while starting HTTP Bind listener ", ex );
    }
}

  主要啟動兩個監聽: (1)ConnectionListener (2)HttpBindManager

  這兩個我們分別來看一下。

  1. ConnectionListener.start()方法:

  為了分析方便,這裏只保留關鍵代碼:

public synchronized void start() {
    
        ......
    Log.debug("Starting...");
    if (getType() == ConnectionType.SOCKET_S2S) {
        connectionAcceptor = new LegacyConnectionAcceptor(generateConnectionConfiguration());
    } else {
        connectionAcceptor = new MINAConnectionAcceptor(generateConnectionConfiguration());
    }

    connectionAcceptor.start();
    Log.info("Started.");
}

  該方法中,根據不同的ConnectionType初始化了連接的接受器ConnectionAcceptor並啟動。

  ConnectionAcceptor是個抽像類,被LegacyConnectionAcceptor、MINAConnectionAcceptor實現。

  LegacyConnectionAcceptor僅能用於S2S的連接,且是之前所使用的方式。現在Oppenfire主要用的是MINA框架,這裏我們只研究一下MINAConnectionAcceptor。

  

  MINAConnectionAcceptor構造方法中,根據不同的連接類型,構造不同的ConnectionHandler。

  完成MINAConnectionAcceptor構造之後,執行了MINAConnectionAcceptor.start()方法。

  MINAConnectionAcceptor.start()方法代碼如下:

  
public synchronized void start()
{
    if ( socketAcceptor != null )
    {
        Log.warn( "Unable to start acceptor (it is already started!)" );
        return;
    }

    try
    {
        // Configure the thread pool that is to be used.
        final int initialSize = ( configuration.getMaxThreadPoolSize() / 4 ) + 1;
        final ExecutorFilter executorFilter = new ExecutorFilter( initialSize, configuration.getMaxThreadPoolSize(), 60, TimeUnit.SECONDS );
        final ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) executorFilter.getExecutor();
        final ThreadFactory threadFactory = new NamedThreadFactory( name + "-thread-", eventExecutor.getThreadFactory(), true, null );
        eventExecutor.setThreadFactory( threadFactory );

        // Construct a new socket acceptor, and configure it.
        socketAcceptor = buildSocketAcceptor();

        if ( JMXManager.isEnabled() )
        {
            configureJMX( socketAcceptor, name );
        }

        final DefaultIoFilterChainBuilder filterChain = socketAcceptor.getFilterChain();
        filterChain.addFirst( ConnectionManagerImpl.EXECUTOR_FILTER_NAME, executorFilter );

        // Add the XMPP codec filter
        filterChain.addAfter( ConnectionManagerImpl.EXECUTOR_FILTER_NAME, ConnectionManagerImpl.XMPP_CODEC_FILTER_NAME, new ProtocolCodecFilter( new XMPPCodecFactory() ) );

        // Kill sessions whose outgoing queues keep growing and fail to send traffic
        filterChain.addAfter( ConnectionManagerImpl.XMPP_CODEC_FILTER_NAME, ConnectionManagerImpl.CAPACITY_FILTER_NAME, new StalledSessionsFilter() );

        // Ports can be configured to start connections in SSL (as opposed to upgrade a non-encrypted socket to an encrypted one, typically using StartTLS)
        if ( configuration.getTlsPolicy() == Connection.TLSPolicy.legacyMode )
        {
            final SslFilter sslFilter = encryptionArtifactFactory.createServerModeSslFilter();
            filterChain.addAfter( ConnectionManagerImpl.EXECUTOR_FILTER_NAME, ConnectionManagerImpl.TLS_FILTER_NAME, sslFilter );
        }

        // Throttle sessions who send data too fast
        if ( configuration.getMaxBufferSize() > 0 )
        {
            socketAcceptor.getSessionConfig().setMaxReadBufferSize( configuration.getMaxBufferSize() );
            Log.debug( "Throttling read buffer for connections to max={} bytes", configuration.getMaxBufferSize() );
        }

        // Start accepting connections
        socketAcceptor.setHandler( connectionHandler );
        socketAcceptor.bind( new InetSocketAddress( configuration.getBindAddress(), configuration.getPort() ) );
    }
    catch ( Exception e )
    {
        System.err.println( "Error starting " + configuration.getPort() + ": " + e.getMessage() );
        Log.error( "Error starting: " + configuration.getPort(), e );
        // Reset for future use.
        if (socketAcceptor != null) {
            try {
                socketAcceptor.unbind();
            } finally {
                socketAcceptor = null;
            }
        }
    }
}

  從上面的代碼可以看到,MINAConnectionAcceptor.start()做了四件事:

  (1)建立線程池

  (2)構建了一個socketAcceptor

  (3)添加xmpp解碼器與編碼器到socketAcceptor

  (4)將connectionHandler註入socketAcceptor並綁定socketAcceptor.bind,

  其中:

ConnectionHandler是連接處理器,MINA接收到的請求最後都轉由ConnectionHandler處理,其內部的處理機制,將在下文另起一節來分析。

XMPPCodecFactory負責解碼接收到的消息、編碼要發送的消息。

  即Openfire中的連接處理模型為:

request->XMPPCodecFactory.XMPPDecoder->ConnectionHandler->XMPPCodecFactory.XMPPEncoder->response
  關於MINA的處理邏輯,這裏簡述一下:

  NioSocketAcceptor是MINA的適配器,MINA中有個過濾器和處理器的概念,過濾器用來過濾數據,處理器用來處理數據。

  總的來說MINA的處理模型就是:

request->過濾器A->過濾器B->處理器->過濾器B->過濾器A->response

  request和response類似serlvet的request和response。

  至此,系統就開始能響應客戶端的連接請求了!!

 

  剛剛分析startListeners()方法時,其中除了啟動ConnetctionListener外,還啟動了另一種監聽:HttpBindManager,沒忘記吧?下來對它也做一下分析。

  2. HttpBindManager.start()方法:

  這部分主要用於啟用7070、7443端口,作為HTTP、HTTPS綁定端口,服務框架用的是Jetty。一般是在Web IM端用到。

  HttpBinManager中綁定監聽了7070端口,並初始化HttpSessionManager。

  HttpSessionManager管理所有通過httpbing連接到openfire的議定,它是一個同步http的雙向流。

  下面簡單跟一下代碼,部分代碼省略掉了

  (1)HttpBindManager

  HttpBindManager構造方法:

private HttpBindManager() {
    ...
    this.httpSessionManager = new HttpSessionManager();
    ....
}

  構造方法中雖然實例化了HttpSessionManager,然而,在HttpBindManager類中並沒有對它做任何操作,只是提供了get方法。HttpSessionManager是在HttpBindServlet中使用的。

  Why?其實好理解,HttpSessionManager顧名思義,Http會話管理,要能管理首先是需要有會話產生,那會話在哪裏產生?

  So,答案就出來了。

  至於為什麽在要HttpBindManager中實例化,因為HttpBindManager中使用了單例,這樣整個會話管理變得統一有序。

  OK,其他不多說,繼續往下走:

  Start()方法中:configureHttpBindServer()函數做了端口綁定、Servlet綁定、以及WEB目錄綁定,然後服務啟動。 
public void start() {
    certificateListener = new CertificateListener();
    CertificateManager.addListener(certificateListener);

    if (!isHttpBindServiceEnabled()) {
        return;
    }
    bindPort = getHttpBindUnsecurePort();
    bindSecurePort = getHttpBindSecurePort();
    configureHttpBindServer(bindPort, bindSecurePort);

    try {
        httpBindServer.start();
        Log.info("HTTP bind service started");
    }
    catch (Exception e) {
        Log.error("Error starting HTTP bind service", e);
    }
}

  configureHttpBindServer():

private synchronized void configureHttpBindServer(int port, int securePort) {
    
    final QueuedThreadPool tp = new QueuedThreadPool(processingThreads);
    tp.setName("Jetty-QTP-BOSH");
    httpBindServer = new Server(tp);
    ....
    createBoshHandler(contexts, "/http-bind");
    createCrossDomainHandler(contexts, "/crossdomain.xml");
    loadStaticDirectory(contexts);

    HandlerCollection collection = new HandlerCollection();
    httpBindServer.setHandler(collection);
    collection.setHandlers(new Handler[] { contexts, new DefaultHandler() });
}

  解釋一下QueuedThreadPool類,該類是jetty的一個線程池,它實現了org.eclipse.jetty.util.thread.ThreadPool接口,並繼承org.eclipse.jetty.util.component.AbstractLifeCycle。

  createBoshHandler():

private void createBoshHandler(ContextHandlerCollection contexts, String boshPath)
{
    ServletContextHandler context = new ServletContextHandler(contexts, boshPath, ServletContextHandler.SESSIONS);
    ......
    context.addServlet(new ServletHolder(new HttpBindServlet()),"/*");
    ......
}

  createCrossDomainHandler():

private void createCrossDomainHandler(ContextHandlerCollection contexts, String crossPath)
{
    ServletContextHandler context = new ServletContextHandler(contexts, crossPath, ServletContextHandler.SESSIONS);
    ......
    context.addServlet(new ServletHolder(new FlashCrossDomainServlet()),"");
}

  loadStaticDirectory():

private void loadStaticDirectory(ContextHandlerCollection contexts) {
    File spankDirectory = new File(JiveGlobals.getHomeDirectory() + File.separator
            + "resources" + File.separator + "spank");
    ......
    WebAppContext context = new WebAppContext(contexts, spankDirectory.getPath(), "/");
    context.setWelcomeFiles(new String[]{"index.html"});
}

  最後在ConnectionManagerImpl中調用HttpBindManager.start()就完成啟動,Openfire與Jetty開始進行連接,關於Jetty的相關機制,這裏就不做延伸了。

  而HttpSessionManager在HttpBindServlet的初始化中開啟,當然在HttpBindServlet被destroy()時,也自然會stop()掉。

  HttpBindServlet.init():

public void init(ServletConfig servletConfig) throws ServletException {
    super.init(servletConfig);
    boshManager = HttpBindManager.getInstance();
    sessionManager = boshManager.getSessionManager();
    sessionManager.start();
}

  (2)HttpSessionManager中所做的工作,就在其start()我們來簡單看一下。

  HttpSessionManager.start():

public void start() {
    Log.info( "Starting instance" );

    this.sessionManager = SessionManager.getInstance();

    final int maxClientPoolSize = JiveGlobals.getIntProperty( "xmpp.client.processing.threads", 8 );
    final int maxPoolSize = JiveGlobals.getIntProperty("xmpp.httpbind.worker.threads", maxClientPoolSize );
    final int keepAlive = JiveGlobals.getIntProperty( "xmpp.httpbind.worker.timeout", 60 );

    sendPacketPool = new ThreadPoolExecutor(getCorePoolSize(maxPoolSize), maxPoolSize, keepAlive, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(), // unbounded task queue
            new NamedThreadFactory( "httpbind-worker-", true, null, Thread.currentThread().getThreadGroup(), null )
    );

    sendPacketPool.prestartCoreThread();

    // Periodically check for Sessions that need a cleanup.
    inactivityTask = new HttpSessionReaper();
    TaskEngine.getInstance().schedule( inactivityTask, 30 * JiveConstants.SECOND, 30 * JiveConstants.SECOND );
}

  解釋一下:

  (1)keepAlive,多余空閑線程等待心任務的的最長時間60秒

  (2)ThreadPoolExecutor配置了線程池,池中所保持的線程數和最大線程數均為8個

  (3)newLinkedBlockingQueue<Runnable>(),執行前保持的隊列,此隊列僅保持由execute 方法提交的 Runnable 任務

  (4)NamedThreadFactory,創建新線程的工廠

  (5)sendPacketPool.prestartCoreThread():該方法為啟動核心線程,使其處於等待工作的空閑狀態。僅當執行新任務時,此操作才重寫默認的啟動核心線程策略。

  最後啟動了一個線程來查看哪些會話需要被關閉:

inactivityTask = new HttpSessionReaper();
TaskEngine.getInstance().schedule( inactivityTask, 30 * JiveConstants.SECOND, 30 * JiveConstants.SECOND );

  進入看看HttpSessionReaper.run()方法:

private class HttpSessionReaper extends TimerTask {
    @Override
    public void run() {
        long currentTime = System.currentTimeMillis();
        for (HttpSession session : sessionMap.values()) {
            try {
                long lastActive = currentTime - session.getLastActivity();
                if (Log.isDebugEnabled()) {
                    Log.debug("Session was last active " + lastActive + " ms ago: " + session.getAddress());
                }
                if (lastActive > session.getInactivityTimeout() * JiveConstants.SECOND) {
                    Log.info("Closing idle session: " + session.getAddress());
                    session.close();
                }
            } catch (Exception e) {
                Log.error("Failed to determine idle state for session: " + session, e);
            }
        }
    }
}

  這個線程的意義:定時將一些超時了的閑置狀態的會話清理掉。

  其中:

  session.getLastActivity():這個方法以毫秒為時間單位返回關閉http連接的時間

  getInactivityTimeout():這個方法以秒為單位返回不活躍或被終止會話時間

  至此,Openfire開始能響應Http形式的請求。

  那麽Openfire的整個網絡監聽,就分解完了。

  註意一點的是,上面內容,是以客戶端-服務端模式為例來講解Openfire的連接監聽,但Openfire的ConnetionType並不止這一種,還有server-server等其他類型,分析方法類似,內容也相似,這裏就不再贅述,有興趣的朋友可以自已做了解。

  而Openfire在接收到請求之後,是如何響應的,在下一章講解。

Openfire分析之三:ConnectionManager 連接管理(1)