/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.mergetree;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.io.CompactIncrement;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
import org.apache.flink.table.store.file.io.NewFilesIncrement;
import org.apache.flink.table.store.file.io.RollingFileWriter;
import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.SortBufferWriteBuffer;
import org.apache.flink.table.store.file.mergetree.WriteBuffer;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.types.logical.RowType;

public class MergeTreeWriter
implements RecordWriter<KeyValue>,
MemoryOwner {
    private final boolean writeBufferSpillable;
    private final int sortMaxFan;
    private final IOManager ioManager;
    private final RowType keyType;
    private final RowType valueType;
    private final CompactManager compactManager;
    private final Comparator<RowData> keyComparator;
    private final MergeFunction<KeyValue> mergeFunction;
    private final KeyValueFileWriterFactory writerFactory;
    private final boolean commitForceCompact;
    private final CoreOptions.ChangelogProducer changelogProducer;
    private final LinkedHashSet<DataFileMeta> newFiles;
    private final LinkedHashSet<DataFileMeta> newFilesChangelog;
    private final LinkedHashMap<String, DataFileMeta> compactBefore;
    private final LinkedHashSet<DataFileMeta> compactAfter;
    private final LinkedHashSet<DataFileMeta> compactChangelog;
    private long newSequenceNumber;
    private WriteBuffer writeBuffer;

    public MergeTreeWriter(boolean writeBufferSpillable, int sortMaxFan, IOManager ioManager, CompactManager compactManager, long maxSequenceNumber, Comparator<RowData> keyComparator, MergeFunction<KeyValue> mergeFunction, KeyValueFileWriterFactory writerFactory, boolean commitForceCompact, CoreOptions.ChangelogProducer changelogProducer) {
        this.writeBufferSpillable = writeBufferSpillable;
        this.sortMaxFan = sortMaxFan;
        this.ioManager = ioManager;
        this.keyType = writerFactory.keyType();
        this.valueType = writerFactory.valueType();
        this.compactManager = compactManager;
        this.newSequenceNumber = maxSequenceNumber + 1L;
        this.keyComparator = keyComparator;
        this.mergeFunction = mergeFunction;
        this.writerFactory = writerFactory;
        this.commitForceCompact = commitForceCompact;
        this.changelogProducer = changelogProducer;
        this.newFiles = new LinkedHashSet();
        this.newFilesChangelog = new LinkedHashSet();
        this.compactBefore = new LinkedHashMap();
        this.compactAfter = new LinkedHashSet();
        this.compactChangelog = new LinkedHashSet();
    }

    private long newSequenceNumber() {
        return this.newSequenceNumber++;
    }

    @VisibleForTesting
    CompactManager compactManager() {
        return this.compactManager;
    }

    @Override
    public void setMemoryPool(MemorySegmentPool memoryPool) {
        this.writeBuffer = new SortBufferWriteBuffer(this.keyType, this.valueType, memoryPool, this.writeBufferSpillable, this.sortMaxFan, this.ioManager);
    }

    @Override
    public void write(KeyValue kv) throws Exception {
        long sequenceNumber = kv.sequenceNumber() == -1L ? this.newSequenceNumber() : kv.sequenceNumber();
        boolean success = this.writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
        if (!success) {
            this.flushWriteBuffer(false, false);
            success = this.writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());
            if (!success) {
                throw new RuntimeException("Mem table is too small to hold a single element.");
            }
        }
    }

    @Override
    public void compact(boolean fullCompaction) throws Exception {
        this.flushWriteBuffer(true, fullCompaction);
    }

    @Override
    public void addNewFiles(List<DataFileMeta> files) {
        files.forEach(this.compactManager::addNewFile);
    }

    @Override
    public long memoryOccupancy() {
        return this.writeBuffer.memoryOccupancy();
    }

    @Override
    public void flushMemory() throws Exception {
        boolean success = this.writeBuffer.flushMemory();
        if (!success) {
            this.flushWriteBuffer(false, false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFullCompaction) throws Exception {
        if (this.writeBuffer.size() > 0) {
            if (this.compactManager.shouldWaitCompaction()) {
                waitForLatestCompaction = true;
            }
            RollingFileWriter<KeyValue, DataFileMeta> changelogWriter = this.changelogProducer == CoreOptions.ChangelogProducer.INPUT ? this.writerFactory.createRollingChangelogFileWriter(0) : null;
            RollingFileWriter<KeyValue, DataFileMeta> dataWriter = this.writerFactory.createRollingMergeTreeFileWriter(0);
            try {
                this.writeBuffer.forEach(this.keyComparator, this.mergeFunction, changelogWriter == null ? null : changelogWriter::write, dataWriter::write);
            }
            finally {
                if (changelogWriter != null) {
                    changelogWriter.close();
                }
                dataWriter.close();
            }
            if (changelogWriter != null) {
                this.newFilesChangelog.addAll((Collection<DataFileMeta>)changelogWriter.result());
            }
            Iterator iterator = dataWriter.result().iterator();
            while (iterator.hasNext()) {
                DataFileMeta fileMeta = (DataFileMeta)iterator.next();
                this.newFiles.add(fileMeta);
                this.compactManager.addNewFile(fileMeta);
            }
            this.writeBuffer.clear();
        }
        this.trySyncLatestCompaction(waitForLatestCompaction);
        this.compactManager.triggerCompaction(forcedFullCompaction);
    }

    @Override
    public RecordWriter.CommitIncrement prepareCommit(boolean blocking) throws Exception {
        this.flushWriteBuffer(false, false);
        this.trySyncLatestCompaction(blocking || this.commitForceCompact);
        return this.drainIncrement();
    }

    @Override
    public void sync() throws Exception {
        this.trySyncLatestCompaction(true);
    }

    private RecordWriter.CommitIncrement drainIncrement() {
        final NewFilesIncrement newFilesIncrement = new NewFilesIncrement(new ArrayList<DataFileMeta>(this.newFiles), new ArrayList<DataFileMeta>(this.newFilesChangelog));
        final CompactIncrement compactIncrement = new CompactIncrement(new ArrayList<DataFileMeta>(this.compactBefore.values()), new ArrayList<DataFileMeta>(this.compactAfter), new ArrayList<DataFileMeta>(this.compactChangelog));
        this.newFiles.clear();
        this.newFilesChangelog.clear();
        this.compactBefore.clear();
        this.compactAfter.clear();
        this.compactChangelog.clear();
        return new RecordWriter.CommitIncrement(){

            @Override
            public NewFilesIncrement newFilesIncrement() {
                return newFilesIncrement;
            }

            @Override
            public CompactIncrement compactIncrement() {
                return compactIncrement;
            }
        };
    }

    private void updateCompactResult(CompactResult result) {
        Set afterFiles = result.after().stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
        for (DataFileMeta file : result.before()) {
            if (this.compactAfter.remove(file)) {
                if (this.compactBefore.containsKey(file.fileName()) || afterFiles.contains(file.fileName())) continue;
                this.writerFactory.deleteFile(file.fileName());
                continue;
            }
            this.compactBefore.put(file.fileName(), file);
        }
        this.compactAfter.addAll(result.after());
        this.compactChangelog.addAll(result.changelog());
    }

    private void trySyncLatestCompaction(boolean blocking) throws Exception {
        Optional<CompactResult> result = this.compactManager.getCompactionResult(blocking);
        result.ifPresent(this::updateCompactResult);
    }

    @Override
    public void close() throws Exception {
        this.compactManager.cancelCompaction();
        this.sync();
        ArrayList<DataFileMeta> delete = new ArrayList<DataFileMeta>(this.newFiles);
        this.newFiles.clear();
        for (DataFileMeta file : this.newFilesChangelog) {
            this.writerFactory.deleteFile(file.fileName());
        }
        this.newFilesChangelog.clear();
        for (DataFileMeta file : this.compactAfter) {
            if (this.compactBefore.containsKey(file.fileName())) continue;
            delete.add(file);
        }
        this.compactAfter.clear();
        for (DataFileMeta file : this.compactChangelog) {
            this.writerFactory.deleteFile(file.fileName());
        }
        this.compactChangelog.clear();
        for (DataFileMeta file : delete) {
            this.writerFactory.deleteFile(file.fileName());
        }
    }
}

