package org.apache.kafka.connect.mirror;

import java.util.OptionalLong;
import java.util.Random;
import java.util.stream.LongStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/connect/mirror/OffsetSyncStoreTest.class */
public class OffsetSyncStoreTest {
    static TopicPartition tp = new TopicPartition("topic1", 2);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/mirror/OffsetSyncStoreTest$FakeOffsetSyncStore.class */
    public static class FakeOffsetSyncStore extends OffsetSyncStore {
        public void start() {
            this.readToEnd = true;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void sync(TopicPartition topicPartition, long j, long j2) {
            OffsetSync offsetSync = new OffsetSync(topicPartition, j, j2);
            handleRecord(new ConsumerRecord("test.offsets.internal", 0, 3L, offsetSync.recordKey(), offsetSync.recordValue()));
        }
    }

    @Test
    public void testOffsetTranslation() {
        FakeOffsetSyncStore fakeOffsetSyncStore = new FakeOffsetSyncStore();
        try {
            fakeOffsetSyncStore.start();
            fakeOffsetSyncStore.sync(tp, 100L, 200L);
            Assertions.assertEquals(OptionalLong.of(201L), fakeOffsetSyncStore.translateDownstream(null, tp, 150L));
            fakeOffsetSyncStore.sync(tp, 150L, 251L);
            Assertions.assertEquals(OptionalLong.of(251L), fakeOffsetSyncStore.translateDownstream(null, tp, 150L));
            Assertions.assertEquals(OptionalLong.of(-1L), fakeOffsetSyncStore.translateDownstream(null, tp, 5L));
            fakeOffsetSyncStore.sync(tp, 200L, 10L);
            Assertions.assertEquals(OptionalLong.of(10L), fakeOffsetSyncStore.translateDownstream(null, tp, 200L));
            fakeOffsetSyncStore.sync(tp, 20L, 20L);
            Assertions.assertEquals(OptionalLong.of(20L), fakeOffsetSyncStore.translateDownstream(null, tp, 20L));
            fakeOffsetSyncStore.close();
        } catch (Throwable th) {
            try {
                fakeOffsetSyncStore.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testNoTranslationIfStoreNotStarted() {
        FakeOffsetSyncStore fakeOffsetSyncStore = new FakeOffsetSyncStore();
        try {
            Assertions.assertEquals(OptionalLong.empty(), fakeOffsetSyncStore.translateDownstream(null, tp, 0L));
            Assertions.assertEquals(OptionalLong.empty(), fakeOffsetSyncStore.translateDownstream(null, tp, 100L));
            Assertions.assertEquals(OptionalLong.empty(), fakeOffsetSyncStore.translateDownstream(null, tp, 200L));
            fakeOffsetSyncStore.sync(tp, 100L, 200L);
            Assertions.assertEquals(OptionalLong.empty(), fakeOffsetSyncStore.translateDownstream(null, tp, 0L));
            Assertions.assertEquals(OptionalLong.empty(), fakeOffsetSyncStore.translateDownstream(null, tp, 100L));
            Assertions.assertEquals(OptionalLong.empty(), fakeOffsetSyncStore.translateDownstream(null, tp, 200L));
            fakeOffsetSyncStore.start();
            Assertions.assertEquals(OptionalLong.of(-1L), fakeOffsetSyncStore.translateDownstream(null, tp, 0L));
            Assertions.assertEquals(OptionalLong.of(200L), fakeOffsetSyncStore.translateDownstream(null, tp, 100L));
            Assertions.assertEquals(OptionalLong.of(201L), fakeOffsetSyncStore.translateDownstream(null, tp, 200L));
            fakeOffsetSyncStore.close();
        } catch (Throwable th) {
            try {
                fakeOffsetSyncStore.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testNoTranslationIfNoOffsetSync() {
        FakeOffsetSyncStore fakeOffsetSyncStore = new FakeOffsetSyncStore();
        try {
            fakeOffsetSyncStore.start();
            Assertions.assertEquals(OptionalLong.empty(), fakeOffsetSyncStore.translateDownstream(null, tp, 0L));
            fakeOffsetSyncStore.close();
        } catch (Throwable th) {
            try {
                fakeOffsetSyncStore.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testPastOffsetTranslation() {
        FakeOffsetSyncStore fakeOffsetSyncStore = new FakeOffsetSyncStore();
        int i = 0;
        while (i <= 1000) {
            try {
                fakeOffsetSyncStore.sync(tp, i, i);
                assertSparseSyncInvariant(fakeOffsetSyncStore, tp);
                i += 10;
            } catch (Throwable th) {
                try {
                    fakeOffsetSyncStore.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        fakeOffsetSyncStore.start();
        assertSparseSync(fakeOffsetSyncStore, 1000L, -1L);
        while (i <= 10000) {
            fakeOffsetSyncStore.sync(tp, i, i);
            assertSparseSyncInvariant(fakeOffsetSyncStore, tp);
            i += 10;
        }
        assertSparseSync(fakeOffsetSyncStore, 1000L, -1L);
        assertSparseSync(fakeOffsetSyncStore, 4840L, 1000L);
        assertSparseSync(fakeOffsetSyncStore, 6760L, 4840L);
        assertSparseSync(fakeOffsetSyncStore, 8680L, 6760L);
        assertSparseSync(fakeOffsetSyncStore, 9160L, 8680L);
        assertSparseSync(fakeOffsetSyncStore, 9640L, 9160L);
        assertSparseSync(fakeOffsetSyncStore, 9880L, 9640L);
        assertSparseSync(fakeOffsetSyncStore, 9940L, 9880L);
        assertSparseSync(fakeOffsetSyncStore, 9970L, 9940L);
        assertSparseSync(fakeOffsetSyncStore, 9990L, 9970L);
        assertSparseSync(fakeOffsetSyncStore, 10000L, 9990L);
        fakeOffsetSyncStore.sync(tp, 1500L, 11000L);
        assertSparseSyncInvariant(fakeOffsetSyncStore, tp);
        Assertions.assertEquals(OptionalLong.of(-1L), fakeOffsetSyncStore.translateDownstream(null, tp, 1499L));
        Assertions.assertEquals(OptionalLong.of(11000L), fakeOffsetSyncStore.translateDownstream(null, tp, 1500L));
        Assertions.assertEquals(OptionalLong.of(11001L), fakeOffsetSyncStore.translateDownstream(null, tp, 2000L));
        fakeOffsetSyncStore.close();
    }

    @Test
    public void testConsistentlySpacedSyncs() {
        long j = Long.MAX_VALUE / 100;
        long j2 = 1;
        while (true) {
            long j3 = j2;
            if (j3 >= j) {
                return;
            }
            long j4 = 0;
            while (true) {
                long j5 = j4;
                if (j5 < 30) {
                    assertSyncSpacingHasBoundedExpirations(j5, LongStream.generate(() -> {
                        return j3;
                    }).limit(100L), 1);
                    j4 = j5 + 1;
                }
            }
            j2 = (j3 * 2) + 1;
        }
    }

    @Test
    public void testRandomlySpacedSyncs() {
        Random random = new Random(0L);
        long j = 1 << 10;
        for (int i = 1; i < 64 - 10; i++) {
            long j2 = 1 << i;
            int i2 = i + 2;
            assertSyncSpacingHasBoundedExpirations(0L, random.longs(j, 0L, j2), i2);
            assertSyncSpacingHasBoundedExpirations(0L, random.longs(j, 65536L, 65536 + j2), i2);
        }
    }

    @Test
    public void testDroppedSyncsSpacing() {
        long j = 100;
        assertSyncSpacingHasBoundedExpirations(0L, new Random(0L).doubles().mapToLong(d -> {
            return (d < 0.5d ? 2 : 1) * j;
        }).limit(10000L), 2);
    }

    /* JADX WARN: Type inference failed for: r0v7, types: [java.util.PrimitiveIterator$OfLong] */
    private void assertSyncSpacingHasBoundedExpirations(long j, LongStream longStream, int i) {
        FakeOffsetSyncStore fakeOffsetSyncStore = new FakeOffsetSyncStore();
        try {
            fakeOffsetSyncStore.start();
            fakeOffsetSyncStore.sync(tp, j, j);
            ?? it = longStream.iterator();
            long j2 = j;
            int i2 = 1;
            while (it.hasNext()) {
                j2 += it.nextLong();
                Assertions.assertTrue(j2 >= 0, "Test is invalid, offset overflowed");
                fakeOffsetSyncStore.sync(tp, j2, j2);
                Assertions.assertEquals(j2, fakeOffsetSyncStore.syncFor(tp, 0).upstreamOffset());
                Assertions.assertEquals(j, fakeOffsetSyncStore.syncFor(tp, 63).upstreamOffset());
                int countDistinctStoredSyncs = countDistinctStoredSyncs(fakeOffsetSyncStore, tp);
                int i3 = (i2 - countDistinctStoredSyncs) + 1;
                Assertions.assertTrue(i3 <= i, "Store expired too many syncs: " + i3 + " > " + i + " after receiving offset " + j2);
                i2 = countDistinctStoredSyncs;
            }
            fakeOffsetSyncStore.close();
        } catch (Throwable th) {
            try {
                fakeOffsetSyncStore.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void assertSparseSync(FakeOffsetSyncStore fakeOffsetSyncStore, long j, long j2) {
        Assertions.assertEquals(OptionalLong.of(j2 == -1 ? j2 : j2 + 1), fakeOffsetSyncStore.translateDownstream(null, tp, j - 1));
        Assertions.assertEquals(OptionalLong.of(j), fakeOffsetSyncStore.translateDownstream(null, tp, j));
        Assertions.assertEquals(OptionalLong.of(j + 1), fakeOffsetSyncStore.translateDownstream(null, tp, j + 1));
        Assertions.assertEquals(OptionalLong.of(j + 1), fakeOffsetSyncStore.translateDownstream(null, tp, j + 2));
    }

    private int countDistinctStoredSyncs(FakeOffsetSyncStore fakeOffsetSyncStore, TopicPartition topicPartition) {
        int i = 1;
        for (int i2 = 1; i2 < 64; i2++) {
            if (fakeOffsetSyncStore.syncFor(topicPartition, i2 - 1) != fakeOffsetSyncStore.syncFor(topicPartition, i2)) {
                i++;
            }
        }
        return i;
    }

    private void assertSparseSyncInvariant(FakeOffsetSyncStore fakeOffsetSyncStore, TopicPartition topicPartition) {
        for (int i = 0; i < 64; i++) {
            for (int i2 = 0; i2 < i; i2++) {
                long upstreamOffset = fakeOffsetSyncStore.syncFor(topicPartition, i).upstreamOffset();
                long upstreamOffset2 = fakeOffsetSyncStore.syncFor(topicPartition, i2).upstreamOffset();
                if (upstreamOffset != upstreamOffset2) {
                    int max = Math.max(i2 - 2, 0);
                    long j = upstreamOffset + (1 << max);
                    if (j >= 0) {
                        Assertions.assertTrue(upstreamOffset2 >= j, "Invariant C(" + i2 + "," + i + "): Upstream offset " + upstreamOffset2 + " at position " + i2 + " should be at least " + j + " (" + upstreamOffset + " + 2^" + max + ")");
                        long j2 = (upstreamOffset + (1 << i)) - (1 << i2);
                        if (j2 >= 0) {
                            Assertions.assertTrue(upstreamOffset2 <= j2, "Invariant B(" + i2 + "," + i + "): Upstream offset " + upstreamOffset2 + " at position " + i2 + " should be no greater than " + j2 + " (" + upstreamOffset + " + 2^" + i + " - 2^" + i2 + ")");
                        }
                    }
                }
            }
        }
    }
}
