package org.apache.kafka.streams.state.internals;

import java.time.Duration;
import java.util.LinkedList;
import java.util.Map;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.class */
public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
    private static final String STORE_NAME = "InMemoryWindowStore";

    @Override // org.apache.kafka.streams.state.internals.AbstractWindowBytesStoreTest
    <K, V> WindowStore<K, V> buildWindowStore(long j, long j2, boolean z, Serde<K> serde, Serde<V> serde2) {
        return Stores.windowStoreBuilder(Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(j), Duration.ofMillis(j2), z), serde, serde2).build();
    }

    @Test
    public void shouldRestore() {
        Assert.assertFalse(this.windowStore.all().hasNext());
        StateSerdes stateSerdes = new StateSerdes("", Serdes.Integer(), Serdes.String());
        LinkedList linkedList = new LinkedList();
        linkedList.add(new KeyValue(WindowKeySchema.toStoreKeyBinary(1, 0L, 0, stateSerdes).get(), stateSerdes.rawValue("one")));
        linkedList.add(new KeyValue(WindowKeySchema.toStoreKeyBinary(2, 3L, 0, stateSerdes).get(), stateSerdes.rawValue("two")));
        linkedList.add(new KeyValue(WindowKeySchema.toStoreKeyBinary(3, 6L, 0, stateSerdes).get(), stateSerdes.rawValue("three")));
        this.context.restore(STORE_NAME, linkedList);
        KeyValueIterator fetchAll = this.windowStore.fetchAll(0L, 6L);
        try {
            Assert.assertEquals(windowedPair(1, "one", 0L), fetchAll.next());
            Assert.assertEquals(windowedPair(2, "two", 3L), fetchAll.next());
            Assert.assertEquals(windowedPair(3, "three", 6L), fetchAll.next());
            Assert.assertFalse(fetchAll.hasNext());
            if (fetchAll != null) {
                fetchAll.close();
            }
        } catch (Throwable th) {
            if (fetchAll != null) {
                try {
                    fetchAll.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldNotExpireFromOpenIterator() {
        this.windowStore.put(1, "one", 0L);
        this.windowStore.put(1, "two", 10L);
        this.windowStore.put(2, "one", 5L);
        this.windowStore.put(2, "two", 15L);
        WindowStoreIterator fetch = this.windowStore.fetch(1, 0L, 50L);
        WindowStoreIterator fetch2 = this.windowStore.fetch(2, 0L, 50L);
        this.windowStore.put(1, "four", 240000L);
        Assert.assertEquals(new KeyValue(0L, "one"), fetch.next());
        Assert.assertEquals(new KeyValue(5L, "one"), fetch2.next());
        Assert.assertEquals(new KeyValue(15L, "two"), fetch2.next());
        Assert.assertEquals(new KeyValue(10L, "two"), fetch.next());
        Assert.assertFalse(fetch.hasNext());
        Assert.assertFalse(fetch2.hasNext());
        fetch.close();
        fetch2.close();
        Assert.assertFalse(this.windowStore.fetch(1, 0L, 50L).hasNext());
    }

    @Test
    public void testExpiration() {
        this.windowStore.put(1, "one", 0L);
        long j = 0 + 30000;
        this.windowStore.put(1, "two", j);
        long j2 = j + 30000;
        this.windowStore.put(1, "three", j2);
        long j3 = j2 + 30000;
        this.windowStore.put(1, "four", j3);
        long j4 = j3 + 30000;
        this.windowStore.put(1, "five", j4);
        KeyValueIterator fetchAll = this.windowStore.fetchAll(0L, j4);
        long j5 = j4 + 30000;
        this.windowStore.put(1, "six", j5);
        Assert.assertEquals(windowedPair(1, "two", 30000L), fetchAll.next());
        Assert.assertEquals(windowedPair(1, "three", IntegrationTestUtils.DEFAULT_TIMEOUT), fetchAll.next());
        Assert.assertEquals(windowedPair(1, "four", 90000L), fetchAll.next());
        Assert.assertEquals(windowedPair(1, "five", 120000L), fetchAll.next());
        Assert.assertFalse(fetchAll.hasNext());
        KeyValueIterator fetchAll2 = this.windowStore.fetchAll(0L, j5);
        Assert.assertEquals(windowedPair(1, "three", IntegrationTestUtils.DEFAULT_TIMEOUT), fetchAll2.next());
        Assert.assertEquals(windowedPair(1, "four", 90000L), fetchAll2.next());
        Assert.assertEquals(windowedPair(1, "five", 120000L), fetchAll2.next());
        Assert.assertEquals(windowedPair(1, "six", 150000L), fetchAll2.next());
        Assert.assertFalse(fetchAll2.hasNext());
    }

    @Test
    public void shouldMatchPositionAfterPut() {
        InMemoryWindowStore wrapped = this.windowStore.wrapped().wrapped();
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", new RecordHeaders()));
        this.windowStore.put(0, "0", IntegrationTestUtils.DEFAULT_TIMEOUT);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", new RecordHeaders()));
        this.windowStore.put(1, "1", IntegrationTestUtils.DEFAULT_TIMEOUT);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", new RecordHeaders()));
        this.windowStore.put(2, "2", IntegrationTestUtils.DEFAULT_TIMEOUT);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 4L, 0, "", new RecordHeaders()));
        this.windowStore.put(3, "3", IntegrationTestUtils.DEFAULT_TIMEOUT);
        Assert.assertEquals(Position.fromMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("", Utils.mkMap(new Map.Entry[]{Utils.mkEntry(0, 4L)}))})), wrapped.getPosition());
    }
}
