1. 程式人生 > >Zookeeper原始碼分析之持久化(一)

Zookeeper原始碼分析之持久化(一)

一、前言

  持久化對於資料的儲存至關重要,下面進行詳細分析。

二、持久化總體框架

  持久化的類主要在包org.apache.zookeeper.server.persistence下,此次也主要是對其下的類進行分析,其包下總體的類結構如下圖所示。


  · TxnLog,介面型別,讀取事務性日誌的介面。

  · FileTxnLog,實現TxnLog介面,添加了訪問該事務性日誌的API。

  · Snapshot,介面型別,持久層快照介面。

  · FileSnap,實現Snapshot介面,負責儲存、序列化、反序列化、訪問快照。

  · FileTxnSnapLog,封裝了TxnLog和SnapShot。

  · Util,工具類,提供持久化所需的API。

  下面先來分析TxnLog和FileTxnLog的原始碼。

三、TxnLog原始碼分析

  TxnLog是介面,規定了對日誌的響應操作。 
public interface TxnLog {
    
    /**
     * roll the current
     * log being appended to
     * @throws IOException 
     */
    // 回滾日誌
    void rollLog() throws IOException;
    /**
     * Append a request to the transaction log
     * @param hdr the transaction header
     * @param r the transaction itself
     * returns true iff something appended, otw false 
     * @throws IOException
     */
    // 新增一個請求至事務性日誌
    boolean append(TxnHeader hdr, Record r) throws IOException;

    /**
     * Start reading the transaction logs
     * from a given zxid
     * @param zxid
     * @return returns an iterator to read the 
     * next transaction in the logs.
     * @throws IOException
     */
    // 讀取事務性日誌
    TxnIterator read(long zxid) throws IOException;
    
    /**
     * the last zxid of the logged transactions.
     * @return the last zxid of the logged transactions.
     * @throws IOException
     */
    // 事務性操作的最新zxid
    long getLastLoggedZxid() throws IOException;
    
    /**
     * truncate the log to get in sync with the 
     * leader.
     * @param zxid the zxid to truncate at.
     * @throws IOException 
     */
    // 清空日誌,與Leader保持同步
    boolean truncate(long zxid) throws IOException;
    
    /**
     * the dbid for this transaction log. 
     * @return the dbid for this transaction log.
     * @throws IOException
     */
    // 獲取資料庫的id
    long getDbId() throws IOException;
    
    /**
     * commmit the trasaction and make sure
     * they are persisted
     * @throws IOException
     */
    // 提交事務並進行確認
    void commit() throws IOException;
   
    /** 
     * close the transactions logs
     */
    // 關閉事務性日誌
    void close() throws IOException;
    /**
     * an iterating interface for reading 
     * transaction logs. 
     */
    // 讀取事務日誌的迭代器介面
    public interface TxnIterator {
        /**
         * return the transaction header.
         * @return return the transaction header.
         */
        // 獲取事務頭部
        TxnHeader getHeader();
        
        /**
         * return the transaction record.
         * @return return the transaction record.
         */
        // 獲取事務
        Record getTxn();
     
        /**
         * go to the next transaction record.
         * @throws IOException
         */
        // 下個事務
        boolean next() throws IOException;
        
        /**
         * close files and release the 
         * resources
         * @throws IOException
         */
        // 關閉檔案釋放資源
        void close() throws IOException;
    }
}
  其中,TxnLog除了提供讀寫事務日誌的API外,還提供了一個用於讀取日誌的迭代器介面TxnIterator。

四、FileTxnLog原始碼分析

  對於LogFile而言,其格式可分為如下三部分

  LogFile:

    FileHeader TxnList ZeroPad

  FileHeader格式如下  

  FileHeader: {
    magic 4bytes (ZKLG)
    version 4bytes
    dbid 8bytes
  }

  TxnList格式如下

  TxnList:

    Txn || Txn TxnList

  Txn格式如下

  Txn:

    checksum Txnlen TxnHeader Record 0x42

  Txnlen格式如下

  Txnlen:
    len 4bytes

  TxnHeader格式如下

  TxnHeader: {

    sessionid 8bytes
    cxid 4bytes
    zxid 8bytes
    time 8bytes
    type 4bytes
  }

  ZeroPad格式如下

  ZeroPad:
    0 padded to EOF (filled during preallocation stage)

  瞭解LogFile的格式對於理解原始碼會有很大的幫助。

  4.1 屬性  
public class FileTxnLog implements TxnLog {
    private static final Logger LOG;
    
    // 預分配大小 64M
    static long preAllocSize =  65536 * 1024;
    
    // 魔術數字,預設為1514884167
    public final static int TXNLOG_MAGIC =
        ByteBuffer.wrap("ZKLG".getBytes()).getInt();

    // 版本號
    public final static int VERSION = 2;

    /** Maximum time we allow for elapsed fsync before WARNing */
    // 進行同步時,發出warn之前所能等待的最長時間
    private final static long fsyncWarningThresholdMS;

    // 靜態屬性,確定Logger、預分配空間大小和最長時間
    static {
        LOG = LoggerFactory.getLogger(FileTxnLog.class);

        String size = System.getProperty("zookeeper.preAllocSize");
        if (size != null) {
            try {
                preAllocSize = Long.parseLong(size) * 1024;
            } catch (NumberFormatException e) {
                LOG.warn(size + " is not a valid value for preAllocSize");
            }
        }
        fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
    }
    
    // 最大(新)的zxid
    long lastZxidSeen;
    // 儲存資料相關的流
    volatile BufferedOutputStream logStream = null;
    volatile OutputArchive oa;
    volatile FileOutputStream fos = null;

    // log目錄檔案
    File logDir;
    
    // 是否強制同步
    private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");;
    
    // 資料庫id
    long dbId;
    
    // 流列表
    private LinkedList<FileOutputStream> streamsToFlush =
        new LinkedList<FileOutputStream>();
    
    // 當前大小
    long currentSize;
    // 寫日誌檔案
    File logFileWrite = null;
}
  4.2. 核心函式 

  1. append函式
    public synchronized boolean append(TxnHeader hdr, Record txn)
        throws IOException
    {
        if (hdr != null) { // 事務頭部不為空
            if (hdr.getZxid() <= lastZxidSeen) { // 事務的zxid小於等於最後的zxid
                LOG.warn("Current zxid " + hdr.getZxid()
                        + " is <= " + lastZxidSeen + " for "
                        + hdr.getType());
            }
            if (logStream==null) { // 日誌流為空
               if(LOG.isInfoEnabled()){
                    LOG.info("Creating new log file: log." +  
                            Long.toHexString(hdr.getZxid()));
               }
               
               // 
               logFileWrite = new File(logDir, ("log." + 
                       Long.toHexString(hdr.getZxid())));
               fos = new FileOutputStream(logFileWrite);
               logStream=new BufferedOutputStream(fos);
               oa = BinaryOutputArchive.getArchive(logStream);
               // 
               FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
               // 序列化
               fhdr.serialize(oa, "fileheader");
               // Make sure that the magic number is written before padding.
               // 重新整理到磁碟
               logStream.flush();
               
               // 當前通道的大小
               currentSize = fos.getChannel().position();
               // 新增fos
               streamsToFlush.add(fos);
            }
            
            // 填充檔案
            padFile(fos);
            
            // Serializes transaction header and transaction data into a byte buffer.
            // 將事務頭和事務資料序列化成Byte Buffer
            byte[] buf = Util.marshallTxnEntry(hdr, txn);
            if (buf == null || buf.length == 0) { // 為空,丟擲異常
                throw new IOException("Faulty serialization for header " +
                        "and txn");
            }
            // 生成一個驗證演算法
            Checksum crc = makeChecksumAlgorithm();
            // Updates the current checksum with the specified array of bytes
            // 使用Byte陣列來更新當前的Checksum
            crc.update(buf, 0, buf.length);
            // 寫long型別資料
            oa.writeLong(crc.getValue(), "txnEntryCRC");
            // Write the serialized transaction record to the output archive.
            // 將序列化的事務記錄寫入OutputArchive
            Util.writeTxnBytes(oa, buf);
            
            return true;
        }
        return false;
    }
  說明:append函式主要用做向事務日誌中新增一個條目,其大體步驟如下

  ① 檢查TxnHeader是否為空,若不為空,則進入②,否則,直接返回false

  ② 檢查logStream是否為空(初始化為空),若不為空,則進入③,否則,進入⑤

  ③ 初始化寫資料相關的流和FileHeader,並序列化FileHeader至指定檔案,進入④

  ④ 強制重新整理(保證資料存到磁碟),並獲取當前寫入資料的大小。進入⑤

  ⑤ 填充資料,填充0,進入⑥

  ⑥ 將事務頭和事務序列化成ByteBuffer(使用Util.marshallTxnEntry函式),進入⑦

  ⑦ 使用Checksum演算法更新步驟⑥的ByteBuffer。進入⑧

  ⑧ 將更新的ByteBuffer寫入磁碟檔案,返回true

  append間接呼叫了padLog函式,其原始碼如下 
    public static long padLogFile(FileOutputStream f,long currentSize,
            long preAllocSize) throws IOException{
        // 獲取位置
        long position = f.getChannel().position();
        if (position + 4096 >= currentSize) { // 計算後是否大於當前大小
            // 重新設定當前大小,剩餘部分填充0
            currentSize = currentSize + preAllocSize;
            fill.position(0);
            f.getChannel().write(fill, currentSize-fill.remaining());
        }
        return currentSize;
    }
  說明:其主要作用是當檔案大小不滿64MB時,向檔案填充0以達到64MB大小。

  2. getLogFiles函式  
   public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
        // 按照zxid對檔案進行排序
        List<File> files = Util.sortDataDir(logDirList, "log", true);
        long logZxid = 0;
        // Find the log file that starts before or at the same time as the
        // zxid of the snapshot
        for (File f : files) { // 遍歷檔案
            // 從檔案中獲取zxid
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid > snapshotZxid) { // 跳過大於snapshotZxid的檔案
                continue;
            }
            // the files
            // are sorted with zxid's
            if (fzxid > logZxid) { // 找出檔案中最大的zxid(同時還需要小於等於snapshotZxid)
                logZxid = fzxid;
            }
        }
        // 檔案列表
        List<File> v=new ArrayList<File>(5);
        for (File f : files) { // 再次遍歷檔案
            // 從檔案中獲取zxid
            long fzxid = Util.getZxidFromName(f.getName(), "log");
            if (fzxid < logZxid) { // 跳過小於logZxid的檔案
                continue;
            }
            // 新增
            v.add(f);
        }
        // 轉化成File[] 型別後返回
        return v.toArray(new File[0]);

    }
  說明:該函式的作用是找出剛剛小於或者等於snapshot的所有log檔案。其步驟大致如下。

  ① 對所有log檔案按照zxid進行升序排序,進入②

  ② 遍歷所有log檔案並記錄剛剛小於或等於給定snapshotZxid的log檔案的logZxid,進入③

  ③ 再次遍歷log檔案,新增zxid大於等於步驟②中的logZxid的所有log檔案,進入④

  ④ 轉化後返回

  getLogFiles函式呼叫了sortDataDir,其原始碼如下:
public static List<File> sortDataDir(File[] files, String prefix, boolean ascending)
    {
        if(files==null) 
            return new ArrayList<File>(0);
        // 轉化為列表
        List<File> filelist = Arrays.asList(files);
        // 進行排序,Comparator是關鍵,根據zxid進行排序
        Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
        return filelist;
    }
  說明:其用於排序log檔案,可以選擇根據zxid進行升序或降序。

  getLogFiles函式間接呼叫了getZxidFromName,其原始碼如下: 
    // 從檔名中解析出zxid
    public static long getZxidFromName(String name, String prefix) {
        long zxid = -1;
        // 對檔名進行分割
        String nameParts[] = name.split("\\.");
        if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 字首相同
            try {
                // 轉化成長整形
                zxid = Long.parseLong(nameParts[1], 16);
            } catch (NumberFormatException e) {
            }
        }
        return zxid;
    }
  說明:getZxidFromName主要用作從檔名中解析zxid,並且需要從指定的字首開始。

  3. getLastLoggedZxid函式 
    public long getLastLoggedZxid() {
        // 獲取已排好序的所有的log檔案
        File[] files = getLogFiles(logDir.listFiles(), 0);
        // 獲取最大的zxid(最後一個log檔案對應的zxid)
        long maxLog=files.length>0?
                Util.getZxidFromName(files[files.length-1].getName(),"log"):-1;

        // if a log file is more recent we must scan it to find
        // the highest zxid
        // 
        long zxid = maxLog;
        // 迭代器
        TxnIterator itr = null;
        try {
            // 新生FileTxnLog
            FileTxnLog txn = new FileTxnLog(logDir);
            // 開始讀取從給定zxid之後的所有事務
            itr = txn.read(maxLog);
            while (true) { // 遍歷
                if(!itr.next()) // 是否存在下一項
                    break;
                // 獲取事務頭
                TxnHeader hdr = itr.getHeader();
                // 獲取zxid
                zxid = hdr.getZxid();
            }
        } catch (IOException e) {
            LOG.warn("Unexpected exception", e);
        } finally {
            // 關閉迭代器
            close(itr);
        }
        return zxid;
    }
  說明:該函式主要用於獲取記錄在log中的最後一個zxid。其步驟大致如下

  ① 獲取已排好序的所有log檔案,並從最後一個檔案中取出zxid作為候選的最大zxid,進入②

  ② 新生成FileTxnLog並讀取步驟①中zxid之後的所有事務,進入③

  ③ 遍歷所有事務並提取出相應的zxid,最後返回。

  其中getLastLoggedZxid呼叫了read函式,其原始碼如下 
public TxnIterator read(long zxid) throws IOException {
        // 返回事務檔案訪問迭代器
        return new FileTxnIterator(logDir, zxid);
    }
  說明:read函式會生成一個FileTxnIterator,其是TxnLog.TxnIterator的子類,之後在FileTxnIterator建構函式中會呼叫init函式,其原始碼如下 
    void init() throws IOException {
        // 新生成檔案列表
        storedFiles = new ArrayList<File>();
        // 進行排序
        List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
        for (File f: files) { // 遍歷檔案
            if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 新增zxid大於等於指定zxid的檔案
                storedFiles.add(f);
            }
            // add the last logfile that is less than the zxid
            else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只新增一個zxid小於指定zxid的檔案,然後退出
                storedFiles.add(f);
                break;
            }
        }
        // go to the next logfile
        // 進入下一個log檔案
        goToNextLog();
        if (!next()) // 不存在下一項,返回
            return;
        while (hdr.getZxid() < zxid) { // 從事務頭中獲取zxid小於給定zxid,直到不存在下一項或者大於給定zxid時退出
            if (!next())
                return;
        }
    }
  說明:init函式用於進行初始化操作,會根據zxid的不同進行不同的初始化操作,在init函式中會呼叫goToNextLog函式,其原始碼如下  
    private boolean goToNextLog() throws IOException {
        if (storedFiles.size() > 0) { // 儲存的檔案列表大於0
            // 取最後一個log檔案
            this.logFile = storedFiles.remove(storedFiles.size()-1);
            // 針對該檔案,建立InputArchive
            ia = createInputArchive(this.logFile);
            // 返回true
            return true;
        }
        return false;
    }
  說明:goToNextLog表示選取下一個log檔案,在init函式中還呼叫了next函式,其原始碼如下  
    public boolean next() throws IOException {
        if (ia == null) { // 為空,返回false
            return false;
        }
        try {
            // 讀取長整形crcValue
            long crcValue = ia.readLong("crcvalue");
            // 通過input archive讀取一個事務條目
            byte[] bytes = Util.readTxnBytes(ia);
            // Since we preallocate, we define EOF to be an
            if (bytes == null || bytes.length==0) { // 對bytes進行判斷
                throw new EOFException("Failed to read " + logFile);
            }
            // EOF or corrupted record
            // validate CRC
            // 驗證CRC
            Checksum crc = makeChecksumAlgorithm();
            // 更新
            crc.update(bytes, 0, bytes.length);
            if (crcValue != crc.getValue()) // 驗證不相等,丟擲異常
                throw new IOException(CRC_ERROR);
            if (bytes == null || bytes.length == 0) // bytes為空,返回false
                return false;
            // 新生成TxnHeader
            hdr = new TxnHeader();
            // 將Txn反序列化,並且將對應的TxnHeader反序列化至hdr,整個Record反序列化至record
            record = SerializeUtils.deserializeTxn(bytes, hdr);
        } catch (EOFException e) { // 丟擲異常
            LOG.debug("EOF excepton " + e);
            // 關閉輸入流
            inputStream.close();
            // 賦值為null
            inputStream = null;
            ia = null;
            hdr = null;
            // this means that the file has ended
            // we should go to the next file
            if (!goToNextLog()) { // 沒有log檔案,則返回false
                return false;
            }
            // if we went to the next log file, we should call next() again
            // 繼續呼叫next
            return next();
        } catch (IOException e) {
            inputStream.close();
            throw e;
        }
        // 返回true
        return true;
    }
  說明:next表示將迭代器移動至下一個事務,方便讀取,next函式的步驟如下。

  ① 讀取事務的crcValue值,用於後續的驗證,進入②

  ② 讀取事務,使用CRC32進行更新並與①中的結果進行比對,若不相同,則丟擲異常,否則,進入③

  ③ 將事務進行反序列化並儲存至相應的屬性中(如事務頭和事務體),會確定具體的事務操作型別。

  ④ 在讀取過程丟擲異常時,會首先關閉流,然後再嘗試呼叫next函式(即進入下一個事務進行讀取)。

  4. commit函式 
    public synchronized void commit() throws IOException {
        if (logStream != null) {
            // 強制刷到磁碟
            logStream.flush();
        }
        for (FileOutputStream log : streamsToFlush) { // 遍歷流
            // 強制刷到磁碟
            log.flush();
            if (forceSync) { // 是否強制同步
                long startSyncNS = System.nanoTime();
                
                log.getChannel().force(false);
                // 計算流式的時間
                long syncElapsedMS =
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
                if (syncElapsedMS > fsyncWarningThresholdMS) { // 大於閾值時則會警告
                    LOG.warn("fsync-ing the write ahead log in "
                            + Thread.currentThread().getName()
                            + " took " + syncElapsedMS
                            + "ms which will adversely effect operation latency. "
                            + "See the ZooKeeper troubleshooting guide");
                }
            }
        }
        while (streamsToFlush.size() > 1) { // 移除流並關閉
            streamsToFlush.removeFirst().close();
        }
    }
  說明:該函式主要用於提交事務日誌至磁碟,其大致步驟如下

  ① 若日誌流logStream不為空,則強制重新整理至磁碟,進入②

  ② 遍歷需要重新整理至磁碟的所有流streamsToFlush並進行重新整理,進入③

  ③ 判斷是否需要強制性同步,如是,則計算每個流的流式時間並在控制檯給出警告,進入④

  ④ 移除所有流並關閉。

  5. truncate函式 
    public boolean truncate(long zxid) throws IOException {
        FileTxnIterator itr = null;
        try {
            // 獲取迭代器
            itr = new FileTxnIterator(this.logDir, zxid);
            PositionInputStream input = itr.inputStream;
            long pos = input.getPosition();
            // now, truncate at the current position
            // 從當前位置開始清空
            RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
            raf.setLength(pos);
            raf.close();
            while (itr.goToNextLog()) { // 存在下一個log檔案
                if (!itr.logFile.delete()) { // 刪除
                    LOG.warn("Unable to truncate {}", itr.logFile);
                }
            }
        } finally {
            // 關閉迭代器
            close(itr);
        }
        return true;
    }
  說明:該函式用於清空大於給定zxid的所有事務日誌。