1. 程式人生 > >elasticsearch index 之 engine

elasticsearch index 之 engine

exce query 索引 analyze size public .post blog 存儲

elasticsearch對於索引中的數據操作如讀寫get等接口都封裝在engine中,同時engine還封裝了索引的讀寫控制,如流量、錯誤處理等。engine是離lucene最近的一部分。

engine的實現結構如下所示:

技術分享

engine接口有三個實現類,主要邏輯都在InternalEngine中。ShadowEngine之實現了engine接口的部分讀方法,主要用於對於索引的讀操作。shardFSEngine在InternalEngine的基礎上實現了recovery方法,它的功能跟InternalEngine基本相同只是它的recovery過程有區別,不會對Translog和index進行快照存儲。

Engine類定義了一些index操作的主要方法和內部類,方法如create,index等。內部類如index,delete等。這些方法的實現是在子類中,這些方法的參數是這些內部類。首先看一下它的方法:

 public abstract void create(Create create) throws EngineException;

    public abstract void index(Index index) throws EngineException;

    public abstract void delete(Delete delete) throws EngineException;

    
public abstract void delete(DeleteByQuery delete) throws EngineException;

這些抽象方法都在子類中實現,它們的參數都是一類,這些都是Engine的內部類,這些內部類類似於實體類,沒有相關邏輯只是由很多filed及get方法構成。如Create和Index都繼承自IndexOperation,它們所有信息都存儲到IndexOperation的相關Field中,IndexOperation如下所示:

 public static abstract class IndexingOperation implements Operation {

        
private final DocumentMapper docMapper; private final Term uid; private final ParsedDocument doc; private long version; private final VersionType versionType; private final Origin origin; private final boolean canHaveDuplicates; private final long startTime; private long endTime; ……………… }

無論是Index還是Create,相關數據和配置都在doc中,根據doc和docMapper就能夠獲取本次操作的所有信息,另外的一些字段如version,uid都是在類初始化時構建。這樣傳給實際方法的是一個class,在方法內部根據需求獲取到相應的數據,如index方法的實現:

    private void innerIndex(Index index) throws IOException {
        synchronized (dirtyLock(index.uid())) {
            final long currentVersion;
            VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
            if (versionValue == null) {
                currentVersion = loadCurrentVersionFromIndex(index.uid());
            } else {
                if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
                    currentVersion = Versions.NOT_FOUND; // deleted, and GC
                } else {
                    currentVersion = versionValue.version();
                }
            }

            long updatedVersion;
            long expectedVersion = index.version();
            if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {
                if (index.origin() == Operation.Origin.RECOVERY) {
                    return;
                } else {
                    throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);
                }
            }
            updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);

            index.updateVersion(updatedVersion);
            if (currentVersion == Versions.NOT_FOUND) {
                // document does not exists, we can optimize for create
                index.created(true);
                if (index.docs().size() > 1) {
                    indexWriter.addDocuments(index.docs(), index.analyzer());
                } else {
                    indexWriter.addDocument(index.docs().get(0), index.analyzer());
                }
            } else {
                if (versionValue != null) {
                    index.created(versionValue.delete()); // we have a delete which is not GC‘ed...
                }
                if (index.docs().size() > 1) {
                    indexWriter.updateDocuments(index.uid(), index.docs(), index.analyzer());//獲取IndexOperation中doc中字段更新索引
                } else {
                    indexWriter.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
                }
            }
            Translog.Location translogLocation = translog.add(new Translog.Index(index));//寫translog

            versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));

            indexingService.postIndexUnderLock(index);
        }
    }

這就是Engine中create、index這些方法的實現方式。後面分析索引過程中會有更加詳細說明。Engine中還有獲取索引狀態(元數據)及索引操作的方法如merge。這些方法也是在子類中調用lucene的相關接口,跟create,index,get很類似。因為沒有深入Engine的方法實現,因此這裏的分析比較簡單,後面的分析會涉及這裏面很多方法。

總結:這裏只是從結構上對indexEngine進行了簡單說明,它裏面的方法是es對lucene索引操作方法的封裝,只是增加了一下處理方面的邏輯如寫translog,異常處理等。它的操作對象是shard,es所有對shard的寫操作都是通過Engine來實現,後面的分析會有所體現。

elasticsearch index 之 engine