1. 程式人生 > >[kafka掃盲]---(7)kafka原始碼閱讀之生產者客戶端緩衝池

[kafka掃盲]---(7)kafka原始碼閱讀之生產者客戶端緩衝池

Author:趙志乾
Date:2018-10-21
Declaration:All Right Reserved!!!

BufferPool.java

1、檔案位置:

該檔案在原始碼中的位置:kafka-2.0.0-src/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java,在IDEA匯入的工程中,位於專案main3下。

2、緩衝池

池化的目的是降低建立和銷燬時間,提升執行效率,即將原來的建立和銷燬時間降為從池中獲取和歸還入池的時間。在kafka生產者客戶端中,緩衝池中的記憶體可以分為3部分:正在使用中的記憶體、空閒的池化頁列表、空閒的非池化頁記憶體。其中赤化頁列表中的各頁大小在緩衝池建立時進行配置,而且緩衝池3個部分之間會不斷的轉化。

緩衝池各部分間的轉化:

· 空閒池化頁轉為正在使用中的記憶體:當有執行緒需要分配一塊兒緩衝空間時,如果要分配的緩衝空間大小恰巧等於池化頁大小且空閒池化頁列表非空,則取出一頁進行分配,此時空閒池化頁轉化為正在使用中的記憶體;

·空閒池化頁轉化為空閒的非池化頁記憶體:當有執行緒需要分配一塊兒緩衝空間時,如果要分配的緩衝空間大小不等池化頁大小且“空閒非池化頁記憶體<需分配緩衝空間<=空閒非池化頁+空閒池化頁”,則將部分空閒池化頁歸併到空閒非池化頁記憶體,使得空閒非池化頁記憶體不小於要分配的緩衝空間;

·空閒非池化頁空間轉化為正在使用中的記憶體:當有執行緒需要分配一塊兒緩衝空間時,如果要分配的緩衝空間大小不等於池化頁大小且小於空閒非池化頁大小時,從空閒非池化頁中分配緩衝空間,從而轉化為正在使用中的記憶體;

·正在使用中的記憶體轉化為空閒池化頁:如果緩衝區要退還給緩衝池,且緩衝區大小等於池化頁大小,則將其退還至空閒池化頁列表;

·正在使用中的記憶體轉化為空閒非池化頁:如果緩衝區要退還給緩衝池,且不滿足退還至空閒池化頁列表的條件,則將其退還至空閒非池化頁空間;

3、緩衝池各部分轉化示意圖

4、程式碼邏輯見註釋

package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.utils.Time;

/*該類的例項用於維護位元組緩衝池,其一些屬性欄位需要由生產者依據實際場景需求進行指定。*/
public class BufferPool {

    static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";
    //緩衝池總記憶體
    private final long totalMemory;
    //池化頁大小
    private final int poolableSize;
    //重入鎖
    private final ReentrantLock lock;
    //空閒的池化頁佇列
    private final Deque<ByteBuffer> free;
    //排隊的執行緒
    private final Deque<Condition> waiters;
    //非池化頁可用記憶體
    private long nonPooledAvailableMemory;
    private final Metrics metrics;
    private final Time time;
    //等待時間
    private final Sensor waitTime;

    /*建立一個新的緩衝池,所需引數包括:可分配的最大記憶體、池化位元組緩衝大小(free列表中一項的大
小)、例項的一些度量、時間例項、度量的邏輯組名稱*/
    public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
        this.poolableSize = poolableSize;
        this.lock = new ReentrantLock();
        this.free = new ArrayDeque<>();
        this.waiters = new ArrayDeque<>();
        this.totalMemory = memory;
        this.nonPooledAvailableMemory = memory;
        this.metrics = metrics;
        this.time = time;
        this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
        MetricName rateMetricName = metrics.metricName("bufferpool-wait-ratio",
                                                   metricGrpName,
                                                   "The fraction of time an appender waits for space allocation.");
        MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total",
                                                   metricGrpName,
                                                   "The total time an appender waits for space allocation.");
        this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
    }

    /*分配給定大小的緩衝。如果緩衝池被配置未阻塞模式,其沒有足夠記憶體可供分配時,該方法會被阻塞。
所需引數:要分配的緩衝大小(單位為位元組)、阻塞最長時間(單位為毫秒)。*/
    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        //如果要分配的緩衝大小超過緩衝池總記憶體,則會丟擲不合法引數異常。因為永遠不會成功。
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");
        //定義返回引用
        ByteBuffer buffer = null;
        //加鎖
        this.lock.lock();
        try {
            // 如果要分配的緩衝大小等於池化項大小,且空閒池化列表中有可用快取
            if (size == poolableSize && !this.free.isEmpty())
                則直接使用空閒列表張的緩衝
                return this.free.pollFirst();

            //如果當前可用記憶體不小於要分配的緩衝大小
            int freeListSize = freeSize() * this.poolableSize;
            if (this.nonPooledAvailableMemory + freeListSize >= size) {
                /*則儘可能讓未池化記憶體滿足分配條件,當未池化記憶體不夠用時,可通過將空閒的池化緩衝
歸併到未池化記憶體*/
                freeUp(size);
                //扣除要分配的記憶體
                this.nonPooledAvailableMemory -= size;
            } else {
                //當前可用記憶體不能滿足分配條件
                int accumulated = 0;
                Condition moreMemory = this.lock.newCondition();
                try {
                    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                    //將其加入等待佇列
                    this.waiters.addLast(moreMemory);
                    //迴圈直至有足夠空間進行分配
                    while (accumulated < size) {
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                        } finally {
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            this.waitTime.record(timeNs, time.milliseconds());
                        }

                        if (waitingTimeElapsed) {
                            throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                        }

                        remainingTimeToBlockNs -= timeNs;

                        //如果沒有迭代累計、待分配緩衝空間大小等於池化頁大小且空閒池化頁列表不為空
                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            //則從空閒池化頁列表進行分配
                            buffer = this.free.pollFirst();
                            accumulated = size;
                        } else {
                            //否則從非池化頁空間分配當前迭代能夠分配的空間大小
                            freeUp(size - accumulated);
                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                            //進行本次迭代分配空間的扣除
                            this.nonPooledAvailableMemory -= got;
                            //更新累計分配空間值
                            accumulated += got;
                        }
                    }                    
                    accumulated = 0;
                } finally {
                    //迴圈過程中失敗,則退還累計分配的空間
                    this.nonPooledAvailableMemory += accumulated;
                    //移除
                    this.waiters.remove(moreMemory);
                }
            }
        } finally {            
            try {
                //如果有可用空間且排隊執行緒不為空,則通知其他執行緒
                if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();
            } finally {
                
                lock.unlock();
            }
        }
        //如果緩衝分配的不是池化頁
        if (buffer == null)
            //則進行非池化頁空間的實際分配
            return safeAllocateByteBuffer(size);
        else
            return buffer;
    }

    /*分配緩衝。如果分配失敗,則將退還分配的數量,並通知下一個排隊執行緒。*/
    private ByteBuffer safeAllocateByteBuffer(int size) {
        //預設分配失敗
        boolean error = true;
        try {
            //分配緩衝
            ByteBuffer buffer = allocateByteBuffer(size);
            //成功分配,重新整理標誌位,並返回
            error = false;
            return buffer;
        } finally {
            //分配失敗
            if (error) {
                //加鎖
                this.lock.lock();
                try {
                    //退還分配的緩衝
                    this.nonPooledAvailableMemory += size;
                    //如果存在其他排隊執行緒
                    if (!this.waiters.isEmpty())
                        //則通知下一個排隊執行緒進行緩衝分配
                        this.waiters.peekFirst().signal();
                } finally {
                    //釋放鎖
                    this.lock.unlock();
                }
            }
        }
    }

    //該方法用於測試
    protected ByteBuffer allocateByteBuffer(int size) {
        return ByteBuffer.allocate(size);
    }

    /*通過將空閒的池化空間歸入未池化可用空間,來儘可能滿足請求分配的緩衝大小*/
    private void freeUp(int size) {
        //如果要分配的緩衝大小大於未池化可用記憶體,且池化空閒列表有可用記憶體
        while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
            //則循序將池化空間歸於未池化空間
            this.nonPooledAvailableMemory += this.free.pollLast().capacity();
    }

    //緩衝空間退還至緩衝池
    public void deallocate(ByteBuffer buffer, int size) {
        //加鎖
        lock.lock();
        try {
            //如果要退還的緩衝空間大小等於池化頁大小,且位元組緩衝空間容量等於池化頁大小
            if (size == this.poolableSize && size == buffer.capacity()) {
                //則清除緩衝空間內容
                buffer.clear();
                //將緩衝空間加入空閒池化頁列表
                this.free.add(buffer);
            } else {
                //否則將其歸還至非池化頁記憶體
                this.nonPooledAvailableMemory += size;
            }
            //如果有其他執行緒因可用記憶體不足在排隊,則進行通知
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

    //緩衝退還入池
    public void deallocate(ByteBuffer buffer) {
        deallocate(buffer, buffer.capacity());
    }

    //獲取可用記憶體=未池化的可用記憶體+池化的可用記憶體
    public long availableMemory() {
        lock.lock();
        try {
            return this.nonPooledAvailableMemory + freeSize() * (long) this.poolableSize;
        } finally {
            lock.unlock();
        }
    }

    // 用於測試 
    protected int freeSize() {
        return this.free.size();
    }

    //獲取未被使用的記憶體
    public long unallocatedMemory() {
        lock.lock();
        try {
            return this.nonPooledAvailableMemory;
        } finally {
            lock.unlock();
        }
    }

    //獲取因獲取記憶體而被阻塞等待的執行緒數
    public int queued() {
        lock.lock();
        try {
            return this.waiters.size();
        } finally {
            lock.unlock();
        }
    }

    //池化的空閒緩衝列表單項大小
    public int poolableSize() {
        return this.poolableSize;
    }

    //返回緩衝池管理的總記憶體
    public long totalMemory() {
        return this.totalMemory;
    }

    // 用於測試
    Deque<Condition> waiters() {
        return this.waiters;
    }
}