/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRule;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StoreChangelogReaderTest
extends EasyMockSupport {
    @Rule
    public EasyMockRule rule = new EasyMockRule((Object)this);
    @Mock(type=MockType.NICE)
    private ProcessorStateManager stateManager;
    @Mock(type=MockType.NICE)
    private ProcessorStateManager activeStateManager;
    @Mock(type=MockType.NICE)
    private ProcessorStateManager standbyStateManager;
    @Mock(type=MockType.NICE)
    private ProcessorStateManager.StateStoreMetadata storeMetadata;
    @Mock(type=MockType.NICE)
    private ProcessorStateManager.StateStoreMetadata storeMetadataOne;
    @Mock(type=MockType.NICE)
    private ProcessorStateManager.StateStoreMetadata storeMetadataTwo;
    @Mock(type=MockType.NICE)
    private StateStore store;
    @Parameterized.Parameter
    public Task.TaskType type;
    private final String storeName = "store";
    private final String topicName = "topic";
    private final LogContext logContext = new LogContext("test-reader ");
    private final TopicPartition tp = new TopicPartition("topic", 0);
    private final TopicPartition tp1 = new TopicPartition("one", 0);
    private final TopicPartition tp2 = new TopicPartition("two", 0);
    private final StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("test-reader"));
    private final MockTime time = new MockTime();
    private final MockStateRestoreListener callback = new MockStateRestoreListener();
    private final KafkaException kaboom = new KafkaException("KABOOM!");
    private final MockStateRestoreListener exceptionCallback = new MockStateRestoreListener(){

        @Override
        public void onRestoreStart(TopicPartition tp, String store, long stOffset, long edOffset) {
            throw StoreChangelogReaderTest.this.kaboom;
        }

        @Override
        public void onBatchRestored(TopicPartition tp, String store, long bedOffset, long numRestored) {
            throw StoreChangelogReaderTest.this.kaboom;
        }

        @Override
        public void onRestoreEnd(TopicPartition tp, String store, long totalRestored) {
            throw StoreChangelogReaderTest.this.kaboom;
        }
    };
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final MockAdminClient adminClient = new MockAdminClient();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{Task.TaskType.STANDBY, Task.TaskType.ACTIVE};
    }

    @Before
    public void setUp() {
        EasyMock.expect((Object)this.stateManager.storeMetadata(this.tp)).andReturn((Object)this.storeMetadata).anyTimes();
        EasyMock.expect((Object)this.stateManager.taskType()).andReturn((Object)this.type).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.storeMetadata(this.tp)).andReturn((Object)this.storeMetadata).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.taskType()).andReturn((Object)Task.TaskType.ACTIVE).anyTimes();
        EasyMock.expect((Object)this.standbyStateManager.storeMetadata(this.tp)).andReturn((Object)this.storeMetadata).anyTimes();
        EasyMock.expect((Object)this.standbyStateManager.taskType()).andReturn((Object)Task.TaskType.STANDBY).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.changelogPartition()).andReturn((Object)this.tp).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.store()).andReturn((Object)this.store).anyTimes();
        EasyMock.expect((Object)this.store.name()).andReturn((Object)"store").anyTimes();
    }

    @After
    public void tearDown() {
        EasyMock.reset((Object[])new Object[]{this.stateManager, this.activeStateManager, this.standbyStateManager, this.storeMetadata, this.storeMetadataOne, this.storeMetadataTwo, this.store});
    }

    @Test
    public void shouldNotRegisterSameStoreMultipleTimes() {
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.storeMetadata});
        this.changelogReader.register(this.tp, this.stateManager);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull((Object)this.changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)0L, (long)this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertThrows(IllegalStateException.class, () -> this.changelogReader.register(this.tp, this.stateManager));
    }

    @Test
    public void shouldNotRegisterStoreWithoutMetadata() {
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.storeMetadata});
        Assert.assertThrows(IllegalStateException.class, () -> this.changelogReader.register(new TopicPartition("ChangelogWithoutStoreMetadata", 0), this.stateManager));
    }

    @Test
    public void shouldSupportUnregisterChangelogBeforeInitialization() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)9L).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 10L));
        EasyMock.replay((Object[])new Object[]{mockTasks, this.stateManager, this.storeMetadata, this.store});
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 100L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.unregister(Collections.singleton(this.tp));
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.assignment());
        Assert.assertNull((Object)this.callback.restoreTopicPartition);
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_suspended"));
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
    }

    @Test
    public void shouldSupportUnregisterChangelogBeforeCompletion() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)9L).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 10L));
        EasyMock.replay((Object[])new Object[]{mockTasks, this.stateManager, this.storeMetadata, this.store});
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 100L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(Collections.emptySet(), (Object)changelogReader.completedChangelogs());
        Assert.assertEquals((long)10L, (long)this.consumer.position(this.tp));
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        Assert.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.assignment());
        changelogReader.unregister(Collections.singleton(this.tp));
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.assignment());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_suspended"));
        } else {
            Assert.assertNull((Object)this.callback.restoreTopicPartition);
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_suspended"));
        }
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
    }

    @Test
    public void shouldSupportUnregisterChangelogAfterCompletion() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)9L).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 10L));
        EasyMock.replay((Object[])new Object[]{mockTasks, this.stateManager, this.storeMetadata, this.store});
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)10L, (long)this.consumer.position(this.tp));
        Assert.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.assignment());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
            Assert.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
        } else {
            Assert.assertEquals(Collections.emptySet(), (Object)changelogReader.completedChangelogs());
            Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        }
        changelogReader.unregister(Collections.singleton(this.tp));
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.assignment());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_suspended"));
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        }
    }

    @Test
    public void shouldInitializeChangelogAndCheckForCompletion() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)9L).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTasks, this.stateManager, this.storeMetadata, this.store});
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        changelogReader.restore(mockTasks);
        Assert.assertEquals((Object)(this.type == Task.TaskType.ACTIVE ? StoreChangelogReader.ChangelogState.COMPLETED : StoreChangelogReader.ChangelogState.RESTORING), (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((Object)(this.type == Task.TaskType.ACTIVE ? Long.valueOf(10L) : null), (Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(this.type == Task.TaskType.ACTIVE ? Collections.singleton(this.tp) : Collections.emptySet(), (Object)changelogReader.completedChangelogs());
        Assert.assertEquals((long)10L, (long)this.consumer.position(this.tp));
        Assert.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        }
    }

    @Test
    public void shouldTriggerRestoreListenerWithOffsetZeroIfPositionThrowsTimeoutException() {
        if (this.type == Task.TaskType.ACTIVE) {
            Map mockTasks = (Map)this.mock(Map.class);
            EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
            EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
            EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 5L));
            EasyMock.replay((Object[])new Object[]{mockTasks, this.stateManager, this.storeMetadata, this.store});
            this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
            MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

                public long position(TopicPartition partition) {
                    throw new TimeoutException("KABOOM!");
                }
            };
            consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
            StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, (Consumer)consumer, (StateRestoreListener)this.callback);
            changelogReader.register(this.tp, this.stateManager);
            changelogReader.restore(mockTasks);
            MatcherAssert.assertThat((Object)this.callback.restoreStartOffset, (Matcher)Matchers.equalTo((Object)0L));
        }
    }

    @Test
    public void shouldPollWithRightTimeout() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn(null).andReturn((Object)9L).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 5L));
        EasyMock.expect((Object)this.stateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.storeMetadata, this.store});
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 11L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class)));
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals((Object)Duration.ofMillis(this.config.getLong("poll.ms")), (Object)this.consumer.lastPollTimeout());
        } else {
            Assert.assertEquals((Object)Duration.ZERO, (Object)this.consumer.lastPollTimeout());
        }
    }

    @Test
    public void shouldPollWithRightTimeoutWithStateUpdater() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn(null).andReturn((Object)9L).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 5L));
        EasyMock.expect((Object)this.stateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.storeMetadata, this.store});
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 11L));
        Properties properties = new Properties();
        properties.put("__state.updater.enabled__", (Object)true);
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("test-reader", properties));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class)));
        Assert.assertEquals((Object)Duration.ofMillis(config.getLong("poll.ms")), (Object)this.consumer.lastPollTimeout());
    }

    @Test
    public void shouldRestoreFromPositionAndCheckForCompletion() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)5L).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 5L));
        EasyMock.expect((Object)this.stateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.storeMetadata, this.store});
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class)));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertTrue((boolean)changelogReader.completedChangelogs().isEmpty());
        Assert.assertEquals((long)6L, (long)this.consumer.position(this.tp));
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
            Assert.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_end"));
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        } else {
            Assert.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        }
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 11L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class)));
        Assert.assertEquals((long)12L, (long)this.consumer.position(this.tp));
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.COMPLETED, (Object)changelogReader.changelogMetadata(this.tp).state());
            Assert.assertEquals((long)3L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
            Assert.assertEquals((long)1L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
            Assert.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
            Assert.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_batch"));
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
        } else {
            Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
            Assert.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
            Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
            Assert.assertEquals(Collections.emptySet(), (Object)changelogReader.completedChangelogs());
            Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        }
    }

    @Test
    public void shouldRestoreFromBeginningAndCheckCompletion() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn(null).andReturn((Object)9L).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 5L));
        EasyMock.expect((Object)this.stateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.storeMetadata, this.store});
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 11L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        if (this.type == Task.TaskType.STANDBY) {
            changelogReader.transitToUpdateStandby();
        }
        changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class)));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)5L, (long)this.consumer.position(this.tp));
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals((long)11L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
            Assert.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_end"));
            Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        } else {
            Assert.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        }
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class)));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)3L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.consumer.seek(this.tp, 11L);
        changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class)));
        Assert.assertEquals((long)11L, (long)this.consumer.position(this.tp));
        Assert.assertEquals((long)3L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.COMPLETED, (Object)changelogReader.changelogMetadata(this.tp).state());
            Assert.assertEquals((long)3L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
            Assert.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
            Assert.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_batch"));
            Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
        } else {
            Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
            Assert.assertEquals(Collections.emptySet(), (Object)changelogReader.completedChangelogs());
            Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        }
    }

    @Test
    public void shouldCheckCompletionIfPositionLargerThanEndOffset() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)5L).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTasks, this.activeStateManager, this.storeMetadata, this.store});
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 0L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.activeStateManager);
        changelogReader.restore(mockTasks);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.COMPLETED, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
        Assert.assertEquals((long)6L, (long)this.consumer.position(this.tp));
        Assert.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
        Assert.assertEquals((Object)this.tp, (Object)this.callback.restoreTopicPartition);
        Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assert.assertEquals((Object)"store", (Object)this.callback.storeNameCalledStates.get("restore_end"));
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
    }

    @Test
    public void shouldRequestPositionAndHandleTimeoutException() {
        TaskId taskId = new TaskId(0, 0);
        Task mockTask = (Task)this.mock(Task.class);
        mockTask.clearTaskTimeout();
        mockTask.maybeInitTaskTimeoutOrThrow(EasyMock.anyLong(), (Exception)EasyMock.anyObject());
        EasyMock.expectLastCall();
        mockTask.recordRestoration((Time)EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)10L).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 10L));
        EasyMock.expect((Object)this.activeStateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTask, this.activeStateManager, this.storeMetadata, this.store});
        final AtomicBoolean clearException = new AtomicBoolean(false);
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public long position(TopicPartition partition) {
                if (clearException.get()) {
                    return 10L;
                }
                throw new TimeoutException("KABOOM!");
            }
        };
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, (Consumer)consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.activeStateManager);
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertTrue((boolean)changelogReader.completedChangelogs().isEmpty());
        Assert.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        EasyMock.verify((Object[])new Object[]{mockTask});
        clearException.set(true);
        EasyMock.resetToDefault((Object[])new Object[]{mockTask});
        mockTask.clearTaskTimeout();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{mockTask});
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.COMPLETED, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals(Collections.singleton(this.tp), (Object)changelogReader.completedChangelogs());
        Assert.assertEquals((long)10L, (long)consumer.position(this.tp));
        EasyMock.verify((Object[])new Object[]{mockTask});
    }

    @Test
    public void shouldThrowIfPositionFail() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.activeStateManager.taskId()).andReturn((Object)taskId);
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)10L).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.activeStateManager, this.storeMetadata, this.store});
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public long position(TopicPartition partition) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, (Consumer)consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.activeStateManager);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class))));
        Assert.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldRequestEndOffsetsAndHandleTimeoutException() {
        TaskId taskId = new TaskId(0, 0);
        Task mockTask = (Task)this.niceMock(Task.class);
        mockTask.maybeInitTaskTimeoutOrThrow(EasyMock.anyLong(), (Exception)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)5L).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 5L));
        EasyMock.expect((Object)this.activeStateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTask, this.activeStateManager, this.storeMetadata, this.store});
        final AtomicBoolean functionCalled = new AtomicBoolean(false);
        MockAdminClient adminClient = new MockAdminClient(){

            public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
                if (functionCalled.get()) {
                    return super.listOffsets(topicPartitionOffsets, options);
                }
                functionCalled.set(true);
                throw new TimeoutException("KABOOM!");
            }
        };
        adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
                throw new AssertionError((Object)"Should not trigger this function");
            }
        };
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)adminClient, (Consumer)consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.activeStateManager);
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertTrue((boolean)functionCalled.get());
        EasyMock.verify((Object[])new Object[]{mockTask});
        EasyMock.resetToDefault((Object[])new Object[]{mockTask});
        mockTask.clearTaskTimeout();
        mockTask.recordRestoration((Time)EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyBoolean());
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{mockTask});
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)6L, (long)consumer.position(this.tp));
        EasyMock.verify((Object[])new Object[]{mockTask});
    }

    @Test
    public void shouldThrowIfEndOffsetsFail() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)10L).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.activeStateManager, this.storeMetadata, this.store});
        MockAdminClient adminClient = new MockAdminClient(){

            public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 0L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.activeStateManager);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class))));
        Assert.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldRequestCommittedOffsetsAndHandleTimeoutException() {
        TaskId taskId = new TaskId(0, 0);
        Task mockTask = (Task)this.mock(Task.class);
        if (this.type == Task.TaskType.ACTIVE) {
            mockTask.clearTaskTimeout();
        }
        mockTask.maybeInitTaskTimeoutOrThrow(EasyMock.anyLong(), (Exception)EasyMock.anyObject());
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.stateManager.changelogAsSource(this.tp)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)5L).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 5L));
        EasyMock.expect((Object)this.stateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTask, this.stateManager, this.storeMetadata, this.store});
        final AtomicBoolean functionCalled = new AtomicBoolean(false);
        MockAdminClient adminClient = new MockAdminClient(){

            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
                if (functionCalled.get()) {
                    return super.listConsumerGroupOffsets(groupSpecs, options);
                }
                functionCalled.set(true);
                return AdminClientTestUtils.listConsumerGroupOffsetsResult((String)groupSpecs.keySet().iterator().next(), (KafkaException)new TimeoutException("KABOOM!"));
            }
        };
        adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 20L));
        adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assert.assertEquals((Object)(this.type == Task.TaskType.ACTIVE ? StoreChangelogReader.ChangelogState.REGISTERED : StoreChangelogReader.ChangelogState.RESTORING), (Object)changelogReader.changelogMetadata(this.tp).state());
        if (this.type == Task.TaskType.ACTIVE) {
            Assert.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        } else {
            Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        }
        Assert.assertTrue((boolean)functionCalled.get());
        EasyMock.verify((Object[])new Object[]{mockTask});
        EasyMock.resetToDefault((Object[])new Object[]{mockTask});
        if (this.type == Task.TaskType.ACTIVE) {
            mockTask.clearTaskTimeout();
            mockTask.clearTaskTimeout();
            EasyMock.expectLastCall();
            mockTask.recordRestoration((Time)EasyMock.anyObject(), EasyMock.anyLong(), EasyMock.anyBoolean());
            EasyMock.expectLastCall();
        }
        EasyMock.replay((Object[])new Object[]{mockTask});
        changelogReader.restore(Collections.singletonMap(taskId, mockTask));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)(this.type == Task.TaskType.ACTIVE ? 10L : 0L), (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)6L, (long)this.consumer.position(this.tp));
        EasyMock.verify((Object[])new Object[]{mockTask});
    }

    @Test
    public void shouldThrowIfCommittedOffsetsFail() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.stateManager.taskId()).andReturn((Object)taskId);
        EasyMock.expect((Object)this.stateManager.changelogAsSource(this.tp)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)10L).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.storeMetadata, this.store});
        MockAdminClient adminClient = new MockAdminClient(){

            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.register(this.tp, this.stateManager);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class))));
        Assert.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldThrowIfUnsubscribeFail() {
        EasyMock.replay((Object[])new Object[]{this.stateManager, this.storeMetadata, this.store});
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public void unsubscribe() {
                throw StoreChangelogReaderTest.this.kaboom;
            }
        };
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, (Consumer)consumer, (StateRestoreListener)this.callback);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((StoreChangelogReader)changelogReader).clear());
        Assert.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldOnlyRestoreStandbyChangelogInUpdateStandbyState() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTasks, this.standbyStateManager, this.storeMetadata, this.store});
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.changelogReader.register(this.tp, this.standbyStateManager);
        this.changelogReader.restore(mockTasks);
        Assert.assertNull((Object)this.callback.restoreTopicPartition);
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull((Object)this.changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)0L, (long)this.changelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 10L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 11L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.changelogReader.restore(mockTasks);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)0L, (long)this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertTrue((boolean)this.changelogReader.changelogMetadata(this.tp).bufferedRecords().isEmpty());
        Assert.assertEquals(Collections.singleton(this.tp), (Object)this.consumer.paused());
        this.changelogReader.transitToUpdateStandby();
        this.changelogReader.restore(mockTasks);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)5L, (long)this.changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertTrue((boolean)this.changelogReader.changelogMetadata(this.tp).bufferedRecords().isEmpty());
    }

    @Test
    public void shouldNotUpdateLimitForNonSourceStandbyChangelog() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.standbyStateManager.changelogAsSource(this.tp)).andReturn((Object)false).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTasks, this.standbyStateManager, this.storeMetadata, this.store});
        MockAdminClient adminClient = new MockAdminClient(){

            public synchronized ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
                throw new AssertionError((Object)"Should not try to fetch committed offsets");
            }
        };
        Properties properties = new Properties();
        properties.put("commit.interval.ms", (Object)100L);
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("test-reader", properties));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, config, this.logContext, (Admin)adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.transitToUpdateStandby();
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        changelogReader.register(this.tp, this.standbyStateManager);
        Assert.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        changelogReader.restore(mockTasks);
        Assert.assertNull((Object)this.callback.restoreTopicPartition);
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 5L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 10L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 11L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(mockTasks);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertNull((Object)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)6L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_end"));
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
    }

    @Test
    public void shouldRestoreToLimitInStandbyState() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.standbyStateManager.changelogAsSource(this.tp)).andReturn((Object)true).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTasks, this.standbyStateManager, this.storeMetadata, this.store});
        long now = this.time.milliseconds();
        Properties properties = new Properties();
        properties.put("commit.interval.ms", (Object)100L);
        StreamsConfig config = new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig("test-reader", properties));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        changelogReader.transitToUpdateStandby();
        this.consumer.updateBeginningOffsets(Collections.singletonMap(this.tp, 5L));
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 7L));
        changelogReader.register(this.tp, this.standbyStateManager);
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        changelogReader.restore(mockTasks);
        Assert.assertNull((Object)this.callback.restoreTopicPartition);
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_start"));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)7L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 5L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 8L, null, (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 9L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 10L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 11L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(mockTasks);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((long)7L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_end"));
        Assert.assertNull((Object)this.callback.storeNameCalledStates.get("restore_batch"));
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 10L));
        this.time.setCurrentTimeMs(now + 100L);
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)7L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.time.setCurrentTimeMs(now + 101L);
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.adminClient.updateConsumerGroupOffsets(Collections.singletonMap(this.tp, 15L));
        this.time.setCurrentTimeMs(now + 201L);
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.time.setCurrentTimeMs(now + 202L);
        changelogReader.enforceRestoreActive();
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)10L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        changelogReader.transitToUpdateStandby();
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)15L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)4L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)2L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)15L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)6L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 12L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 13L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 14L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 15L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        changelogReader.restore(mockTasks);
        Assert.assertEquals((long)15L, (long)changelogReader.changelogMetadata(this.tp).endOffset());
        Assert.assertEquals((long)9L, (long)changelogReader.changelogMetadata(this.tp).totalRestored());
        Assert.assertEquals((long)1L, (long)changelogReader.changelogMetadata(this.tp).bufferedRecords().size());
        Assert.assertEquals((long)0L, (long)changelogReader.changelogMetadata(this.tp).bufferedLimitIndex());
    }

    @Test
    public void shouldRestoreMultipleChangelogs() {
        Map mockTasks = (Map)this.mock(Map.class);
        EasyMock.expect((Object)((Task)mockTasks.get(null))).andReturn((Object)((Task)this.mock(Task.class))).anyTimes();
        EasyMock.expect((Object)mockTasks.containsKey(null)).andReturn((Object)true).anyTimes();
        EasyMock.expect((Object)this.storeMetadataOne.changelogPartition()).andReturn((Object)this.tp1).anyTimes();
        EasyMock.expect((Object)this.storeMetadataOne.store()).andReturn((Object)this.store).anyTimes();
        EasyMock.expect((Object)this.storeMetadataTwo.changelogPartition()).andReturn((Object)this.tp2).anyTimes();
        EasyMock.expect((Object)this.storeMetadataTwo.store()).andReturn((Object)this.store).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)0L).anyTimes();
        EasyMock.expect((Object)this.storeMetadataOne.offset()).andReturn((Object)0L).anyTimes();
        EasyMock.expect((Object)this.storeMetadataTwo.offset()).andReturn((Object)0L).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.storeMetadata(this.tp1)).andReturn((Object)this.storeMetadataOne).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.storeMetadata(this.tp2)).andReturn((Object)this.storeMetadataTwo).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.changelogOffsets()).andReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.tp, (Object)5L), Utils.mkEntry((Object)this.tp1, (Object)5L), Utils.mkEntry((Object)this.tp2, (Object)5L)})).anyTimes();
        EasyMock.replay((Object[])new Object[]{mockTasks, this.activeStateManager, this.storeMetadata, this.store, this.storeMetadataOne, this.storeMetadataTwo});
        this.setupConsumer(10L, this.tp);
        this.setupConsumer(5L, this.tp1);
        this.setupConsumer(3L, this.tp2);
        this.changelogReader.register(this.tp, this.activeStateManager);
        this.changelogReader.register(this.tp1, this.activeStateManager);
        this.changelogReader.register(this.tp2, this.activeStateManager);
        this.changelogReader.restore(mockTasks);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp2).state());
        this.changelogReader.unregister(Collections.singletonList(this.tp));
        Assert.assertNull((Object)this.changelogReader.changelogMetadata(this.tp));
        Assert.assertFalse((boolean)this.changelogReader.isEmpty());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)this.changelogReader.changelogMetadata(this.tp2).state());
        this.changelogReader.clear();
        Assert.assertTrue((boolean)this.changelogReader.isEmpty());
        Assert.assertNull((Object)this.changelogReader.changelogMetadata(this.tp1));
        Assert.assertNull((Object)this.changelogReader.changelogMetadata(this.tp2));
        Assert.assertEquals((Object)this.changelogReader.state(), (Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING);
    }

    @Test
    public void shouldTransitState() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.storeMetadataOne.changelogPartition()).andReturn((Object)this.tp1).anyTimes();
        EasyMock.expect((Object)this.storeMetadataOne.store()).andReturn((Object)this.store).anyTimes();
        EasyMock.expect((Object)this.storeMetadataTwo.changelogPartition()).andReturn((Object)this.tp2).anyTimes();
        EasyMock.expect((Object)this.storeMetadataTwo.store()).andReturn((Object)this.store).anyTimes();
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)5L).anyTimes();
        EasyMock.expect((Object)this.storeMetadataOne.offset()).andReturn((Object)5L).anyTimes();
        EasyMock.expect((Object)this.storeMetadataTwo.offset()).andReturn((Object)5L).anyTimes();
        EasyMock.expect((Object)this.standbyStateManager.storeMetadata(this.tp1)).andReturn((Object)this.storeMetadataOne).anyTimes();
        EasyMock.expect((Object)this.standbyStateManager.storeMetadata(this.tp2)).andReturn((Object)this.storeMetadataTwo).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.tp, 5L));
        EasyMock.expect((Object)this.activeStateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.expect((Object)this.standbyStateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.activeStateManager, this.standbyStateManager, this.storeMetadata, this.store, this.storeMetadataOne, this.storeMetadataTwo});
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp1, 10L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp2, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
        changelogReader.register(this.tp, this.activeStateManager);
        changelogReader.register(this.tp1, this.standbyStateManager);
        changelogReader.register(this.tp2, this.standbyStateManager);
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)changelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.REGISTERED, (Object)changelogReader.changelogMetadata(this.tp2).state());
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.assignment());
        changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class)));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp2).state());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp, this.tp1, this.tp2}), (Object)this.consumer.assignment());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2}), (Object)this.consumer.paused());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
        changelogReader.enforceRestoreActive();
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
        changelogReader.transitToUpdateStandby();
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.STANDBY_UPDATING, (Object)changelogReader.state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp2).state());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp, this.tp1, this.tp2}), (Object)this.consumer.assignment());
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        Assert.assertThrows(IllegalStateException.class, () -> ((StoreChangelogReader)changelogReader).transitToUpdateStandby());
        changelogReader.unregister(Collections.singletonList(this.tp));
        changelogReader.register(this.tp, this.activeStateManager);
        Assert.assertThrows(IllegalStateException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class))));
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp1).state());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogState.RESTORING, (Object)changelogReader.changelogMetadata(this.tp2).state());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp, this.tp1, this.tp2}), (Object)this.consumer.assignment());
        Assert.assertEquals(Collections.emptySet(), (Object)this.consumer.paused());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.STANDBY_UPDATING, (Object)changelogReader.state());
        changelogReader.enforceRestoreActive();
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp, this.tp1, this.tp2}), (Object)this.consumer.assignment());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2}), (Object)this.consumer.paused());
    }

    @Test
    public void shouldTransitStateBackToActiveRestoringAfterRemovingLastTask() {
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.callback);
        EasyMock.expect((Object)this.standbyStateManager.storeMetadata(this.tp1)).andReturn((Object)this.storeMetadataOne).anyTimes();
        EasyMock.expect((Object)this.storeMetadataOne.changelogPartition()).andReturn((Object)this.tp1).anyTimes();
        EasyMock.expect((Object)this.storeMetadataOne.store()).andReturn((Object)this.store).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.standbyStateManager, this.store, this.storeMetadataOne});
        changelogReader.register(this.tp1, this.standbyStateManager);
        changelogReader.transitToUpdateStandby();
        changelogReader.unregister((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.tp1}));
        Assert.assertTrue((boolean)changelogReader.isEmpty());
        Assert.assertEquals((Object)StoreChangelogReader.ChangelogReaderState.ACTIVE_RESTORING, (Object)changelogReader.state());
    }

    @Test
    public void shouldThrowIfRestoreCallbackThrows() {
        TaskId taskId = new TaskId(0, 0);
        EasyMock.expect((Object)this.storeMetadata.offset()).andReturn((Object)5L).anyTimes();
        EasyMock.expect((Object)this.activeStateManager.taskId()).andReturn((Object)taskId).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.activeStateManager, this.storeMetadata, this.store});
        this.adminClient.updateEndOffsets(Collections.singletonMap(this.tp, 10L));
        StoreChangelogReader changelogReader = new StoreChangelogReader((Time)this.time, this.config, this.logContext, (Admin)this.adminClient, this.consumer, (StateRestoreListener)this.exceptionCallback);
        changelogReader.register(this.tp, this.activeStateManager);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class))));
        Assert.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 6L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        this.consumer.addRecord(new ConsumerRecord("topic", 0, 7L, (Object)"key".getBytes(), (Object)"value".getBytes()));
        thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class))));
        Assert.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
        this.consumer.seek(this.tp, 10L);
        thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> changelogReader.restore(Collections.singletonMap(taskId, (Task)this.mock(Task.class))));
        Assert.assertEquals((Object)this.kaboom, (Object)thrown.getCause());
    }

    @Test
    public void shouldNotThrowOnUnknownRevokedPartition() {
        LogCaptureAppender.setClassLoggerToDebug(StoreChangelogReader.class);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StoreChangelogReader.class);){
            this.changelogReader.unregister(Collections.singletonList(new TopicPartition("unknown", 0)));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)Matchers.hasItem((Object)"test-reader Changelog partition unknown-0 could not be found, it could be already cleaned up during the handling of task corruption and never restore again"));
        }
    }

    private void setupConsumer(long messages, TopicPartition topicPartition) {
        this.assignPartition(messages, topicPartition);
        this.addRecords(messages, topicPartition);
        this.consumer.assign(Collections.emptyList());
    }

    private void addRecords(long messages, TopicPartition topicPartition) {
        int i = 0;
        while ((long)i < messages) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), (long)i, (Object)new byte[0], (Object)new byte[0]));
            ++i;
        }
    }

    private void assignPartition(long messages, TopicPartition topicPartition) {
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, null, null)));
        this.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        this.consumer.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0L, messages) + 1L));
        this.adminClient.updateEndOffsets(Collections.singletonMap(topicPartition, Math.max(0L, messages) + 1L));
        this.consumer.assign(Collections.singletonList(topicPartition));
    }
}

