1. 程式人生 > >RxJava之執行緒切換原理分析

RxJava之執行緒切換原理分析

Scheduler翻譯成中文就是“排程”的意思,在RxJava裡就是執行緒之間的排程,也就是執行緒之間的切換。

 從圖中可以簡單看出,SingleScheduler、ComputationScheduler、IoScheduler、NewThreadScheduler的核心實現就是一個執行緒池,但該執行緒池裡只會有一個執行緒,而newScheduledThreadPool是可以延遲、定時執行的,所以我們可以認為SingleScheduler、ComputationScheduler、IoScheduler、NewThreadScheduler的核心實現就是一個可定時、可延遲執行及輪詢的特殊執行緒,下面簡稱執行緒。該執行緒是通過Observable的scheduleDirect方法來提交任務(基本上都是通過此方法來提交任務)。

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule
(task, delay, unit); return task; }

createWorker是一個抽象的方法,在SingleScheduler、ComputationScheduler、IoScheduler、NewThreadScheduler中分別有不同的實現。

1、SingleScheduler原理分析

 createWorker在SingleScheduler中的實現如下,在呼叫ScheduledWorker的schedule方法將任務交給執行緒執行。

    public Worker createWorker() {
        //傳入一個有且只有一個執行緒的執行緒池,就是上面說的特殊執行緒
return new ScheduledWorker(executor.get()); } public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { ... tasks.add(sr); try { Future<?> f; //將任務交給執行緒執行 if (delay <= 0L) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delay, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { ... }

 SingleScheduler的原理比較簡單,到此就完畢了。但執行緒是在那裡建立的尼?其實在SingleScheduler的構造方法就已經建立了執行緒。由於SingleScheduler是單例實現,所有SingleScheduler中有且僅有一個執行緒。 那麼當多工且有耗時操作時,後面的任務就會等待。

    public SingleScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        executor.lazySet(createExecutor(threadFactory));
    }
    //建立執行緒池
    static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
        return SchedulerPoolFactory.create(threadFactory);
    }
    //是SchedulerPoolFactory中的create方法
    public static ScheduledExecutorService create(ThreadFactory factory) {
        //建立執行緒池
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
    }

2、ComputationScheduler原理分析

 ComputationScheduler就比較麻煩一點了,圖中可以看出ComputationScheduler裡建立了一組執行緒。也是在構造方法中建立的,構造方法中呼叫start方法。

    public ComputationScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
        start();
    }
    public void start() {
        //MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
        FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
        //用update替代預設的NONE,NONE中沒有任何執行緒建立
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }

 在start中建立了FixedSchedulerPool物件,其中MAX_THREADS是根據Runtime.getRuntime().availableProcessors()計算出來的。來看一下FixedSchedulerPool物件的實現

    static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
        final int cores;

        final PoolWorker[] eventLoops;
        long n;

        FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
            // initialize event loops
            this.cores = maxThreads;
            this.eventLoops = new PoolWorker[maxThreads];
            //每個PoolWorker代表一個Executors.newScheduledThreadPool(1, factory)的實現
            for (int i = 0; i < maxThreads; i++) {
                this.eventLoops[i] = new PoolWorker(threadFactory);
            }
        }
        //eventLoops中取出一個執行緒
        public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            // simple round robin, improvements to come
            return eventLoops[(int)(n++ % c)];
        }
        ...
        @Override
        public void createWorkers(int number, WorkerCallback callback) {
            int c = cores;
            if (c == 0) {
                for (int i = 0; i < number; i++) {
                    callback.onWorker(i, SHUTDOWN_WORKER);
                }
            } else {
                int index = (int)n % c;
                for (int i = 0; i < number; i++) {
                    callback.onWorker(i, new EventLoopWorker(eventLoops[index]));
                    if (++index == c) {
                        index = 0;
                    }
                }
                n = index;
            }
        }
    }

 在FixedSchedulerPool中建立了一組執行緒,當有任務時就取出其中一個執行緒來處理任務,如果任務超出執行緒數,那麼其他任務就會等待。下面來看schedule方法

    public Worker createWorker() {
        //pool.get返回的就是一個FixedSchedulerPool物件
        return new EventLoopWorker(pool.get().getEventLoop());
    }
    public Disposable schedule(@NonNull Runnable action) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial);
    }

 由於poolWorker是NewThreadWorker的子類,所以就呼叫NewThreadWorker的scheduleActual方法。

    static final class PoolWorker extends NewThreadWorker {
        PoolWorker(ThreadFactory threadFactory) {
            super(threadFactory);
        }
    }
   public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }

 在scheduleActual中也是將任務交給給執行緒執行,由於執行緒最多有MAX_THREADS個,所以當任務超過MAX_THREADS則會等待,直到其他任務執行完畢釋放執行緒。

3、IoScheduler原理解析

 IoScheduler就有點意思,它有對Worker進行了快取,實現了一個池CachedWorkerPoolCachedWorkerPool中有一個佇列ConcurrentLinkedQueue,它會儲存每次使用的執行緒,同時也在CachedWorkerPool中建立一個執行緒來不停的掃描(每隔60s掃描一次),判斷是否有超時的執行緒,如果有就從ConcurrentLinkedQueue中將執行緒移除掉。

   static final class CachedWorkerPool implements Runnable {
        //每個執行緒的存活時間
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                //建立一個新的執行緒,每keepAliveTime(預設是60s)執行一次,掃描是否有超時的執行緒
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

        @Override
        public void run() {
            //每次掃描執行的方法
            evictExpiredWorkers();
        }

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            //判斷佇列是否為空,不為空則從佇列中取出
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }
            //如果沒有快取的執行緒,就重新建立
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
        //當執行緒加入到佇列中
        void release(ThreadWorker threadWorker) {
            // Refresh expire time before putting worker back in pool
            //更新到期時間
            threadWorker.setExpirationTime(now() + keepAliveTime);
            expiringWorkerQueue.offer(threadWorker);
        }
        //如果執行緒超出存活時間就移除掉
        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();
                //遍歷佇列中的所有執行緒
                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }
        //當前時間
        long now() {
            return System.nanoTime();
        }
        //中斷執行緒
        void shutdown() {
            allWorkers.dispose();
            if (evictorTask != null) {
                evictorTask.cancel(true);
            }
            if (evictorService != null) {
                evictorService.shutdownNow();
            }
        }
    }

 瞭解CachedWorkerPool後,IoScheduler基本上就沒什麼難點了,下面再來看createWorker的實現,首先從CachedWorkerPool中取出一個執行緒,然後在交給這個執行緒即可。

    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    //ThreadWorker繼承與NewThreadWorker
    static final class ThreadWorker extends NewThreadWorker {
        //到期時間,到期後該執行緒就會被移除掉
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }
        //返回到期時間
        public long getExpirationTime() {
            return expirationTime;
        }
        //設定到期時間
        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }

 從上面看出ThreadWorker有一個到期時間,如果到期後就會被銷燬,否則繼續存在。

4、NewThreadScheduler原理解析

NewThreadScheduler就比較簡單,直接來一個任務就建立一個執行緒。

    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }

 這裡直接返回NewThreadWorker,這個類比較簡單。

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        //建立一個執行緒,前面已經介紹
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }

    //提交任務給執行緒池,也就是前面說的特殊執行緒
    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try {
            Future<?> f;
            if (delayTime <= 0L) {
                f = executor.submit(task);
            } else {
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }

    //提交任務給執行緒池,也就是前面說的特殊執行緒
    public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        if (period <= 0L) {

            InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
            try {
                Future<?> f;
                if (initialDelay <= 0L) {
                    f = executor.submit(periodicWrapper);
                } else {
                    f = executor.schedule(periodicWrapper, initialDelay, unit);
                }
                periodicWrapper.setFirst(f);
            } catch (RejectedExecutionException ex) {
                RxJavaPlugins.onError(ex);
                return EmptyDisposable.INSTANCE;
            }

            return periodicWrapper;
        }
        ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
        
            
           

相關推薦

RxJava執行切換原理分析

 Scheduler翻譯成中文就是“排程”的意思,在RxJava裡就是執行緒之間的排程,也就是執行緒之間的切換。  從圖中可以簡單看出,SingleScheduler、ComputationScheduler、IoScheduler、NewThreadSchedule

RxJava 執行切換原理

  RxJava的執行緒切換主要涉及到 observeOn(),subscribeOn() 我們來分析一下這兩個方法是怎麼做到切換的。 observeOn()作用於上一個構造好的Observable例項,RxJava設計比較巧妙的地方是,把執行緒切換的操作也封裝成了Ob

RxJava的訊息訂閱和執行切換原理

本文由玉剛說寫作平臺提供寫作贊助,版權歸玉剛說微信公眾號所有 原作者:四月葡萄 版權宣告:未經玉剛說許可,不得以任何形式轉載 1.前言 本文主要是對RxJava的訊息訂閱和執行緒切換進行原始碼分析,相關的使用方式等不作詳細介紹。 本文原始碼基於

RxJava執行切換原理

RxJava在圈子裡越來越火,相信很大的一個原因就是它的執行緒切換。它的執行緒切換可以用優雅來形容,鏈式呼叫,簡單、方便。今天,就讓我們來窺探一下RxJava的執行緒切換原理。本次拆輪子,還是按原樣,通過小例子,研讀RxJava原始碼等來理解整個過程、結構、原理,我們首要的

RxJava2探索-執行切換原理subscribeOn

前言 說起來有點丟人,上週去某公司面試,做足了什麼像java記憶體模型、hashmap原理、設計模式、Android多執行緒、自定義View等等相關的知識準備,然而面試的時候,前面幾個一個沒問!!!自定義view考察了onmeasure和Mnuspace那幾個

java併發程式設計一一執行原理分析(三)

合理的設定執行緒池的大小 接著上一篇探討執行緒留下的尾巴。如果合理的設定執行緒池的大小。 要想合理的配置執行緒池的大小、首先得分析任務的特性,可以從以下幾個角度分析: 1、任務的性質:CPU密集型任務、IO密集型任務、混合型任務等; 2、任務的優先順序:高、中、低; 3、任務的執行時

java併發程式設計一一執行原理分析(二)

2、執行緒池 1、什麼是執行緒池 Java中的執行緒池是運用場景最多的併發框架,幾乎所有需要非同步或併發執行任務的程式都可以使用執行緒池。 在開發工程中,合理的使用執行緒池能夠帶來3個好處。 第一:降低資源的消耗,通過重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗 第二:提

java併發程式設計一一執行原理分析(一)

1、併發包 1、CountDownLatch(計數器) CountDownLatch 類位於 java.util.concurrent 包下,利用它可以實現類似於計數器的功能。 比如有一個任務A,它要等待其他4個任務執行完成之後才能執行,此時就可以利用CountDownLatch

執行原理分析&鎖的深度化

執行緒池 什麼是執行緒池 Java中的線程池是運用場景最多的並發框架,幾乎所有需要非同步或並發執行任務的程式都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來3個好處。第一:降低資源消耗。通過重複利用已創建的線程降低線程創建和銷毀造成的消耗。第二:提高響應速度。當任務

java併發包&執行原理分析&鎖的深度化

  併發包 同步容器類 Vector與ArrayList區別 1.ArrayList是最常用的List實現類,內部是通過陣列實現的,它允許對元素進行快速隨機訪問。陣列的缺點是每個元素之間不能有間隔,當陣列大小不滿足時需要增加儲存能力,就要講已經有陣列的資料

Java多執行系列--“JUC執行池”05 執行原理(四)

概要 本章介紹執行緒池的拒絕策略。內容包括: 拒絕策略介紹 拒絕策略對比和示例 拒絕策略介紹 執行緒池的拒絕策略,是指當任務新增到執行緒池中被拒絕,而採取的處理措施。 當任務新增到執行緒池中之所以被拒絕,可能是由於:第一,執行緒池異常關閉。第二,任務數量

java併發包&執行原理分析&鎖的深度化

將不安全的map集合轉成安全集合 HashMap HashMap = new HashMap<>(); Collections.synchronizedMap(HashMap); concurrentHasMap 開發推薦使用這種map———執行

執行原理分析整理

作業系統的設計,可以歸結為三點: (1)以多程序形式,允許多個任務同時執行; (2)以多執行緒形式,允許單個任務分成不同的部分執行; (3)提供協調機制,一方面防止程序之間和執行緒之間產生衝突,另一方面允許程序之間和執行緒之間共享資源。 多核、多處理器(多CPU)、多執

執行原理分析&鎖的深度化

執行緒池 什麼是執行緒池 Java中的線程池是運用場景最多的並發框架,幾乎所有需要非同步或並發執行任務的程式都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來3個好處。第一:降低資源消耗

深入理解Java執行原理分析與使用(尤其當執行佇列滿了之後事項)

在這裡借花獻佛了,那別人的東西學一學了。在我們的開發中“池”的概念並不罕見,有資料庫連線池、執行緒池、物件池、常量池等等。下面我們主要針對執行緒池來一步一步揭開執行緒池的面紗。使用執行緒池的好處1、降低資源消耗可以重複利用已建立的執行緒降低執行緒建立和銷燬造成的消耗。2、提高響應速度當任務到達時,任務可以不需

Java多執行執行池深入分析

執行緒池是併發包裡面很重要的一部分,在實際情況中也是使用很多的一個重要元件。 下圖描述的是執行緒池API的一部分。廣義上的完整執行緒池可能還包括Thread/Runnable、Timer/TimerTask等部分。這裡只介紹主要的和高階的API以及架構和原理。 大

Java執行實現原理

 併發不一定依賴多執行緒(如PHP中很常見的多程序併發),但在Java裡談併發,大多數都與執行緒脫不了關係。執行緒是一種比程序更輕量級的排程執行單位,執行緒的引入,可以把一個程序的資源分配和排程執行分開,各個執行緒既可以共享程序資源(記憶體地址、檔案I/O等),又可以獨立排程(執行緒是C

【java併發程式設計】執行原理分析及ThreadPoolExecutor原始碼實現

執行緒池簡介:  多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。         假設一個伺服器完成一項任務所需時間為:T1 建立執行緒時間,T2 線上程中執行任務的時間,T3 銷燬執行緒時間。    

基於Netty3的RPC架構筆記3執行模型原始碼分析

      隨著使用者量上升,專案的架構也在不斷的升級,由最開始的MVC的垂直架構(傳統專案)到RPC架構(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服務,又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,

Java併發/多執行-CAS原理分析

[toc] # 什麼是CAS CAS 即 compare and swap,比較並交換。 CAS是一種原子操作,同時 CAS 使用樂觀鎖機制。 J.U.C中的很多功能都是建立在 CAS 之上,各種原子類,其底層都用 CAS來實現原子操作。用來解決併發時的安全問題。 # 併發安全問題 ## 舉一個典