/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.util.List;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.LogicalKeyValueSegment;
import org.apache.kafka.streams.state.internals.LogicalKeyValueSegments;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LogicalKeyValueSegmentsTest {
    private static final long SEGMENT_INTERVAL = 100L;
    private static final long RETENTION_PERIOD = 400L;
    private static final String STORE_NAME = "logical-segments";
    private static final String METRICS_SCOPE = "metrics-scope";
    private static final String DB_FILE_DIR = "rocksdb";
    private InternalMockProcessorContext context;
    private LogicalKeyValueSegments segments;

    @Before
    public void setUp() {
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), new MockRecordCollector(), new ThreadCache(new LogContext("testCache "), 0L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics())));
        this.segments = new LogicalKeyValueSegments(STORE_NAME, DB_FILE_DIR, 400L, 100L, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME));
        this.segments.openExisting((ProcessorContext)this.context, 0L);
    }

    @After
    public void tearDown() {
        this.segments.close();
    }

    @Test
    public void shouldGetSegmentIdsFromTimestamp() {
        Assert.assertEquals((long)0L, (long)this.segments.segmentId(0L));
        Assert.assertEquals((long)1L, (long)this.segments.segmentId(100L));
        Assert.assertEquals((long)2L, (long)this.segments.segmentId(200L));
        Assert.assertEquals((long)3L, (long)this.segments.segmentId(300L));
    }

    @Test
    public void shouldCreateSegments() {
        LogicalKeyValueSegment segment1 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(0L, (ProcessorContext)this.context, 0L);
        LogicalKeyValueSegment segment2 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(1L, (ProcessorContext)this.context, 100L);
        LogicalKeyValueSegment segment3 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(2L, (ProcessorContext)this.context, 200L);
        File rocksdbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), STORE_NAME);
        Assert.assertTrue((boolean)rocksdbDir.isDirectory());
        Assert.assertTrue((boolean)segment1.isOpen());
        Assert.assertTrue((boolean)segment2.isOpen());
        Assert.assertTrue((boolean)segment3.isOpen());
    }

    @Test
    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
        long streamTime = this.updateStreamTimeAndCreateSegment(7);
        Assert.assertNull((Object)this.segments.getOrCreateSegmentIfLive(0L, (ProcessorContext)this.context, streamTime));
    }

    @Test
    public void shouldCreateReservedSegments() {
        LogicalKeyValueSegment reservedSegment1 = this.segments.createReservedSegment(-1L, "reserved-1");
        LogicalKeyValueSegment reservedSegment2 = this.segments.createReservedSegment(-2L, "reserved-2");
        File rocksdbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), STORE_NAME);
        Assert.assertTrue((boolean)rocksdbDir.isDirectory());
        Assert.assertTrue((boolean)reservedSegment1.isOpen());
        Assert.assertTrue((boolean)reservedSegment2.isOpen());
    }

    @Test
    public void shouldNotCreateReservedSegmentWithNonNegativeId() {
        Assert.assertThrows(IllegalArgumentException.class, () -> this.segments.createReservedSegment(0L, "reserved"));
        Assert.assertThrows(IllegalArgumentException.class, () -> this.segments.createReservedSegment(1L, "reserved"));
    }

    @Test
    public void shouldNotCreateReservedSegmentFromRegularMethod() {
        Assert.assertThrows(IllegalArgumentException.class, () -> this.segments.getOrCreateSegmentIfLive(-1L, (ProcessorContext)this.context, 0L));
        Assert.assertThrows(IllegalArgumentException.class, () -> this.segments.getOrCreateSegment(-1L, (ProcessorContext)this.context));
    }

    @Test
    public void shouldCleanupSegmentsThatHaveExpired() {
        LogicalKeyValueSegment segment1 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(0L, (ProcessorContext)this.context, 0L);
        LogicalKeyValueSegment segment2 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(2L, (ProcessorContext)this.context, 200L);
        LogicalKeyValueSegment segment3 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(3L, (ProcessorContext)this.context, 300L);
        LogicalKeyValueSegment segment4 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(7L, (ProcessorContext)this.context, 700L);
        this.segments.cleanupExpiredSegments(700L);
        List allSegments = this.segments.allSegments(true);
        Assert.assertEquals((long)2L, (long)allSegments.size());
        Assert.assertEquals((Object)segment3, allSegments.get(0));
        Assert.assertEquals((Object)segment4, allSegments.get(1));
    }

    @Test
    public void shouldNotCleanUpReservedSegments() {
        LogicalKeyValueSegment reservedSegment = this.segments.createReservedSegment(-1L, "reserved");
        LogicalKeyValueSegment segment1 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(1L, (ProcessorContext)this.context, 100L);
        LogicalKeyValueSegment segment2 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(2L, (ProcessorContext)this.context, 200L);
        this.segments.cleanupExpiredSegments(600L);
        List allSegments = this.segments.allSegments(true);
        Assert.assertEquals((long)1L, (long)allSegments.size());
        Assert.assertEquals((Object)segment2, allSegments.get(0));
        Assert.assertEquals((Object)reservedSegment, (Object)this.segments.getReservedSegment(-1L));
    }

    @Test
    public void shouldGetSegmentForTimestamp() {
        LogicalKeyValueSegment segment1 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(0L, (ProcessorContext)this.context, 0L);
        LogicalKeyValueSegment segment2 = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(1L, (ProcessorContext)this.context, 100L);
        Assert.assertEquals((Object)segment1, (Object)this.segments.getSegmentForTimestamp(0L));
        Assert.assertEquals((Object)segment1, (Object)this.segments.getSegmentForTimestamp(99L));
        Assert.assertEquals((Object)segment2, (Object)this.segments.getSegmentForTimestamp(100L));
        Assert.assertEquals((Object)segment2, (Object)this.segments.getSegmentForTimestamp(199L));
    }

    @Test
    public void shouldGetSegmentsWithinTimeRange() {
        this.segments.createReservedSegment(-1L, "reserved");
        long streamTime = this.updateStreamTimeAndCreateSegment(4);
        this.segments.getOrCreateSegmentIfLive(0L, (ProcessorContext)this.context, streamTime);
        this.segments.getOrCreateSegmentIfLive(2L, (ProcessorContext)this.context, streamTime);
        this.segments.getOrCreateSegmentIfLive(1L, (ProcessorContext)this.context, streamTime);
        this.segments.getOrCreateSegmentIfLive(3L, (ProcessorContext)this.context, streamTime);
        this.segments.getOrCreateSegmentIfLive(4L, (ProcessorContext)this.context, streamTime);
        List segments = this.segments.segments(0L, 200L, true);
        Assert.assertEquals((long)3L, (long)segments.size());
        Assert.assertEquals((long)0L, (long)((LogicalKeyValueSegment)segments.get(0)).id());
        Assert.assertEquals((long)1L, (long)((LogicalKeyValueSegment)segments.get(1)).id());
        Assert.assertEquals((long)2L, (long)((LogicalKeyValueSegment)segments.get(2)).id());
    }

    @Test
    public void shouldGetSegmentsWithinBackwardTimeRange() {
        this.segments.createReservedSegment(-1L, "reserved");
        long streamTime = this.updateStreamTimeAndCreateSegment(4);
        this.segments.getOrCreateSegmentIfLive(0L, (ProcessorContext)this.context, streamTime);
        this.segments.getOrCreateSegmentIfLive(2L, (ProcessorContext)this.context, streamTime);
        this.segments.getOrCreateSegmentIfLive(1L, (ProcessorContext)this.context, streamTime);
        this.segments.getOrCreateSegmentIfLive(3L, (ProcessorContext)this.context, streamTime);
        this.segments.getOrCreateSegmentIfLive(4L, (ProcessorContext)this.context, streamTime);
        List segments = this.segments.segments(0L, 200L, false);
        Assert.assertEquals((long)3L, (long)segments.size());
        Assert.assertEquals((long)2L, (long)((LogicalKeyValueSegment)segments.get(0)).id());
        Assert.assertEquals((long)1L, (long)((LogicalKeyValueSegment)segments.get(1)).id());
        Assert.assertEquals((long)0L, (long)((LogicalKeyValueSegment)segments.get(2)).id());
    }

    @Test
    public void shouldClearSegmentsOnClose() {
        LogicalKeyValueSegment segment = (LogicalKeyValueSegment)this.segments.getOrCreateSegmentIfLive(0L, (ProcessorContext)this.context, 0L);
        LogicalKeyValueSegment reservedSegment = this.segments.createReservedSegment(-1L, "reserved");
        segment.put(new Bytes("k".getBytes()), "v".getBytes());
        reservedSegment.put(new Bytes("k".getBytes()), "v".getBytes());
        KeyValueIterator all1 = segment.all();
        KeyValueIterator all2 = reservedSegment.all();
        Assert.assertTrue((boolean)all1.hasNext());
        Assert.assertTrue((boolean)all2.hasNext());
        this.segments.close();
        MatcherAssert.assertThat((Object)((LogicalKeyValueSegment)this.segments.getSegmentForTimestamp(0L)), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
        MatcherAssert.assertThat((Object)this.segments.getReservedSegment(-1L), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
        Assert.assertThrows(InvalidStateStoreException.class, () -> all1.hasNext());
        Assert.assertThrows(InvalidStateStoreException.class, () -> all2.hasNext());
    }

    private long updateStreamTimeAndCreateSegment(int segment) {
        long streamTime = 100L * (long)segment;
        this.segments.getOrCreateSegmentIfLive((long)segment, (ProcessorContext)this.context, streamTime);
        return streamTime;
    }
}

