1. 程式人生 > >探究ElasticSearch中的執行緒池實現

探究ElasticSearch中的執行緒池實現

探究ElasticSearch中的執行緒池實現

ElasticSearch裡面各種操作都是基於執行緒池+回撥實現的,所以這篇文章記錄一下java.util.concurrent涉及執行緒池實現和ElasticSearch中如何自定義自己的執行緒池的。因為我們自己開發寫程式碼,也經常會用到執行緒池,一般很少有機會自己去擴充實現一個自己的執行緒池,比如下面是我經常用的套路,其中SidSearchExceptionHandlerSidSearchRejectExecutionHandler都只是簡單地記錄日誌。

//任務佇列    
private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1024);
//執行緒在執行過程中的異常處理器
private SidSearchExceptionHandler exceptionHandler = new SidSearchExceptionHandler();
//向執行緒池提交任務時,拒絕策略
private SidSearchRejectExecutionHandler rejectExecutionHandler = new SidSearchRejectExecutionHandler();

//藉助Guava包中的ThreadFactoryBuild建立執行緒工廠(主要是方便指定執行緒的名稱,debug起來清晰)
    private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("audio-%d").setUncaughtExceptionHandler(exceptionHandler).build();

//建立執行緒池
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(nThreads, nThreads, 1, TimeUnit.DAYS, taskQueue, threadFactory, rejectExecutionHandler);

比如下面這個自定義執行緒執行時異常處理策略,線上程執行過程中丟擲異常時,只是簡單地列印日誌:

public class SidSearchExceptionHandler implements Thread.UncaughtExceptionHandler {

    public static final Logger logger = LoggerFactory.getLogger(SidSearchExceptionHandler.class);
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("sid search thread pool execution error,thread name:{},cause:{},msg:{}",
                t.getName(), e.getCause(), e.getMessage());
    }
}

因此,看ES自定義的執行緒池實現,看下大神們是如何繼承ThreadPoolExecutor,定義異常處理策略的。

執行緒池基礎知識

1. 定義任務

想要執行:任務、或者叫業務邏輯的載體是:通過定義一個類,implements Runnable介面,Override Runnable介面的run()方法,在run()方法裡面寫業務邏輯處理程式碼(比如將資料寫入到資料庫、向ElasticSearch提交查詢請求……)

2. 提交任務

執行 java.util.concurrent.Executor的 execute(Runnable runnable)方法,就能提交任務,執行緒池中某個具體的執行緒會執行提交的任務。

當所有的任務執行完成後,執行緒池是否要關閉?如果需要執行可返回結果的任務怎麼辦?於是乎ExecutorService 就擴充套件Executor介面:public interface ExecutorService extends Executor ,提供了這些功能。

3. 執行任務

相比於ExecutorServiceThreadPoolExecutor添加了兩個方法,這樣可以在任務執行前和執行完成後做一些處理。

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }

ElasticSearch中的EsThreadPoolExecutor.java就實現了這兩個方法。

而真正的任務執行是在ThreadPoolExecutor的內部類Worker中run()方法實現

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    {
    // 接受一個Runnable任務,然後執行ThreadFactory newThread()建立執行緒執行任務
            Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    }
    

Work implements Runnable,呼叫ThreadPoolExecutor的 final void runWorker(Worker w)執行任務。

來看一下runWorker方法中的部分程式碼:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        try {
                 //任務執行前做一些處理
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //任務執行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //任務執行完成後做一些處理
                        afterExecute(task, thrown);
                    }

任務是由具體的執行緒來執行的,因此還需要考慮執行緒是如何建立的。ThreadFactory定義了建立執行緒池的方法newThread

public interface ThreadFactory {
    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

Executors工具類裡面定義了具體的工廠類,用來建立執行緒池

    /**
     * The default thread factory
     */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

ElasticSearch 原始碼執行緒池實現

1. EsThreadFactory建立執行緒

EsExecutors的內部類EsThreadFactory

    static class EsThreadFactory implements ThreadFactory {

        final ThreadGroup group;
        final AtomicInteger threadNumber = new AtomicInteger(1);
        final String namePrefix;

        EsThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
                    0);
            t.setDaemon(true);
            return t;
        }

    }

執行緒組、執行緒數量、執行緒名稱

在建立執行緒時,一般會為之指定一個執行緒執行的異常處理策略。驚奇的是EsThreadFactory裡面並沒有顯示地定義執行緒執行時的異常處理策略(可能在其他程式碼中,通過匿名內部類的方式定義了異常處理策略吧)。而是使用ThreadGroup中定義的預設異常處理策略:

public class ThreadGroup implements Thread.UncaughtExceptionHandler {

如果要自定義執行緒執行過程中出現異常的處理策略,只需要 implements Thread.UncaughtExceptionHandler並且重寫它的uncaughtException(Thread t, Throwable e)方法即可。如果未提供執行緒執行過程中出現異常的處理策略,那麼就使用該預設的異常處理策略。

java.lang.ThreadGroup裡面的uncaughtException(Thread t, Throwable e)方法的註釋:

Called by the Java Virtual Machine when a thread in this thread group stops because of an uncaught exception, and the thread does not have a specific Thread.UncaughtExceptionHandler installed.
The uncaughtException method of ThreadGroup does the following:
If this thread group has a parent thread group, the uncaughtException method of that parent is called with the same two arguments.


Otherwise, this method checks to see if there is a Thread.getDefaultUncaughtExceptionHandler    default uncaught exception handler installed, and if so, its uncaughtException method is called with the same two arguments.

如果在建立執行緒工廠的時候指定了UncaughtExceptionHandler,通過Thread.getDefaultUncaughtExceptionHandler 就能獲取到。

//在建立執行緒工廠時呼叫setUncaughtExceptionHandler方法設定一個自定義的:UncaughtExceptionHandler
//若線上程執行過程中出現了異常,那麼 exceptionHandler 物件的uncaughtException(Thread t, Throwable e) 方 //法就會被呼叫
   private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("audio-%d").setUncaughtExceptionHandler(exceptionHandler).build();
Otherwise, this method determines if the Throwable argument is an instance of ThreadDeath. If so, nothing special is done. Otherwise, a message containing the thread's name, as returned from the thread's Thread.getName method, and a stack backtrace,using the Throwable's Throwable.printStackTrace method, is printed to the System err

當未指定異常處理器時,若引數Throwable e是一個ThreadDeath物件,那麼什麼也不做。

如果引數Throwable e不是一個ThreadDeath物件,那麼就會通過方法Throwable.printStackTrac列印異常

2.EsThreadPoolExecutor 建立執行緒池

public class EsThreadPoolExecutor extends ThreadPoolExecutor {

    private final ThreadContext contextHolder;
    private volatile ShutdownListener listener;
A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with  a thread. It allows to store and retrieve header information across method calls, network calls as well as threads spawned from a thread that has a ThreadContext associated with.

從它的構造方法中可看出,多了個ThreadContext(多了儲存一些執行緒執行上下文資訊的功能)

    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
            ThreadContext contextHolder) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.name = name;
        this.contextHolder = contextHolder;
    }

再看EsThreadPoolExecutor Override ThreadPoolExecutor 的execute()方法:

    @Override
    public void execute(final Runnable command) {
        doExecute(wrapRunnable(command));
    }

    protected void doExecute(final Runnable command) {
        try {
            super.execute(command);
        } catch (EsRejectedExecutionException ex) {
            if (command instanceof AbstractRunnable) {
                // If we are an abstract runnable we can handle the rejection
                // directly and don't need to rethrow it.
                try {
                    ((AbstractRunnable) command).onRejection(ex);
                } finally {
                    ((AbstractRunnable) command).onAfter();

                }
            } else {
                throw ex;
            }
        }
    }

doExecute()先執行super.execute(command);在這裡面有任務拒絕策略的檢查邏輯,如果任務被拒絕了,就會呼叫EsAbortPolicyrejectedExecution()

        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //呼叫拒絕策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
    /**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     */
    final void reject(Runnable command) {
        //handler 就是在new ThreadPoolExecutor物件 時傳遞的 RejectedExecutionHandler物件
        handler.rejectedExecution(command, this);
    }

然後可以在doExecute()裡面多做一些額外的處理:((AbstractRunnable) command).onRejection(ex);任務被拒絕之後發個訊息通知啥的。

ElasticSearch中的拒絕策略實現EsAbortPolicy

public class EsAbortPolicy implements XRejectedExecutionHandler {
    private final CounterMetric rejected = new CounterMetric();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof AbstractRunnable) {
            //判斷任務是否要強制執行
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                //建立ThreadPoolExecutor指定的 任務佇列 型別是SizeBlockingQueue
                if (!(queue instanceof SizeBlockingQueue)) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {
                    //儘管任務執行失敗了,還是再一次把它提交到任務佇列,這樣拒絕的任務又可以有執行機會了
                    ((SizeBlockingQueue) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        rejected.inc();
        throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
    }

    @Override
    public long rejected() {
        return rejected.count();
    }
}
public interface XRejectedExecutionHandler extends RejectedExecutionHandler {

    /**
     * The number of rejected executions.
     */
    long rejected();
}

XRejectedExecutionHandler統計任務被拒絕的次數。用的是java.util.concurrent.atomic.LongAdder,又發現了一個新的計數器:關於LongAdder與AtomicLong的對比

看完這個實現,是不是下次也可以模仿實現:當向 執行緒池 提交任務被拒絕了,也能夠失敗重試^~^

前面講了這麼多,都是在對比ElasticSearch中的執行緒池與JDK併發包中的執行緒池背後執行的一些原理。ElasticSearch中的自定義執行緒池就是基於JDK併發包中的執行緒池實現的。

下面來正式分析下ElasticSearch原始碼中執行緒池建立流程。

節點啟動過程中,org.elasticsearch.node.Node.java 開始建立執行緒池:

final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));

看ThreadPool原始碼:裡面有很多例項變數,如下:

public class ThreadPool extends AbstractComponent implements Scheduler, Closeable {
    

private Map<String, ExecutorHolder> executors = new HashMap<>();
static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
private final Map<String, ExecutorBuilder> builders;
private final ThreadContext threadContext;
private final ScheduledThreadPoolExecutor scheduler;

比如說:ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();就是一個執行緒池。還有一些執行緒池是通過ExecutorBuilder來建立的(Map<String, ExecutorBuilder> builders)

執行緒池型別:ThreadPool的內部類ThreadPoolType

    public enum ThreadPoolType {
        DIRECT("direct"),
        FIXED("fixed"),
        FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
        SCALING("scaling");

一個HashMap儲存執行緒池名稱,以及相應的型別。

    static {
        HashMap<String, ThreadPoolType> map = new HashMap<>();
        map.put(Names.SAME, ThreadPoolType.DIRECT);
        map.put(Names.GENERIC, ThreadPoolType.SCALING);
        map.put(Names.LISTENER, ThreadPoolType.FIXED);
        map.put(Names.GET, ThreadPoolType.FIXED);
        map.put(Names.ANALYZE, ThreadPoolType.FIXED);
        map.put(Names.INDEX, ThreadPoolType.FIXED);
        map.put(Names.WRITE, ThreadPoolType.FIXED);
        map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE);
        map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
        map.put(Names.FLUSH, ThreadPoolType.SCALING);
        map.put(Names.REFRESH, ThreadPoolType.SCALING);
        map.put(Names.WARMER, ThreadPoolType.SCALING);
        map.put(Names.SNAPSHOT, ThreadPoolType.SCALING);
        map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
        map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
        map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
        THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
    }

而真正建立執行緒池的程式碼,是在ThreadPool的構造方法中的for迴圈final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);,這行語句的build方法。

        for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
            final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
            final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
            if (executors.containsKey(executorHolder.info.getName())) {
                throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
            }
            logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
            executors.put(entry.getKey(), executorHolder);
        }

前面列舉類 ThreadPoolType 中有四種類型的執行緒池,對應著上圖的三個ExecutorBuild類,看org.elasticsearch.threadpool.FixedExecutorBuilder的build方法:建立執行緒池需要引數FixedExecutorSettings,需要儲存執行緒上下文 ThreadContext

    @Override
    ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) {
        int size = settings.size;
        int queueSize = settings.queueSize;
        final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
        final ExecutorService executor =
                EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
        final String name;
        if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) {
            name = "bulk";
        } else {
            name = name();
        }
        final ThreadPool.Info info =
            new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));
        return new ThreadPool.ExecutorHolder(executor, info);
    }

其中的這兩行程式碼:

final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));

構建執行緒工廠。

 final ExecutorService executor =
                EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);

構建執行緒池。

至此,ElasticSearch構建執行緒池整個流程就是這樣了。

構建出來的執行緒池被封裝在ThreadPool.ExecutorHolder類中new ThreadPool.ExecutorHolder(executor, info);

final ThreadPool.Info info =
            new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize));

        return new ThreadPool.ExecutorHolder(executor, info);

當所有的執行緒池構造完成後,在節點啟動過程中初始化各種服務時,new 這些物件時,都需要傳一個ThreadPool 引數,各個服務就可以使用執行緒池來執行任務了。org.elasticsearch.node.Node.java 中程式碼:

//構造好各種執行緒池
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));

//clusterService 用到了threadPool
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
               ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));


//monitorService 用到了threadPool
final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);

//actionModule
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);

//...在new 很多其他 XXXService時,都需要傳一個ThreadPool引數。

因此,可以說ThreadPool在ElasticSearch各種操作中無處不在。哈哈。

總結

這篇文章寫得有點亂,主要兩個方面:一個是JDK包中原生執行緒池相關功能介紹,然後對比ElasticSearch中如何實現自定義的執行緒池。分析了ElasticSearch中自定義執行緒池任務提交時的拒絕策略和執行緒執行過程中丟擲異常時的異常處理策略。然後大概分析下ElasticSearch中執行緒池的建立流程:從org.elasticsearch.node.Node開始:

主要涉及到以下類:

org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor

org.elasticsearch.threadpool.ExecutorBuilder的三個子類:

  • org.elasticsearch.threadpool.FixedExecutorBuilder
  • org.elasticsearch.threadpool.AutoQueueAdjustingExecutorBuilder
  • org.elasticsearch.threadpool.ScalingExecutorBuilder

org.elasticsearch.threadpool.ThreadPool