package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.class */
public class ChangeLoggingKeyValueBytesStoreTest {
    private InternalMockProcessorContext context;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final Integer INPUT_PARTITION = 0;
    private static final Long INPUT_OFFSET = 100L;
    private final MockRecordCollector collector = new MockRecordCollector();
    private final InMemoryKeyValueStore inner = new InMemoryKeyValueStore("kv");
    private final ChangeLoggingKeyValueBytesStore store = new ChangeLoggingKeyValueBytesStore(this.inner);
    private final StreamsConfig streamsConfig = streamsConfigMock();
    private final Bytes hi = Bytes.wrap("hi".getBytes());
    private final Bytes hello = Bytes.wrap("hello".getBytes());
    private final byte[] there = "there".getBytes();
    private final byte[] world = "world".getBytes();

    @Before
    public void before() {
        this.context = mockContext();
        this.context.setTime(0L);
        this.store.init(this.context, this.store);
    }

    private InternalMockProcessorContext mockContext() {
        return new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), this.streamsConfig, () -> {
            return this.collector;
        }, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
    }

    @After
    public void after() {
        this.store.close();
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        InternalMockProcessorContext mockContext = mockContext();
        KeyValueStore keyValueStore = (KeyValueStore) Mockito.mock(InMemoryKeyValueStore.class);
        ChangeLoggingKeyValueBytesStore changeLoggingKeyValueBytesStore = new ChangeLoggingKeyValueBytesStore(keyValueStore);
        changeLoggingKeyValueBytesStore.init(mockContext, changeLoggingKeyValueBytesStore);
        ((KeyValueStore) Mockito.verify(keyValueStore)).init(mockContext, changeLoggingKeyValueBytesStore);
    }

    @Test
    public void shouldDelegateInit() {
        InternalMockProcessorContext mockContext = mockContext();
        KeyValueStore keyValueStore = (KeyValueStore) Mockito.mock(InMemoryKeyValueStore.class);
        ChangeLoggingKeyValueBytesStore changeLoggingKeyValueBytesStore = new ChangeLoggingKeyValueBytesStore(keyValueStore);
        changeLoggingKeyValueBytesStore.init(mockContext, changeLoggingKeyValueBytesStore);
        ((KeyValueStore) Mockito.verify(keyValueStore)).init(mockContext, changeLoggingKeyValueBytesStore);
    }

    @Test
    public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
        this.store.put(this.hi, this.there);
        MatcherAssert.assertThat(this.inner.get(this.hi), CoreMatchers.equalTo(this.there));
        MatcherAssert.assertThat(Integer.valueOf(this.collector.collected().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(this.collector.collected().get(0).key(), CoreMatchers.equalTo(this.hi));
        MatcherAssert.assertThat(this.collector.collected().get(0).value(), CoreMatchers.equalTo(this.there));
    }

    @Test
    public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
        this.store.putAll(Arrays.asList(KeyValue.pair(this.hi, this.there), KeyValue.pair(this.hello, this.world)));
        MatcherAssert.assertThat(this.inner.get(this.hi), CoreMatchers.equalTo(this.there));
        MatcherAssert.assertThat(this.inner.get(this.hello), CoreMatchers.equalTo(this.world));
        MatcherAssert.assertThat(Integer.valueOf(this.collector.collected().size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(this.collector.collected().get(0).key(), CoreMatchers.equalTo(this.hi));
        MatcherAssert.assertThat(this.collector.collected().get(0).value(), CoreMatchers.equalTo(this.there));
        MatcherAssert.assertThat(this.collector.collected().get(1).key(), CoreMatchers.equalTo(this.hello));
        MatcherAssert.assertThat(this.collector.collected().get(1).value(), CoreMatchers.equalTo(this.world));
    }

    @Test
    public void shouldPropagateDelete() {
        this.store.put(this.hi, this.there);
        this.store.delete(this.hi);
        MatcherAssert.assertThat(Long.valueOf(this.inner.approximateNumEntries()), CoreMatchers.equalTo(0L));
        MatcherAssert.assertThat(this.inner.get(this.hi), CoreMatchers.nullValue());
    }

    @Test
    public void shouldReturnOldValueOnDelete() {
        this.store.put(this.hi, this.there);
        MatcherAssert.assertThat(this.store.delete(this.hi), CoreMatchers.equalTo(this.there));
    }

    @Test
    public void shouldLogKeyNullOnDelete() {
        this.store.put(this.hi, this.there);
        MatcherAssert.assertThat(this.store.delete(this.hi), CoreMatchers.equalTo(this.there));
        MatcherAssert.assertThat(Integer.valueOf(this.collector.collected().size()), CoreMatchers.equalTo(2));
        MatcherAssert.assertThat(this.collector.collected().get(0).key(), CoreMatchers.equalTo(this.hi));
        MatcherAssert.assertThat(this.collector.collected().get(0).value(), CoreMatchers.equalTo(this.there));
        MatcherAssert.assertThat(this.collector.collected().get(1).key(), CoreMatchers.equalTo(this.hi));
        MatcherAssert.assertThat(this.collector.collected().get(1).value(), CoreMatchers.nullValue());
    }

    @Test
    public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() {
        this.store.putIfAbsent(this.hi, this.there);
        MatcherAssert.assertThat(this.inner.get(this.hi), CoreMatchers.equalTo(this.there));
    }

    @Test
    public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() {
        this.store.put(this.hi, this.there);
        this.store.putIfAbsent(this.hi, this.world);
        MatcherAssert.assertThat(this.inner.get(this.hi), CoreMatchers.equalTo(this.there));
    }

    @Test
    public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
        this.store.putIfAbsent(this.hi, this.there);
        MatcherAssert.assertThat(Integer.valueOf(this.collector.collected().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(this.collector.collected().get(0).key(), CoreMatchers.equalTo(this.hi));
        MatcherAssert.assertThat(this.collector.collected().get(0).value(), CoreMatchers.equalTo(this.there));
    }

    @Test
    public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
        this.store.put(this.hi, this.there);
        this.store.putIfAbsent(this.hi, this.world);
        MatcherAssert.assertThat(Integer.valueOf(this.collector.collected().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(this.collector.collected().get(0).key(), CoreMatchers.equalTo(this.hi));
        MatcherAssert.assertThat(this.collector.collected().get(0).value(), CoreMatchers.equalTo(this.there));
    }

    @Test
    public void shouldReturnCurrentValueOnPutIfAbsent() {
        this.store.put(this.hi, this.there);
        MatcherAssert.assertThat(this.store.putIfAbsent(this.hi, this.world), CoreMatchers.equalTo(this.there));
    }

    @Test
    public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() {
        MatcherAssert.assertThat(this.store.putIfAbsent(this.hi, this.there), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldReturnValueOnGetWhenExists() {
        this.store.put(this.hello, this.world);
        MatcherAssert.assertThat(this.store.get(this.hello), CoreMatchers.equalTo(this.world));
    }

    @Test
    public void shouldGetRecordsWithPrefixKey() {
        this.store.put(this.hi, this.there);
        this.store.put(Bytes.increment(this.hi), this.world);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        KeyValueIterator prefixScan = this.store.prefixScan(this.hi.toString(), new StringSerializer());
        while (prefixScan.hasNext()) {
            try {
                KeyValue keyValue = (KeyValue) prefixScan.next();
                arrayList.add((Bytes) keyValue.key);
                arrayList2.add(Bytes.wrap((byte[]) keyValue.value));
                i++;
            } catch (Throwable th) {
                if (prefixScan != null) {
                    try {
                        prefixScan.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (prefixScan != null) {
            prefixScan.close();
        }
        MatcherAssert.assertThat(Integer.valueOf(i), CoreMatchers.is(1));
        MatcherAssert.assertThat(arrayList, CoreMatchers.is(Collections.singletonList(this.hi)));
        MatcherAssert.assertThat(arrayList2, CoreMatchers.is(Collections.singletonList(Bytes.wrap(this.there))));
    }

    @Test
    public void shouldReturnNullOnGetWhenDoesntExist() {
        MatcherAssert.assertThat(this.store.get(this.hello), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldLogPositionOnPut() {
        this.context.setRecordContext(new ProcessorRecordContext(-1L, INPUT_OFFSET.longValue(), INPUT_PARTITION.intValue(), INPUT_TOPIC_NAME, new RecordHeaders()));
        this.context.setTime(1L);
        this.store.put(this.hi, this.there);
        MatcherAssert.assertThat(Integer.valueOf(this.collector.collected().size()), CoreMatchers.equalTo(1));
        MatcherAssert.assertThat(this.collector.collected().get(0).headers(), CoreMatchers.is(CoreMatchers.notNullValue()));
        Header lastHeader = this.collector.collected().get(0).headers().lastHeader("v");
        MatcherAssert.assertThat(lastHeader, CoreMatchers.is(CoreMatchers.notNullValue()));
        MatcherAssert.assertThat(Boolean.valueOf(lastHeader.equals(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY)), CoreMatchers.is(true));
        Header lastHeader2 = this.collector.collected().get(0).headers().lastHeader("c");
        MatcherAssert.assertThat(lastHeader2, CoreMatchers.is(CoreMatchers.notNullValue()));
        Position deserialize = PositionSerde.deserialize(ByteBuffer.wrap(lastHeader2.value()));
        MatcherAssert.assertThat(deserialize.getPartitionPositions(INPUT_TOPIC_NAME), CoreMatchers.is(CoreMatchers.notNullValue()));
        MatcherAssert.assertThat(deserialize.getPartitionPositions(INPUT_TOPIC_NAME), Matchers.hasEntry(0, 100L));
    }

    private StreamsConfig streamsConfigMock() {
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.mock(StreamsConfig.class);
        HashMap hashMap = new HashMap();
        hashMap.put("__iq.consistency.offset.vector.enabled__", true);
        Mockito.when(streamsConfig.originals()).thenReturn(hashMap);
        Mockito.when(streamsConfig.values()).thenReturn(Collections.emptyMap());
        Mockito.when(streamsConfig.getString("application.id")).thenReturn("add-id");
        return streamsConfig;
    }
}
