/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.stats.JobVertexStatsTracker;
import org.apache.flink.runtime.webmonitor.stats.Statistics;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobVertexThreadInfoTracker<T extends Statistics>
implements JobVertexStatsTracker<T> {
    private static final Logger LOG = LoggerFactory.getLogger(JobVertexThreadInfoTracker.class);
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final ThreadInfoRequestCoordinator coordinator;
    private final Function<JobVertexThreadInfoStats, T> createStatsFn;
    private final ExecutorService executor;
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
    @GuardedBy(value="lock")
    private final Cache<Key, T> vertexStatsCache;
    @GuardedBy(value="lock")
    private final Set<Key> pendingStats = new HashSet<Key>();
    private final int numSamples;
    private final Duration statsRefreshInterval;
    private final Duration delayBetweenSamples;
    private final int maxThreadInfoDepth;
    private final CompletableFuture<Void> resultAvailableFuture = new CompletableFuture();
    private boolean shutDown;
    private final Time rpcTimeout;

    JobVertexThreadInfoTracker(ThreadInfoRequestCoordinator coordinator, GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, Function<JobVertexThreadInfoStats, T> createStatsFn, ScheduledExecutorService executor, Duration cleanUpInterval, int numSamples, Duration statsRefreshInterval, Duration delayBetweenSamples, int maxStackTraceDepth, Time rpcTimeout, Cache<Key, T> vertexStatsCache) {
        this.coordinator = Preconditions.checkNotNull(coordinator, "Thread info samples coordinator");
        this.resourceManagerGatewayRetriever = Preconditions.checkNotNull(resourceManagerGatewayRetriever, "Gateway retriever");
        this.createStatsFn = Preconditions.checkNotNull(createStatsFn, "Create stats function");
        this.executor = Preconditions.checkNotNull(executor, "Scheduled executor");
        this.statsRefreshInterval = Preconditions.checkNotNull(statsRefreshInterval, "Statistics refresh interval");
        this.rpcTimeout = rpcTimeout;
        Preconditions.checkArgument(cleanUpInterval.toMillis() > 0L, "Clean up interval must be greater than 0");
        Preconditions.checkArgument(numSamples >= 1, "Number of samples");
        this.numSamples = numSamples;
        Preconditions.checkArgument(statsRefreshInterval.toMillis() > 0L, "Stats refresh interval must be greater than 0");
        this.delayBetweenSamples = Preconditions.checkNotNull(delayBetweenSamples, "Delay between samples");
        Preconditions.checkArgument(maxStackTraceDepth > 0, "Max stack trace depth must be greater than 0");
        this.maxThreadInfoDepth = maxStackTraceDepth;
        this.vertexStatsCache = Preconditions.checkNotNull(vertexStatsCache, "Vertex stats cache");
        executor.scheduleWithFixedDelay(this::cleanUpVertexStatsCache, cleanUpInterval.toMillis(), cleanUpInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Optional<T> getVertexStats(JobID jobId, AccessExecutionJobVertex vertex) {
        Object object = this.lock;
        synchronized (object) {
            Key key = JobVertexThreadInfoTracker.getKey(jobId, vertex);
            Statistics stats = (Statistics)this.vertexStatsCache.getIfPresent(key);
            if (stats == null || System.currentTimeMillis() >= stats.getEndTime() + this.statsRefreshInterval.toMillis()) {
                this.triggerThreadInfoSampleInternal(key, vertex);
            }
            return Optional.ofNullable(stats);
        }
    }

    private void triggerThreadInfoSampleInternal(Key key, AccessExecutionJobVertex vertex) {
        assert (Thread.holdsLock(this.lock));
        if (this.shutDown) {
            return;
        }
        if (!this.pendingStats.contains(key)) {
            this.pendingStats.add(key);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Triggering thread info sample for tasks: {}", (Object)Arrays.toString(vertex.getTaskVertices()));
            }
            AccessExecutionVertex[] executionVertices = vertex.getTaskVertices();
            CompletableFuture<ResourceManagerGateway> gatewayFuture = this.resourceManagerGatewayRetriever.getFuture();
            CompletionStage sample = gatewayFuture.thenCompose(resourceManagerGateway -> this.coordinator.triggerThreadInfoRequest(this.matchExecutionsWithGateways(executionVertices, (ResourceManagerGateway)resourceManagerGateway), this.numSamples, this.delayBetweenSamples, this.maxThreadInfoDepth));
            ((CompletableFuture)sample).whenCompleteAsync((BiConsumer)new ThreadInfoSampleCompletionCallback(key, vertex), (Executor)this.executor);
        }
    }

    private Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> matchExecutionsWithGateways(AccessExecutionVertex[] executionVertices, ResourceManagerGateway resourceManagerGateway) {
        HashMap<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> executionsWithGateways = new HashMap<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>>();
        for (AccessExecutionVertex executionVertex : executionVertices) {
            TaskManagerLocation tmLocation = executionVertex.getCurrentAssignedResourceLocation();
            if (tmLocation != null) {
                CompletableFuture<TaskExecutorThreadInfoGateway> taskExecutorGatewayFuture = resourceManagerGateway.requestTaskExecutorThreadInfoGateway(tmLocation.getResourceID(), this.rpcTimeout);
                if (executionVertex.getExecutionState() == ExecutionState.RUNNING) {
                    executionsWithGateways.put(executionVertex.getCurrentExecutionAttempt().getAttemptId(), taskExecutorGatewayFuture);
                    continue;
                }
                LOG.trace("{} not running, but {}; not sampling", (Object)executionVertex.getTaskNameWithSubtaskIndex(), (Object)executionVertex.getExecutionState());
                continue;
            }
            LOG.trace("ExecutionVertex {} is currently not assigned", (Object)executionVertex);
        }
        return executionsWithGateways;
    }

    @VisibleForTesting
    void cleanUpVertexStatsCache() {
        this.vertexStatsCache.cleanUp();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutDown() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.shutDown) {
                this.vertexStatsCache.invalidateAll();
                this.pendingStats.clear();
                this.shutDown = true;
            }
        }
    }

    @VisibleForTesting
    CompletableFuture<Void> getResultAvailableFuture() {
        return this.resultAvailableFuture;
    }

    private static Key getKey(JobID jobId, AccessExecutionJobVertex vertex) {
        return new Key(jobId, vertex.getJobVertexId());
    }

    private class ThreadInfoSampleCompletionCallback
    implements BiConsumer<JobVertexThreadInfoStats, Throwable> {
        private final Key key;
        private final AccessExecutionJobVertex vertex;

        ThreadInfoSampleCompletionCallback(Key key, AccessExecutionJobVertex vertex) {
            this.key = key;
            this.vertex = vertex;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void accept(JobVertexThreadInfoStats threadInfoStats, Throwable throwable) {
            Object object = JobVertexThreadInfoTracker.this.lock;
            synchronized (object) {
                try {
                    if (JobVertexThreadInfoTracker.this.shutDown) {
                        return;
                    }
                    if (threadInfoStats != null) {
                        JobVertexThreadInfoTracker.this.vertexStatsCache.put(this.key, JobVertexThreadInfoTracker.this.createStatsFn.apply(threadInfoStats));
                        JobVertexThreadInfoTracker.this.resultAvailableFuture.complete(null);
                    } else {
                        LOG.debug("Failed to gather a thread info sample for {}", (Object)this.vertex.getName(), (Object)throwable);
                    }
                }
                catch (Throwable t) {
                    LOG.error("Error during stats completion.", t);
                }
                finally {
                    JobVertexThreadInfoTracker.this.pendingStats.remove(this.key);
                }
            }
        }
    }

    static class Key {
        private final JobID jobId;
        private final JobVertexID jobVertexId;

        private Key(JobID jobId, JobVertexID jobVertexId) {
            this.jobId = jobId;
            this.jobVertexId = jobVertexId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Key key = (Key)o;
            return Objects.equals(this.jobId, key.jobId) && Objects.equals(this.jobVertexId, key.jobVertexId);
        }

        public int hashCode() {
            return Objects.hash(this.jobId, this.jobVertexId);
        }
    }
}

