package org.apache.kafka.connect.storage;

import com.mapr.baseutils.audit.AuditConstants;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaConfigStorage.class */
public class KafkaConfigStorage {
    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
    public static final String CONNECTOR_PREFIX = "connector-";
    public static final String TASK_PREFIX = "task-";
    public static final String COMMIT_TASKS_PREFIX = "commit-";
    private static final long READ_TO_END_TIMEOUT_MS = 30000;
    private final Converter converter;
    private final Callback<String> connectorConfigCallback;
    private final Callback<List<ConnectorTaskId>> tasksConfigCallback;
    private String topic;
    private KafkaBasedLog<String, byte[]> configLog;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaConfigStorage.class);
    public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct().field(AuditConstants.PROPERTIES, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA)).build();
    public static final Schema TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0;
    public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct().field(ConnectProtocol.TASKS_KEY_NAME, Schema.INT32_SCHEMA).build();
    private Map<String, Integer> connectorTaskCounts = new HashMap();
    private Map<String, Map<String, String>> connectorConfigs = new HashMap();
    private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap();
    private Set<String> inconsistent = new HashSet();
    private Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap();
    private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() { // from class: org.apache.kafka.connect.storage.KafkaConfigStorage.1
        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.kafka.connect.storage.KafkaConfigStorage.access$302(org.apache.kafka.connect.storage.KafkaConfigStorage, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.kafka.connect.storage.KafkaConfigStorage
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(java.lang.Throwable r7, org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, byte[]> r8) {
            /*
                Method dump skipped, instructions count: 959
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.storage.KafkaConfigStorage.AnonymousClass1.onCompletion(java.lang.Throwable, org.apache.kafka.clients.consumer.ConsumerRecord):void");
        }
    };
    private final Object lock = new Object();
    private boolean starting = false;
    private long offset = -1;

    public static String CONNECTOR_KEY(String str) {
        return CONNECTOR_PREFIX + str;
    }

    public static String TASK_KEY(ConnectorTaskId connectorTaskId) {
        return TASK_PREFIX + connectorTaskId.connector() + "-" + connectorTaskId.task();
    }

    public static String COMMIT_TASKS_KEY(String str) {
        return COMMIT_TASKS_PREFIX + str;
    }

    public KafkaConfigStorage(Converter converter, Callback<String> callback, Callback<List<ConnectorTaskId>> callback2) {
        this.converter = converter;
        this.connectorConfigCallback = callback;
        this.tasksConfigCallback = callback2;
    }

    public void configure(Map<String, ?> map) {
        if (map.get(CONFIG_TOPIC_CONFIG) == null) {
            throw new ConnectException("Must specify topic for connector configuration.");
        }
        this.topic = (String) map.get(CONFIG_TOPIC_CONFIG);
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put(ProducerConfig.ACKS_CONFIG, "all");
        HashMap hashMap2 = new HashMap();
        hashMap2.putAll(map);
        hashMap2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        hashMap2.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        hashMap2.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        this.configLog = createKafkaBasedLog(this.topic, hashMap, hashMap2, this.consumedCallback);
    }

    public void start() {
        log.info("Starting KafkaConfigStorage");
        this.starting = true;
        this.configLog.start();
        this.starting = false;
        log.info("Started KafkaConfigStorage");
    }

    public void stop() {
        log.info("Closing KafkaConfigStorage");
        this.configLog.stop();
        log.info("Closed KafkaConfigStorage");
    }

    public ClusterConfigState snapshot() {
        ClusterConfigState clusterConfigState;
        synchronized (this.lock) {
            clusterConfigState = new ClusterConfigState(this.offset, new HashMap(this.connectorTaskCounts), new HashMap(this.connectorConfigs), new HashMap(this.taskConfigs), new HashSet(this.inconsistent));
        }
        return clusterConfigState;
    }

    public void putConnectorConfig(String str, Map<String, String> map) {
        byte[] fromConnectData;
        if (map == null) {
            fromConnectData = null;
        } else {
            Struct struct = new Struct(CONNECTOR_CONFIGURATION_V0);
            struct.put(AuditConstants.PROPERTIES, map);
            fromConnectData = this.converter.fromConnectData(this.topic, CONNECTOR_CONFIGURATION_V0, struct);
        }
        try {
            this.configLog.send(CONNECTOR_KEY(str), fromConnectData);
            this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to write connector configuration to Kafka: ", e);
            throw new ConnectException("Error writing connector configuration to Kafka", e);
        }
    }

    public void putTaskConfigs(Map<ConnectorTaskId, Map<String, String>> map) {
        try {
            this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
            HashMap hashMap = new HashMap();
            synchronized (this.lock) {
                for (Map.Entry<String, Set<Integer>> entry : taskIdsByConnector(map).entrySet()) {
                    if (!completeTaskIdSet(entry.getValue(), entry.getValue().size())) {
                        log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission");
                        throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors");
                    }
                    hashMap.put(entry.getKey(), Integer.valueOf(entry.getValue().size()));
                }
            }
            for (Map.Entry<ConnectorTaskId, Map<String, String>> entry2 : map.entrySet()) {
                Struct struct = new Struct(TASK_CONFIGURATION_V0);
                struct.put(AuditConstants.PROPERTIES, entry2.getValue());
                this.configLog.send(TASK_KEY(entry2.getKey()), this.converter.fromConnectData(this.topic, TASK_CONFIGURATION_V0, struct));
            }
            try {
                this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
                for (Map.Entry entry3 : hashMap.entrySet()) {
                    Struct struct2 = new Struct(CONNECTOR_TASKS_COMMIT_V0);
                    struct2.put(ConnectProtocol.TASKS_KEY_NAME, entry3.getValue());
                    this.configLog.send(COMMIT_TASKS_KEY((String) entry3.getKey()), this.converter.fromConnectData(this.topic, CONNECTOR_TASKS_COMMIT_V0, struct2));
                }
                this.configLog.readToEnd().get(30000L, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                log.error("Failed to write root configuration to Kafka: ", e);
                throw new ConnectException("Error writing root configuration to Kafka", e);
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e2) {
            log.error("Failed to write root configuration to Kafka: ", e2);
            throw new ConnectException("Error writing root configuration to Kafka", e2);
        }
    }

    public Future<Void> readToEnd() {
        return this.configLog.readToEnd();
    }

    public void readToEnd(Callback<Void> callback) {
        this.configLog.readToEnd(callback);
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<String, byte[]>> callback) {
        return new KafkaBasedLog<>(str, map, map2, callback, new SystemTime());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ConnectorTaskId parseTaskId(String str) {
        String[] split = str.split("-");
        if (split.length < 3) {
            return null;
        }
        try {
            return new ConnectorTaskId(Utils.join(Arrays.copyOfRange(split, 1, split.length - 1), "-"), Integer.parseInt(split[split.length - 1]));
        } catch (NumberFormatException e) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, Map<String, String>> map) {
        HashMap hashMap = new HashMap();
        if (map == null) {
            return hashMap;
        }
        Iterator<Map.Entry<ConnectorTaskId, Map<String, String>>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            ConnectorTaskId key = it.next().getKey();
            if (!hashMap.containsKey(key.connector())) {
                hashMap.put(key.connector(), new TreeSet());
            }
            ((Set) hashMap.get(key.connector())).add(Integer.valueOf(key.task()));
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean completeTaskIdSet(Set<Integer> set, int i) {
        if (set.size() < i) {
            return false;
        }
        for (int i2 = 0; i2 < i; i2++) {
            if (!set.contains(Integer.valueOf(i2))) {
                return false;
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int intValue(Object obj) {
        if (obj instanceof Integer) {
            return ((Integer) obj).intValue();
        }
        if (obj instanceof Long) {
            return (int) ((Long) obj).longValue();
        }
        throw new ConnectException("Expected integer value to be either Integer or Long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.connect.storage.KafkaConfigStorage.access$302(org.apache.kafka.connect.storage.KafkaConfigStorage, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$302(org.apache.kafka.connect.storage.KafkaConfigStorage r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.offset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.storage.KafkaConfigStorage.access$302(org.apache.kafka.connect.storage.KafkaConfigStorage, long):long");
    }

    static /* synthetic */ Object access$400(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.lock;
    }

    static /* synthetic */ Map access$500(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.connectorConfigs;
    }

    static /* synthetic */ boolean access$600(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.starting;
    }

    static /* synthetic */ Callback access$700(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.connectorConfigCallback;
    }

    static /* synthetic */ ConnectorTaskId access$800(KafkaConfigStorage kafkaConfigStorage, String str) {
        return kafkaConfigStorage.parseTaskId(str);
    }

    static /* synthetic */ Map access$900(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.deferredTaskUpdates;
    }

    static /* synthetic */ int access$1000(Object obj) {
        return intValue(obj);
    }

    static /* synthetic */ Map access$1100(KafkaConfigStorage kafkaConfigStorage, Map map) {
        return kafkaConfigStorage.taskIdsByConnector(map);
    }

    static /* synthetic */ boolean access$1200(KafkaConfigStorage kafkaConfigStorage, Set set, int i) {
        return kafkaConfigStorage.completeTaskIdSet(set, i);
    }

    static /* synthetic */ Set access$1300(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.inconsistent;
    }

    static /* synthetic */ Map access$1400(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.taskConfigs;
    }

    static /* synthetic */ Map access$1500(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.connectorTaskCounts;
    }

    static /* synthetic */ Callback access$1600(KafkaConfigStorage kafkaConfigStorage) {
        return kafkaConfigStorage.tasksConfigCallback;
    }

    static {
    }
}
