1. 程式人生 > >netty原始碼閱讀與分析----HashedWheelTimer

netty原始碼閱讀與分析----HashedWheelTimer

netty是一個基礎通訊框架,管理的連線數較多,可能多至百萬級,每一個連線都有或多或少有超時任務,比如傳送資料超時,心跳檢測等。如果為每一個連線都啟動一個Timer,不僅效率低下,而且佔用資源。基於論文Hashed and hierarchical timing wheels: data structures for the efficient implementation of a timer facility提出的定時輪,netty採用這種方式來管理和維護大量的定時任務,實現就在HashedWheelTimer這個類中,其原理如下:

定時輪其實就是一種環型的資料結構,可以把它想象成一個時鐘,分成了許多格子,每個格子代表一定的時間,在這個格子上用一個連結串列來儲存要執行的超時任務,同時有一個指標一格一格的走,走到哪個格子時就執行格子對應的超時任務,超時任務通過一定的規則放入到格子中,如下圖所示:


以上圖為例子,假設一格代表1s,上圖能表示的時間段則為8s,假設當前指標指向3,有一個任務需要3s後執行,那麼這個任務應該放在3+3=6的格子中,如果有一個任務需要6s後執行,那麼這個任務應該放在(3+6)%8=1的格子中。接下來看下netty中的HashedWheelTimer類是具體如何實現這個演算法的,建構函式如下:

public HashedWheelTimer(
            ThreadFactory threadFactory,//用於建立worker執行緒
            long tickDuration, //表示一格的時長,就是多久走一格
        TimeUnit unit, //時間單位
        int ticksPerWheel, //一圈有多少格
        boolean leakDetection,//是否開啟記憶體洩露檢測
            long maxPendingTimeouts
        ) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // Normalize ticksPerWheel to power of two and initialize the wheel.
        wheel = createWheel(ticksPerWheel);//建立定時輪,格數為2的冪次方
        mask = wheel.length - 1;//因為格子數為2的冪次方,此處用於代替%取餘操作,可以提高效率

        // Convert tickDuration to nanos.
        this.tickDuration = unit.toNanos(tickDuration);//轉換成納秒

        // Prevent overflow.
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {//校驗是否存在溢位。即指標轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }
        workerThread = threadFactory.newThread(worker);

        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

        this.maxPendingTimeouts = maxPendingTimeouts;

        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }
接下來看下建立定時輪的方法:
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException(
                    "ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException(
                    "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }

        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);//確保為2的米次方
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }
這裡我們可以看到,一個定時輪就是包含了一個數組(確切的說是環形陣列,對應環形的資料結構),每個元素HashedWheelBucket是一個連結串列,
private static final class HashedWheelBucket {
        // Used for the linked-list datastructure
        private HashedWheelTimeout head;
        private HashedWheelTimeout tail;
	.....
}
接下來看下定時輪的啟動,停止和新增任務,首先是啟動:
public void start() {// 啟動定時輪。這個方法其實在新增定時任務(newTimeout()方法)的時候會自動呼叫此方法,因為如果時間輪里根本沒有定時任務,啟動時間輪也是空耗資源
        
	// 判斷當前時間輪的狀態,如果是初始化,則啟動worker執行緒,啟動整個定時輪;如果已經啟動則略過;如果是已經停止,則報錯。這裡因為可能有多個執行緒爭搶啟動定時輪,所以採用了cas方式的無鎖設計
	switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                startTimeInitialized.await();//等待worker啟動
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }
接下來看下stop:
public Set<Timeout> stop() {
	    // worker執行緒不能停止定時輪,也就是加入的定時任務的執行緒不能呼叫這個方法。防止惡意的定時任務呼叫這個方法造成定時任務失效
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(
                    HashedWheelTimer.class.getSimpleName() +
                            ".stop() cannot be called from " +
                            TimerTask.class.getSimpleName());
        }
	// 嘗試CAS替換當前狀態變為“停止:2”。如果失敗,則當前時間輪的狀態只能是“初始化:0”或者“停止:2”。直接將當前狀態設定為“停止:2“
        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            // workerState can be 0 or 2 at this moment - let it always be 2.
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                INSTANCE_COUNTER.decrementAndGet();
                if (leak != null) {
                    boolean closed = leak.close(this);
                    assert closed;
                }
            }

            return Collections.emptySet();
        }
	
        try {
            boolean interrupted = false;
            while (workerThread.isAlive()) {
                workerThread.interrupt();//中斷worker執行緒
                try {
                    workerThread.join(100);
                } catch (InterruptedException ignored) {
                    interrupted = true;
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } finally {
            INSTANCE_COUNTER.decrementAndGet();
            if (leak != null) {
                boolean closed = leak.close(this);
                assert closed;
            }
        }
        return worker.unprocessedTimeouts();//返回未處理的任務
    }
接下來看下新增任務的方法:
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }

        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }
其實就是包裝成一個HashedWheelTimeout任務物件,然後放入到timeouts佇列中。

從閱讀netty原始碼以及之前disruptor的原始碼後我們可以發現,無鎖設計,juc併發包在框架中的應用非常普遍。