/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.kafka.KafkaLogOptions;
import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.table.store.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.TopicExistsException;
import org.apache.flink.table.store.shaded.org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;

public class KafkaLogStoreFactory
implements LogStoreTableFactory {
    public static final String IDENTIFIER = "kafka";
    public static final String KAFKA_PREFIX = "kafka.";

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        HashSet options = new HashSet();
        options.add(KafkaLogOptions.BOOTSTRAP_SERVERS);
        return options;
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet();
    }

    @Override
    public Map<String, String> enrichOptions(DynamicTableFactory.Context context) {
        HashMap<String, String> options = new HashMap<String, String>(context.getCatalogTable().getOptions());
        Preconditions.checkArgument((!options.containsKey(KafkaLogOptions.TOPIC.key()) ? 1 : 0) != 0, (Object)"Managed table can not contain custom topic. You need to remove topic in table options or session config.");
        String topic = context.getObjectIdentifier().asSummaryString();
        options.put(KafkaLogOptions.TOPIC.key(), topic);
        return options;
    }

    private String topic(DynamicTableFactory.Context context) {
        return (String)context.getCatalogTable().getOptions().get(KafkaLogOptions.TOPIC.key());
    }

    @Override
    public void onCreateTable(DynamicTableFactory.Context context, int numBucket, boolean ignoreIfExists) {
        Configuration options = Configuration.fromMap((Map)context.getCatalogTable().getOptions());
        try (AdminClient adminClient = AdminClient.create(KafkaLogStoreFactory.toKafkaProperties((ReadableConfig)options));){
            HashMap<String, String> configs = new HashMap<String, String>();
            options.getOptional(CoreOptions.LOG_RETENTION).ifPresent(retention -> configs.put("retention.ms", String.valueOf(retention.toMillis())));
            NewTopic topicObj = new NewTopic(this.topic(context), Optional.of(numBucket), Optional.empty()).configs(configs);
            adminClient.createTopics(Collections.singleton(topicObj)).all().get();
        }
        catch (InterruptedException | ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                if (ignoreIfExists) {
                    return;
                }
                throw new TableException(String.format("Failed to create kafka topic. Reason: topic %s exists for table %s. Suggestion: please try `DESCRIBE TABLE %s` to check whether table exists in current catalog. If table exists and the DDL needs to be executed multiple times, please use `CREATE TABLE IF NOT EXISTS` ddl instead. Otherwise, please choose another table name or manually delete the current topic and try again.", this.topic(context), context.getObjectIdentifier().asSerializableString(), context.getObjectIdentifier().asSerializableString()));
            }
            throw new TableException("Error in createTopic", (Throwable)e);
        }
    }

    @Override
    public void onDropTable(DynamicTableFactory.Context context, boolean ignoreIfNotExists) {
        try (AdminClient adminClient = AdminClient.create(KafkaLogStoreFactory.toKafkaProperties(FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context).getOptions()));){
            adminClient.deleteTopics(Collections.singleton(this.topic(context))).all().get();
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
                if (ignoreIfNotExists) {
                    return;
                }
                throw new TableException(String.format("Failed to delete kafka topic. Reason: topic %s doesn't exist for table %s. Suggestion: please try `DROP TABLE IF EXISTS` ddl instead.", this.topic(context), context.getObjectIdentifier().asSerializableString()));
            }
            throw new TableException("Error in deleteTopic", (Throwable)e);
        }
        catch (InterruptedException e) {
            throw new TableException("Error in deleteTopic", (Throwable)e);
        }
    }

    @Override
    public KafkaLogSourceProvider createSourceProvider(DynamicTableFactory.Context context, DynamicTableSource.Context sourceContext, @Nullable int[][] projectFields) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        DataType physicalType = schema.toPhysicalRowDataType();
        DeserializationSchema primaryKeyDeserializer = null;
        int[] primaryKey = this.getPrimaryKeyIndexes(schema);
        if (primaryKey.length > 0) {
            DataType keyType = DataTypeUtils.projectRow((DataType)physicalType, (int[])primaryKey);
            primaryKeyDeserializer = (DeserializationSchema)LogStoreTableFactory.getKeyDecodingFormat(helper).createRuntimeDecoder(sourceContext, keyType);
        }
        DeserializationSchema valueDeserializer = (DeserializationSchema)LogStoreTableFactory.getValueDecodingFormat(helper).createRuntimeDecoder(sourceContext, physicalType);
        return new KafkaLogSourceProvider(this.topic(context), KafkaLogStoreFactory.toKafkaProperties(helper.getOptions()), physicalType, primaryKey, (DeserializationSchema<RowData>)primaryKeyDeserializer, (DeserializationSchema<RowData>)valueDeserializer, projectFields, (CoreOptions.LogConsistency)((Object)helper.getOptions().get(CoreOptions.LOG_CONSISTENCY)), CoreOptions.startupMode(helper.getOptions()), (Long)helper.getOptions().get(CoreOptions.SCAN_TIMESTAMP_MILLIS));
    }

    @Override
    public KafkaLogSinkProvider createSinkProvider(DynamicTableFactory.Context context, DynamicTableSink.Context sinkContext) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        DataType physicalType = schema.toPhysicalRowDataType();
        SerializationSchema primaryKeySerializer = null;
        int[] primaryKey = this.getPrimaryKeyIndexes(schema);
        if (primaryKey.length > 0) {
            DataType keyType = DataTypeUtils.projectRow((DataType)physicalType, (int[])primaryKey);
            primaryKeySerializer = (SerializationSchema)LogStoreTableFactory.getKeyEncodingFormat(helper).createRuntimeEncoder(sinkContext, keyType);
        }
        SerializationSchema valueSerializer = (SerializationSchema)LogStoreTableFactory.getValueEncodingFormat(helper).createRuntimeEncoder(sinkContext, physicalType);
        return new KafkaLogSinkProvider(this.topic(context), KafkaLogStoreFactory.toKafkaProperties(helper.getOptions()), (SerializationSchema<RowData>)primaryKeySerializer, (SerializationSchema<RowData>)valueSerializer, (CoreOptions.LogConsistency)((Object)helper.getOptions().get(CoreOptions.LOG_CONSISTENCY)), (CoreOptions.LogChangelogMode)((Object)helper.getOptions().get(CoreOptions.LOG_CHANGELOG_MODE)));
    }

    private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
        List columns = schema.getColumnNames();
        return schema.getPrimaryKey().map(UniqueConstraint::getColumns).map(pkColumns -> pkColumns.stream().mapToInt(columns::indexOf).toArray()).orElseGet(() -> new int[0]);
    }

    public static Properties toKafkaProperties(ReadableConfig options) {
        Properties properties = new Properties();
        Map optionMap = ((Configuration)options).toMap();
        optionMap.keySet().stream().filter(key -> key.startsWith(KAFKA_PREFIX)).forEach(key -> properties.put(key.substring(KAFKA_PREFIX.length()), optionMap.get(key)));
        if (options.get(CoreOptions.LOG_CONSISTENCY) == CoreOptions.LogConsistency.TRANSACTIONAL) {
            properties.setProperty("isolation.level", "read_committed");
        }
        return properties;
    }
}

