1. 程式人生 > >MINA原始碼分析---ExecutorFilter執行緒池過濾器

MINA原始碼分析---ExecutorFilter執行緒池過濾器

原始碼中都加註釋啦

/*
 */
package org.apache.mina.filter.executor;

import java.util.EnumSet;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.filterchain.IoFilterChain;
import org.apache.mina.core.filterchain.IoFilterEvent;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoEventType;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.WriteRequest;

/**
 * A filter that forwards轉發 I/O events to {@link Executor} to enforce執行 a certain
 * thread model while allowing the events per session to be processed
 * simultaneously同時地處理. You can apply various thread model by inserting this filter
 * to a {@link IoFilterChain}.
 * 
 * <h2>Life Cycle Management</h2>
 * 這個過濾器ExecutorFilter不負責Executor的生命週期管理
 * Please note that this filter doesn't manage the life cycle of the {@link Executor}.
 * 如果你用ExecutorFilter(Executor)或相似的建構函式建立了一個過濾器,則你自己要負責管理Executor
 * 例項的生命週期,比如呼叫ExecutorService#shutdown()函式關閉執行緒池
 * If you created this filter using {@link #ExecutorFilter(Executor)} or similar
 * constructor that accepts an {@link Executor} that you've instantiated例項化, you have
 * full control and responsibility of managing its life cycle (e.g. calling
 * {@link ExecutorService#shutdown()}.
 * <p> 
 * If you created this filter using convenience constructors like
 * {@link #ExecutorFilter(int)}, then you can shut down the executor by calling
 * {@link #destroy()} explicitly.
 * 
 * <h2>Event Ordering</h2>
 * 
 * All convenience constructors of this filter creates a new
 * {@link OrderedThreadPoolExecutor} instance.  Therefore, the order of event is
 * maintained like the following:
 * <ul>
 * <li>All event handler methods are called exclusively唯一地.
 * 所有的事件處理函式在同一時刻只有一個被呼叫,並且是按順序呼叫
 *     (e.g. messageReceived and messageSent can't be invoked at the same time.)</li>
 * <li>The event order is never mixed up.
 *     (e.g. messageReceived is always invoked before sessionClosed or messageSent.)</li>
 * </ul>
 * However, if you specified other {@link Executor} instance in the constructor,
 * the order of events are not maintained at all.  This means more than one event
 * handler methods can be invoked at the same time with mixed order.  For example,
 * let's assume that messageReceived, messageSent, and sessionClosed events are
 * fired.
 * <ul>
 * <li>All event handler methods can be called simultaneously.
 *     (e.g. messageReceived and messageSent can be invoked at the same time.)</li>
 * <li>The event order can be mixed up.
 *     (e.g. sessionClosed or messageSent can be invoked before messageReceived
 *           is invoked.)</li>
 * </ul>
 * If you need to maintain the order of events per session, please specify an
 * {@link OrderedThreadPoolExecutor} instance or use the convenience constructors.
 * 
 * <h2>Selective Filtering有選擇性的過濾</h2>
 * 過濾器攔截的事件型別
 * By default, all event types but <tt>sessionCreated</tt>, <tt>filterWrite</tt>,
 * <tt>filterClose</tt> and <tt>filterSetTrafficMask</tt> are submitted to the
 * underlying executor, which is most common setting.
 * <p>
 * If you want to submit only a certain set of event types, you can specify them
 * in the constructor.  For example, you could configure a thread pool for
 * write operation for the maximum performance:
 * <pre><code>
 * IoService service = ...;
 * DefaultIoFilterChainBuilder chain = service.getFilterChain();
 * 
 * chain.addLast("codec", new ProtocolCodecFilter(...));
 * // Use one thread pool for most events.
 * chain.addLast("executor1", new ExecutorFilter());
 * // and another dedicated thread pool for 'filterWrite' events.
 * 新增另一個專門的執行緒池來處理 filterWrite事件
 * chain.addLast("executor2", new ExecutorFilter(IoEventType.WRITE));
 * </code></pre>
 * 
 * <h2>Preventing {@link OutOfMemoryError}</h2>
 * 防止記憶體溢位
 * Please refer to {@link IoEventQueueThrottle}, which is specified as
 * a parameter of the convenience constructors.
 * 
 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
 * 
 * @see OrderedThreadPoolExecutor
 * @see UnorderedThreadPoolExecutor
 * @org.apache.xbean.XBean
 */
public class ExecutorFilter extends IoFilterAdapter {
    /** The list of handled events */
    private EnumSet<IoEventType> eventTypes;
    
    /** The associated executor */
    private Executor executor;
    
    /** A flag set if the executor can be managed */ 
    private boolean manageableExecutor;
    
    /** The default pool size */
    private static final int DEFAULT_MAX_POOL_SIZE = 16;
    
    /** The number of thread to create at startup */
    private static final int BASE_THREAD_NUMBER = 0;
    
    /** The default KeepAlive time, in seconds */
    private static final long DEFAULT_KEEPALIVE_TIME = 30;
    
    /** 
     * A set of flags used to tell if the Executor has been created 
     * in the constructor or passed as an argument. In the second case, 
     * the executor state can be managed.
     **/
    private static final boolean MANAGEABLE_EXECUTOR = true;
    private static final boolean NOT_MANAGEABLE_EXECUTOR = false;
    //此過濾器預設處理的事件
    /** A list of default EventTypes to be handled by the executor */
    private static IoEventType[] DEFAULT_EVENT_SET = new IoEventType[] {
        IoEventType.EXCEPTION_CAUGHT,
        IoEventType.MESSAGE_RECEIVED, 
        IoEventType.MESSAGE_SENT,
        IoEventType.SESSION_CLOSED, 
        IoEventType.SESSION_IDLE,
        IoEventType.SESSION_OPENED
    };

    
    /**
     * Create an OrderedThreadPool executor.
     *
     * @param corePoolSize The initial pool sizePoolSize
     * @param maximumPoolSize The maximum pool size
     * @param keepAliveTime Default duration for a thread
     * @param unit Time unit used for the keepAlive value
     * @param threadFactory The factory used to create threads
     * @param queueHandler The queue used to store events
     * @return An instance of the created Executor
     */
    private Executor createDefaultExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
        TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler queueHandler) {
        // Create a new Executor
        Executor executor = new OrderedThreadPoolExecutor(corePoolSize, maximumPoolSize, 
            keepAliveTime, unit, threadFactory, queueHandler);
        
        return executor;
    }
    
    /**
     * Create an EnumSet from an array of EventTypes, and set the associated
     * eventTypes field.
     *
     * @param eventTypes The array of handled events
     */
    private void initEventTypes(IoEventType... eventTypes) {
        if ((eventTypes == null) || (eventTypes.length == 0)) {
            eventTypes = DEFAULT_EVENT_SET;
        }

        // Copy the list of handled events in the event set
        this.eventTypes = EnumSet.of(eventTypes[0], eventTypes);
        
        // Check that we don't have the SESSION_CREATED event in the set
        if (this.eventTypes.contains( IoEventType.SESSION_CREATED )) {
            this.eventTypes = null;
            throw new IllegalArgumentException(IoEventType.SESSION_CREATED
                + " is not allowed.");
        }
    }

    /**
     * Creates a new instance of ExecutorFilter. This private constructor is called by all
     * the public constructor.
     *
     * @param executor The underlying {@link Executor} in charge of managing the Thread pool.
     * @param manageableExecutor Tells if the Executor's Life Cycle can be managed or not
     * @param eventTypes The lit of event which are handled by the executor
     * @param
     */
    private void init(Executor executor, boolean manageableExecutor, IoEventType... eventTypes) {
        if (executor == null) {
            throw new IllegalArgumentException("executor");
        }

        initEventTypes(eventTypes);
        this.executor = executor;
        this.manageableExecutor = manageableExecutor;
    }
    
    /**
     * Shuts down the underlying executor if this filter hase been created via
     * a convenience constructor.
     */
    @Override
    public void destroy() {
        if (manageableExecutor) {
            ((ExecutorService) executor).shutdown();
        }
    }

    /**
     * Returns the underlying {@link Executor} instance this filter uses.
     * 
     * @return The underlying {@link Executor}
     */
    public final Executor getExecutor() {
        return executor;
    }

    /**
     * Fires the specified event through the underlying executor.
     * 1、這個函式是最關鍵的,如果發生的事件是此過濾器所感興趣的,則在此函式中呼叫一個執行緒池處理此事件
     * 2、IoFilterEvent類是一個執行緒類,它的run方法中呼叫fire方法,fire方法中根據事件型別進行相應的處理
     * @param event The filtered event
     */
    protected void fireEvent(IoFilterEvent event) {
        executor.execute(event);
    }


    /**
     * {@inheritDoc}如果是本過濾器感興趣的事件,則另一個執行緒處理接下來的工作,<pre name="code" class="java">                   後面的處理邏輯都一致,
*/ @Override public final void sessionOpened(NextFilter nextFilter, IoSession session) { if (eventTypes.contains(IoEventType.SESSION_OPENED)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED, session, null); fireEvent(event); } else { nextFilter.sessionOpened(session); } } /** * {@inheritDoc} */ @Override public final void sessionClosed(NextFilter nextFilter, IoSession session) { if (eventTypes.contains(IoEventType.SESSION_CLOSED)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_CLOSED, session, null); fireEvent(event); } else { nextFilter.sessionClosed(session); } } /** * {@inheritDoc} */ @Override public final void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) { if (eventTypes.contains(IoEventType.SESSION_IDLE)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_IDLE, session, status); fireEvent(event); } else { nextFilter.sessionIdle(session, status); } } /** * {@inheritDoc} */ @Override public final void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) { if (eventTypes.contains(IoEventType.EXCEPTION_CAUGHT)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.EXCEPTION_CAUGHT, session, cause); fireEvent(event); } else { nextFilter.exceptionCaught(session, cause); } } /** * {@inheritDoc} */ @Override public final void messageReceived(NextFilter nextFilter, IoSession session, Object message) { if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_RECEIVED, session, message); fireEvent(event); } else { nextFilter.messageReceived(session, message); } } /** * {@inheritDoc} */ @Override public final void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) { if (eventTypes.contains(IoEventType.MESSAGE_SENT)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.MESSAGE_SENT, session, writeRequest); fireEvent(event); } else { nextFilter.messageSent(session, writeRequest); } } /** * {@inheritDoc} */ @Override public final void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) { if (eventTypes.contains(IoEventType.WRITE)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.WRITE, session, writeRequest); fireEvent(event); } else { nextFilter.filterWrite(session, writeRequest); } } /** * {@inheritDoc} */ @Override public final void filterClose(NextFilter nextFilter, IoSession session) throws Exception { if (eventTypes.contains(IoEventType.CLOSE)) { IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.CLOSE, session, null); fireEvent(event); } else { nextFilter.filterClose(session); } }}