/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.index.engine;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.IngestionSource;
import org.opensearch.common.Nullable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.common.Strings;
import org.opensearch.index.IngestionConsumerFactory;
import org.opensearch.index.IngestionShardConsumer;
import org.opensearch.index.IngestionShardPointer;
import org.opensearch.index.VersionType;
import org.opensearch.index.engine.DeleteVersionValue;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineCreationFailureException;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.IndexVersionValue;
import org.opensearch.index.engine.IngestionEngineException;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.engine.VersionValue;
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.translog.NoOpTranslogManager;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.index.translog.listener.CompositeTranslogEventListener;
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.IngestionSettings;
import org.opensearch.indices.pollingingest.PollingIngestStats;
import org.opensearch.indices.pollingingest.StreamPoller;

public class IngestionEngine
extends InternalEngine {
    private StreamPoller streamPoller;
    private final IngestionConsumerFactory ingestionConsumerFactory;
    private final DocumentMapperForType documentMapperForType;

    public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) {
        super(engineConfig);
        this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
        this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
        this.registerDynamicIndexSettingsHandlers();
    }

    public void start() {
        this.initializeStreamPoller(null, null, null);
    }

    private void initializeStreamPoller(@Nullable StreamPoller.ResetState resetStateOverride, @Nullable String resetValueOverride, @Nullable IngestionShardPointer startPointerOverride) {
        boolean forceResetPoller;
        IndexMetadata indexMetadata = this.engineConfig.getIndexSettings().getIndexMetadata();
        assert (indexMetadata != null);
        IngestionSource ingestionSource = Objects.requireNonNull(indexMetadata.getIngestionSource());
        this.ingestionConsumerFactory.initialize(ingestionSource.params());
        String clientId = this.engineConfig.getIndexSettings().getNodeName() + "-" + this.engineConfig.getIndexSettings().getIndex().getName() + "-" + this.engineConfig.getShardId().getId();
        Object ingestionShardConsumer = this.ingestionConsumerFactory.createShardConsumer(clientId, this.engineConfig.getShardId().getId());
        this.logger.info("created ingestion consumer for shard [{}]", (Object)this.engineConfig.getShardId());
        Map<String, String> commitData = IngestionEngine.commitDataAsMap(this.indexWriter);
        StreamPoller.ResetState resetState = ingestionSource.getPointerInitReset().getType();
        String resetValue = ingestionSource.getPointerInitReset().getValue();
        IngestionShardPointer startPointer = null;
        HashSet<IngestionShardPointer> persistedPointers = new HashSet();
        boolean bl = forceResetPoller = resetStateOverride != null && !Strings.isNullOrEmpty((String)resetValueOverride) && startPointerOverride != null;
        if (commitData.containsKey("batch_start") || forceResetPoller) {
            if (forceResetPoller) {
                startPointer = startPointerOverride;
            } else {
                String batchStartStr = commitData.get("batch_start");
                startPointer = this.ingestionConsumerFactory.parsePointerFromString(batchStartStr);
                resetState = StreamPoller.ResetState.NONE;
            }
            try (Engine.Searcher searcher = this.acquireSearcher("restore_offset", Engine.SearcherScope.INTERNAL);){
                persistedPointers = this.fetchPersistedOffsets(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()), startPointer);
                this.logger.debug("recovered persisted pointers: {}", persistedPointers);
            }
            catch (IOException e) {
                throw new EngineCreationFailureException(this.config().getShardId(), "failed to restore offset", e);
            }
        }
        if (forceResetPoller) {
            resetState = resetStateOverride;
            resetValue = resetValueOverride;
        }
        IngestionErrorStrategy ingestionErrorStrategy = IngestionErrorStrategy.create(ingestionSource.getErrorStrategy(), ingestionSource.getType());
        StreamPoller.State initialPollerState = indexMetadata.getIngestionStatus().isPaused() ? StreamPoller.State.PAUSED : StreamPoller.State.NONE;
        this.streamPoller = new DefaultStreamPoller(startPointer, (Set<IngestionShardPointer>)persistedPointers, (IngestionShardConsumer)ingestionShardConsumer, this, resetState, resetValue, ingestionErrorStrategy, initialPollerState, ingestionSource.getMaxPollSize(), ingestionSource.getPollTimeout(), ingestionSource.getNumProcessorThreads(), ingestionSource.getBlockingQueueSize());
        this.registerStreamPollerListener();
        this.streamPoller.start();
    }

    private void registerStreamPollerListener() {
        if (this.engineConfig.getClusterApplierService() != null) {
            this.engineConfig.getClusterApplierService().addListener(this.streamPoller);
            boolean isWriteBlockEnabled = this.engineConfig.getClusterApplierService().state().blocks().indexBlocked(ClusterBlockLevel.WRITE, this.engineConfig.getIndexSettings().getIndex().getName());
            this.streamPoller.setWriteBlockEnabled(isWriteBlockEnabled);
        }
    }

    private void unregisterStreamPollerListener() {
        if (this.engineConfig.getClusterApplierService() != null) {
            this.engineConfig.getClusterApplierService().removeListener(this.streamPoller);
        }
    }

    protected Set<IngestionShardPointer> fetchPersistedOffsets(DirectoryReader directoryReader, IngestionShardPointer batchStart) throws IOException {
        IndexSearcher searcher = new IndexSearcher((IndexReader)directoryReader);
        searcher.setQueryCache(null);
        Query query = batchStart.newRangeQueryGreaterThan("_offset");
        TopDocs topDocs = searcher.search(query, Integer.MAX_VALUE);
        HashSet<IngestionShardPointer> result = new HashSet<IngestionShardPointer>();
        StoredFields storedFields = searcher.getIndexReader().storedFields();
        for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
            Document doc = storedFields.document(scoreDoc.doc);
            String valueStr = doc.get("_offset");
            Object value = this.ingestionConsumerFactory.parsePointerFromString(valueStr);
            result.add((IngestionShardPointer)value);
        }
        this.refresh("restore_offset", Engine.SearcherScope.INTERNAL, true);
        return result;
    }

    @Override
    public Engine.IndexResult index(Engine.Index index) throws IOException {
        throw new IngestionEngineException("push-based indexing is not supported in ingestion engine, use streaming source instead");
    }

    public void indexInternal(Engine.Index index, boolean isCreateMode) throws IOException {
        assert (Objects.equals(index.uid().field(), "_id")) : index.uid().field();
        try (ReleasableLock releasableLock1 = this.readLock.acquire();
             Releasable releasableLock2 = this.versionMap.acquireLock(index.uid().bytes());){
            boolean isExternalVersioning;
            this.ensureOpen();
            this.lastWriteNanos = index.startTime();
            boolean bl = isExternalVersioning = index.versionType() == VersionType.EXTERNAL;
            if (index.getAutoGeneratedIdTimestamp() == -1L) {
                this.validateDocumentVersion(index);
            }
            if (isExternalVersioning) {
                index.parsedDoc().version().setLongValue(index.version());
            }
            Engine.IndexResult indexResult = this.indexIntoLucene(index, isCreateMode);
            if (isExternalVersioning && indexResult.getResultType() == Engine.Result.Type.SUCCESS) {
                this.versionMap.maybePutIndexUnderLock(index.uid().bytes(), new IndexVersionValue(Translog.EMPTY_TRANSLOG_LOCATION, index.version(), index.seqNo(), index.primaryTerm()));
            }
        }
        catch (VersionConflictEngineException e) {
            this.logger.debug("Version conflict encountered when processing index operation", (Throwable)((Object)e));
            throw e;
        }
        catch (IOException | RuntimeException e) {
            try {
                this.maybeFailEngine("index", e);
            }
            catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw e;
        }
    }

    private Engine.IndexResult indexIntoLucene(Engine.Index index, boolean isCreateMode) throws IOException {
        if (isCreateMode || index.getAutoGeneratedIdTimestamp() != -1L) {
            this.addDocs(index.docs(), this.indexWriter);
        } else {
            this.updateDocs(index.uid(), index.docs(), this.indexWriter);
        }
        return new Engine.IndexResult(index.version(), index.primaryTerm(), index.seqNo(), true);
    }

    private void addDocs(List<ParseContext.Document> docs, IndexWriter indexWriter) throws IOException {
        if (docs.size() > 1) {
            indexWriter.addDocuments(docs);
        } else {
            indexWriter.addDocument((Iterable)docs.get(0));
        }
    }

    private void updateDocs(Term uid, List<ParseContext.Document> docs, IndexWriter indexWriter) throws IOException {
        if (docs.size() > 1) {
            indexWriter.softUpdateDocuments(uid, docs, new Field[]{this.softDeletesField});
        } else {
            indexWriter.softUpdateDocument(uid, (Iterable)docs.get(0), new Field[]{this.softDeletesField});
        }
    }

    @Override
    public Engine.DeleteResult delete(Engine.Delete delete) throws IOException {
        throw new IngestionEngineException("push-based deletion is not supported in ingestion engine, use streaming source instead");
    }

    public void deleteInternal(Engine.Delete delete) throws IOException {
        this.versionMap.enforceSafeAccess();
        assert (Objects.equals(delete.uid().field(), "_id")) : delete.uid().field();
        this.lastWriteNanos = delete.startTime();
        try (ReleasableLock releasableLock1 = this.readLock.acquire();
             Releasable releasableLock2 = this.versionMap.acquireLock(delete.uid().bytes());){
            boolean isExternalVersioning;
            this.ensureOpen();
            this.validateDocumentVersion(delete);
            ParsedDocument tombstone = this.engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());
            boolean bl = isExternalVersioning = delete.versionType() == VersionType.EXTERNAL;
            if (isExternalVersioning) {
                tombstone.version().setLongValue(delete.version());
            }
            assert (tombstone.docs().size() == 1) : "Tombstone doc should have single doc [" + String.valueOf(tombstone) + "]";
            ParseContext.Document doc = tombstone.docs().get(0);
            assert (doc.getField("_tombstone") != null) : "Delete tombstone document but _tombstone field is not set [" + String.valueOf(doc) + " ]";
            doc.add((IndexableField)this.softDeletesField);
            this.indexWriter.softUpdateDocument(delete.uid(), (Iterable)doc, new Field[]{this.softDeletesField});
            if (isExternalVersioning) {
                this.versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(delete.version(), delete.seqNo(), delete.primaryTerm(), this.engineConfig.getThreadPool().relativeTimeInMillis()));
            }
        }
        catch (VersionConflictEngineException e) {
            this.logger.debug("Version conflict encountered when processing deletes", (Throwable)((Object)e));
            throw e;
        }
        catch (IOException | RuntimeException e) {
            try {
                this.maybeFailEngine("delete", e);
            }
            catch (Exception inner) {
                e.addSuppressed(inner);
            }
            throw e;
        }
        this.maybePruneDeletes();
    }

    @Override
    public Engine.NoOpResult noOp(Engine.NoOp noOp) throws IOException {
        this.ensureOpen();
        Engine.NoOpResult noOpResult = new Engine.NoOpResult(noOp.primaryTerm(), noOp.seqNo());
        return noOpResult;
    }

    @Override
    public Engine.GetResult get(Engine.Get get, BiFunction<String, Engine.SearcherScope, Engine.Searcher> searcherFactory) throws EngineException {
        return this.getFromSearcher(get, searcherFactory, Engine.SearcherScope.EXTERNAL);
    }

    @Override
    protected void pruneDeletedTombstones() {
        long timeMSec = this.engineConfig.getThreadPool().relativeTimeInMillis();
        long maxTimestampToPrune = timeMSec - this.engineConfig.getIndexSettings().getGcDeletesInMillis();
        this.versionMap.pruneTombstones(maxTimestampToPrune, Long.MAX_VALUE);
        this.lastDeleteVersionPruneTimeMSec = timeMSec;
    }

    @Override
    public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange, boolean accurateCount) throws IOException {
        return Translog.EMPTY_TRANSLOG_SNAPSHOT;
    }

    @Override
    protected void commitIndexWriter(IndexWriter writer, String translogUUID) throws IOException {
        try {
            long localCheckpoint = this.localCheckpointTracker.getProcessedCheckpoint();
            writer.setLiveCommitData(() -> {
                HashMap<String, String> commitData = new HashMap<String, String>(7);
                commitData.put("translog_uuid", translogUUID);
                commitData.put("local_checkpoint", Long.toString(localCheckpoint));
                commitData.put("max_seq_no", Long.toString(this.localCheckpointTracker.getMaxSeqNo()));
                commitData.put("max_unsafe_auto_id_timestamp", Long.toString(this.maxUnsafeAutoIdTimestamp.get()));
                commitData.put("history_uuid", this.historyUUID);
                commitData.put("min_retained_seq_no", Long.toString(this.softDeletesPolicy.getMinRetainedSeqNo()));
                if (this.streamPoller.getBatchStartPointer() != null) {
                    commitData.put("batch_start", this.streamPoller.getBatchStartPointer().asString());
                } else {
                    this.logger.warn("ignore null batch start pointer");
                }
                String currentForceMergeUUID = this.forceMergeUUID;
                if (currentForceMergeUUID != null) {
                    commitData.put("force_merge_uuid", currentForceMergeUUID);
                }
                this.logger.trace("committing writer with commit data [{}]", commitData);
                return commitData.entrySet().iterator();
            });
            this.shouldPeriodicallyFlushAfterBigMerge.set(false);
            writer.commit();
        }
        catch (Exception ex) {
            try {
                this.failEngine("lucene commit failed", ex);
            }
            catch (Exception inner) {
                ex.addSuppressed(inner);
            }
            throw ex;
        }
        catch (AssertionError e) {
            if (ExceptionsHelper.stackTrace((Throwable)((Object)e)).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
                EngineException engineException = new EngineException(this.shardId, "failed to commit engine", (Throwable)((Object)e), new Object[0]);
                try {
                    this.failEngine("lucene commit failed", (Exception)((Object)engineException));
                }
                catch (Exception inner) {
                    engineException.addSuppressed(inner);
                }
                throw engineException;
            }
            throw e;
        }
    }

    @Override
    public void activateThrottling() {
    }

    @Override
    public void deactivateThrottling() {
    }

    @Override
    public void maybePruneDeletes() {
    }

    @Override
    public void close() throws IOException {
        if (this.streamPoller != null) {
            this.streamPoller.close();
        }
        this.unregisterStreamPollerListener();
        super.close();
    }

    public DocumentMapperForType getDocumentMapperForType() {
        return this.documentMapperForType;
    }

    @Override
    protected TranslogManager createTranslogManager(String translogUUID, TranslogDeletionPolicy translogDeletionPolicy, CompositeTranslogEventListener translogEventListener) throws IOException {
        return new NoOpTranslogManager(this.shardId, this.readLock, this::ensureOpen, new TranslogStats(), Translog.EMPTY_TRANSLOG_SNAPSHOT, translogUUID, true);
    }

    protected Map<String, String> commitDataAsMap() {
        return IngestionEngine.commitDataAsMap(this.indexWriter);
    }

    @Override
    public PollingIngestStats pollingIngestStats() {
        return this.streamPoller.getStats();
    }

    private void registerDynamicIndexSettingsHandlers() {
        this.engineConfig.getIndexSettings().getScopedSettings().addSettingsUpdateConsumer(IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING, this::updateErrorHandlingStrategy);
    }

    private void updateErrorHandlingStrategy(IngestionErrorStrategy.ErrorStrategy errorStrategy) {
        IngestionErrorStrategy updatedIngestionErrorStrategy = IngestionErrorStrategy.create(errorStrategy, this.engineConfig.getIndexSettings().getIndexMetadata().getIngestionSource().getType());
        this.streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy);
    }

    private void validateDocumentVersion(Engine.Operation operation) throws IOException {
        boolean currentNotFoundOrDeleted;
        long currentVersion;
        if (operation.versionType() != VersionType.EXTERNAL) {
            return;
        }
        this.versionMap.enforceSafeAccess();
        VersionValue versionValue = this.resolveDocVersion(operation, false);
        if (versionValue == null) {
            currentVersion = -1L;
            currentNotFoundOrDeleted = true;
        } else {
            currentVersion = versionValue.version;
            currentNotFoundOrDeleted = versionValue.isDelete();
        }
        if (operation.versionType().isVersionConflictForWrites(currentVersion, operation.version(), currentNotFoundOrDeleted)) {
            throw new VersionConflictEngineException(this.shardId, operation, currentVersion, currentNotFoundOrDeleted);
        }
    }

    public void updateIngestionSettings(IngestionSettings ingestionSettings) {
        if (ingestionSettings.getResetState() != null && ingestionSettings.getResetValue() != null) {
            this.resetStreamPoller(ingestionSettings.getResetState(), ingestionSettings.getResetValue());
        }
        if (ingestionSettings.getIsPaused() != null) {
            this.updateIngestionState(ingestionSettings);
        }
    }

    private void updateIngestionState(IngestionSettings ingestionSettings) {
        if (ingestionSettings.getIsPaused().booleanValue()) {
            this.streamPoller.pause();
        } else {
            this.streamPoller.resume();
        }
    }

    private void resetStreamPoller(StreamPoller.ResetState resetState, String resetValue) {
        if (!this.streamPoller.isPaused()) {
            throw new IllegalStateException("Cannot reset consumer when poller is not paused");
        }
        try {
            this.refresh("reset poller", Engine.SearcherScope.INTERNAL, true);
            IngestionShardPointer startPointer = null;
            if (resetState == StreamPoller.ResetState.RESET_BY_OFFSET) {
                startPointer = this.streamPoller.getConsumer().pointerFromOffset(resetValue);
            } else if (resetState == StreamPoller.ResetState.RESET_BY_TIMESTAMP) {
                startPointer = this.streamPoller.getConsumer().pointerFromTimestampMillis(Long.parseLong(resetValue));
            }
            this.streamPoller.close();
            this.unregisterStreamPollerListener();
            this.initializeStreamPoller(resetState, resetValue, startPointer);
        }
        catch (Exception e) {
            throw new OpenSearchException("Failed to reset stream poller", (Throwable)e, new Object[0]);
        }
        try {
            this.flush(true, true);
        }
        catch (Exception e) {
            throw new OpenSearchException("Exception during flush. Poller successfully reset, but reset value might not be persisted.", (Throwable)e, new Object[0]);
        }
    }

    public ShardIngestionState getIngestionState() {
        IngestionShardPointer shardPointer = this.streamPoller.getBatchStartPointer();
        return new ShardIngestionState(this.engineConfig.getIndexSettings().getIndex().getName(), this.engineConfig.getShardId().getId(), this.streamPoller.getState().toString(), this.streamPoller.getErrorStrategy().getName(), this.streamPoller.isPaused(), this.streamPoller.isWriteBlockEnabled(), shardPointer != null ? shardPointer.toString() : "");
    }
}

