package org.apache.kafka.connect.mirror;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/OffsetSyncStore.class */
class OffsetSyncStore implements AutoCloseable {
    private static final Logger log;
    static final int SYNCS_PER_PARTITION = 64;
    private final KafkaBasedLog<byte[], byte[]> backingStore;
    private final Map<TopicPartition, OffsetSync[]> offsetSyncs;
    private final TopicAdmin admin;
    protected volatile boolean readToEnd;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public OffsetSyncStore(MirrorCheckpointConfig mirrorCheckpointConfig) {
        this.offsetSyncs = new ConcurrentHashMap();
        this.readToEnd = false;
        Consumer consumer = null;
        TopicAdmin topicAdmin = null;
        try {
            consumer = MirrorUtils.newConsumer(mirrorCheckpointConfig.offsetSyncsTopicConsumerConfig());
            topicAdmin = new TopicAdmin(mirrorCheckpointConfig.offsetSyncsTopicAdminConfig(), mirrorCheckpointConfig.forwardingAdmin(mirrorCheckpointConfig.offsetSyncsTopicAdminConfig()));
            KafkaBasedLog<byte[], byte[]> createBackingStore = createBackingStore(mirrorCheckpointConfig, consumer, topicAdmin);
            this.admin = topicAdmin;
            this.backingStore = createBackingStore;
        } catch (Throwable th) {
            Utils.closeQuietly(consumer, "consumer for offset syncs");
            Utils.closeQuietly(topicAdmin, "admin client for offset syncs");
            throw th;
        }
    }

    private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig mirrorCheckpointConfig, final Consumer<byte[], byte[]> consumer, TopicAdmin topicAdmin) {
        return new KafkaBasedLog<byte[], byte[]>(mirrorCheckpointConfig.offsetSyncsTopic(), Collections.emptyMap(), Collections.emptyMap(), () -> {
            return topicAdmin;
        }, (th, consumerRecord) -> {
            handleRecord(consumerRecord);
        }, Time.SYSTEM, topicAdmin2 -> {
        }) { // from class: org.apache.kafka.connect.mirror.OffsetSyncStore.1
            protected Producer<byte[], byte[]> createProducer() {
                return null;
            }

            protected Consumer<byte[], byte[]> createConsumer() {
                return consumer;
            }

            protected boolean readPartition(TopicPartition topicPartition) {
                return topicPartition.partition() == 0;
            }
        };
    }

    OffsetSyncStore() {
        this.offsetSyncs = new ConcurrentHashMap();
        this.readToEnd = false;
        this.admin = null;
        this.backingStore = null;
    }

    public void start() {
        this.backingStore.start();
        this.readToEnd = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OptionalLong translateDownstream(String str, TopicPartition topicPartition, long j) {
        if (!this.readToEnd) {
            log.debug("translateDownstream({},{},{}): Skipped (initial offset syncs read still in progress)", new Object[]{str, topicPartition, Long.valueOf(j)});
            return OptionalLong.empty();
        }
        Optional<OffsetSync> latestOffsetSync = latestOffsetSync(topicPartition, j);
        if (!latestOffsetSync.isPresent()) {
            log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)", new Object[]{str, topicPartition, Long.valueOf(j)});
            return OptionalLong.empty();
        }
        if (latestOffsetSync.get().upstreamOffset() > j) {
            log.debug("translateDownstream({},{},{}): Skipped ({} is ahead of upstream consumer group {})", new Object[]{str, topicPartition, Long.valueOf(j), latestOffsetSync.get(), Long.valueOf(j)});
            return OptionalLong.of(-1L);
        }
        long j2 = j == latestOffsetSync.get().upstreamOffset() ? 0L : 1L;
        log.debug("translateDownstream({},{},{}): Translated {} (relative to {})", new Object[]{str, topicPartition, Long.valueOf(j), Long.valueOf(latestOffsetSync.get().downstreamOffset() + j2), latestOffsetSync.get()});
        return OptionalLong.of(latestOffsetSync.get().downstreamOffset() + j2);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        AutoCloseable autoCloseable;
        if (this.backingStore != null) {
            KafkaBasedLog<byte[], byte[]> kafkaBasedLog = this.backingStore;
            Objects.requireNonNull(kafkaBasedLog);
            autoCloseable = kafkaBasedLog::stop;
        } else {
            autoCloseable = null;
        }
        Utils.closeQuietly(autoCloseable, "backing store for offset syncs");
        Utils.closeQuietly(this.admin, "admin client for offset syncs");
    }

    protected void handleRecord(ConsumerRecord<byte[], byte[]> consumerRecord) {
        OffsetSync deserializeRecord = OffsetSync.deserializeRecord(consumerRecord);
        this.offsetSyncs.compute(deserializeRecord.topicPartition(), (topicPartition, offsetSyncArr) -> {
            return offsetSyncArr == null ? createInitialSyncs(deserializeRecord) : updateExistingSyncs(offsetSyncArr, deserializeRecord);
        });
    }

    private OffsetSync[] updateExistingSyncs(OffsetSync[] offsetSyncArr, OffsetSync offsetSync) {
        OffsetSync[] offsetSyncArr2 = (OffsetSync[]) Arrays.copyOf(offsetSyncArr, SYNCS_PER_PARTITION);
        updateSyncArray(offsetSyncArr2, offsetSyncArr, offsetSync);
        if (log.isTraceEnabled()) {
            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(offsetSyncArr2));
        }
        return offsetSyncArr2;
    }

    private String offsetArrayToString(OffsetSync[] offsetSyncArr) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
            if (i == 0 || offsetSyncArr[i] != offsetSyncArr[i - 1]) {
                if (i != 0) {
                    sb.append(",");
                }
                sb.append(offsetSyncArr[i].upstreamOffset());
                sb.append(":");
                sb.append(offsetSyncArr[i].downstreamOffset());
            }
        }
        sb.append("]");
        return sb.toString();
    }

    private OffsetSync[] createInitialSyncs(OffsetSync offsetSync) {
        OffsetSync[] offsetSyncArr = new OffsetSync[SYNCS_PER_PARTITION];
        clearSyncArray(offsetSyncArr, offsetSync);
        return offsetSyncArr;
    }

    private void clearSyncArray(OffsetSync[] offsetSyncArr, OffsetSync offsetSync) {
        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
            offsetSyncArr[i] = offsetSync;
        }
    }

    private void updateSyncArray(OffsetSync[] offsetSyncArr, OffsetSync[] offsetSyncArr2, OffsetSync offsetSync) {
        boolean z;
        long upstreamOffset = offsetSync.upstreamOffset();
        if (!this.readToEnd || offsetSyncArr[0].upstreamOffset() > upstreamOffset) {
            clearSyncArray(offsetSyncArr, offsetSync);
            return;
        }
        OffsetSync offsetSync2 = offsetSync;
        int i = 0;
        offsetSyncArr[0] = offsetSync2;
        for (int i2 = 1; i2 < SYNCS_PER_PARTITION; i2++) {
            int i3 = i2 - 1;
            do {
                OffsetSync offsetSync3 = offsetSyncArr2[i];
                boolean invariantB = invariantB(offsetSyncArr[i3], offsetSync3, i3, i2);
                boolean invariantC = invariantC(offsetSyncArr[i3], offsetSync3, i3);
                int i4 = i2 + 1;
                boolean z2 = i4 >= SYNCS_PER_PARTITION || invariantC(offsetSync3, offsetSyncArr[i4], i2);
                int i5 = i + 1;
                boolean z3 = i5 < SYNCS_PER_PARTITION && offsetSync3 == offsetSyncArr2[i5];
                boolean z4 = invariantB && invariantC && z2;
                if (z4) {
                    offsetSync2 = offsetSync3;
                }
                z = z3 || !invariantC;
                if (z4 || z) {
                    i++;
                }
                if (i >= i2) {
                    break;
                }
            } while (z);
            if (!$assertionsDisabled && !invariantB(offsetSyncArr[i3], offsetSync2, i3, i2)) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !invariantC(offsetSyncArr[i3], offsetSync2, i3)) {
                throw new AssertionError();
            }
            if (invariantB(offsetSyncArr[i3], offsetSyncArr[i2], i3, i2)) {
                return;
            }
            offsetSyncArr[i2] = offsetSync2;
            if (!$assertionsDisabled && !invariantB(offsetSyncArr[i3], offsetSyncArr[i2], i3, i2)) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !invariantC(offsetSyncArr[i3], offsetSyncArr[i2], i3)) {
                throw new AssertionError();
            }
        }
    }

    private boolean invariantB(OffsetSync offsetSync, OffsetSync offsetSync2, int i, int i2) {
        long upstreamOffset = (offsetSync2.upstreamOffset() + (1 << i2)) - (1 << i);
        return offsetSync == offsetSync2 || upstreamOffset < 0 || offsetSync.upstreamOffset() <= upstreamOffset;
    }

    private boolean invariantC(OffsetSync offsetSync, OffsetSync offsetSync2, int i) {
        long upstreamOffset = offsetSync2.upstreamOffset() + (1 << Math.max(i - 2, 0));
        return offsetSync == offsetSync2 || (upstreamOffset >= 0 && offsetSync.upstreamOffset() >= upstreamOffset);
    }

    private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition, long j) {
        return Optional.ofNullable(this.offsetSyncs.get(topicPartition)).map(offsetSyncArr -> {
            return lookupLatestSync(offsetSyncArr, j);
        });
    }

    private OffsetSync lookupLatestSync(OffsetSync[] offsetSyncArr, long j) {
        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
            OffsetSync offsetSync = offsetSyncArr[i];
            if (offsetSync.upstreamOffset() <= j) {
                return offsetSync;
            }
        }
        return offsetSyncArr[63];
    }

    OffsetSync syncFor(TopicPartition topicPartition, int i) {
        OffsetSync[] offsetSyncArr = this.offsetSyncs.get(topicPartition);
        if (offsetSyncArr == null) {
            throw new IllegalArgumentException("No syncs present for " + topicPartition);
        }
        if (i >= offsetSyncArr.length) {
            throw new IllegalArgumentException("Requested sync " + (i + 1) + " for " + topicPartition + " but there are only " + offsetSyncArr.length + " syncs available for that topic partition");
        }
        return offsetSyncArr[i];
    }

    static {
        $assertionsDisabled = !OffsetSyncStore.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(OffsetSyncStore.class);
    }
}
