package org.elasticsearch.xpack.ml.inference;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.indices.InvalidAliasNameException;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlStatsIndex;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceStats;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.utils.persistence.ResultsPersisterService;

/* loaded from: input_file:org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.class */
public class TrainedModelStatsService {
    private static final Logger logger = LogManager.getLogger(TrainedModelStatsService.class);
    private static final TimeValue PERSISTENCE_INTERVAL = TimeValue.timeValueSeconds(1);
    private static final String STATS_UPDATE_SCRIPT_TEMPLATE = "    ctx._source.{0} += params.{0};\n    ctx._source.{1} += params.{1};\n    ctx._source.{2} += params.{2};\n    ctx._source.{3} += params.{3};\n    ctx._source.{4} = params.{4};";
    private static final String STATS_UPDATE_SCRIPT = Messages.getMessage(STATS_UPDATE_SCRIPT_TEMPLATE, new Object[]{InferenceStats.MISSING_ALL_FIELDS_COUNT.getPreferredName(), InferenceStats.INFERENCE_COUNT.getPreferredName(), InferenceStats.FAILURE_COUNT.getPreferredName(), InferenceStats.CACHE_MISS_COUNT.getPreferredName(), InferenceStats.TIMESTAMP.getPreferredName()});
    private static final ToXContent.Params FOR_INTERNAL_STORAGE_PARAMS = new ToXContent.MapParams(Collections.singletonMap("for_internal_storage", "true"));
    private final Map<String, InferenceStats> statsQueue = new ConcurrentHashMap();
    private final ResultsPersisterService resultsPersisterService;
    private final OriginSettingClient client;
    private final IndexNameExpressionResolver indexNameExpressionResolver;
    private final ThreadPool threadPool;
    private volatile Scheduler.Cancellable scheduledFuture;
    private volatile boolean stopped;
    private volatile ClusterState clusterState;

    public TrainedModelStatsService(ResultsPersisterService resultsPersisterService, OriginSettingClient originSettingClient, IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ThreadPool threadPool) {
        this.resultsPersisterService = resultsPersisterService;
        this.client = originSettingClient;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        this.threadPool = threadPool;
        clusterService.addLifecycleListener(new LifecycleListener() { // from class: org.elasticsearch.xpack.ml.inference.TrainedModelStatsService.1
            public void beforeStart() {
                TrainedModelStatsService.this.start();
            }

            public void beforeStop() {
                TrainedModelStatsService.this.stop();
            }
        });
        clusterService.addListener(this::setClusterState);
    }

    void setClusterState(ClusterChangedEvent clusterChangedEvent) {
        this.clusterState = clusterChangedEvent.state();
    }

    public void queueStats(InferenceStats inferenceStats, boolean z) {
        if (inferenceStats.hasStats()) {
            this.statsQueue.compute(InferenceStats.docId(inferenceStats.getModelId(), inferenceStats.getNodeId()), (str, inferenceStats2) -> {
                return inferenceStats2 == null ? inferenceStats : InferenceStats.accumulator(inferenceStats).merge(inferenceStats2).currentStats(inferenceStats.getTimeStamp());
            });
        }
        if (z) {
            this.threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(this::updateStats);
        }
    }

    void stop() {
        logger.debug("About to stop TrainedModelStatsService");
        this.stopped = true;
        this.statsQueue.clear();
        Scheduler.Cancellable cancellable = this.scheduledFuture;
        if (cancellable != null) {
            cancellable.cancel();
        }
    }

    private boolean shouldStop() {
        return this.stopped || MlMetadata.getMlMetadata(this.clusterState).isResetMode() || MlMetadata.getMlMetadata(this.clusterState).isUpgradeMode();
    }

    void start() {
        logger.debug("About to start TrainedModelStatsService");
        this.stopped = false;
        this.scheduledFuture = this.threadPool.scheduleWithFixedDelay(this::updateStats, PERSISTENCE_INTERVAL, MachineLearning.UTILITY_THREAD_POOL_NAME);
    }

    void updateStats() {
        if (this.clusterState == null || this.statsQueue.isEmpty() || this.stopped) {
            return;
        }
        if (MlMetadata.getMlMetadata(this.clusterState).isUpgradeMode()) {
            logger.debug("Model stats not persisted as ml upgrade mode is enabled");
            return;
        }
        if (MlMetadata.getMlMetadata(this.clusterState).isResetMode()) {
            logger.debug("Model stats not persisted as ml reset_mode is enabled");
            return;
        }
        if (!verifyIndicesExistAndPrimaryShardsAreActive(this.clusterState, this.indexNameExpressionResolver)) {
            try {
                logger.debug("About to create the stats index as it does not exist yet");
                createStatsIndexIfNecessary();
            } catch (Exception e) {
                if (!(e instanceof InvalidAliasNameException)) {
                    logger.error("failure creating ml stats index for storing model stats", e);
                    return;
                }
            }
        }
        ArrayList arrayList = new ArrayList(this.statsQueue.size());
        Iterator it = new HashSet(this.statsQueue.keySet()).iterator();
        while (it.hasNext()) {
            InferenceStats remove = this.statsQueue.remove((String) it.next());
            if (remove != null) {
                arrayList.add(remove);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        BulkRequest bulkRequest = new BulkRequest();
        Stream filter = arrayList.stream().map(TrainedModelStatsService::buildUpdateRequest).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        Objects.requireNonNull(bulkRequest);
        filter.forEach(bulkRequest::add);
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        if (bulkRequest.requests().isEmpty() || shouldStop()) {
            return;
        }
        String str = (String) arrayList.stream().map((v0) -> {
            return v0.getModelId();
        }).collect(Collectors.joining(","));
        try {
            this.resultsPersisterService.bulkIndexWithRetry(bulkRequest, str, () -> {
                return Boolean.valueOf(!shouldStop());
            }, str2 -> {
            });
        } catch (ElasticsearchException e2) {
            logger.warn(() -> {
                return new ParameterizedMessage("failed to store stats for [{}]", str);
            }, e2);
        }
    }

    static boolean verifyIndicesExistAndPrimaryShardsAreActive(ClusterState clusterState, IndexNameExpressionResolver indexNameExpressionResolver) {
        IndexRoutingTable index;
        String[] concreteIndexNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN, new String[]{MlStatsIndex.writeAlias()});
        if (concreteIndexNames.length == 0) {
            return false;
        }
        for (String str : concreteIndexNames) {
            if (!clusterState.metadata().hasIndex(str) || (index = clusterState.getRoutingTable().index(str)) == null || !index.allPrimaryShardsActive()) {
                return false;
            }
        }
        return true;
    }

    private void createStatsIndexIfNecessary() {
        PlainActionFuture plainActionFuture = new PlainActionFuture();
        OriginSettingClient originSettingClient = this.client;
        ClusterState clusterState = this.clusterState;
        IndexNameExpressionResolver indexNameExpressionResolver = this.indexNameExpressionResolver;
        TimeValue timeValue = MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT;
        CheckedConsumer checkedConsumer = bool -> {
            ElasticsearchMappings.addDocMappingIfMissing(MlStatsIndex.writeAlias(), MlStatsIndex::wrappedMapping, this.client, this.clusterState, MasterNodeRequest.DEFAULT_MASTER_NODE_TIMEOUT, plainActionFuture);
        };
        Objects.requireNonNull(plainActionFuture);
        MlStatsIndex.createStatsIndexAndAliasIfNecessary(originSettingClient, clusterState, indexNameExpressionResolver, timeValue, ActionListener.wrap(checkedConsumer, plainActionFuture::onFailure));
        plainActionFuture.actionGet();
        logger.debug("Created stats index");
    }

    static UpdateRequest buildUpdateRequest(InferenceStats inferenceStats) {
        try {
            XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
            try {
                HashMap hashMap = new HashMap();
                hashMap.put(InferenceStats.FAILURE_COUNT.getPreferredName(), Long.valueOf(inferenceStats.getFailureCount()));
                hashMap.put(InferenceStats.MISSING_ALL_FIELDS_COUNT.getPreferredName(), Long.valueOf(inferenceStats.getMissingAllFieldsCount()));
                hashMap.put(InferenceStats.TIMESTAMP.getPreferredName(), Long.valueOf(inferenceStats.getTimeStamp().toEpochMilli()));
                hashMap.put(InferenceStats.INFERENCE_COUNT.getPreferredName(), Long.valueOf(inferenceStats.getInferenceCount()));
                hashMap.put(InferenceStats.CACHE_MISS_COUNT.getPreferredName(), Long.valueOf(inferenceStats.getCacheMissCount()));
                inferenceStats.toXContent(jsonBuilder, FOR_INTERNAL_STORAGE_PARAMS);
                UpdateRequest updateRequest = new UpdateRequest();
                updateRequest.upsert(jsonBuilder).index(MlStatsIndex.writeAlias()).retryOnConflict(3).id(InferenceStats.docId(inferenceStats.getModelId(), inferenceStats.getNodeId())).script(new Script(ScriptType.INLINE, "painless", STATS_UPDATE_SCRIPT, hashMap)).setRequireAlias(true);
                if (jsonBuilder != null) {
                    jsonBuilder.close();
                }
                return updateRequest;
            } finally {
            }
        } catch (IOException e) {
            logger.error(() -> {
                return new ParameterizedMessage("[{}] [{}] failed to serialize stats for update.", inferenceStats.getModelId(), inferenceStats.getNodeId());
            }, e);
            return null;
        }
    }
}
