/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.ThreadMetadataImpl;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockSettings;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class KafkaStreamsTest {
    @Rule
    public Timeout globalTimeout = Timeout.seconds((long)600L);
    private static final int NUM_THREADS = 2;
    private static final String APPLICATION_ID = "appId";
    private static final String CLIENT_ID = "test-client";
    private static final Duration DEFAULT_DURATION = Duration.ofSeconds(30L);
    @Rule
    public TestName testName = new TestName();
    private MockClientSupplier supplier;
    private MockTime time;
    private Properties props;
    private MockAdminClient adminClient;
    private StateListenerStub streamsStateListener;
    @Mock
    private StreamThread streamThreadOne;
    @Mock
    private StreamThread streamThreadTwo;
    @Captor
    private ArgumentCaptor<StreamThread.StateListener> threadStateListenerCapture;
    private MockedStatic<ClientMetrics> clientMetricsMockedStatic;
    private MockedStatic<StreamThread> streamThreadMockedStatic;
    private MockedStatic<StreamsConfigUtils> streamsConfigUtils;
    private MockedConstruction<GlobalStreamThread> globalStreamThreadMockedConstruction;
    private MockedConstruction<Metrics> metricsMockedConstruction;

    @Before
    public void before() throws Exception {
        this.time = new MockTime();
        this.adminClient = new MockAdminClient();
        this.supplier = new MockClientSupplier();
        this.supplier.setCluster(Cluster.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))));
        this.streamsStateListener = new StateListenerStub();
        this.props = new Properties();
        this.props.put("application.id", APPLICATION_ID);
        this.props.put("client.id", CLIENT_ID);
        this.props.put("bootstrap.servers", "localhost:2018");
        this.props.put("metric.reporters", MockMetricsReporter.class.getName());
        this.props.put("state.dir", TestUtils.tempDirectory().getPath());
        this.props.put("num.stream.threads", (Object)2);
        this.prepareStreams();
    }

    @After
    public void tearDown() {
        if (this.clientMetricsMockedStatic != null) {
            this.clientMetricsMockedStatic.close();
        }
        if (this.streamThreadMockedStatic != null) {
            this.streamThreadMockedStatic.close();
        }
        if (this.globalStreamThreadMockedConstruction != null) {
            this.globalStreamThreadMockedConstruction.close();
        }
        if (this.metricsMockedConstruction != null) {
            this.metricsMockedConstruction.close();
        }
        if (this.streamsConfigUtils != null) {
            this.streamsConfigUtils.close();
        }
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    private void prepareStreams() throws Exception {
        this.metricsMockedConstruction = Mockito.mockConstruction(Metrics.class, (mock, context) -> {
            Assert.assertEquals((long)4L, (long)context.arguments().size());
            List reporters = (List)context.arguments().get(1);
            for (MetricsReporter reporter : reporters) {
                reporter.init(Collections.emptyList());
            }
            ((Metrics)Mockito.doAnswer(invocation -> {
                for (MetricsReporter reporter : reporters) {
                    reporter.close();
                }
                return null;
            }).when(mock)).close();
        });
        this.clientMetricsMockedStatic = Mockito.mockStatic(ClientMetrics.class);
        this.clientMetricsMockedStatic.when(ClientMetrics::version).thenReturn((Object)"1.56");
        this.clientMetricsMockedStatic.when(ClientMetrics::commitId).thenReturn((Object)"1a2b3c4d5e");
        ClientMetrics.addVersionMetric((StreamsMetricsImpl)((StreamsMetricsImpl)ArgumentMatchers.any(StreamsMetricsImpl.class)));
        ClientMetrics.addCommitIdMetric((StreamsMetricsImpl)((StreamsMetricsImpl)ArgumentMatchers.any(StreamsMetricsImpl.class)));
        ClientMetrics.addApplicationIdMetric((StreamsMetricsImpl)((StreamsMetricsImpl)ArgumentMatchers.any(StreamsMetricsImpl.class)), (String)((String)Mockito.eq((Object)APPLICATION_ID)));
        ClientMetrics.addTopologyDescriptionMetric((StreamsMetricsImpl)((StreamsMetricsImpl)ArgumentMatchers.any(StreamsMetricsImpl.class)), (Gauge)((Gauge)ArgumentMatchers.any()));
        ClientMetrics.addStateMetric((StreamsMetricsImpl)((StreamsMetricsImpl)ArgumentMatchers.any(StreamsMetricsImpl.class)), (Gauge)((Gauge)ArgumentMatchers.any()));
        ClientMetrics.addNumAliveStreamThreadMetric((StreamsMetricsImpl)((StreamsMetricsImpl)ArgumentMatchers.any(StreamsMetricsImpl.class)), (Gauge)((Gauge)ArgumentMatchers.any()));
        this.streamThreadMockedStatic = Mockito.mockStatic(StreamThread.class);
        this.streamThreadMockedStatic.when(() -> StreamThread.create((TopologyMetadata)((TopologyMetadata)ArgumentMatchers.any(TopologyMetadata.class)), (StreamsConfig)((StreamsConfig)ArgumentMatchers.any(StreamsConfig.class)), (KafkaClientSupplier)((KafkaClientSupplier)ArgumentMatchers.any(KafkaClientSupplier.class)), (Admin)((Admin)ArgumentMatchers.any(Admin.class)), (UUID)((UUID)ArgumentMatchers.any(UUID.class)), (String)((String)ArgumentMatchers.any(String.class)), (StreamsMetricsImpl)((StreamsMetricsImpl)ArgumentMatchers.any(StreamsMetricsImpl.class)), (Time)((Time)ArgumentMatchers.any(Time.class)), (StreamsMetadataState)((StreamsMetadataState)ArgumentMatchers.any(StreamsMetadataState.class)), (long)Mockito.anyLong(), (StateDirectory)((StateDirectory)ArgumentMatchers.any(StateDirectory.class)), (StateRestoreListener)((StateRestoreListener)ArgumentMatchers.any(StateRestoreListener.class)), (int)Mockito.anyInt(), (Runnable)((Runnable)ArgumentMatchers.any(Runnable.class)), (BiConsumer)((BiConsumer)ArgumentMatchers.any()))).thenReturn((Object)this.streamThreadOne).thenReturn((Object)this.streamThreadTwo);
        this.streamsConfigUtils = Mockito.mockStatic(StreamsConfigUtils.class);
        this.streamsConfigUtils.when(() -> StreamsConfigUtils.processingMode((StreamsConfig)((StreamsConfig)ArgumentMatchers.any(StreamsConfig.class)))).thenReturn((Object)StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE);
        this.streamsConfigUtils.when(() -> StreamsConfigUtils.eosEnabled((StreamsConfig)((StreamsConfig)ArgumentMatchers.any(StreamsConfig.class)))).thenReturn((Object)false);
        this.streamsConfigUtils.when(() -> StreamsConfigUtils.getTotalCacheSize((StreamsConfig)((StreamsConfig)ArgumentMatchers.any(StreamsConfig.class)))).thenReturn((Object)0xA00000L);
        Mockito.when((Object)this.streamThreadOne.getId()).thenReturn((Object)1L);
        Mockito.when((Object)this.streamThreadTwo.getId()).thenReturn((Object)2L);
        this.prepareStreamThread(this.streamThreadOne, 1, true);
        this.prepareStreamThread(this.streamThreadTwo, 2, false);
        AtomicReference<GlobalStreamThread.State> globalThreadState = new AtomicReference<GlobalStreamThread.State>(GlobalStreamThread.State.CREATED);
        this.globalStreamThreadMockedConstruction = Mockito.mockConstruction(GlobalStreamThread.class, (mock, context) -> {
            Mockito.when((Object)mock.state()).thenAnswer(invocation -> globalThreadState.get());
            ((GlobalStreamThread)Mockito.doNothing().when(mock)).setStateListener((StreamThread.StateListener)this.threadStateListenerCapture.capture());
            ((GlobalStreamThread)Mockito.doAnswer(invocation -> {
                globalThreadState.set(GlobalStreamThread.State.RUNNING);
                ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)mock, (ThreadStateTransitionValidator)GlobalStreamThread.State.RUNNING, (ThreadStateTransitionValidator)GlobalStreamThread.State.CREATED);
                return null;
            }).when(mock)).start();
            ((GlobalStreamThread)Mockito.doAnswer(invocation -> {
                this.supplier.restoreConsumer.close();
                for (MockProducer<byte[], byte[]> producer : this.supplier.producers) {
                    producer.close();
                }
                globalThreadState.set(GlobalStreamThread.State.DEAD);
                ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)mock, (ThreadStateTransitionValidator)GlobalStreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)GlobalStreamThread.State.RUNNING);
                ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)mock, (ThreadStateTransitionValidator)GlobalStreamThread.State.DEAD, (ThreadStateTransitionValidator)GlobalStreamThread.State.PENDING_SHUTDOWN);
                return null;
            }).when(mock)).shutdown();
            Mockito.when((Object)mock.stillRunning()).thenReturn((Object)(globalThreadState.get() == GlobalStreamThread.State.RUNNING ? 1 : 0));
        });
    }

    private void prepareStreamThread(StreamThread thread, int threadId, boolean terminable) throws Exception {
        AtomicReference<StreamThread.State> state = new AtomicReference<StreamThread.State>(StreamThread.State.CREATED);
        Mockito.when((Object)thread.state()).thenAnswer(invocation -> state.get());
        ((StreamThread)Mockito.doNothing().when((Object)thread)).setStateListener((StreamThread.StateListener)this.threadStateListenerCapture.capture());
        Mockito.when((Object)thread.getStateLock()).thenReturn(new Object());
        ((StreamThread)Mockito.doAnswer(invocation -> {
            state.set(StreamThread.State.STARTING);
            ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.STARTING, (ThreadStateTransitionValidator)StreamThread.State.CREATED);
            ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED, (ThreadStateTransitionValidator)StreamThread.State.STARTING);
            ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
            ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.RUNNING, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED);
            return null;
        }).when((Object)thread)).start();
        Mockito.when((Object)thread.getGroupInstanceID()).thenReturn(Optional.empty());
        Mockito.when((Object)thread.threadMetadata()).thenReturn((Object)new ThreadMetadataImpl("processId-StreamThread-" + threadId, "DEAD", "", "", Collections.emptySet(), "", Collections.emptySet(), Collections.emptySet()));
        Mockito.when((Object)thread.waitOnThreadState((StreamThread.State)Mockito.isA(StreamThread.State.class), Mockito.anyLong())).thenReturn((Object)true);
        Mockito.when((Object)thread.isThreadAlive()).thenReturn((Object)true);
        Mockito.when((Object)thread.getName()).thenReturn((Object)("processId-StreamThread-" + threadId));
        ((StreamThread)Mockito.doAnswer(invocation -> {
            this.supplier.consumer.close();
            this.supplier.restoreConsumer.close();
            for (MockProducer<byte[], byte[]> producer : this.supplier.producers) {
                producer.close();
            }
            state.set(StreamThread.State.DEAD);
            ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.RUNNING);
            ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
            return null;
        }).when((Object)thread)).shutdown();
        if (!terminable) {
            ((StreamThread)Mockito.doAnswer(invocation -> {
                Thread.sleep(2000L);
                return null;
            }).when((Object)thread)).join();
        }
        Mockito.when((Object)thread.readOnlyActiveTasks()).thenReturn(Collections.emptySet());
        Mockito.when((Object)thread.readyOnlyAllTasks()).thenReturn(Collections.emptySet());
    }

    @Test
    public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.close();
            Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)streams.state());
        }
    }

    @Test
    public void stateShouldTransitToRunningIfNonDeadThreadsBackToRunning() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.setStateListener((KafkaStreams.StateListener)this.streamsStateListener);
            Assert.assertEquals((long)0L, (long)this.streamsStateListener.numChanges);
            Assert.assertEquals((Object)KafkaStreams.State.CREATED, (Object)streams.state());
            streams.start();
            TestUtils.waitForCondition(() -> this.streamsStateListener.numChanges == 2, (String)"Streams never started.");
            Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)streams.state());
            TestUtils.waitForCondition(() -> this.streamsStateListener.numChanges == 2, (String)"Streams never started.");
            Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)streams.state());
            for (StreamThread thread : streams.threads) {
                ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED, (ThreadStateTransitionValidator)StreamThread.State.RUNNING);
            }
            Assert.assertEquals((long)3L, (long)this.streamsStateListener.numChanges);
            Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)streams.state());
            for (StreamThread thread : streams.threads) {
                ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_REVOKED);
            }
            Assert.assertEquals((long)3L, (long)this.streamsStateListener.numChanges);
            Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)streams.state());
            ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)streams.threads.get(1), (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED);
            ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)streams.threads.get(1), (ThreadStateTransitionValidator)StreamThread.State.DEAD, (ThreadStateTransitionValidator)StreamThread.State.PENDING_SHUTDOWN);
            Assert.assertEquals((long)3L, (long)this.streamsStateListener.numChanges);
            Assert.assertEquals((Object)KafkaStreams.State.REBALANCING, (Object)streams.state());
            for (StreamThread thread : streams.threads) {
                if (thread == streams.threads.get(1)) continue;
                ((StreamThread.StateListener)this.threadStateListenerCapture.getValue()).onChange((Thread)thread, (ThreadStateTransitionValidator)StreamThread.State.RUNNING, (ThreadStateTransitionValidator)StreamThread.State.PARTITIONS_ASSIGNED);
            }
            Assert.assertEquals((long)4L, (long)this.streamsStateListener.numChanges);
            Assert.assertEquals((Object)KafkaStreams.State.RUNNING, (Object)streams.state());
            streams.close();
            TestUtils.waitForCondition(() -> this.streamsStateListener.numChanges == 6, (String)"Streams never closed.");
            Assert.assertEquals((Object)KafkaStreams.State.NOT_RUNNING, (Object)streams.state());
        }
    }

    @Test
    public void shouldCleanupResourcesOnCloseWithoutPreviousStart() throws Exception {
        StreamsBuilder builder = this.getBuilderWithSource();
        builder.globalTable("anyTopic");
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class);
             KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.close();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (String)"Streams never stopped.");
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.not((Matcher)CoreMatchers.hasItem((Matcher)Matchers.containsString((String)"ERROR"))));
        }
        Assert.assertTrue((boolean)this.supplier.consumer.closed());
        Assert.assertTrue((boolean)this.supplier.restoreConsumer.closed());
        for (MockProducer<byte[], byte[]> p : this.supplier.producers) {
            Assert.assertTrue((boolean)p.closed());
        }
    }

    @Test
    public void testStateThreadClose() throws Exception {
        StreamsBuilder builder = this.getBuilderWithSource();
        builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertEquals((long)2L, (long)streams.threads.size());
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            for (int i = 0; i < 2; ++i) {
                StreamThread tmpThread = (StreamThread)streams.threads.get(i);
                tmpThread.shutdown();
                TestUtils.waitForCondition(() -> tmpThread.state() == StreamThread.State.DEAD, (String)"Thread never stopped.");
                ((StreamThread)streams.threads.get(i)).join();
            }
            TestUtils.waitForCondition(() -> streams.metadataForLocalThreads().stream().allMatch(t -> t.threadState().equals("DEAD")), (String)"Streams never stopped");
            streams.close();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (String)"Streams never stopped.");
            Assert.assertNull((Object)streams.globalStreamThread);
        }
    }

    @Test
    public void testStateGlobalThreadClose() throws Exception {
        StreamsBuilder builder = this.getBuilderWithSource();
        builder.globalTable("anyTopic");
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class);
             KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            GlobalStreamThread globalStreamThread = streams.globalStreamThread;
            globalStreamThread.shutdown();
            TestUtils.waitForCondition(() -> globalStreamThread.state() == GlobalStreamThread.State.DEAD, (String)"Thread never stopped.");
            globalStreamThread.join();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.PENDING_ERROR, (String)"Thread never stopped.");
            streams.close();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.ERROR, (String)"Thread never stopped.");
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Matcher)Matchers.containsString((String)"ERROR")));
        }
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() {
        int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            int newInitCount = MockMetricsReporter.INIT_COUNT.get();
            int initDiff = newInitCount - oldInitCount;
            Assert.assertEquals((String)"some reporters including MockMetricsReporter should be initialized by calling on construction", (long)1L, (long)initDiff);
            streams.start();
            int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
            streams.close();
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.NOT_RUNNING);
            Assert.assertEquals((long)(oldCloseCount + initDiff), (long)MockMetricsReporter.CLOSE_COUNT.get());
        }
    }

    @Test
    public void testCloseIsIdempotent() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.close();
            int closeCount = MockMetricsReporter.CLOSE_COUNT.get();
            streams.close();
            Assert.assertEquals((String)"subsequent close() calls should do nothing", (long)closeCount, (long)MockMetricsReporter.CLOSE_COUNT.get());
        }
    }

    @Test
    public void testPauseResume() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            streams.pause();
            Assert.assertTrue((boolean)streams.isPaused());
            streams.resume();
            Assert.assertFalse((boolean)streams.isPaused());
        }
    }

    @Test
    public void testStartingPaused() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.pause();
            streams.start();
            Assert.assertTrue((boolean)streams.isPaused());
            streams.resume();
            Assert.assertFalse((boolean)streams.isPaused());
        }
    }

    @Test
    public void testShowPauseResumeAreIdempotent() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            streams.pause();
            Assert.assertTrue((boolean)streams.isPaused());
            streams.pause();
            Assert.assertTrue((boolean)streams.isPaused());
            streams.resume();
            Assert.assertFalse((boolean)streams.isPaused());
            streams.resume();
            Assert.assertFalse((boolean)streams.isPaused());
        }
    }

    @Test
    public void shouldAddThreadWhenRunning() throws InterruptedException {
        this.props.put("num.stream.threads", (Object)1);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            int oldSize = streams.threads.size();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (long)15L, (String)"wait until running");
            MatcherAssert.assertThat((Object)streams.addStreamThread(), (Matcher)Matchers.equalTo(Optional.of("processId-StreamThread-2")));
            MatcherAssert.assertThat((Object)streams.threads.size(), (Matcher)Matchers.equalTo((Object)(oldSize + 1)));
        }
    }

    @Test
    public void shouldNotAddThreadWhenCreated() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            int oldSize = streams.threads.size();
            MatcherAssert.assertThat((Object)streams.addStreamThread(), (Matcher)Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat((Object)streams.threads.size(), (Matcher)Matchers.equalTo((Object)oldSize));
        }
    }

    @Test
    public void shouldNotAddThreadWhenClosed() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            int oldSize = streams.threads.size();
            streams.close();
            MatcherAssert.assertThat((Object)streams.addStreamThread(), (Matcher)Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat((Object)streams.threads.size(), (Matcher)Matchers.equalTo((Object)oldSize));
        }
    }

    @Test
    public void shouldNotAddThreadWhenError() {
        StreamsBuilder builder = this.getBuilderWithSource();
        builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            int oldSize = streams.threads.size();
            streams.start();
            streams.globalStreamThread.shutdown();
            MatcherAssert.assertThat((Object)streams.addStreamThread(), (Matcher)Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat((Object)streams.threads.size(), (Matcher)Matchers.equalTo((Object)oldSize));
        }
    }

    @Test
    public void shouldNotReturnDeadThreads() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            this.streamThreadOne.shutdown();
            Set threads = streams.metadataForLocalThreads();
            MatcherAssert.assertThat((Object)threads.size(), (Matcher)Matchers.equalTo((Object)1));
            MatcherAssert.assertThat((Object)threads, (Matcher)CoreMatchers.hasItem((Object)this.streamThreadTwo.threadMetadata()));
        }
    }

    @Test
    public void shouldRemoveThread() throws InterruptedException {
        this.props.put("num.stream.threads", (Object)2);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            int oldSize = streams.threads.size();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (long)15L, (String)"Kafka Streams client did not reach state RUNNING");
            MatcherAssert.assertThat((Object)streams.removeStreamThread(), (Matcher)Matchers.equalTo(Optional.of("processId-StreamThread-1")));
            MatcherAssert.assertThat((Object)streams.threads.size(), (Matcher)Matchers.equalTo((Object)(oldSize - 1)));
        }
    }

    @Test
    public void shouldNotRemoveThreadWhenNotRunning() {
        this.props.put("num.stream.threads", (Object)1);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            MatcherAssert.assertThat((Object)streams.removeStreamThread(), (Matcher)Matchers.equalTo(Optional.empty()));
            MatcherAssert.assertThat((Object)streams.threads.size(), (Matcher)Matchers.equalTo((Object)1));
        }
    }

    @Test
    public void testCannotStartOnceClosed() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            streams.close();
            try {
                streams.start();
                Assert.fail((String)"Should have throw IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    @Test
    public void shouldNotSetGlobalRestoreListenerAfterStarting() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            try {
                streams.setGlobalStateRestoreListener(null);
                Assert.fail((String)"Should throw an IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    @Test
    public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            Assert.assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler)null));
        }
    }

    @Test
    public void shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreateState() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            Assert.assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler)null));
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionSettingStreamsUncaughtExceptionHandlerIfNull() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler)null));
        }
    }

    @Test
    public void shouldThrowExceptionSettingStateListenerNotInCreateState() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            try {
                streams.setStateListener(null);
                Assert.fail((String)"Should throw IllegalStateException");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    @Test
    public void shouldAllowCleanupBeforeStartAndAfterClose() {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        try {
            streams.cleanUp();
            streams.start();
        }
        finally {
            streams.close();
            streams.cleanUp();
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileRunning() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            try {
                streams.cleanUp();
                Assert.fail((String)"Should have thrown IllegalStateException");
            }
            catch (IllegalStateException expected) {
                Assert.assertEquals((Object)"Cannot clean up while running.", (Object)expected.getMessage());
            }
        }
    }

    @Test
    public void shouldThrowOnCleanupWhilePaused() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            streams.pause();
            TestUtils.waitForCondition(() -> ((KafkaStreams)streams).isPaused(), (String)"Streams did not pause.");
            Assert.assertThrows((String)"Cannot clean up while running.", IllegalStateException.class, () -> ((KafkaStreams)streams).cleanUp());
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDown() throws InterruptedException {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.start();
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
        streams.close(Duration.ZERO);
        MatcherAssert.assertThat((Object)(streams.state() == KafkaStreams.State.PENDING_SHUTDOWN ? 1 : 0), (Matcher)Matchers.equalTo((Object)true));
        Assert.assertThrows(IllegalStateException.class, () -> ((KafkaStreams)streams).cleanUp());
        MatcherAssert.assertThat((Object)(streams.state() == KafkaStreams.State.PENDING_SHUTDOWN ? 1 : 0), (Matcher)Matchers.equalTo((Object)true));
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupFalse() throws InterruptedException {
        MockConsumer mockConsumer = (MockConsumer)Mockito.mock(MockConsumer.class, (MockSettings)Mockito.withSettings().useConstructor(new Object[]{OffsetResetStrategy.EARLIEST}));
        MockClientSupplier mockClientSupplier = (MockClientSupplier)Mockito.spy(MockClientSupplier.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Optional<String> groupInstanceId = Optional.of("test-instance-id");
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(groupInstanceId);
        Mockito.when((Object)mockConsumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)mockClientSupplier.getAdmin((Map)ArgumentMatchers.any())).thenReturn((Object)this.adminClient);
        Mockito.when(mockClientSupplier.getConsumer((Map)ArgumentMatchers.any())).thenReturn((Object)mockConsumer);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)mockClientSupplier, (Time)this.time);){
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
            KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
            closeOptions.timeout(Duration.ZERO);
            closeOptions.leaveGroup(true);
            streams.close(closeOptions);
            MatcherAssert.assertThat((Object)(streams.state() == KafkaStreams.State.PENDING_SHUTDOWN ? 1 : 0), (Matcher)Matchers.equalTo((Object)true));
            Assert.assertThrows(IllegalStateException.class, () -> ((KafkaStreams)streams).cleanUp());
            MatcherAssert.assertThat((Object)(streams.state() == KafkaStreams.State.PENDING_SHUTDOWN ? 1 : 0), (Matcher)Matchers.equalTo((Object)true));
        }
    }

    @Test
    public void shouldThrowOnCleanupWhileShuttingDownStreamClosedWithCloseOptionLeaveGroupTrue() throws InterruptedException {
        KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
        streams.start();
        TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)"Streams never started.");
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ZERO);
        streams.close(closeOptions);
        MatcherAssert.assertThat((Object)(streams.state() == KafkaStreams.State.PENDING_SHUTDOWN ? 1 : 0), (Matcher)Matchers.equalTo((Object)true));
        Assert.assertThrows(IllegalStateException.class, () -> ((KafkaStreams)streams).cleanUp());
        MatcherAssert.assertThat((Object)(streams.state() == KafkaStreams.State.PENDING_SHUTDOWN ? 1 : 0), (Matcher)Matchers.equalTo((Object)true));
    }

    @Test
    public void shouldNotGetAllTasksWhenNotRunning() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertThrows(StreamsNotStartedException.class, () -> ((KafkaStreams)streams).metadataForAllStreamsClients());
            streams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            streams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> ((KafkaStreams)streams).metadataForAllStreamsClients());
        }
    }

    @Test
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertThrows(StreamsNotStartedException.class, () -> streams.streamsMetadataForStore("store"));
            streams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            streams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> streams.streamsMetadataForStore("store"));
        }
    }

    @Test
    public void shouldNotGetQueryMetadataWithSerializerWhenNotRunningOrRebalancing() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", (Object)"key", Serdes.String().serializer()));
            streams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            streams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", (Object)"key", Serdes.String().serializer()));
        }
    }

    @Test
    public void shouldGetQueryMetadataWithSerializerWhenRunningOrRebalancing() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            Assert.assertEquals((Object)KeyQueryMetadata.NOT_AVAILABLE, (Object)streams.queryMetadataForKey("store", (Object)"key", Serdes.String().serializer()));
        }
    }

    @Test
    public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertThrows(StreamsNotStartedException.class, () -> streams.queryMetadataForKey("store", (Object)"key", (topic, key, value, numPartitions) -> 0));
            streams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            streams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", (Object)"key", (topic, key, value, numPartitions) -> 0));
        }
    }

    @Test
    public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            streams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(UnknownStateStoreException.class, () -> streams.store(StoreQueryParameters.fromNameAndType((String)"unknown-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore())));
        }
    }

    @Test
    public void shouldNotGetStoreWhenWhenNotRunningOrRebalancing() throws InterruptedException {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertThrows(StreamsNotStartedException.class, () -> streams.store(StoreQueryParameters.fromNameAndType((String)"store", (QueryableStoreType)QueryableStoreTypes.keyValueStore())));
            streams.start();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.RUNNING, DEFAULT_DURATION);
            streams.close();
            IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams), KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
            Assert.assertThrows(IllegalStateException.class, () -> streams.store(StoreQueryParameters.fromNameAndType((String)"store", (QueryableStoreType)QueryableStoreTypes.keyValueStore())));
        }
    }

    @Test
    public void shouldReturnEmptyLocalStorePartitionLags() {
        ListOffsetsResult result = (ListOffsetsResult)Mockito.mock(ListOffsetsResult.class);
        KafkaFutureImpl allFuture = new KafkaFutureImpl();
        allFuture.complete(Collections.emptyMap());
        Mockito.when((Object)result.all()).thenReturn((Object)allFuture);
        MockAdminClient mockAdminClient = (MockAdminClient)Mockito.spy(MockAdminClient.class);
        Mockito.when((Object)mockAdminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn((Object)result);
        MockClientSupplier mockClientSupplier = (MockClientSupplier)Mockito.spy(MockClientSupplier.class);
        Mockito.when((Object)mockClientSupplier.getAdmin((Map)ArgumentMatchers.any())).thenReturn((Object)mockAdminClient);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)mockClientSupplier, (Time)this.time);){
            streams.start();
            Assert.assertEquals((long)0L, (long)streams.allLocalStorePartitionLags().size());
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier);){
            Assert.assertFalse((boolean)streams.close(Duration.ofMillis(10L)));
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForClose() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertThrows(IllegalArgumentException.class, () -> streams.close(Duration.ofMillis(-1L)));
        }
    }

    @Test
    public void shouldNotBlockInCloseForZeroDuration() {
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertFalse((boolean)streams.close(Duration.ZERO));
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupFalseWhenThreadsHaventTerminated() {
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(10L));
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier);){
            Assert.assertFalse((boolean)streams.close(closeOptions));
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupFalse() {
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(-1L));
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
        }
    }

    @Test
    public void shouldNotBlockInCloseWithCloseOptionLeaveGroupFalseForZeroDuration() {
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ZERO);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier);){
            Assert.assertFalse((boolean)streams.close(closeOptions));
        }
    }

    @Test
    public void shouldReturnFalseOnCloseWithCloseOptionWithLeaveGroupTrueWhenThreadsHaventTerminated() {
        MockConsumer mockConsumer = (MockConsumer)Mockito.mock(MockConsumer.class, (MockSettings)Mockito.withSettings().useConstructor(new Object[]{OffsetResetStrategy.EARLIEST}));
        MockClientSupplier mockClientSupplier = (MockClientSupplier)Mockito.spy(MockClientSupplier.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Optional<String> groupInstanceId = Optional.of("test-instance-id");
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(groupInstanceId);
        Mockito.when((Object)mockConsumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)mockClientSupplier.getAdmin((Map)ArgumentMatchers.any())).thenReturn((Object)this.adminClient);
        Mockito.when(mockClientSupplier.getConsumer((Map)ArgumentMatchers.any())).thenReturn((Object)mockConsumer);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(10L));
        closeOptions.leaveGroup(true);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)mockClientSupplier);){
            Assert.assertFalse((boolean)streams.close(closeOptions));
        }
    }

    @Test
    public void shouldThrowOnNegativeTimeoutForCloseWithCloseOptionLeaveGroupTrue() {
        MockClientSupplier mockClientSupplier = (MockClientSupplier)Mockito.spy(MockClientSupplier.class);
        Mockito.when((Object)mockClientSupplier.getAdmin((Map)ArgumentMatchers.any())).thenReturn((Object)this.adminClient);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ofMillis(-1L));
        closeOptions.leaveGroup(true);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)mockClientSupplier, (Time)this.time);){
            Assert.assertThrows(IllegalArgumentException.class, () -> streams.close(closeOptions));
        }
    }

    @Test
    public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() {
        MockConsumer mockConsumer = (MockConsumer)Mockito.mock(MockConsumer.class, (MockSettings)Mockito.withSettings().useConstructor(new Object[]{OffsetResetStrategy.EARLIEST}));
        MockClientSupplier mockClientSupplier = (MockClientSupplier)Mockito.spy(MockClientSupplier.class);
        ConsumerGroupMetadata consumerGroupMetadata = (ConsumerGroupMetadata)Mockito.mock(ConsumerGroupMetadata.class);
        Optional<String> groupInstanceId = Optional.of("test-instance-id");
        Mockito.when((Object)consumerGroupMetadata.groupInstanceId()).thenReturn(groupInstanceId);
        Mockito.when((Object)mockConsumer.groupMetadata()).thenReturn((Object)consumerGroupMetadata);
        Mockito.when((Object)mockClientSupplier.getAdmin((Map)ArgumentMatchers.any())).thenReturn((Object)this.adminClient);
        Mockito.when(mockClientSupplier.getConsumer((Map)ArgumentMatchers.any())).thenReturn((Object)mockConsumer);
        KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
        closeOptions.timeout(Duration.ZERO);
        closeOptions.leaveGroup(true);
        try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)mockClientSupplier);){
            Assert.assertFalse((boolean)streams.close(closeOptions));
        }
    }

    @Test
    public void shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
        try (MockedStatic executorsMockedStatic = Mockito.mockStatic(Executors.class);){
            ScheduledExecutorService cleanupSchedule = (ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class);
            ScheduledExecutorService rocksDBMetricsRecordingTriggerThread = (ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class);
            executorsMockedStatic.when(() -> Executors.newSingleThreadScheduledExecutor((ThreadFactory)ArgumentMatchers.any(ThreadFactory.class))).thenReturn((Object)cleanupSchedule, new Object[]{rocksDBMetricsRecordingTriggerThread});
            StreamsBuilder builder = new StreamsBuilder();
            builder.table("topic", Materialized.as((String)"store"));
            this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.name());
            try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
                streams.start();
            }
            executorsMockedStatic.verify(() -> Executors.newSingleThreadScheduledExecutor((ThreadFactory)ArgumentMatchers.any(ThreadFactory.class)), Mockito.times((int)2));
            ((ScheduledExecutorService)Mockito.verify((Object)rocksDBMetricsRecordingTriggerThread)).scheduleAtFixedRate((Runnable)ArgumentMatchers.any(RocksDBMetricsRecordingTrigger.class), Mockito.eq((long)0L), Mockito.eq((long)1L), (TimeUnit)((Object)Mockito.eq((Object)((Object)TimeUnit.MINUTES))));
            ((ScheduledExecutorService)Mockito.verify((Object)rocksDBMetricsRecordingTriggerThread)).shutdownNow();
        }
    }

    @Test
    public void shouldGetClientSupplierFromConfigForConstructor() {
        StreamsConfig config = new StreamsConfig((Map)this.props);
        StreamsConfig mockConfig = (StreamsConfig)Mockito.spy((Object)config);
        Mockito.when((Object)mockConfig.getKafkaClientSupplier()).thenReturn((Object)this.supplier);
        new KafkaStreams(this.getBuilderWithSource().build(), mockConfig);
        ((StreamsConfig)Mockito.verify((Object)mockConfig, (VerificationMode)Mockito.times((int)2))).getKafkaClientSupplier();
    }

    @Test
    public void shouldGetClientSupplierFromConfigForConstructorWithTime() {
        StreamsConfig config = new StreamsConfig((Map)this.props);
        StreamsConfig mockConfig = (StreamsConfig)Mockito.spy((Object)config);
        Mockito.when((Object)mockConfig.getKafkaClientSupplier()).thenReturn((Object)this.supplier);
        new KafkaStreams(this.getBuilderWithSource().build(), mockConfig, (Time)this.time);
        ((StreamsConfig)Mockito.verify((Object)mockConfig, (VerificationMode)Mockito.times((int)2))).getKafkaClientSupplier();
    }

    @Test
    public void shouldUseProvidedClientSupplier() {
        StreamsConfig config = new StreamsConfig((Map)this.props);
        StreamsConfig mockConfig = (StreamsConfig)Mockito.spy((Object)config);
        new KafkaStreams(this.getBuilderWithSource().build(), mockConfig, (KafkaClientSupplier)this.supplier);
        ((StreamsConfig)Mockito.verify((Object)mockConfig, (VerificationMode)Mockito.times((int)0))).getKafkaClientSupplier();
    }

    @Test
    public void shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
        try (MockedStatic executorsMockedStatic = Mockito.mockStatic(Executors.class);){
            ScheduledExecutorService cleanupSchedule = (ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class);
            executorsMockedStatic.when(() -> Executors.newSingleThreadScheduledExecutor((ThreadFactory)ArgumentMatchers.any(ThreadFactory.class))).thenReturn((Object)cleanupSchedule);
            StreamsBuilder builder = new StreamsBuilder();
            builder.table("topic", Materialized.as((String)"store"));
            this.props.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.name());
            try (KafkaStreams streams = new KafkaStreams(this.getBuilderWithSource().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
                streams.start();
            }
            executorsMockedStatic.verify(() -> Executors.newSingleThreadScheduledExecutor((ThreadFactory)ArgumentMatchers.any(ThreadFactory.class)));
        }
    }

    @Test
    public void shouldCleanupOldStateDirs() {
        try (MockedStatic executorsMockedStatic = Mockito.mockStatic(Executors.class);){
            ScheduledExecutorService cleanupSchedule = (ScheduledExecutorService)Mockito.mock(ScheduledExecutorService.class);
            executorsMockedStatic.when(() -> Executors.newSingleThreadScheduledExecutor((ThreadFactory)ArgumentMatchers.any(ThreadFactory.class))).thenReturn((Object)cleanupSchedule);
            try (MockedConstruction ignored = Mockito.mockConstruction(StateDirectory.class, (mock, context) -> Mockito.when((Object)mock.initializeProcessId()).thenReturn((Object)UUID.randomUUID()));){
                this.props.setProperty("state.cleanup.delay.ms", "1");
                StreamsBuilder builder = new StreamsBuilder();
                builder.table("topic", Materialized.as((String)"store"));
                try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
                    streams.start();
                }
            }
            ((ScheduledExecutorService)Mockito.verify((Object)cleanupSchedule)).scheduleAtFixedRate((Runnable)ArgumentMatchers.any(Runnable.class), Mockito.eq((long)1L), Mockito.eq((long)1L), (TimeUnit)((Object)Mockito.eq((Object)((Object)TimeUnit.MILLISECONDS))));
            ((ScheduledExecutorService)Mockito.verify((Object)cleanupSchedule)).shutdownNow();
        }
    }

    @Test
    public void statelessTopologyShouldNotCreateStateDirectory() {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String inputTopic = safeTestName + "-input";
        String outputTopic = safeTestName + "-output";
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{inputTopic}).addProcessor("process", () -> new Processor<String, String, String, String>(){
            private ProcessorContext context;

            public void init(ProcessorContext<String, String> context) {
                this.context = context;
            }

            public void process(Record<String, String> record) {
                if (((String)record.value()).length() % 2 == 0) {
                    this.context.forward(record.withValue((Object)((String)record.key() + (String)record.value())));
                }
            }
        }, new String[]{"source"}).addSink("sink", outputTopic, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"process"});
        this.startStreamsAndCheckDirExists(topology, false);
    }

    @Test
    public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String inputTopic = safeTestName + "-input";
        String outputTopic = safeTestName + "-output";
        String globalTopicName = safeTestName + "-global";
        String storeName = safeTestName + "-counts";
        String globalStoreName = safeTestName + "-globalStore";
        Topology topology = this.getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, false);
        this.startStreamsAndCheckDirExists(topology, false);
    }

    @Test
    public void statefulTopologyShouldCreateStateDirectory() {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String inputTopic = safeTestName + "-input";
        String outputTopic = safeTestName + "-output";
        String globalTopicName = safeTestName + "-global";
        String storeName = safeTestName + "-counts";
        String globalStoreName = safeTestName + "-globalStore";
        Topology topology = this.getStatefulTopology(inputTopic, outputTopic, globalTopicName, storeName, globalStoreName, true);
        this.startStreamsAndCheckDirExists(topology, true);
    }

    @Test
    public void shouldThrowTopologyExceptionOnEmptyTopology() {
        try {
            new KafkaStreams(new StreamsBuilder().build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);
            Assert.fail((String)"Should have thrown TopologyException");
        }
        catch (TopologyException e) {
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)Matchers.equalTo((Object)"Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table."));
        }
    }

    @Test
    public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            MatcherAssert.assertThat((Object)streams.threads.size(), (Matcher)Matchers.equalTo((Object)0));
        }
    }

    @Test
    public void shouldTransitToRunningWithGlobalOnlyTopology() throws InterruptedException {
        StreamsBuilder builder = new StreamsBuilder();
        builder.globalTable("anyTopic");
        try (KafkaStreams streams = new KafkaStreams(builder.build(), this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            MatcherAssert.assertThat((Object)streams.threads.size(), (Matcher)Matchers.equalTo((Object)0));
            Assert.assertEquals((Object)streams.state(), (Object)KafkaStreams.State.CREATED);
            streams.start();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, (String)("Streams never started, state is " + streams.state()));
            streams.close();
            TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.NOT_RUNNING, (String)"Streams never stopped.");
        }
    }

    @Deprecated
    private Topology getStatefulTopology(String inputTopic, String outputTopic, String globalTopicName, final String storeName, String globalStoreName, boolean isPersistentStore) {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(isPersistentStore ? Stores.persistentKeyValueStore((String)storeName) : Stores.inMemoryKeyValueStore((String)storeName)), (Serde)Serdes.String(), (Serde)Serdes.Long());
        Topology topology = new Topology();
        topology.addSource("source", Serdes.String().deserializer(), Serdes.String().deserializer(), new String[]{inputTopic}).addProcessor("process", () -> new Processor<String, String, String, String>(){
            private ProcessorContext context;

            public void init(ProcessorContext<String, String> context) {
                this.context = context;
            }

            public void process(Record<String, String> record) {
                KeyValueStore kvStore = (KeyValueStore)this.context.getStateStore(storeName);
                kvStore.put((Object)((String)record.key()), (Object)5L);
                this.context.forward(record.withValue((Object)"5"));
                this.context.commit();
            }
        }, new String[]{"source"}).addStateStore(storeBuilder, new String[]{"process"}).addSink("sink", outputTopic, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), new String[]{"process"});
        StoreBuilder globalStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(isPersistentStore ? Stores.persistentKeyValueStore((String)globalStoreName) : Stores.inMemoryKeyValueStore((String)globalStoreName)), (Serde)Serdes.String(), (Serde)Serdes.String()).withLoggingDisabled();
        topology.addGlobalStore(globalStoreBuilder, "global", Serdes.String().deserializer(), Serdes.String().deserializer(), globalTopicName, globalTopicName + "-processor", new MockProcessorSupplier());
        return topology;
    }

    private StreamsBuilder getBuilderWithSource() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("source-topic");
        return builder;
    }

    private void startStreamsAndCheckDirExists(Topology topology, boolean shouldFilesExist) {
        try (MockedConstruction stateDirectoryMockedConstruction = Mockito.mockConstruction(StateDirectory.class, (mock, context) -> {
            Mockito.when((Object)mock.initializeProcessId()).thenReturn((Object)UUID.randomUUID());
            Assert.assertEquals((long)4L, (long)context.arguments().size());
            Assert.assertEquals((Object)shouldFilesExist, context.arguments().get(2));
        });
             KafkaStreams ignored = new KafkaStreams(topology, this.props, (KafkaClientSupplier)this.supplier, (Time)this.time);){
            Assert.assertFalse((boolean)stateDirectoryMockedConstruction.constructed().isEmpty());
        }
    }

    public static class StateListenerStub
    implements KafkaStreams.StateListener {
        int numChanges = 0;
        KafkaStreams.State oldState;
        KafkaStreams.State newState;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap<KafkaStreams.State, Long>();

        public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
            long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0L;
            ++this.numChanges;
            this.oldState = oldState;
            this.newState = newState;
            this.mapStates.put(newState, prevCount + 1L);
        }
    }
}

