package org.apache.kafka.connect.storage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/kafka/connect/storage/KafkaConfigStorage.class */
public class KafkaConfigStorage {
    private static final Logger log;
    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
    public static final String CONNECTOR_PREFIX = "connector-";
    public static final String TASK_PREFIX = "task-";
    public static final String COMMIT_TASKS_PREFIX = "commit-";
    public static final Schema CONNECTOR_CONFIGURATION_V0;
    public static final Schema TASK_CONFIGURATION_V0;
    public static final Schema CONNECTOR_TASKS_COMMIT_V0;
    private static final long READ_TO_END_TIMEOUT_MS = 30000;
    private final Converter converter;
    private final Callback<String> connectorConfigCallback;
    private final Callback<List<ConnectorTaskId>> tasksConfigCallback;
    private String topic;
    private KafkaBasedLog<String, byte[]> configLog;
    static final /* synthetic */ boolean $assertionsDisabled;
    private Map<String, Integer> connectorTaskCounts = new HashMap();
    private Map<String, Map<String, String>> connectorConfigs = new HashMap();
    private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap();
    private Set<String> inconsistent = new HashSet();
    private Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap();
    private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() { // from class: org.apache.kafka.connect.storage.KafkaConfigStorage.1
        AnonymousClass1() {
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<String, byte[]> consumerRecord) {
            if (th != null) {
                KafkaConfigStorage.log.error("Unexpected in consumer callback for KafkaConfigStorage: ", th);
                return;
            }
            try {
                SchemaAndValue connectData = KafkaConfigStorage.this.converter.toConnectData(KafkaConfigStorage.this.topic, (byte[]) consumerRecord.value());
                KafkaConfigStorage.access$302(KafkaConfigStorage.this, consumerRecord.offset() + 1);
                if (((String) consumerRecord.key()).startsWith(KafkaConfigStorage.CONNECTOR_PREFIX)) {
                    String substring = ((String) consumerRecord.key()).substring(KafkaConfigStorage.CONNECTOR_PREFIX.length());
                    synchronized (KafkaConfigStorage.this.lock) {
                        if (connectData.value() == null) {
                            KafkaConfigStorage.this.connectorConfigs.remove(substring);
                        } else {
                            if (!(connectData.value() instanceof Map)) {
                                KafkaConfigStorage.log.error("Found connector configuration (" + ((String) consumerRecord.key()) + ") in wrong format: " + connectData.value().getClass());
                                return;
                            }
                            Object obj = ((Map) connectData.value()).get("properties");
                            if (!(obj instanceof Map)) {
                                KafkaConfigStorage.log.error("Invalid data for connector config: properties filed should be a Map but is " + obj.getClass());
                                return;
                            }
                            KafkaConfigStorage.this.connectorConfigs.put(substring, (Map) obj);
                        }
                        if (KafkaConfigStorage.this.starting) {
                            return;
                        }
                        KafkaConfigStorage.this.connectorConfigCallback.onCompletion(null, substring);
                        return;
                    }
                }
                if (((String) consumerRecord.key()).startsWith(KafkaConfigStorage.TASK_PREFIX)) {
                    synchronized (KafkaConfigStorage.this.lock) {
                        ConnectorTaskId parseTaskId = KafkaConfigStorage.this.parseTaskId((String) consumerRecord.key());
                        if (parseTaskId == null) {
                            KafkaConfigStorage.log.error("Ignoring task configuration because " + ((String) consumerRecord.key()) + " couldn't be parsed as a task config key");
                            return;
                        }
                        if (!(connectData.value() instanceof Map)) {
                            KafkaConfigStorage.log.error("Ignoring task configuration because it is in the wrong format: " + connectData.value());
                            return;
                        }
                        Object obj2 = ((Map) connectData.value()).get("properties");
                        if (!(obj2 instanceof Map)) {
                            KafkaConfigStorage.log.error("Invalid data for task config: properties filed should be a Map but is " + obj2.getClass());
                            return;
                        }
                        Map map = (Map) KafkaConfigStorage.this.deferredTaskUpdates.get(parseTaskId.connector());
                        if (map == null) {
                            map = new HashMap();
                            KafkaConfigStorage.this.deferredTaskUpdates.put(parseTaskId.connector(), map);
                        }
                        map.put(parseTaskId, (Map) obj2);
                        return;
                    }
                }
                if (!((String) consumerRecord.key()).startsWith(KafkaConfigStorage.COMMIT_TASKS_PREFIX)) {
                    KafkaConfigStorage.log.error("Discarding config update record with invalid key: " + ((String) consumerRecord.key()));
                    return;
                }
                String substring2 = ((String) consumerRecord.key()).substring(KafkaConfigStorage.COMMIT_TASKS_PREFIX.length());
                ArrayList arrayList = new ArrayList();
                synchronized (KafkaConfigStorage.this.lock) {
                    if (!(connectData.value() instanceof Map)) {
                        KafkaConfigStorage.log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + connectData.value());
                        return;
                    }
                    Map map2 = (Map) KafkaConfigStorage.this.deferredTaskUpdates.get(substring2);
                    int intValue = KafkaConfigStorage.intValue(((Map) connectData.value()).get(ConnectProtocol.TASKS_KEY_NAME));
                    if (KafkaConfigStorage.this.completeTaskIdSet(KafkaConfigStorage.this.taskIds(substring2, map2), intValue)) {
                        if (map2 != null) {
                            KafkaConfigStorage.this.taskConfigs.putAll(map2);
                            arrayList.addAll(KafkaConfigStorage.this.taskConfigs.keySet());
                        }
                        KafkaConfigStorage.this.inconsistent.remove(substring2);
                    } else {
                        KafkaConfigStorage.this.inconsistent.add(substring2);
                    }
                    if (map2 != null) {
                        map2.clear();
                    }
                    KafkaConfigStorage.this.connectorTaskCounts.put(substring2, Integer.valueOf(intValue));
                    if (KafkaConfigStorage.this.starting) {
                        return;
                    }
                    KafkaConfigStorage.this.tasksConfigCallback.onCompletion(null, arrayList);
                }
            } catch (DataException e) {
                KafkaConfigStorage.log.error("Failed to convert config data to Kafka Connect format: ", e);
            }
        }
    };
    private final Object lock = new Object();
    private boolean starting = false;
    private long offset = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.connect.storage.KafkaConfigStorage$1 */
    /* loaded from: input_file:org/apache/kafka/connect/storage/KafkaConfigStorage$1.class */
    public class AnonymousClass1 implements Callback<ConsumerRecord<String, byte[]>> {
        AnonymousClass1() {
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<String, byte[]> consumerRecord) {
            if (th != null) {
                KafkaConfigStorage.log.error("Unexpected in consumer callback for KafkaConfigStorage: ", th);
                return;
            }
            try {
                SchemaAndValue connectData = KafkaConfigStorage.this.converter.toConnectData(KafkaConfigStorage.this.topic, (byte[]) consumerRecord.value());
                KafkaConfigStorage.access$302(KafkaConfigStorage.this, consumerRecord.offset() + 1);
                if (((String) consumerRecord.key()).startsWith(KafkaConfigStorage.CONNECTOR_PREFIX)) {
                    String substring = ((String) consumerRecord.key()).substring(KafkaConfigStorage.CONNECTOR_PREFIX.length());
                    synchronized (KafkaConfigStorage.this.lock) {
                        if (connectData.value() == null) {
                            KafkaConfigStorage.this.connectorConfigs.remove(substring);
                        } else {
                            if (!(connectData.value() instanceof Map)) {
                                KafkaConfigStorage.log.error("Found connector configuration (" + ((String) consumerRecord.key()) + ") in wrong format: " + connectData.value().getClass());
                                return;
                            }
                            Object obj = ((Map) connectData.value()).get("properties");
                            if (!(obj instanceof Map)) {
                                KafkaConfigStorage.log.error("Invalid data for connector config: properties filed should be a Map but is " + obj.getClass());
                                return;
                            }
                            KafkaConfigStorage.this.connectorConfigs.put(substring, (Map) obj);
                        }
                        if (KafkaConfigStorage.this.starting) {
                            return;
                        }
                        KafkaConfigStorage.this.connectorConfigCallback.onCompletion(null, substring);
                        return;
                    }
                }
                if (((String) consumerRecord.key()).startsWith(KafkaConfigStorage.TASK_PREFIX)) {
                    synchronized (KafkaConfigStorage.this.lock) {
                        ConnectorTaskId parseTaskId = KafkaConfigStorage.this.parseTaskId((String) consumerRecord.key());
                        if (parseTaskId == null) {
                            KafkaConfigStorage.log.error("Ignoring task configuration because " + ((String) consumerRecord.key()) + " couldn't be parsed as a task config key");
                            return;
                        }
                        if (!(connectData.value() instanceof Map)) {
                            KafkaConfigStorage.log.error("Ignoring task configuration because it is in the wrong format: " + connectData.value());
                            return;
                        }
                        Object obj2 = ((Map) connectData.value()).get("properties");
                        if (!(obj2 instanceof Map)) {
                            KafkaConfigStorage.log.error("Invalid data for task config: properties filed should be a Map but is " + obj2.getClass());
                            return;
                        }
                        Map map = (Map) KafkaConfigStorage.this.deferredTaskUpdates.get(parseTaskId.connector());
                        if (map == null) {
                            map = new HashMap();
                            KafkaConfigStorage.this.deferredTaskUpdates.put(parseTaskId.connector(), map);
                        }
                        map.put(parseTaskId, (Map) obj2);
                        return;
                    }
                }
                if (!((String) consumerRecord.key()).startsWith(KafkaConfigStorage.COMMIT_TASKS_PREFIX)) {
                    KafkaConfigStorage.log.error("Discarding config update record with invalid key: " + ((String) consumerRecord.key()));
                    return;
                }
                String substring2 = ((String) consumerRecord.key()).substring(KafkaConfigStorage.COMMIT_TASKS_PREFIX.length());
                ArrayList arrayList = new ArrayList();
                synchronized (KafkaConfigStorage.this.lock) {
                    if (!(connectData.value() instanceof Map)) {
                        KafkaConfigStorage.log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + connectData.value());
                        return;
                    }
                    Map map2 = (Map) KafkaConfigStorage.this.deferredTaskUpdates.get(substring2);
                    int intValue = KafkaConfigStorage.intValue(((Map) connectData.value()).get(ConnectProtocol.TASKS_KEY_NAME));
                    if (KafkaConfigStorage.this.completeTaskIdSet(KafkaConfigStorage.this.taskIds(substring2, map2), intValue)) {
                        if (map2 != null) {
                            KafkaConfigStorage.this.taskConfigs.putAll(map2);
                            arrayList.addAll(KafkaConfigStorage.this.taskConfigs.keySet());
                        }
                        KafkaConfigStorage.this.inconsistent.remove(substring2);
                    } else {
                        KafkaConfigStorage.this.inconsistent.add(substring2);
                    }
                    if (map2 != null) {
                        map2.clear();
                    }
                    KafkaConfigStorage.this.connectorTaskCounts.put(substring2, Integer.valueOf(intValue));
                    if (KafkaConfigStorage.this.starting) {
                        return;
                    }
                    KafkaConfigStorage.this.tasksConfigCallback.onCompletion(null, arrayList);
                }
            } catch (DataException e) {
                KafkaConfigStorage.log.error("Failed to convert config data to Kafka Connect format: ", e);
            }
        }
    }

    public static String CONNECTOR_KEY(String str) {
        return CONNECTOR_PREFIX + str;
    }

    public static String TASK_KEY(ConnectorTaskId connectorTaskId) {
        return TASK_PREFIX + connectorTaskId.connector() + "-" + connectorTaskId.task();
    }

    public static String COMMIT_TASKS_KEY(String str) {
        return COMMIT_TASKS_PREFIX + str;
    }

    public KafkaConfigStorage(Converter converter, Callback<String> callback, Callback<List<ConnectorTaskId>> callback2) {
        this.converter = converter;
        this.connectorConfigCallback = callback;
        this.tasksConfigCallback = callback2;
    }

    public void configure(Map<String, ?> map) {
        if (map.get(CONFIG_TOPIC_CONFIG) == null) {
            throw new ConnectException("Must specify topic for connector configuration.");
        }
        this.topic = (String) map.get(CONFIG_TOPIC_CONFIG);
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        hashMap.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put("acks", "all");
        HashMap hashMap2 = new HashMap();
        hashMap2.putAll(map);
        hashMap2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        hashMap2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        hashMap2.put("enable.auto.commit", false);
        this.configLog = createKafkaBasedLog(this.topic, hashMap, hashMap2, this.consumedCallback);
    }

    public void start() {
        log.info("Starting KafkaConfigStorage");
        this.starting = true;
        this.configLog.start();
        this.starting = false;
        log.info("Started KafkaConfigStorage");
    }

    public void stop() {
        log.info("Closing KafkaConfigStorage");
        this.configLog.stop();
        log.info("Closed KafkaConfigStorage");
    }

    public ClusterConfigState snapshot() {
        ClusterConfigState clusterConfigState;
        synchronized (this.lock) {
            clusterConfigState = new ClusterConfigState(this.offset, new HashMap(this.connectorTaskCounts), new HashMap(this.connectorConfigs), new HashMap(this.taskConfigs), new HashSet(this.inconsistent));
        }
        return clusterConfigState;
    }

    public void putConnectorConfig(String str, Map<String, String> map) {
        byte[] fromConnectData;
        if (map == null) {
            fromConnectData = null;
        } else {
            Struct struct = new Struct(CONNECTOR_CONFIGURATION_V0);
            struct.put("properties", map);
            fromConnectData = this.converter.fromConnectData(this.topic, CONNECTOR_CONFIGURATION_V0, struct);
        }
        try {
            this.configLog.send(CONNECTOR_KEY(str), fromConnectData);
            this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Failed to write connector configuration to Kafka: ", e);
            throw new ConnectException("Error writing connector configuration to Kafka", e);
        }
    }

    public void putTaskConfigs(String str, List<Map<String, String>> list) {
        try {
            this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            int size = list.size();
            int i = 0;
            for (Map<String, String> map : list) {
                Struct struct = new Struct(TASK_CONFIGURATION_V0);
                struct.put("properties", map);
                byte[] fromConnectData = this.converter.fromConnectData(this.topic, TASK_CONFIGURATION_V0, struct);
                log.debug("Writing configuration for task " + i + " configuration: " + map);
                this.configLog.send(TASK_KEY(new ConnectorTaskId(str, i)), fromConnectData);
                i++;
            }
            if (size > 0) {
                try {
                    this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
                } catch (InterruptedException | ExecutionException | TimeoutException e) {
                    log.error("Failed to write root configuration to Kafka: ", e);
                    throw new ConnectException("Error writing root configuration to Kafka", e);
                }
            }
            Struct struct2 = new Struct(CONNECTOR_TASKS_COMMIT_V0);
            struct2.put(ConnectProtocol.TASKS_KEY_NAME, Integer.valueOf(size));
            byte[] fromConnectData2 = this.converter.fromConnectData(this.topic, CONNECTOR_TASKS_COMMIT_V0, struct2);
            log.debug("Writing commit for connector " + str + " with " + size + " tasks.");
            this.configLog.send(COMMIT_TASKS_KEY(str), fromConnectData2);
            this.configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e2) {
            log.error("Failed to write root configuration to Kafka: ", e2);
            throw new ConnectException("Error writing root configuration to Kafka", e2);
        }
    }

    public Future<Void> readToEnd() {
        return this.configLog.readToEnd();
    }

    public void readToEnd(Callback<Void> callback) {
        this.configLog.readToEnd(callback);
    }

    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String str, Map<String, Object> map, Map<String, Object> map2, Callback<ConsumerRecord<String, byte[]>> callback) {
        return new KafkaBasedLog<>(str, map, map2, callback, new SystemTime());
    }

    public ConnectorTaskId parseTaskId(String str) {
        String[] split = str.split("-");
        if (split.length < 3) {
            return null;
        }
        try {
            return new ConnectorTaskId(Utils.join(Arrays.copyOfRange(split, 1, split.length - 1), "-"), Integer.parseInt(split[split.length - 1]));
        } catch (NumberFormatException e) {
            return null;
        }
    }

    public Set<Integer> taskIds(String str, Map<ConnectorTaskId, Map<String, String>> map) {
        TreeSet treeSet = new TreeSet();
        if (map == null) {
            return treeSet;
        }
        for (ConnectorTaskId connectorTaskId : map.keySet()) {
            if (!$assertionsDisabled && !connectorTaskId.connector().equals(str)) {
                throw new AssertionError();
            }
            treeSet.add(Integer.valueOf(connectorTaskId.task()));
        }
        return treeSet;
    }

    public boolean completeTaskIdSet(Set<Integer> set, int i) {
        if (set.size() < i) {
            return false;
        }
        for (int i2 = 0; i2 < i; i2++) {
            if (!set.contains(Integer.valueOf(i2))) {
                return false;
            }
        }
        return true;
    }

    public static int intValue(Object obj) {
        if (obj instanceof Integer) {
            return ((Integer) obj).intValue();
        }
        if (obj instanceof Long) {
            return (int) ((Long) obj).longValue();
        }
        throw new ConnectException("Expected integer value to be either Integer or Long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.connect.storage.KafkaConfigStorage.access$302(org.apache.kafka.connect.storage.KafkaConfigStorage, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$302(org.apache.kafka.connect.storage.KafkaConfigStorage r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.offset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.connect.storage.KafkaConfigStorage.access$302(org.apache.kafka.connect.storage.KafkaConfigStorage, long):long");
    }

    static {
        $assertionsDisabled = !KafkaConfigStorage.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(KafkaConfigStorage.class);
        CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct().field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA)).build();
        TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0;
        CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct().field(ConnectProtocol.TASKS_KEY_NAME, Schema.INT32_SCHEMA).build();
    }
}
