package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;

import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedTimingStatsReporter;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.RollupDataExtractorFactory;

/* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.class */
public class ChunkedDataExtractor implements DataExtractor {
    private static final Logger LOGGER = LogManager.getLogger(ChunkedDataExtractor.class);
    private static final String EARLIEST_TIME = "earliest_time";
    private static final String LATEST_TIME = "latest_time";
    private static final long MIN_CHUNK_SPAN = 60000;
    private final Client client;
    private final DataExtractorFactory dataExtractorFactory;
    private final ChunkedDataExtractorContext context;
    private final DatafeedTimingStatsReporter timingStatsReporter;
    private long currentStart;
    private long currentEnd;
    private long chunkSpan;
    private DataExtractor currentExtractor;
    private boolean isCancelled = false;
    private final DataSummaryFactory dataSummaryFactory = new DataSummaryFactory();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor$AggregatedDataSummary.class */
    public static class AggregatedDataSummary implements DataSummary {
        private final double earliestTime;
        private final double latestTime;
        private final long histogramIntervalMillis;

        static AggregatedDataSummary noDataSummary(long j) {
            return new AggregatedDataSummary(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, j);
        }

        AggregatedDataSummary(double d, double d2, long j) {
            this.earliestTime = d;
            this.latestTime = d2;
            this.histogramIntervalMillis = j;
        }

        @Override // org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractor.DataSummary
        public long estimateChunk() {
            return 1000 * this.histogramIntervalMillis;
        }

        @Override // org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractor.DataSummary
        public boolean hasData() {
            return !(Double.isInfinite(this.earliestTime) || Double.isInfinite(this.latestTime));
        }

        @Override // org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractor.DataSummary
        public long earliestTime() {
            return (long) this.earliestTime;
        }

        @Override // org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractor.DataSummary
        public long getDataTimeSpread() {
            return ((long) this.latestTime) - ((long) this.earliestTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor$DataSummary.class */
    public interface DataSummary {
        long estimateChunk();

        boolean hasData();

        long earliestTime();

        long getDataTimeSpread();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor$DataSummaryFactory.class */
    public class DataSummaryFactory {
        private DataSummaryFactory() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public DataSummary buildDataSummary() {
            return ChunkedDataExtractor.this.context.hasAggregations ? newAggregatedDataSummary() : newScrolledDataSummary();
        }

        private DataSummary newScrolledDataSummary() {
            SearchResponse executeSearchRequest = ChunkedDataExtractor.this.executeSearchRequest(rangeSearchRequest());
            ChunkedDataExtractor.LOGGER.debug("[{}] Scrolling Data summary response was obtained", ChunkedDataExtractor.this.context.jobId);
            ChunkedDataExtractor.this.timingStatsReporter.reportSearchDuration(executeSearchRequest.getTook());
            long j = 0;
            long j2 = 0;
            long j3 = executeSearchRequest.getHits().getTotalHits().value;
            if (j3 > 0) {
                Aggregations aggregations = executeSearchRequest.getAggregations();
                j = (long) aggregations.get(ChunkedDataExtractor.EARLIEST_TIME).getValue();
                j2 = (long) aggregations.get(ChunkedDataExtractor.LATEST_TIME).getValue();
            }
            return new ScrolledDataSummary(j, j2, j3);
        }

        private DataSummary newAggregatedDataSummary() {
            SearchResponse executeSearchRequest = ChunkedDataExtractor.this.executeSearchRequest(ChunkedDataExtractor.this.dataExtractorFactory instanceof RollupDataExtractorFactory ? rollupRangeSearchRequest() : rangeSearchRequest());
            ChunkedDataExtractor.LOGGER.debug("[{}] Aggregating Data summary response was obtained", ChunkedDataExtractor.this.context.jobId);
            ChunkedDataExtractor.this.timingStatsReporter.reportSearchDuration(executeSearchRequest.getTook());
            Aggregations aggregations = executeSearchRequest.getAggregations();
            if (aggregations == null) {
                return AggregatedDataSummary.noDataSummary(ChunkedDataExtractor.this.context.histogramInterval.longValue());
            }
            return new AggregatedDataSummary(aggregations.get(ChunkedDataExtractor.EARLIEST_TIME).getValue(), aggregations.get(ChunkedDataExtractor.LATEST_TIME).getValue(), ChunkedDataExtractor.this.context.histogramInterval.longValue());
        }

        private SearchSourceBuilder rangeSearchBuilder() {
            return new SearchSourceBuilder().size(0).query(ExtractorUtils.wrapInTimeRangeQuery(ChunkedDataExtractor.this.context.query, ChunkedDataExtractor.this.context.timeField, ChunkedDataExtractor.this.currentStart, ChunkedDataExtractor.this.context.end)).runtimeMappings(ChunkedDataExtractor.this.context.runtimeMappings).aggregation(AggregationBuilders.min(ChunkedDataExtractor.EARLIEST_TIME).field(ChunkedDataExtractor.this.context.timeField)).aggregation(AggregationBuilders.max(ChunkedDataExtractor.LATEST_TIME).field(ChunkedDataExtractor.this.context.timeField));
        }

        private SearchRequestBuilder rangeSearchRequest() {
            return new SearchRequestBuilder(ChunkedDataExtractor.this.client, SearchAction.INSTANCE).setIndices(ChunkedDataExtractor.this.context.indices).setIndicesOptions(ChunkedDataExtractor.this.context.indicesOptions).setSource(rangeSearchBuilder()).setAllowPartialSearchResults(false).setTrackTotalHits(true);
        }

        private RollupSearchAction.RequestBuilder rollupRangeSearchRequest() {
            return new RollupSearchAction.RequestBuilder(ChunkedDataExtractor.this.client, new SearchRequest().indices(ChunkedDataExtractor.this.context.indices).indicesOptions(ChunkedDataExtractor.this.context.indicesOptions).allowPartialSearchResults(false).source(rangeSearchBuilder()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor$ScrolledDataSummary.class */
    public class ScrolledDataSummary implements DataSummary {
        private final long earliestTime;
        private final long latestTime;
        private final long totalHits;

        private ScrolledDataSummary(long j, long j2, long j3) {
            this.earliestTime = j;
            this.latestTime = j2;
            this.totalHits = j3;
        }

        @Override // org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractor.DataSummary
        public long earliestTime() {
            return this.earliestTime;
        }

        @Override // org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractor.DataSummary
        public long getDataTimeSpread() {
            return this.latestTime - this.earliestTime;
        }

        @Override // org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractor.DataSummary
        public long estimateChunk() {
            return (this.totalHits <= 0 || getDataTimeSpread() <= 0) ? ChunkedDataExtractor.this.context.end - ChunkedDataExtractor.this.currentEnd : Math.max((10 * (ChunkedDataExtractor.this.context.scrollSize * getDataTimeSpread())) / this.totalHits, ChunkedDataExtractor.MIN_CHUNK_SPAN);
        }

        @Override // org.elasticsearch.xpack.ml.datafeed.extractor.chunked.ChunkedDataExtractor.DataSummary
        public boolean hasData() {
            return this.totalHits > 0;
        }
    }

    public ChunkedDataExtractor(Client client, DataExtractorFactory dataExtractorFactory, ChunkedDataExtractorContext chunkedDataExtractorContext, DatafeedTimingStatsReporter datafeedTimingStatsReporter) {
        this.client = (Client) Objects.requireNonNull(client);
        this.dataExtractorFactory = (DataExtractorFactory) Objects.requireNonNull(dataExtractorFactory);
        this.context = (ChunkedDataExtractorContext) Objects.requireNonNull(chunkedDataExtractorContext);
        this.timingStatsReporter = (DatafeedTimingStatsReporter) Objects.requireNonNull(datafeedTimingStatsReporter);
        this.currentStart = chunkedDataExtractorContext.start;
        this.currentEnd = chunkedDataExtractorContext.start;
    }

    public boolean hasNext() {
        boolean z = this.currentExtractor != null && this.currentExtractor.hasNext();
        return isCancelled() ? z : z || this.currentEnd < this.context.end;
    }

    public Optional<InputStream> next() throws IOException {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }
        if (this.currentExtractor == null) {
            setUpChunkedSearch();
        }
        return getNextStream();
    }

    private void setUpChunkedSearch() {
        DataSummary buildDataSummary = this.dataSummaryFactory.buildDataSummary();
        if (!buildDataSummary.hasData()) {
            this.currentEnd = this.context.end;
            LOGGER.debug("[{}] Chunked search configured: no data found", this.context.jobId);
            return;
        }
        this.currentStart = this.context.timeAligner.alignToFloor(buildDataSummary.earliestTime());
        this.currentEnd = this.currentStart;
        this.chunkSpan = this.context.chunkSpan == null ? buildDataSummary.estimateChunk() : this.context.chunkSpan.getMillis();
        this.chunkSpan = this.context.timeAligner.alignToCeil(this.chunkSpan);
        LOGGER.debug("[{}] Chunked search configured: kind = {}, dataTimeSpread = {} ms, chunk span = {} ms", this.context.jobId, buildDataSummary.getClass().getSimpleName(), Long.valueOf(buildDataSummary.getDataTimeSpread()), Long.valueOf(this.chunkSpan));
    }

    protected SearchResponse executeSearchRequest(ActionRequestBuilder<SearchRequest, SearchResponse> actionRequestBuilder) {
        Map<String, String> map = this.context.headers;
        Client client = this.client;
        Objects.requireNonNull(actionRequestBuilder);
        return ClientHelper.executeWithHeaders(map, "ml", client, actionRequestBuilder::get);
    }

    private Optional<InputStream> getNextStream() throws IOException {
        while (hasNext()) {
            boolean z = false;
            if (this.currentExtractor == null || !this.currentExtractor.hasNext()) {
                advanceTime();
                z = true;
            }
            Optional<InputStream> next = this.currentExtractor.next();
            if (next.isPresent()) {
                return next;
            }
            if (z && hasNext()) {
                setUpChunkedSearch();
            }
        }
        return Optional.empty();
    }

    private void advanceTime() {
        this.currentStart = this.currentEnd;
        this.currentEnd = Math.min(this.currentStart + this.chunkSpan, this.context.end);
        this.currentExtractor = this.dataExtractorFactory.newExtractor(this.currentStart, this.currentEnd);
        LOGGER.trace("[{}] advances time to [{}, {})", this.context.jobId, Long.valueOf(this.currentStart), Long.valueOf(this.currentEnd));
    }

    public boolean isCancelled() {
        return this.isCancelled;
    }

    public void cancel() {
        if (this.currentExtractor != null) {
            this.currentExtractor.cancel();
        }
        this.isCancelled = true;
    }

    public long getEndTime() {
        return this.context.end;
    }

    ChunkedDataExtractorContext getContext() {
        return this.context;
    }
}
