/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.UpstreamRecoveryTracker;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class CheckpointedInputGate
implements PullingAsyncDataInput<BufferOrEvent>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointedInputGate.class);
    private final CheckpointBarrierHandler barrierHandler;
    private final UpstreamRecoveryTracker upstreamRecoveryTracker;
    private final InputGate inputGate;
    private final MailboxExecutor mailboxExecutor;
    private boolean isFinished;

    public CheckpointedInputGate(InputGate inputGate, CheckpointBarrierHandler barrierHandler, MailboxExecutor mailboxExecutor) {
        this(inputGate, barrierHandler, mailboxExecutor, UpstreamRecoveryTracker.NO_OP);
    }

    public CheckpointedInputGate(InputGate inputGate, CheckpointBarrierHandler barrierHandler, MailboxExecutor mailboxExecutor, UpstreamRecoveryTracker upstreamRecoveryTracker) {
        this.inputGate = inputGate;
        this.barrierHandler = barrierHandler;
        this.mailboxExecutor = mailboxExecutor;
        this.upstreamRecoveryTracker = upstreamRecoveryTracker;
        this.waitForPriorityEvents(inputGate, mailboxExecutor);
    }

    private void processPriorityEvents() throws IOException, InterruptedException {
        Optional<BufferOrEvent> bufferOrEventOpt;
        boolean hasPriorityEvent = this.inputGate.getPriorityEventAvailableFuture().isDone();
        while (hasPriorityEvent && (bufferOrEventOpt = this.pollNext()).isPresent()) {
            BufferOrEvent bufferOrEvent = bufferOrEventOpt.get();
            Preconditions.checkState(bufferOrEvent.hasPriority(), "Should only poll priority events");
            hasPriorityEvent = bufferOrEvent.morePriorityEvents();
        }
        this.waitForPriorityEvents(this.inputGate, this.mailboxExecutor);
    }

    private void waitForPriorityEvents(InputGate inputGate, MailboxExecutor mailboxExecutor) {
        CompletableFuture<?> priorityEventAvailableFuture = inputGate.getPriorityEventAvailableFuture();
        FutureUtils.assertNoException(priorityEventAvailableFuture.thenRun(() -> {
            try {
                mailboxExecutor.execute(this::processPriorityEvents, "process priority event @ gate %s", inputGate);
            }
            catch (RejectedExecutionException ex) {
                LOG.debug("Ignored RejectedExecutionException in CheckpointedInputGate.waitForPriorityEvents");
            }
        }));
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        return this.inputGate.getAvailableFuture();
    }

    @Override
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        Optional<BufferOrEvent> next = this.inputGate.pollNext();
        if (!next.isPresent()) {
            return this.handleEmptyBuffer();
        }
        BufferOrEvent bufferOrEvent = next.get();
        if (bufferOrEvent.isEvent()) {
            return this.handleEvent(bufferOrEvent);
        }
        if (bufferOrEvent.isBuffer()) {
            this.barrierHandler.addProcessedBytes(bufferOrEvent.getBuffer().getSize());
        }
        return next;
    }

    private Optional<BufferOrEvent> handleEvent(BufferOrEvent bufferOrEvent) throws IOException {
        Class<?> eventClass = bufferOrEvent.getEvent().getClass();
        if (eventClass == CheckpointBarrier.class) {
            CheckpointBarrier checkpointBarrier = (CheckpointBarrier)bufferOrEvent.getEvent();
            this.barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelInfo(), false);
        } else if (eventClass == CancelCheckpointMarker.class) {
            this.barrierHandler.processCancellationBarrier((CancelCheckpointMarker)bufferOrEvent.getEvent(), bufferOrEvent.getChannelInfo());
        } else if (eventClass == EndOfData.class) {
            this.inputGate.acknowledgeAllRecordsProcessed(bufferOrEvent.getChannelInfo());
        } else if (eventClass == EndOfPartitionEvent.class) {
            this.barrierHandler.processEndOfPartition(bufferOrEvent.getChannelInfo());
        } else if (eventClass == EventAnnouncement.class) {
            EventAnnouncement eventAnnouncement = (EventAnnouncement)bufferOrEvent.getEvent();
            AbstractEvent announcedEvent = eventAnnouncement.getAnnouncedEvent();
            Preconditions.checkState(announcedEvent instanceof CheckpointBarrier, "Only CheckpointBarrier announcement are currently supported, but found [%s]", announcedEvent);
            CheckpointBarrier announcedBarrier = (CheckpointBarrier)announcedEvent;
            this.barrierHandler.processBarrierAnnouncement(announcedBarrier, eventAnnouncement.getSequenceNumber(), bufferOrEvent.getChannelInfo());
        } else if (bufferOrEvent.getEvent().getClass() == EndOfChannelStateEvent.class) {
            this.upstreamRecoveryTracker.handleEndOfRecovery(bufferOrEvent.getChannelInfo());
        }
        return Optional.of(bufferOrEvent);
    }

    public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
        return this.barrierHandler.getAllBarriersReceivedFuture(checkpointId);
    }

    private Optional<BufferOrEvent> handleEmptyBuffer() {
        if (this.inputGate.isFinished()) {
            this.isFinished = true;
        }
        return Optional.empty();
    }

    @Override
    public boolean isFinished() {
        return this.isFinished;
    }

    @Override
    public PullingAsyncDataInput.EndOfDataStatus hasReceivedEndOfData() {
        return this.inputGate.hasReceivedEndOfData();
    }

    @Override
    public void close() throws IOException {
        this.barrierHandler.close();
    }

    @VisibleForTesting
    long getLatestCheckpointId() {
        return this.barrierHandler.getLatestCheckpointId();
    }

    @VisibleForTesting
    long getAlignmentDurationNanos() {
        return this.barrierHandler.getAlignmentDurationNanos();
    }

    @VisibleForTesting
    long getCheckpointStartDelayNanos() {
        return this.barrierHandler.getCheckpointStartDelayNanos();
    }

    public int getNumberOfInputChannels() {
        return this.inputGate.getNumberOfInputChannels();
    }

    public String toString() {
        return this.barrierHandler.toString();
    }

    public InputChannel getChannel(int channelIndex) {
        return this.inputGate.getChannel(channelIndex);
    }

    public List<InputChannelInfo> getChannelInfos() {
        return this.inputGate.getChannelInfos();
    }

    public boolean allChannelsRecovered() {
        return this.upstreamRecoveryTracker.allChannelsRecovered();
    }

    @VisibleForTesting
    CheckpointBarrierHandler getCheckpointBarrierHandler() {
        return this.barrierHandler;
    }
}

