/*
 * Decompiled with CFR 0.152.
 */
package kafka.log.remote;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
import kafka.log.remote.RemoteLogManager;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Option;
import scala.collection.JavaConverters;

public class RemoteLogManagerTest {
    private final Time time = new MockTime();
    private final int brokerId = 0;
    private final String logDir = TestUtils.tempDirectory((String)"kafka-").toString();
    private final String clusterId = "dummyId";
    private final String remoteLogStorageTestProp = "remote.log.storage.test";
    private final String remoteLogStorageTestVal = "storage.test";
    private final String remoteLogMetadataTestProp = "remote.log.metadata.test";
    private final String remoteLogMetadataTestVal = "metadata.test";
    private final String remoteLogMetadataCommonClientTestProp = "remote.log.metadata.common.client.common.client.test";
    private final String remoteLogMetadataCommonClientTestVal = "common.test";
    private final String remoteLogMetadataProducerTestProp = "remote.log.metadata.producer.producer.test";
    private final String remoteLogMetadataProducerTestVal = "producer.test";
    private final String remoteLogMetadataConsumerTestProp = "remote.log.metadata.consumer.consumer.test";
    private final String remoteLogMetadataConsumerTestVal = "consumer.test";
    private final String remoteLogMetadataTopicPartitionsNum = "1";
    private final RemoteStorageManager remoteStorageManager = (RemoteStorageManager)Mockito.mock(RemoteStorageManager.class);
    private final RemoteLogMetadataManager remoteLogMetadataManager = (RemoteLogMetadataManager)Mockito.mock(RemoteLogMetadataManager.class);
    private RemoteLogManagerConfig remoteLogManagerConfig = null;
    private BrokerTopicStats brokerTopicStats = null;
    private RemoteLogManager remoteLogManager = null;
    private final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
    private final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>();
    private final TopicPartition tp = new TopicPartition("TestTopic", 5);
    private final EpochEntry epochEntry0 = new EpochEntry(0, 0L);
    private final EpochEntry epochEntry1 = new EpochEntry(1, 100L);
    private final EpochEntry epochEntry2 = new EpochEntry(2, 200L);
    private final List<EpochEntry> totalEpochEntries = Arrays.asList(this.epochEntry0, this.epochEntry1, this.epochEntry2);
    private final LeaderEpochCheckpoint checkpoint = new LeaderEpochCheckpoint(){
        List<EpochEntry> epochs = Collections.emptyList();

        public void write(Collection<EpochEntry> epochs) {
            this.epochs = new ArrayList<EpochEntry>(epochs);
        }

        public List<EpochEntry> read() {
            return this.epochs;
        }
    };
    private final AtomicLong currentLogStartOffset = new AtomicLong(0L);
    private final UnifiedLog mockLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);

    @BeforeEach
    void setUp() throws Exception {
        this.topicIds.put(this.leaderTopicIdPartition.topicPartition().topic(), this.leaderTopicIdPartition.topicId());
        this.topicIds.put(this.followerTopicIdPartition.topicPartition().topic(), this.followerTopicIdPartition.topicId());
        Properties props = kafka.utils.TestUtils.createDummyBrokerConfig();
        props.setProperty("remote.log.storage.system.enable", "true");
        this.remoteLogManagerConfig = this.createRLMConfig(props);
        this.brokerTopicStats = new BrokerTopicStats(Optional.of(KafkaConfig.fromProps((Properties)props)));
        kafka.utils.TestUtils.clearYammerMetrics();
        this.remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, 0, this.logDir, "dummyId", this.time, tp -> Optional.of(this.mockLog), (topicPartition, offset) -> this.currentLogStartOffset.set((long)offset), this.brokerTopicStats){

            public RemoteStorageManager createRemoteStorageManager() {
                return RemoteLogManagerTest.this.remoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }

            long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
                return 0L;
            }
        };
    }

    @Test
    void testGetLeaderEpochCheckpoint() {
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        InMemoryLeaderEpochCheckpoint inMemoryCheckpoint = this.remoteLogManager.getLeaderEpochCheckpoint(this.mockLog, 0L, 300L);
        Assertions.assertEquals(this.totalEpochEntries, (Object)inMemoryCheckpoint.read());
        InMemoryLeaderEpochCheckpoint inMemoryCheckpoint2 = this.remoteLogManager.getLeaderEpochCheckpoint(this.mockLog, 100L, 200L);
        List epochEntries = inMemoryCheckpoint2.read();
        Assertions.assertEquals((int)1, (int)epochEntries.size());
        Assertions.assertEquals((Object)this.epochEntry1, epochEntries.get(0));
    }

    @Test
    void testFindHighestRemoteOffset() throws RemoteStorageException {
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), this.tp);
        long offset = this.remoteLogManager.findHighestRemoteOffset(tpId, this.mockLog);
        Assertions.assertEquals((long)-1L, (long)offset);
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch(tpId, 2)).thenReturn(Optional.of(200L));
        long offset2 = this.remoteLogManager.findHighestRemoteOffset(tpId, this.mockLog);
        Assertions.assertEquals((long)200L, (long)offset2);
    }

    @Test
    void testRemoteLogMetadataManagerWithUserDefinedConfigs() {
        String key = "key";
        String configPrefix = "config.prefix";
        Properties props = new Properties();
        props.put("remote.log.metadata.manager.impl.prefix", configPrefix);
        props.put(configPrefix + key, "world");
        props.put("remote.log.metadata.y", "z");
        Map metadataMangerConfig = this.createRLMConfig(props).remoteLogMetadataManagerProps();
        Assertions.assertEquals((Object)props.get(configPrefix + key), metadataMangerConfig.get(key));
        Assertions.assertFalse((boolean)metadataMangerConfig.containsKey("remote.log.metadata.y"));
    }

    @Test
    void testRemoteStorageManagerWithUserDefinedConfigs() {
        String key = "key";
        String configPrefix = "config.prefix";
        Properties props = new Properties();
        props.put("remote.log.storage.manager.impl.prefix", configPrefix);
        props.put(configPrefix + key, "world");
        props.put("remote.storage.manager.y", "z");
        Map remoteStorageManagerConfig = this.createRLMConfig(props).remoteStorageManagerProps();
        Assertions.assertEquals((Object)props.get(configPrefix + key), remoteStorageManagerConfig.get(key));
        Assertions.assertFalse((boolean)remoteStorageManagerConfig.containsKey("remote.storage.manager.y"));
    }

    @Test
    void testRemoteLogMetadataManagerWithEndpointConfig() {
        String host = "localhost";
        String port = "1234";
        String securityProtocol = "PLAINTEXT";
        EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol), SecurityProtocol.PLAINTEXT);
        this.remoteLogManager.onEndPointCreated(endPoint);
        this.remoteLogManager.startup();
        ArgumentCaptor capture = ArgumentCaptor.forClass(Map.class);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).configure((Map)capture.capture());
        Assertions.assertEquals((Object)(host + ":" + port), ((Map)capture.getValue()).get("remote.log.metadata.common.client.bootstrap.servers"));
        Assertions.assertEquals((Object)securityProtocol, ((Map)capture.getValue()).get("remote.log.metadata.common.client.security.protocol"));
        Assertions.assertEquals((Object)"dummyId", ((Map)capture.getValue()).get("cluster.id"));
        Assertions.assertEquals((Object)0, ((Map)capture.getValue()).get(KafkaConfig.BrokerIdProp()));
    }

    @Test
    void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws IOException {
        Properties props = new Properties();
        props.put("rlmm.config.remote.log.metadata.common.client.security.protocol", "SSL");
        try (RemoteLogManager remoteLogManager = new RemoteLogManager(this.createRLMConfig(props), 0, this.logDir, "dummyId", this.time, tp -> Optional.of(this.mockLog), (topicPartition, offset) -> {}, this.brokerTopicStats){

            public RemoteStorageManager createRemoteStorageManager() {
                return RemoteLogManagerTest.this.remoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };){
            String host = "localhost";
            String port = "1234";
            String securityProtocol = "PLAINTEXT";
            EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new ListenerName(securityProtocol), SecurityProtocol.PLAINTEXT);
            remoteLogManager.onEndPointCreated(endPoint);
            remoteLogManager.startup();
            ArgumentCaptor capture = ArgumentCaptor.forClass(Map.class);
            ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).configure((Map)capture.capture());
            Assertions.assertEquals((Object)(host + ":" + port), ((Map)capture.getValue()).get("remote.log.metadata.common.client.bootstrap.servers"));
            Assertions.assertEquals((Object)"SSL", ((Map)capture.getValue()).get("remote.log.metadata.common.client.security.protocol"));
            Assertions.assertEquals((Object)"dummyId", ((Map)capture.getValue()).get("cluster.id"));
            Assertions.assertEquals((Object)0, ((Map)capture.getValue()).get(KafkaConfig.BrokerIdProp()));
        }
    }

    @Test
    void testStartup() {
        this.remoteLogManager.startup();
        ArgumentCaptor capture = ArgumentCaptor.forClass(Map.class);
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)1))).configure((Map)capture.capture());
        Assertions.assertEquals((Object)0, ((Map)capture.getValue()).get("broker.id"));
        Assertions.assertEquals((Object)"storage.test", ((Map)capture.getValue()).get("remote.log.storage.test"));
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).configure((Map)capture.capture());
        Assertions.assertEquals((Object)0, ((Map)capture.getValue()).get("broker.id"));
        Assertions.assertEquals((Object)this.logDir, ((Map)capture.getValue()).get("log.dir"));
        Assertions.assertEquals((Object)"1", ((Map)capture.getValue()).get("remote.log.metadata.topic.num.partitions"));
        Assertions.assertEquals((Object)"metadata.test", ((Map)capture.getValue()).get("remote.log.metadata.test"));
        Assertions.assertEquals((Object)"consumer.test", ((Map)capture.getValue()).get("remote.log.metadata.consumer.consumer.test"));
        Assertions.assertEquals((Object)"producer.test", ((Map)capture.getValue()).get("remote.log.metadata.producer.producer.test"));
        Assertions.assertEquals((Object)"common.test", ((Map)capture.getValue()).get("remote.log.metadata.common.client.common.client.test"));
    }

    @Test
    void testCopyLogSegmentsToRemoteShouldCopyExpectedLogSegment() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        long lso = 250L;
        long leo = 300L;
        this.assertCopyExpectedLogSegmentsToRemote(oldSegmentStartOffset, nextSegmentStartOffset, lso, leo);
    }

    @Test
    void testCopyLogSegmentToRemoteForStaleTopic() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        long lso = 150L;
        long leo = 150L;
        this.assertCopyExpectedLogSegmentsToRemote(oldSegmentStartOffset, nextSegmentStartOffset, lso, leo);
    }

    private void assertCopyExpectedLogSegmentsToRemote(long oldSegmentStartOffset, long nextSegmentStartOffset, long lastStableOffset, long logEndOffset) throws Exception {
        long oldSegmentEndOffset = nextSegmentStartOffset - 1L;
        Mockito.when((Object)this.mockLog.topicPartition()).thenReturn((Object)this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(-1L));
        File tempFile = TestUtils.tempFile();
        File mockProducerSnapshotIndex = TestUtils.tempFile();
        File tempDir = TestUtils.tempDirectory();
        LogSegment oldSegment = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)oldSegment.baseOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)nextSegmentStartOffset);
        ((LogSegment)Mockito.verify((Object)oldSegment, (VerificationMode)Mockito.times((int)0))).readNextOffset();
        ((LogSegment)Mockito.verify((Object)activeSegment, (VerificationMode)Mockito.times((int)0))).readNextOffset();
        FileRecords fileRecords = (FileRecords)Mockito.mock(FileRecords.class);
        Mockito.when((Object)oldSegment.log()).thenReturn((Object)fileRecords);
        Mockito.when((Object)fileRecords.file()).thenReturn((Object)tempFile);
        Mockito.when((Object)fileRecords.sizeInBytes()).thenReturn((Object)10);
        Mockito.when((Object)oldSegment.readNextOffset()).thenReturn((Object)nextSegmentStartOffset);
        Mockito.when((Object)this.mockLog.activeSegment()).thenReturn((Object)activeSegment);
        Mockito.when((Object)this.mockLog.logStartOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
        ProducerStateManager mockStateManager = (ProducerStateManager)Mockito.mock(ProducerStateManager.class);
        Mockito.when((Object)this.mockLog.producerStateManager()).thenReturn((Object)mockStateManager);
        Mockito.when((Object)mockStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
        Mockito.when((Object)this.mockLog.lastStableOffset()).thenReturn((Object)lastStableOffset);
        Mockito.when((Object)this.mockLog.logEndOffset()).thenReturn((Object)logEndOffset);
        LazyIndex idx = LazyIndex.forOffset((File)UnifiedLog.offsetIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1000);
        LazyIndex timeIdx = LazyIndex.forTime((File)UnifiedLog.timeIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1500);
        File txnFile = UnifiedLog.transactionIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)"");
        txnFile.createNewFile();
        TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
        Mockito.when((Object)oldSegment.lazyTimeIndex()).thenReturn((Object)timeIdx);
        Mockito.when((Object)oldSegment.lazyOffsetIndex()).thenReturn((Object)idx);
        Mockito.when((Object)oldSegment.txnIndex()).thenReturn((Object)txnIndex);
        CompletableFuture<Object> dummyFuture = new CompletableFuture<Object>();
        dummyFuture.complete(null);
        Mockito.when((Object)this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
        Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
        Mockito.when((Object)this.remoteStorageManager.copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class))).thenReturn(Optional.empty());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        task.convertToLeader(2);
        task.copyLogSegmentsToRemote(this.mockLog);
        ArgumentCaptor remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager)).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg.capture());
        TreeMap<Integer, Long> expectedLeaderEpochs = new TreeMap<Integer, Long>();
        expectedLeaderEpochs.put(this.epochEntry0.epoch, this.epochEntry0.startOffset);
        expectedLeaderEpochs.put(this.epochEntry1.epoch, this.epochEntry1.startOffset);
        this.verifyRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg.getValue(), oldSegmentStartOffset, oldSegmentEndOffset, expectedLeaderEpochs);
        ArgumentCaptor remoteLogSegmentMetadataArg2 = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ArgumentCaptor logSegmentDataArg = ArgumentCaptor.forClass(LogSegmentData.class);
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)1))).copyLogSegmentData((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg2.capture(), (LogSegmentData)logSegmentDataArg.capture());
        Assertions.assertEquals((Object)remoteLogSegmentMetadataArg.getValue(), (Object)remoteLogSegmentMetadataArg2.getValue());
        this.verifyLogSegmentData((LogSegmentData)logSegmentDataArg.getValue(), idx, timeIdx, txnIndex, tempFile, mockProducerSnapshotIndex, Arrays.asList(this.epochEntry0, this.epochEntry1));
        ArgumentCaptor remoteLogSegmentMetadataUpdateArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadataUpdate.class);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)remoteLogSegmentMetadataUpdateArg.capture());
        this.verifyRemoteLogSegmentMetadataUpdate((RemoteLogSegmentMetadataUpdate)remoteLogSegmentMetadataUpdateArg.getValue());
        ArgumentCaptor argument = ArgumentCaptor.forClass(Long.class);
        ((UnifiedLog)Mockito.verify((Object)this.mockLog, (VerificationMode)Mockito.times((int)1))).updateHighestOffsetInRemoteStorage(((Long)argument.capture()).longValue());
        Assertions.assertEquals((long)oldSegmentEndOffset, (Long)((Long)argument.getValue()));
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals((long)10L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals((long)10L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    @Test
    void testCustomMetadataSizeExceedsLimit() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        long lastStableOffset = 150L;
        long logEndOffset = 150L;
        Mockito.when((Object)this.mockLog.topicPartition()).thenReturn((Object)this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(-1L));
        File tempFile = TestUtils.tempFile();
        File mockProducerSnapshotIndex = TestUtils.tempFile();
        File tempDir = TestUtils.tempDirectory();
        LogSegment oldSegment = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)oldSegment.baseOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)nextSegmentStartOffset);
        ((LogSegment)Mockito.verify((Object)oldSegment, (VerificationMode)Mockito.times((int)0))).readNextOffset();
        ((LogSegment)Mockito.verify((Object)activeSegment, (VerificationMode)Mockito.times((int)0))).readNextOffset();
        FileRecords fileRecords = (FileRecords)Mockito.mock(FileRecords.class);
        Mockito.when((Object)oldSegment.log()).thenReturn((Object)fileRecords);
        Mockito.when((Object)fileRecords.file()).thenReturn((Object)tempFile);
        Mockito.when((Object)fileRecords.sizeInBytes()).thenReturn((Object)10);
        Mockito.when((Object)oldSegment.readNextOffset()).thenReturn((Object)nextSegmentStartOffset);
        Mockito.when((Object)this.mockLog.activeSegment()).thenReturn((Object)activeSegment);
        Mockito.when((Object)this.mockLog.logStartOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
        ProducerStateManager mockStateManager = (ProducerStateManager)Mockito.mock(ProducerStateManager.class);
        Mockito.when((Object)this.mockLog.producerStateManager()).thenReturn((Object)mockStateManager);
        Mockito.when((Object)mockStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
        Mockito.when((Object)this.mockLog.lastStableOffset()).thenReturn((Object)lastStableOffset);
        Mockito.when((Object)this.mockLog.logEndOffset()).thenReturn((Object)logEndOffset);
        LazyIndex idx = LazyIndex.forOffset((File)UnifiedLog.offsetIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1000);
        LazyIndex timeIdx = LazyIndex.forTime((File)UnifiedLog.timeIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1500);
        File txnFile = UnifiedLog.transactionIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)"");
        txnFile.createNewFile();
        TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
        Mockito.when((Object)oldSegment.lazyTimeIndex()).thenReturn((Object)timeIdx);
        Mockito.when((Object)oldSegment.lazyOffsetIndex()).thenReturn((Object)idx);
        Mockito.when((Object)oldSegment.txnIndex()).thenReturn((Object)txnIndex);
        int customMetadataSizeLimit = 128;
        RemoteLogSegmentMetadata.CustomMetadata customMetadata = new RemoteLogSegmentMetadata.CustomMetadata(new byte[customMetadataSizeLimit * 2]);
        CompletableFuture<Object> dummyFuture = new CompletableFuture<Object>();
        dummyFuture.complete(null);
        Mockito.when((Object)this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
        Mockito.when((Object)this.remoteStorageManager.copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class))).thenReturn(Optional.of(customMetadata));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, customMetadataSizeLimit);
        task.convertToLeader(2);
        task.copyLogSegmentsToRemote(this.mockLog);
        ArgumentCaptor remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager)).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg.capture());
        RemoteLogSegmentMetadataUpdate expectedMetadataUpdate = new RemoteLogSegmentMetadataUpdate(((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg.getValue()).remoteLogSegmentId(), this.time.milliseconds(), Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 0);
        RemoteLogSegmentMetadata expectedDeleteMetadata = ((RemoteLogSegmentMetadata)remoteLogSegmentMetadataArg.getValue()).createWithUpdates(expectedMetadataUpdate);
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)1))).deleteLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.eq((Object)expectedDeleteMetadata));
        Assertions.assertTrue((boolean)task.isCancelled());
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyBytesRate().count());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyBytesRate().count());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    @Test
    void testRemoteLogManagerTasksAvgIdlePercentMetrics() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        Mockito.when((Object)this.mockLog.topicPartition()).thenReturn((Object)this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        File tempFile = TestUtils.tempFile();
        File mockProducerSnapshotIndex = TestUtils.tempFile();
        File tempDir = TestUtils.tempDirectory();
        LogSegment oldSegment = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)oldSegment.baseOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)nextSegmentStartOffset);
        FileRecords fileRecords = (FileRecords)Mockito.mock(FileRecords.class);
        Mockito.when((Object)oldSegment.log()).thenReturn((Object)fileRecords);
        Mockito.when((Object)fileRecords.file()).thenReturn((Object)tempFile);
        Mockito.when((Object)fileRecords.sizeInBytes()).thenReturn((Object)10);
        Mockito.when((Object)oldSegment.readNextOffset()).thenReturn((Object)nextSegmentStartOffset);
        Mockito.when((Object)this.mockLog.activeSegment()).thenReturn((Object)activeSegment);
        Mockito.when((Object)this.mockLog.logStartOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
        ProducerStateManager mockStateManager = (ProducerStateManager)Mockito.mock(ProducerStateManager.class);
        Mockito.when((Object)this.mockLog.producerStateManager()).thenReturn((Object)mockStateManager);
        Mockito.when((Object)mockStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
        Mockito.when((Object)this.mockLog.lastStableOffset()).thenReturn((Object)250L);
        LazyIndex idx = LazyIndex.forOffset((File)UnifiedLog.offsetIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1000);
        LazyIndex timeIdx = LazyIndex.forTime((File)UnifiedLog.timeIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1500);
        File txnFile = UnifiedLog.transactionIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)"");
        txnFile.createNewFile();
        TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
        Mockito.when((Object)oldSegment.lazyTimeIndex()).thenReturn((Object)timeIdx);
        Mockito.when((Object)oldSegment.lazyOffsetIndex()).thenReturn((Object)idx);
        Mockito.when((Object)oldSegment.txnIndex()).thenReturn((Object)txnIndex);
        CompletableFuture<Object> dummyFuture = new CompletableFuture<Object>();
        dummyFuture.complete(null);
        Mockito.when((Object)this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
        Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
        CountDownLatch latch = new CountDownLatch(1);
        ((RemoteStorageManager)Mockito.doAnswer(ans -> {
            latch.await();
            return null;
        }).when((Object)this.remoteStorageManager)).copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class));
        Partition mockLeaderPartition = this.mockPartition(this.leaderTopicIdPartition);
        Partition mockFollowerPartition = this.mockPartition(this.followerTopicIdPartition);
        Assertions.assertEquals((double)1.0, (double)this.yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent"));
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), this.topicIds);
        Assertions.assertTrue((this.yammerMetricValue("RemoteLogManagerTasksAvgIdlePercent") < 1.0 ? 1 : 0) != 0);
        latch.countDown();
    }

    private double yammerMetricValue(String name) {
        Gauge guage = (Gauge)KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream().filter(e -> ((MetricName)e.getKey()).getMBeanName().contains(name)).findFirst().get().getValue();
        return (Double)guage.value();
    }

    @Test
    void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        Mockito.when((Object)this.mockLog.topicPartition()).thenReturn((Object)this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        File tempFile = TestUtils.tempFile();
        File mockProducerSnapshotIndex = TestUtils.tempFile();
        File tempDir = TestUtils.tempDirectory();
        LogSegment oldSegment = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)oldSegment.baseOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)nextSegmentStartOffset);
        FileRecords fileRecords = (FileRecords)Mockito.mock(FileRecords.class);
        Mockito.when((Object)oldSegment.log()).thenReturn((Object)fileRecords);
        Mockito.when((Object)fileRecords.file()).thenReturn((Object)tempFile);
        Mockito.when((Object)fileRecords.sizeInBytes()).thenReturn((Object)10);
        Mockito.when((Object)oldSegment.readNextOffset()).thenReturn((Object)nextSegmentStartOffset);
        Mockito.when((Object)this.mockLog.activeSegment()).thenReturn((Object)activeSegment);
        Mockito.when((Object)this.mockLog.logStartOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
        ProducerStateManager mockStateManager = (ProducerStateManager)Mockito.mock(ProducerStateManager.class);
        Mockito.when((Object)this.mockLog.producerStateManager()).thenReturn((Object)mockStateManager);
        Mockito.when((Object)mockStateManager.fetchSnapshot(ArgumentMatchers.anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
        Mockito.when((Object)this.mockLog.lastStableOffset()).thenReturn((Object)250L);
        LazyIndex idx = LazyIndex.forOffset((File)UnifiedLog.offsetIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1000);
        LazyIndex timeIdx = LazyIndex.forTime((File)UnifiedLog.timeIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)""), (long)oldSegmentStartOffset, (int)1500);
        File txnFile = UnifiedLog.transactionIndexFile((File)tempDir, (long)oldSegmentStartOffset, (String)"");
        txnFile.createNewFile();
        TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
        Mockito.when((Object)oldSegment.lazyTimeIndex()).thenReturn((Object)timeIdx);
        Mockito.when((Object)oldSegment.lazyOffsetIndex()).thenReturn((Object)idx);
        Mockito.when((Object)oldSegment.txnIndex()).thenReturn((Object)txnIndex);
        CompletableFuture<Object> dummyFuture = new CompletableFuture<Object>();
        dummyFuture.complete(null);
        Mockito.when((Object)this.remoteLogMetadataManager.addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
        ((RemoteStorageManager)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException()}).when((Object)this.remoteStorageManager)).copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class));
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        task.convertToLeader(2);
        task.copyLogSegmentsToRemote(this.mockLog);
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)1))).copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class));
        ((UnifiedLog)Mockito.verify((Object)this.mockLog, (VerificationMode)Mockito.times((int)0))).updateHighestOffsetInRemoteStorage(ArgumentMatchers.anyLong());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    @Test
    void testCopyLogSegmentsToRemoteShouldNotCopySegmentForFollower() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenReturn(Optional.of(0L));
        LogSegment oldSegment = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)oldSegment.baseOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)nextSegmentStartOffset);
        Mockito.when((Object)this.mockLog.activeSegment()).thenReturn((Object)activeSegment);
        Mockito.when((Object)this.mockLog.logStartOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
        Mockito.when((Object)this.mockLog.lastStableOffset()).thenReturn((Object)250L);
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        task.convertToFollower();
        task.copyLogSegmentsToRemote(this.mockLog);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.never())).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.never())).copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class));
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        ((UnifiedLog)Mockito.verify((Object)this.mockLog, (VerificationMode)Mockito.never())).updateHighestOffsetInRemoteStorage(ArgumentMatchers.anyLong());
    }

    @Test
    void testRLMTaskDoesNotUploadSegmentsWhenRemoteLogMetadataManagerIsNotInitialized() throws Exception {
        long oldSegmentStartOffset = 0L;
        long nextSegmentStartOffset = 150L;
        Mockito.when((Object)this.mockLog.topicPartition()).thenReturn((Object)this.leaderTopicIdPartition.topicPartition());
        this.checkpoint.write(this.totalEpochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.remoteLogMetadataManager.highestOffsetForEpoch((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt())).thenThrow(new Throwable[]{new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + this.leaderTopicIdPartition)});
        LogSegment oldSegment = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)oldSegment.baseOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)nextSegmentStartOffset);
        Mockito.when((Object)this.mockLog.activeSegment()).thenReturn((Object)activeSegment);
        Mockito.when((Object)this.mockLog.logStartOffset()).thenReturn((Object)oldSegmentStartOffset);
        Mockito.when((Object)this.mockLog.logSegments(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
        Mockito.when((Object)this.mockLog.lastStableOffset()).thenReturn((Object)250L);
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        task.convertToLeader(0);
        task.run();
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.never())).addRemoteLogSegmentMetadata((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class));
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.never())).copyLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (LogSegmentData)ArgumentMatchers.any(LogSegmentData.class));
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.never())).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class));
        ((UnifiedLog)Mockito.verify((Object)this.mockLog, (VerificationMode)Mockito.never())).updateHighestOffsetInRemoteStorage(ArgumentMatchers.anyLong());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.topicStats(this.leaderTopicIdPartition.topic()).failedRemoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().remoteCopyRequestRate().count());
        Assertions.assertEquals((long)0L, (long)this.brokerTopicStats.allTopicsStats().failedRemoteCopyRequestRate().count());
    }

    private void verifyRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long oldSegmentStartOffset, long oldSegmentEndOffset, Map<Integer, Long> expectedLeaderEpochs) {
        Assertions.assertEquals((Object)this.leaderTopicIdPartition, (Object)remoteLogSegmentMetadata.remoteLogSegmentId().topicIdPartition());
        Assertions.assertEquals((long)oldSegmentStartOffset, (long)remoteLogSegmentMetadata.startOffset());
        Assertions.assertEquals((long)oldSegmentEndOffset, (long)remoteLogSegmentMetadata.endOffset());
        NavigableMap leaderEpochs = remoteLogSegmentMetadata.segmentLeaderEpochs();
        Assertions.assertEquals((int)expectedLeaderEpochs.size(), (int)leaderEpochs.size());
        Iterator<Map.Entry<Integer, Long>> leaderEpochEntries = expectedLeaderEpochs.entrySet().iterator();
        Assertions.assertEquals(leaderEpochEntries.next(), leaderEpochs.firstEntry());
        Assertions.assertEquals(leaderEpochEntries.next(), leaderEpochs.lastEntry());
        Assertions.assertEquals((int)0, (int)remoteLogSegmentMetadata.brokerId());
        Assertions.assertEquals((Object)RemoteLogSegmentState.COPY_SEGMENT_STARTED, (Object)remoteLogSegmentMetadata.state());
    }

    private void verifyRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
        Assertions.assertEquals((Object)this.leaderTopicIdPartition, (Object)remoteLogSegmentMetadataUpdate.remoteLogSegmentId().topicIdPartition());
        Assertions.assertEquals((int)0, (int)remoteLogSegmentMetadataUpdate.brokerId());
        Assertions.assertEquals((Object)RemoteLogSegmentState.COPY_SEGMENT_FINISHED, (Object)remoteLogSegmentMetadataUpdate.state());
    }

    private void verifyLogSegmentData(LogSegmentData logSegmentData, LazyIndex idx, LazyIndex timeIdx, TransactionIndex txnIndex, File tempFile, File mockProducerSnapshotIndex, List<EpochEntry> expectedLeaderEpoch) throws IOException {
        Assertions.assertEquals((Object)idx.file().getAbsolutePath(), (Object)logSegmentData.offsetIndex().toAbsolutePath().toString());
        Assertions.assertEquals((Object)timeIdx.file().getAbsolutePath(), (Object)logSegmentData.timeIndex().toAbsolutePath().toString());
        Assertions.assertEquals((Object)txnIndex.file().getPath(), (Object)((Path)logSegmentData.transactionIndex().get()).toAbsolutePath().toString());
        Assertions.assertEquals((Object)tempFile.getAbsolutePath(), (Object)logSegmentData.logSegment().toAbsolutePath().toString());
        Assertions.assertEquals((Object)mockProducerSnapshotIndex.getAbsolutePath(), (Object)logSegmentData.producerSnapshotIndex().toAbsolutePath().toString());
        InMemoryLeaderEpochCheckpoint inMemoryLeaderEpochCheckpoint = new InMemoryLeaderEpochCheckpoint();
        inMemoryLeaderEpochCheckpoint.write(expectedLeaderEpoch);
        Assertions.assertEquals((Object)inMemoryLeaderEpochCheckpoint.readAsByteBuffer(), (Object)logSegmentData.leaderEpochIndex());
    }

    @Test
    void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
        final ClassLoaderAwareRemoteStorageManager rsmManager = (ClassLoaderAwareRemoteStorageManager)Mockito.mock(ClassLoaderAwareRemoteStorageManager.class);
        try (RemoteLogManager remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, 0, this.logDir, "dummyId", this.time, t -> Optional.empty(), (topicPartition, offset) -> {}, this.brokerTopicStats){

            public RemoteStorageManager createRemoteStorageManager() {
                return rsmManager;
            }
        };){
            Assertions.assertEquals((Object)rsmManager, (Object)remoteLogManager.storageManager());
        }
    }

    private void verifyInCache(TopicIdPartition ... topicIdPartitions) {
        Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> Assertions.assertDoesNotThrow(() -> this.remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)));
    }

    private void verifyNotInCache(TopicIdPartition ... topicIdPartitions) {
        Arrays.stream(topicIdPartitions).forEach(topicIdPartition -> Assertions.assertThrows(KafkaException.class, () -> this.remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), 0, 0L)));
    }

    @Test
    void testTopicIdCacheUpdates() throws RemoteStorageException {
        Partition mockLeaderPartition = this.mockPartition(this.leaderTopicIdPartition);
        Partition mockFollowerPartition = this.mockPartition(this.followerTopicIdPartition);
        Mockito.when((Object)this.remoteLogMetadataManager.remoteLogSegmentMetadata((TopicIdPartition)ArgumentMatchers.any(TopicIdPartition.class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        this.verifyNotInCache(this.followerTopicIdPartition, this.leaderTopicIdPartition);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(mockLeaderPartition), Collections.singleton(mockFollowerPartition), this.topicIds);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).onPartitionLeadershipChanges(Collections.singleton(this.leaderTopicIdPartition), Collections.singleton(this.followerTopicIdPartition));
        this.verifyInCache(this.followerTopicIdPartition, this.leaderTopicIdPartition);
        this.remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(this.leaderTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> {});
        this.verifyNotInCache(this.leaderTopicIdPartition);
        this.verifyInCache(this.followerTopicIdPartition);
        this.remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(this.followerTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> {});
        this.verifyNotInCache(this.leaderTopicIdPartition, this.followerTopicIdPartition);
    }

    @Test
    void testFetchRemoteLogSegmentMetadata() throws RemoteStorageException {
        this.remoteLogManager.onLeadershipChange(Collections.singleton(this.mockPartition(this.leaderTopicIdPartition)), Collections.singleton(this.mockPartition(this.followerTopicIdPartition)), this.topicIds);
        this.remoteLogManager.fetchRemoteLogSegmentMetadata(this.leaderTopicIdPartition.topicPartition(), 10, 100L);
        this.remoteLogManager.fetchRemoteLogSegmentMetadata(this.followerTopicIdPartition.topicPartition(), 20, 200L);
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager)).remoteLogSegmentMetadata((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager)).remoteLogSegmentMetadata((TopicIdPartition)ArgumentMatchers.eq((Object)this.followerTopicIdPartition), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong());
    }

    @Test
    void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() {
        RemoteLogManager spyRemoteLogManager = (RemoteLogManager)Mockito.spy((Object)this.remoteLogManager);
        spyRemoteLogManager.onLeadershipChange(Collections.emptySet(), Collections.singleton(this.mockPartition(this.followerTopicIdPartition)), this.topicIds);
        ((RemoteLogManager)Mockito.verify((Object)spyRemoteLogManager)).doHandleLeaderOrFollowerPartitions((TopicIdPartition)ArgumentMatchers.eq((Object)this.followerTopicIdPartition), (Consumer)ArgumentMatchers.any(Consumer.class));
        Mockito.reset((Object[])new RemoteLogManager[]{spyRemoteLogManager});
        spyRemoteLogManager.onLeadershipChange(Collections.singleton(this.mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
        ((RemoteLogManager)Mockito.verify((Object)spyRemoteLogManager)).doHandleLeaderOrFollowerPartitions((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), (Consumer)ArgumentMatchers.any(Consumer.class));
    }

    private MemoryRecords records(long timestamp, long initialOffset, int partitionLeaderEpoch) {
        return MemoryRecords.withRecords((long)initialOffset, (CompressionType)CompressionType.NONE, (Integer)partitionLeaderEpoch, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(timestamp - 1L, "first message".getBytes()), new SimpleRecord(timestamp + 1L, "second message".getBytes()), new SimpleRecord(timestamp + 2L, "third message".getBytes())});
    }

    @Test
    void testRLMTaskShouldSetLeaderEpochCorrectly() {
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        Assertions.assertFalse((boolean)task.isLeader());
        task.convertToLeader(1);
        Assertions.assertTrue((boolean)task.isLeader());
        task.convertToFollower();
        Assertions.assertFalse((boolean)task.isLeader());
    }

    @Test
    void testFindOffsetByTimestamp() throws IOException, RemoteStorageException {
        TopicPartition tp = this.leaderTopicIdPartition.topicPartition();
        long ts = this.time.milliseconds();
        long startOffset = 120L;
        int targetLeaderEpoch = 10;
        TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<Integer, Long>();
        validSegmentEpochs.put(targetLeaderEpoch, startOffset);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, this.checkpoint);
        leaderEpochFileCache.assign(4, 99L);
        leaderEpochFileCache.assign(5, 99L);
        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
        leaderEpochFileCache.assign(12, 500L);
        this.doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Optional maybeTimestampAndOffset1 = this.remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.of(new FileRecords.TimestampAndOffset(ts + 1L, startOffset + 1L, Optional.of(targetLeaderEpoch))), (Object)maybeTimestampAndOffset1);
        Optional maybeTimestampAndOffset2 = this.remoteLogManager.findOffsetByTimestamp(tp, ts + 2L, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.of(new FileRecords.TimestampAndOffset(ts + 2L, startOffset + 2L, Optional.of(targetLeaderEpoch))), (Object)maybeTimestampAndOffset2);
        Optional maybeTimestampAndOffset3 = this.remoteLogManager.findOffsetByTimestamp(tp, ts + 3L, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.empty(), (Object)maybeTimestampAndOffset3);
    }

    @Test
    void testFindOffsetByTimestampWithInvalidEpochSegments() throws IOException, RemoteStorageException {
        TopicPartition tp = this.leaderTopicIdPartition.topicPartition();
        long ts = this.time.milliseconds();
        long startOffset = 120L;
        int targetLeaderEpoch = 10;
        TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<Integer, Long>();
        validSegmentEpochs.put(targetLeaderEpoch - 1, startOffset - 1L);
        validSegmentEpochs.put(targetLeaderEpoch, startOffset);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, this.checkpoint);
        leaderEpochFileCache.assign(4, 99L);
        leaderEpochFileCache.assign(5, 99L);
        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
        leaderEpochFileCache.assign(12, 500L);
        this.doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Optional maybeTimestampAndOffset1 = this.remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.empty(), (Object)maybeTimestampAndOffset1);
        Optional maybeTimestampAndOffset2 = this.remoteLogManager.findOffsetByTimestamp(tp, ts + 2L, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.empty(), (Object)maybeTimestampAndOffset2);
        Optional maybeTimestampAndOffset3 = this.remoteLogManager.findOffsetByTimestamp(tp, ts + 3L, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.empty(), (Object)maybeTimestampAndOffset3);
    }

    @Test
    void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, RemoteStorageException {
        TopicPartition tp = this.leaderTopicIdPartition.topicPartition();
        long ts = this.time.milliseconds();
        long startOffset = 120L;
        int targetLeaderEpoch = 10;
        TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<Integer, Long>();
        validSegmentEpochs.put(targetLeaderEpoch, startOffset);
        LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, this.checkpoint);
        leaderEpochFileCache.assign(4, 99L);
        leaderEpochFileCache.assign(5, 99L);
        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
        leaderEpochFileCache.assign(12, 500L);
        this.doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
        Optional maybeTimestampAndOffset = this.remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, leaderEpochFileCache);
        Assertions.assertEquals(Optional.empty(), (Object)maybeTimestampAndOffset);
    }

    private void doTestFindOffsetByTimestamp(long ts, long startOffset, int targetLeaderEpoch, TreeMap<Integer, Long> validSegmentEpochs, RemoteLogSegmentState state) throws IOException, RemoteStorageException {
        TopicPartition tp = this.leaderTopicIdPartition.topicPartition();
        RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid());
        RemoteLogSegmentMetadata segmentMetadata = (RemoteLogSegmentMetadata)Mockito.mock(RemoteLogSegmentMetadata.class);
        Mockito.when((Object)segmentMetadata.remoteLogSegmentId()).thenReturn((Object)remoteLogSegmentId);
        Mockito.when((Object)segmentMetadata.maxTimestampMs()).thenReturn((Object)(ts + 2L));
        Mockito.when((Object)segmentMetadata.startOffset()).thenReturn((Object)startOffset);
        Mockito.when((Object)segmentMetadata.endOffset()).thenReturn((Object)(startOffset + 2L));
        Mockito.when((Object)segmentMetadata.segmentLeaderEpochs()).thenReturn(validSegmentEpochs);
        Mockito.when((Object)segmentMetadata.state()).thenReturn((Object)state);
        File tpDir = new File(this.logDir, tp.toString());
        Files.createDirectory(tpDir.toPath(), new FileAttribute[0]);
        File txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix());
        txnIdxFile.createNewFile();
        Mockito.when((Object)this.remoteStorageManager.fetchIndex((RemoteLogSegmentMetadata)ArgumentMatchers.any(RemoteLogSegmentMetadata.class), (RemoteStorageManager.IndexType)ArgumentMatchers.any(RemoteStorageManager.IndexType.class))).thenAnswer(ans -> {
            RemoteLogSegmentMetadata metadata = (RemoteLogSegmentMetadata)ans.getArgument(0);
            RemoteStorageManager.IndexType indexType = (RemoteStorageManager.IndexType)ans.getArgument(1);
            int maxEntries = (int)(metadata.endOffset() - metadata.startOffset());
            OffsetIndex offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix()), metadata.startOffset(), maxEntries * 8);
            TimeIndex timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix()), metadata.startOffset(), maxEntries * 12);
            switch (indexType) {
                case OFFSET: {
                    return Files.newInputStream(offsetIdx.file().toPath(), new OpenOption[0]);
                }
                case TIMESTAMP: {
                    return Files.newInputStream(timeIdx.file().toPath(), new OpenOption[0]);
                }
                case TRANSACTION: {
                    return Files.newInputStream(txnIdxFile.toPath(), new OpenOption[0]);
                }
            }
            return null;
        });
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(ans -> {
            int leaderEpoch = (Integer)ans.getArgument(1);
            if (leaderEpoch == targetLeaderEpoch) {
                return Collections.singleton(segmentMetadata).iterator();
            }
            return Collections.emptyIterator();
        });
        Mockito.when((Object)this.remoteStorageManager.fetchLogSegment(segmentMetadata, 0)).thenAnswer(a -> new ByteArrayInputStream(this.records(ts, startOffset, targetLeaderEpoch).buffer().array()));
        Mockito.when((Object)this.mockLog.logEndOffset()).thenReturn((Object)600L);
        this.remoteLogManager.onLeadershipChange(Collections.singleton(this.mockPartition(this.leaderTopicIdPartition)), Collections.emptySet(), this.topicIds);
    }

    @Test
    void testIdempotentClose() throws IOException {
        this.remoteLogManager.close();
        this.remoteLogManager.close();
        InOrder inorder = Mockito.inOrder((Object[])new Object[]{this.remoteStorageManager, this.remoteLogMetadataManager});
        ((RemoteStorageManager)inorder.verify((Object)this.remoteStorageManager, Mockito.times((int)1))).close();
        ((RemoteLogMetadataManager)inorder.verify((Object)this.remoteLogMetadataManager, Mockito.times((int)1))).close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRemoveMetricsOnClose() throws IOException {
        try (MockedConstruction mockMetricsGroupCtor = Mockito.mockConstruction(KafkaMetricsGroup.class);){
            RemoteLogManager remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, 0, this.logDir, "dummyId", this.time, tp -> Optional.of(this.mockLog), (topicPartition, offset) -> {}, this.brokerTopicStats){

                public RemoteStorageManager createRemoteStorageManager() {
                    return RemoteLogManagerTest.this.remoteStorageManager;
                }

                public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                    return RemoteLogManagerTest.this.remoteLogMetadataManager;
                }
            };
            remoteLogManager.close();
            KafkaMetricsGroup mockRlmMetricsGroup = (KafkaMetricsGroup)mockMetricsGroupCtor.constructed().get(0);
            KafkaMetricsGroup mockThreadPoolMetricsGroup = (KafkaMetricsGroup)mockMetricsGroupCtor.constructed().get(1);
            List<String> remoteLogManagerMetricNames = Collections.singletonList(RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
            Set remoteStorageThreadPoolMetricNames = RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
            ((KafkaMetricsGroup)Mockito.verify((Object)mockRlmMetricsGroup, (VerificationMode)Mockito.times((int)remoteLogManagerMetricNames.size()))).newGauge(ArgumentMatchers.anyString(), (Gauge)ArgumentMatchers.any());
            remoteLogManagerMetricNames.forEach(metricName -> ((KafkaMetricsGroup)Mockito.verify((Object)mockRlmMetricsGroup)).removeMetric(metricName));
            ((KafkaMetricsGroup)Mockito.verify((Object)mockThreadPoolMetricsGroup, (VerificationMode)Mockito.times((int)remoteStorageThreadPoolMetricNames.size()))).newGauge(ArgumentMatchers.anyString(), (Gauge)ArgumentMatchers.any());
            remoteStorageThreadPoolMetricNames.forEach(metricName -> ((KafkaMetricsGroup)Mockito.verify((Object)mockThreadPoolMetricsGroup)).removeMetric(metricName));
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockRlmMetricsGroup});
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{mockThreadPoolMetricsGroup});
        }
    }

    private static RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(long startOffset, long endOffset, Map<Integer, Long> segmentEpochs) {
        return new RemoteLogSegmentMetadata(new RemoteLogSegmentId(new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic", 0)), Uuid.randomUuid()), startOffset, endOffset, 100000L, 1, 100000L, 1000, Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, segmentEpochs);
    }

    @Test
    public void testBuildFilteredLeaderEpochMap() {
        TreeMap<Integer, Long> leaderEpochToStartOffset = new TreeMap<Integer, Long>();
        leaderEpochToStartOffset.put(0, 0L);
        leaderEpochToStartOffset.put(1, 0L);
        leaderEpochToStartOffset.put(2, 0L);
        leaderEpochToStartOffset.put(3, 30L);
        leaderEpochToStartOffset.put(4, 40L);
        leaderEpochToStartOffset.put(5, 60L);
        leaderEpochToStartOffset.put(6, 60L);
        leaderEpochToStartOffset.put(7, 70L);
        leaderEpochToStartOffset.put(8, 70L);
        TreeMap<Integer, Long> expectedLeaderEpochs = new TreeMap<Integer, Long>();
        expectedLeaderEpochs.put(2, 0L);
        expectedLeaderEpochs.put(3, 30L);
        expectedLeaderEpochs.put(4, 40L);
        expectedLeaderEpochs.put(6, 60L);
        expectedLeaderEpochs.put(8, 70L);
        NavigableMap refinedLeaderEpochMap = RemoteLogManager.buildFilteredLeaderEpochMap(leaderEpochToStartOffset);
        Assertions.assertEquals(expectedLeaderEpochs, (Object)refinedLeaderEpochMap);
    }

    @Test
    public void testRemoteSegmentWithinLeaderEpochs() {
        long logEndOffset = 90L;
        TreeMap<Integer, Long> leaderEpochToStartOffset = new TreeMap<Integer, Long>();
        leaderEpochToStartOffset.put(0, 0L);
        leaderEpochToStartOffset.put(1, 10L);
        leaderEpochToStartOffset.put(2, 20L);
        leaderEpochToStartOffset.put(3, 30L);
        leaderEpochToStartOffset.put(4, 40L);
        leaderEpochToStartOffset.put(5, 50L);
        leaderEpochToStartOffset.put(7, 70L);
        TreeMap<Integer, Long> segmentEpochs1 = new TreeMap<Integer, Long>();
        segmentEpochs1.put(1, 15L);
        segmentEpochs1.put(2, 20L);
        segmentEpochs1.put(3, 30L);
        Assertions.assertTrue((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(15L, 35L, segmentEpochs1), (long)90L, leaderEpochToStartOffset));
        TreeMap<Integer, Long> segmentEpochs2 = new TreeMap<Integer, Long>();
        segmentEpochs2.put(1, 15L);
        Assertions.assertTrue((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(15L, 19L, segmentEpochs2), (long)90L, leaderEpochToStartOffset));
        TreeMap<Integer, Long> segmentEpochs3 = new TreeMap<Integer, Long>();
        segmentEpochs3.put(0, 0L);
        Assertions.assertTrue((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(0L, 5L, segmentEpochs3), (long)90L, leaderEpochToStartOffset));
        TreeMap<Integer, Long> segmentEpochs4 = new TreeMap<Integer, Long>();
        segmentEpochs4.put(7, 70L);
        Assertions.assertTrue((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(70L, 75L, segmentEpochs4), (long)90L, leaderEpochToStartOffset));
        TreeMap<Integer, Long> segmentEpochs5 = new TreeMap<Integer, Long>();
        segmentEpochs5.put(1, 15L);
        segmentEpochs5.put(2, 20L);
        Assertions.assertTrue((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(15L, 29L, segmentEpochs5), (long)90L, leaderEpochToStartOffset));
        TreeMap<Integer, Long> segmentEpochs6 = new TreeMap<Integer, Long>();
        segmentEpochs6.put(5, 55L);
        segmentEpochs6.put(6, 60L);
        segmentEpochs6.put(7, 70L);
        Assertions.assertFalse((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(55L, 85L, segmentEpochs6), (long)90L, leaderEpochToStartOffset));
        TreeMap<Integer, Long> segmentEpochs7 = new TreeMap<Integer, Long>();
        segmentEpochs7.put(1, 15L);
        segmentEpochs7.put(2, 20L);
        segmentEpochs7.put(4, 40L);
        Assertions.assertFalse((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(15L, 45L, segmentEpochs7), (long)90L, leaderEpochToStartOffset));
        Assertions.assertFalse((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(15L, 95L, leaderEpochToStartOffset), (long)90L, leaderEpochToStartOffset));
        Assertions.assertFalse((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(15L, 90L, leaderEpochToStartOffset), (long)90L, leaderEpochToStartOffset));
        TreeMap<Integer, Long> segmentEpochs9 = new TreeMap<Integer, Long>();
        segmentEpochs9.put(1, 5L);
        segmentEpochs9.put(2, 20L);
        Assertions.assertFalse((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(5L, 25L, segmentEpochs9), (long)90L, leaderEpochToStartOffset));
        TreeMap<Integer, Long> segmentEpochs10 = new TreeMap<Integer, Long>();
        segmentEpochs10.put(1, 15L);
        segmentEpochs10.put(2, 20L);
        Assertions.assertFalse((boolean)RemoteLogManager.isRemoteSegmentWithinLeaderEpochs((RemoteLogSegmentMetadata)RemoteLogManagerTest.createRemoteLogSegmentMetadata(15L, 35L, segmentEpochs10), (long)90L, leaderEpochToStartOffset));
    }

    @Test
    public void testCandidateLogSegmentsSkipsActiveSegment() {
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        LogSegment segment1 = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment segment2 = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)segment1.baseOffset()).thenReturn((Object)5L);
        Mockito.when((Object)segment2.baseOffset()).thenReturn((Object)10L);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)15L);
        Mockito.when((Object)log.logSegments(5L, Long.MAX_VALUE)).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, activeSegment)));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        List<RemoteLogManager.EnrichedLogSegment> expected = Arrays.asList(new RemoteLogManager.EnrichedLogSegment(segment1, 10L), new RemoteLogManager.EnrichedLogSegment(segment2, 15L));
        List actual = task.candidateLogSegments(log, Long.valueOf(5L), Long.valueOf(20L));
        Assertions.assertEquals(expected, (Object)actual);
    }

    @Test
    public void testCandidateLogSegmentsSkipsSegmentsAfterLastStableOffset() {
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        LogSegment segment1 = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment segment2 = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment segment3 = (LogSegment)Mockito.mock(LogSegment.class);
        LogSegment activeSegment = (LogSegment)Mockito.mock(LogSegment.class);
        Mockito.when((Object)segment1.baseOffset()).thenReturn((Object)5L);
        Mockito.when((Object)segment2.baseOffset()).thenReturn((Object)10L);
        Mockito.when((Object)segment3.baseOffset()).thenReturn((Object)15L);
        Mockito.when((Object)activeSegment.baseOffset()).thenReturn((Object)20L);
        Mockito.when((Object)log.logSegments(5L, Long.MAX_VALUE)).thenReturn((Object)JavaConverters.collectionAsScalaIterable(Arrays.asList(segment1, segment2, segment3, activeSegment)));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        List<RemoteLogManager.EnrichedLogSegment> expected = Arrays.asList(new RemoteLogManager.EnrichedLogSegment(segment1, 10L), new RemoteLogManager.EnrichedLogSegment(segment2, 15L));
        List actual = task.candidateLogSegments(log, Long.valueOf(5L), Long.valueOf(15L));
        Assertions.assertEquals(expected, (Object)actual);
    }

    @Test
    public void testRemoteSizeData() {
        Supplier[] invalidRetentionSizeData;
        for (Supplier invalidRetentionSizeDataEntry : invalidRetentionSizeData = new Supplier[]{() -> new RemoteLogManager.RetentionSizeData(10L, 0L), () -> new RemoteLogManager.RetentionSizeData(10L, -1L), () -> new RemoteLogManager.RetentionSizeData(-1L, 10L), () -> new RemoteLogManager.RetentionSizeData(-1L, -1L), () -> new RemoteLogManager.RetentionSizeData(-1L, 0L)}) {
            Assertions.assertThrows(IllegalArgumentException.class, ((Supplier)invalidRetentionSizeDataEntry)::get);
        }
    }

    @Test
    public void testRemoteSizeTime() {
        Supplier[] invalidRetentionTimeData;
        for (Supplier invalidRetentionTimeDataEntry : invalidRetentionTimeData = new Supplier[]{() -> new RemoteLogManager.RetentionTimeData(-1L, 10L), () -> new RemoteLogManager.RetentionTimeData(10L, -1L)}) {
            Assertions.assertThrows(IllegalArgumentException.class, ((Supplier)invalidRetentionTimeDataEntry)::get);
        }
    }

    @Test
    public void testStopPartitionsWithoutDeletion() throws RemoteStorageException {
        BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, throwable) -> Assertions.fail((String)"shouldn't be called");
        HashSet<StopPartition> partitions = new HashSet<StopPartition>();
        partitions.add(new StopPartition(this.leaderTopicIdPartition.topicPartition(), true, false));
        partitions.add(new StopPartition(this.followerTopicIdPartition.topicPartition(), true, false));
        this.remoteLogManager.onLeadershipChange(Collections.singleton(this.mockPartition(this.leaderTopicIdPartition)), Collections.singleton(this.mockPartition(this.followerTopicIdPartition)), this.topicIds);
        Assertions.assertNotNull((Object)this.remoteLogManager.task(this.leaderTopicIdPartition));
        Assertions.assertNotNull((Object)this.remoteLogManager.task(this.followerTopicIdPartition));
        this.remoteLogManager.stopPartitions(partitions, errorHandler);
        Assertions.assertNull((Object)this.remoteLogManager.task(this.leaderTopicIdPartition));
        Assertions.assertNull((Object)this.remoteLogManager.task(this.followerTopicIdPartition));
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).onStopPartitions((Set)ArgumentMatchers.any());
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)0))).deleteLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any());
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)0))).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any());
    }

    @Test
    public void testStopPartitionsWithDeletion() throws RemoteStorageException {
        BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, ex) -> Assertions.fail((String)("shouldn't be called: " + ex));
        HashSet<StopPartition> partitions = new HashSet<StopPartition>();
        partitions.add(new StopPartition(this.leaderTopicIdPartition.topicPartition(), true, true));
        partitions.add(new StopPartition(this.followerTopicIdPartition.topicPartition(), true, true));
        this.remoteLogManager.onLeadershipChange(Collections.singleton(this.mockPartition(this.leaderTopicIdPartition)), Collections.singleton(this.mockPartition(this.followerTopicIdPartition)), this.topicIds);
        Assertions.assertNotNull((Object)this.remoteLogManager.task(this.leaderTopicIdPartition));
        Assertions.assertNotNull((Object)this.remoteLogManager.task(this.followerTopicIdPartition));
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition))).thenReturn(this.listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 5, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition)ArgumentMatchers.eq((Object)this.followerTopicIdPartition))).thenReturn(this.listRemoteLogSegmentMetadata(this.followerTopicIdPartition, 3, 100, 1024, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED).iterator());
        CompletableFuture<Object> dummyFuture = new CompletableFuture<Object>();
        dummyFuture.complete(null);
        Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any())).thenReturn(dummyFuture);
        this.remoteLogManager.stopPartitions(partitions, errorHandler);
        Assertions.assertNull((Object)this.remoteLogManager.task(this.leaderTopicIdPartition));
        Assertions.assertNull((Object)this.remoteLogManager.task(this.followerTopicIdPartition));
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)1))).onStopPartitions((Set)ArgumentMatchers.any());
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)8))).deleteLogSegmentData((RemoteLogSegmentMetadata)ArgumentMatchers.any());
        ((RemoteLogMetadataManager)Mockito.verify((Object)this.remoteLogMetadataManager, (VerificationMode)Mockito.times((int)16))).updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any());
    }

    @Test
    public void testFindLogStartOffset() throws RemoteStorageException, IOException {
        ArrayList<EpochEntry> epochEntries = new ArrayList<EpochEntry>();
        epochEntries.add(new EpochEntry(0, 0L));
        epochEntries.add(new EpochEntry(1, 250L));
        epochEntries.add(new EpochEntry(2, 550L));
        this.checkpoint.write(epochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        long timestamp = this.time.milliseconds();
        int segmentSize = 1024;
        List<RemoteLogSegmentMetadata> segmentMetadataList = Arrays.asList(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), 500L, 539L, timestamp, 0, timestamp, segmentSize, this.truncateAndGetLeaderEpochs(epochEntries, 500L, 539L)), new RemoteLogSegmentMetadata(new RemoteLogSegmentId(this.leaderTopicIdPartition, Uuid.randomUuid()), 540L, 700L, timestamp, 0, timestamp, segmentSize, this.truncateAndGetLeaderEpochs(epochEntries, 540L, 700L)));
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocation -> {
            int epoch = (Integer)invocation.getArgument(1);
            if (epoch == 1) {
                return segmentMetadataList.iterator();
            }
            return Collections.emptyIterator();
        });
        try (RemoteLogManager remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, 0, this.logDir, "dummyId", this.time, tp -> Optional.of(this.mockLog), (topicPartition, offset) -> {}, this.brokerTopicStats){

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };){
            Assertions.assertEquals((long)500L, (long)remoteLogManager.findLogStartOffset(this.leaderTopicIdPartition, this.mockLog));
        }
    }

    @Test
    public void testFindLogStartOffsetFallbackToLocalLogStartOffsetWhenRemoteIsEmpty() throws RemoteStorageException, IOException {
        ArrayList<EpochEntry> epochEntries = new ArrayList<EpochEntry>();
        epochEntries.add(new EpochEntry(1, 250L));
        epochEntries.add(new EpochEntry(2, 550L));
        this.checkpoint.write(epochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.mockLog.localLogStartOffset()).thenReturn((Object)250L);
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenReturn(Collections.emptyIterator());
        try (RemoteLogManager remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, 0, this.logDir, "dummyId", this.time, tp -> Optional.of(this.mockLog), (topicPartition, offset) -> {}, this.brokerTopicStats){

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };){
            Assertions.assertEquals((long)250L, (long)remoteLogManager.findLogStartOffset(this.leaderTopicIdPartition, this.mockLog));
        }
    }

    @Test
    public void testLogStartOffsetUpdatedOnStartup() throws RemoteStorageException, IOException, InterruptedException {
        ArrayList<EpochEntry> epochEntries = new ArrayList<EpochEntry>();
        epochEntries.add(new EpochEntry(1, 250L));
        epochEntries.add(new EpochEntry(2, 550L));
        this.checkpoint.write(epochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.leaderTopicIdPartition.topicPartition(), this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        RemoteLogSegmentMetadata metadata = (RemoteLogSegmentMetadata)Mockito.mock(RemoteLogSegmentMetadata.class);
        Mockito.when((Object)metadata.startOffset()).thenReturn((Object)600L);
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocation -> {
            int epoch = (Integer)invocation.getArgument(1);
            if (epoch == 2) {
                return Collections.singletonList(metadata).iterator();
            }
            return Collections.emptyIterator();
        });
        AtomicLong logStartOffset = new AtomicLong(0L);
        try (RemoteLogManager remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, 0, this.logDir, "dummyId", this.time, tp -> Optional.of(this.mockLog), (topicPartition, offset) -> logStartOffset.set((long)offset), this.brokerTopicStats){

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };){
            RemoteLogManager remoteLogManager2 = remoteLogManager;
            Objects.requireNonNull(remoteLogManager2);
            RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager2, this.leaderTopicIdPartition, 128);
            task.convertToLeader(4);
            task.copyLogSegmentsToRemote(this.mockLog);
            Assertions.assertEquals((long)600L, (long)logStartOffset.get());
        }
    }

    @ParameterizedTest(name="testDeletionOnRetentionBreachedSegments retentionSize={0} retentionMs={1}")
    @CsvSource(value={"0, -1", "-1, 0"})
    public void testDeletionOnRetentionBreachedSegments(long retentionSize, long retentionMs) throws RemoteStorageException, ExecutionException, InterruptedException {
        HashMap<String, Long> logProps = new HashMap<String, Long>();
        logProps.put("retention.bytes", retentionSize);
        logProps.put("retention.ms", retentionMs);
        LogConfig mockLogConfig = new LogConfig(logProps);
        Mockito.when((Object)this.mockLog.config()).thenReturn((Object)mockLogConfig);
        List<EpochEntry> epochEntries = Collections.singletonList(this.epochEntry0);
        this.checkpoint.write(epochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.mockLog.topicPartition()).thenReturn((Object)this.leaderTopicIdPartition.topicPartition());
        Mockito.when((Object)this.mockLog.logEndOffset()).thenReturn((Object)200L);
        List<RemoteLogSegmentMetadata> metadataList = this.listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 2, 100, 1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(metadataList.iterator());
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(ans -> metadataList.iterator());
        Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(CompletableFuture.runAsync(() -> {}));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        task.convertToLeader(0);
        task.cleanupExpiredRemoteLogSegments();
        Assertions.assertEquals((long)200L, (long)this.currentLogStartOffset.get());
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager)).deleteLogSegmentData(metadataList.get(0));
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager)).deleteLogSegmentData(metadataList.get(1));
    }

    @Test
    public void testDeleteRetentionMsBeingCancelledBeforeSecondDelete() throws RemoteStorageException, ExecutionException, InterruptedException {
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask leaderTask = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        leaderTask.convertToLeader(0);
        Mockito.when((Object)this.mockLog.topicPartition()).thenReturn((Object)this.leaderTopicIdPartition.topicPartition());
        Mockito.when((Object)this.mockLog.logEndOffset()).thenReturn((Object)200L);
        List<EpochEntry> epochEntries = Collections.singletonList(this.epochEntry0);
        List<RemoteLogSegmentMetadata> metadataList = this.listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 2, 100, 1024, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(metadataList.iterator());
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenAnswer(ans -> metadataList.iterator());
        this.checkpoint.write(epochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        HashMap<String, Long> logProps = new HashMap<String, Long>();
        logProps.put("retention.bytes", -1L);
        logProps.put("retention.ms", 0L);
        LogConfig mockLogConfig = new LogConfig(logProps);
        Mockito.when((Object)this.mockLog.config()).thenReturn((Object)mockLogConfig);
        Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenAnswer(answer -> {
            leaderTask.cancel();
            return CompletableFuture.runAsync(() -> {});
        });
        leaderTask.cleanupExpiredRemoteLogSegments();
        Assertions.assertEquals((long)200L, (long)this.currentLogStartOffset.get());
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager)).deleteLogSegmentData(metadataList.get(0));
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.never())).deleteLogSegmentData(metadataList.get(1));
        RemoteLogManager remoteLogManager2 = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager2);
        RemoteLogManager.RLMTask newLeaderTask = new RemoteLogManager.RLMTask(remoteLogManager2, this.followerTopicIdPartition, 128);
        newLeaderTask.convertToLeader(1);
        Iterator<RemoteLogSegmentMetadata> firstIterator = metadataList.iterator();
        firstIterator.next();
        Iterator<RemoteLogSegmentMetadata> secondIterator = metadataList.iterator();
        secondIterator.next();
        Iterator<RemoteLogSegmentMetadata> thirdIterator = metadataList.iterator();
        thirdIterator.next();
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.followerTopicIdPartition)).thenReturn(firstIterator);
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.followerTopicIdPartition, 0)).thenReturn(secondIterator).thenReturn(thirdIterator);
        Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenAnswer(answer -> CompletableFuture.runAsync(() -> {}));
        newLeaderTask.cleanupExpiredRemoteLogSegments();
        Assertions.assertEquals((long)200L, (long)this.currentLogStartOffset.get());
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager)).deleteLogSegmentData(metadataList.get(0));
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager)).deleteLogSegmentData(metadataList.get(1));
    }

    @ParameterizedTest(name="testDeleteLogSegmentDueToRetentionSizeBreach segmentCount={0} deletableSegmentCount={1}")
    @CsvSource(value={"50, 0", "50, 1", "50, 23", "50, 50"})
    public void testDeleteLogSegmentDueToRetentionSizeBreach(int segmentCount, int deletableSegmentCount) throws RemoteStorageException, ExecutionException, InterruptedException {
        int recordsPerSegment = 100;
        int segmentSize = 1024;
        List<EpochEntry> epochEntries = Arrays.asList(new EpochEntry(0, 0L), new EpochEntry(1, 20L), new EpochEntry(3, 50L), new EpochEntry(4, 100L));
        this.checkpoint.write(epochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
        int currentLeaderEpoch = epochEntries.get((int)(epochEntries.size() - 1)).epoch;
        long localLogSegmentsSize = 512L;
        long retentionSize = ((long)segmentCount - (long)deletableSegmentCount) * (long)segmentSize + localLogSegmentsSize;
        HashMap<String, Long> logProps = new HashMap<String, Long>();
        logProps.put("retention.bytes", retentionSize);
        logProps.put("retention.ms", -1L);
        LogConfig mockLogConfig = new LogConfig(logProps);
        Mockito.when((Object)this.mockLog.config()).thenReturn((Object)mockLogConfig);
        long localLogStartOffset = (long)segmentCount * (long)recordsPerSegment;
        long logEndOffset = (long)segmentCount * (long)recordsPerSegment + 1L;
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.mockLog.localLogStartOffset()).thenReturn((Object)localLogStartOffset);
        Mockito.when((Object)this.mockLog.logEndOffset()).thenReturn((Object)logEndOffset);
        Mockito.when((Object)this.mockLog.onlyLocalLogSegmentsSize()).thenReturn((Object)localLogSegmentsSize);
        List<RemoteLogSegmentMetadata> segmentMetadataList = this.listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, segmentCount, recordsPerSegment, segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        this.verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch);
    }

    @ParameterizedTest(name="testDeleteLogSegmentDueToRetentionTimeBreach segmentCount={0} deletableSegmentCount={1}")
    @CsvSource(value={"50, 0", "50, 1", "50, 23", "50, 50"})
    public void testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount, int deletableSegmentCount) throws RemoteStorageException, ExecutionException, InterruptedException {
        int recordsPerSegment = 100;
        int segmentSize = 1024;
        List<EpochEntry> epochEntries = Arrays.asList(new EpochEntry(0, 0L), new EpochEntry(1, 20L), new EpochEntry(3, 50L), new EpochEntry(4, 100L));
        this.checkpoint.write(epochEntries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
        int currentLeaderEpoch = epochEntries.get((int)(epochEntries.size() - 1)).epoch;
        long localLogSegmentsSize = 512L;
        long retentionSize = -1L;
        HashMap<String, Long> logProps = new HashMap<String, Long>();
        logProps.put("retention.bytes", retentionSize);
        logProps.put("retention.ms", 1L);
        LogConfig mockLogConfig = new LogConfig(logProps);
        Mockito.when((Object)this.mockLog.config()).thenReturn((Object)mockLogConfig);
        long localLogStartOffset = (long)segmentCount * (long)recordsPerSegment;
        long logEndOffset = (long)segmentCount * (long)recordsPerSegment + 1L;
        Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
        Mockito.when((Object)this.mockLog.localLogStartOffset()).thenReturn((Object)localLogStartOffset);
        Mockito.when((Object)this.mockLog.logEndOffset()).thenReturn((Object)logEndOffset);
        Mockito.when((Object)this.mockLog.onlyLocalLogSegmentsSize()).thenReturn((Object)localLogSegmentsSize);
        List<RemoteLogSegmentMetadata> segmentMetadataList = this.listRemoteLogSegmentMetadataByTime(this.leaderTopicIdPartition, segmentCount, deletableSegmentCount, recordsPerSegment, segmentSize, epochEntries, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
        this.verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch);
    }

    private void verifyDeleteLogSegment(List<RemoteLogSegmentMetadata> segmentMetadataList, int deletableSegmentCount, int currentLeaderEpoch) throws RemoteStorageException, ExecutionException, InterruptedException {
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(segmentMetadataList.iterator());
        Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments((TopicIdPartition)ArgumentMatchers.eq((Object)this.leaderTopicIdPartition), ArgumentMatchers.anyInt())).thenAnswer(invocation -> {
            int leaderEpoch = (Integer)invocation.getArgument(1);
            return segmentMetadataList.stream().filter(segmentMetadata -> segmentMetadata.segmentLeaderEpochs().containsKey(leaderEpoch)).iterator();
        });
        Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenAnswer(answer -> CompletableFuture.runAsync(() -> {}));
        RemoteLogManager remoteLogManager = this.remoteLogManager;
        Objects.requireNonNull(remoteLogManager);
        RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager, this.leaderTopicIdPartition, 128);
        task.convertToLeader(currentLeaderEpoch);
        task.cleanupExpiredRemoteLogSegments();
        ArgumentCaptor deletedMetadataCapture = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
        ((RemoteStorageManager)Mockito.verify((Object)this.remoteStorageManager, (VerificationMode)Mockito.times((int)deletableSegmentCount))).deleteLogSegmentData((RemoteLogSegmentMetadata)deletedMetadataCapture.capture());
        if (deletableSegmentCount > 0) {
            List deletedMetadataList = deletedMetadataCapture.getAllValues();
            RemoteLogSegmentMetadata expectedEndMetadata = segmentMetadataList.get(deletableSegmentCount - 1);
            Assertions.assertEquals((Object)segmentMetadataList.get(0), deletedMetadataList.get(0));
            Assertions.assertEquals((Object)expectedEndMetadata, deletedMetadataList.get(deletedMetadataList.size() - 1));
            Assertions.assertEquals((long)this.currentLogStartOffset.get(), (long)(expectedEndMetadata.endOffset() + 1L));
        }
    }

    @Test
    public void testDeleteRetentionMsOnExpiredSegment() throws RemoteStorageException, IOException {
        AtomicLong logStartOffset = new AtomicLong(0L);
        try (RemoteLogManager remoteLogManager = new RemoteLogManager(this.remoteLogManagerConfig, 0, this.logDir, "dummyId", this.time, tp -> Optional.of(this.mockLog), (topicPartition, offset) -> logStartOffset.set((long)offset), this.brokerTopicStats){

            public RemoteStorageManager createRemoteStorageManager() {
                return RemoteLogManagerTest.this.remoteStorageManager;
            }

            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
                return RemoteLogManagerTest.this.remoteLogMetadataManager;
            }
        };){
            RemoteLogManager remoteLogManager2 = remoteLogManager;
            Objects.requireNonNull(remoteLogManager2);
            RemoteLogManager.RLMTask task = new RemoteLogManager.RLMTask(remoteLogManager2, this.leaderTopicIdPartition, 128);
            task.convertToLeader(0);
            Mockito.when((Object)this.mockLog.topicPartition()).thenReturn((Object)this.leaderTopicIdPartition.topicPartition());
            Mockito.when((Object)this.mockLog.logEndOffset()).thenReturn((Object)200L);
            List<EpochEntry> epochEntries = Collections.singletonList(this.epochEntry0);
            List<RemoteLogSegmentMetadata> remoteLogSegmentMetadatas = this.listRemoteLogSegmentMetadata(this.leaderTopicIdPartition, 2, 100, 1024, epochEntries, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED);
            Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition)).thenReturn(remoteLogSegmentMetadatas.iterator());
            Mockito.when((Object)this.remoteLogMetadataManager.listRemoteLogSegments(this.leaderTopicIdPartition, 0)).thenReturn(remoteLogSegmentMetadatas.iterator()).thenReturn(remoteLogSegmentMetadatas.iterator());
            this.checkpoint.write(epochEntries);
            LeaderEpochFileCache cache = new LeaderEpochFileCache(this.tp, this.checkpoint);
            Mockito.when((Object)this.mockLog.leaderEpochCache()).thenReturn((Object)Option.apply((Object)cache));
            HashMap<String, Long> logProps = new HashMap<String, Long>();
            logProps.put("retention.bytes", -1L);
            logProps.put("retention.ms", 0L);
            LogConfig mockLogConfig = new LogConfig(logProps);
            Mockito.when((Object)this.mockLog.config()).thenReturn((Object)mockLogConfig);
            Mockito.when((Object)this.remoteLogMetadataManager.updateRemoteLogSegmentMetadata((RemoteLogSegmentMetadataUpdate)ArgumentMatchers.any(RemoteLogSegmentMetadataUpdate.class))).thenAnswer(answer -> CompletableFuture.runAsync(() -> {}));
            task.cleanupExpiredRemoteLogSegments();
            Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.remoteStorageManager});
            Assertions.assertEquals((long)0L, (long)logStartOffset.get());
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition, int segmentCount, int recordsPerSegment, int segmentSize, RemoteLogSegmentState state) {
        return this.listRemoteLogSegmentMetadata(topicIdPartition, segmentCount, recordsPerSegment, segmentSize, Collections.emptyList(), state);
    }

    private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadata(TopicIdPartition topicIdPartition, int segmentCount, int recordsPerSegment, int segmentSize, List<EpochEntry> epochEntries, RemoteLogSegmentState state) {
        return this.listRemoteLogSegmentMetadataByTime(topicIdPartition, segmentCount, 0, recordsPerSegment, segmentSize, epochEntries, state);
    }

    private List<RemoteLogSegmentMetadata> listRemoteLogSegmentMetadataByTime(TopicIdPartition topicIdPartition, int segmentCount, int deletableSegmentCount, int recordsPerSegment, int segmentSize, List<EpochEntry> epochEntries, RemoteLogSegmentState state) {
        ArrayList<RemoteLogSegmentMetadata> segmentMetadataList = new ArrayList<RemoteLogSegmentMetadata>();
        for (int idx = 0; idx < segmentCount; ++idx) {
            long timestamp = this.time.milliseconds();
            if (idx < deletableSegmentCount) {
                timestamp = this.time.milliseconds() - 1L;
            }
            long startOffset = (long)idx * (long)recordsPerSegment;
            long endOffset = startOffset + (long)recordsPerSegment - 1L;
            List<EpochEntry> localTotalEpochEntries = epochEntries.isEmpty() ? this.totalEpochEntries : epochEntries;
            Map<Integer, Long> segmentLeaderEpochs = this.truncateAndGetLeaderEpochs(localTotalEpochEntries, startOffset, endOffset);
            RemoteLogSegmentMetadata metadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid()), startOffset, endOffset, timestamp, 0, timestamp, segmentSize, Optional.empty(), state, segmentLeaderEpochs);
            segmentMetadataList.add(metadata);
        }
        return segmentMetadataList;
    }

    private Map<Integer, Long> truncateAndGetLeaderEpochs(List<EpochEntry> entries, Long startOffset, Long endOffset) {
        InMemoryLeaderEpochCheckpoint myCheckpoint = new InMemoryLeaderEpochCheckpoint();
        myCheckpoint.write(entries);
        LeaderEpochFileCache cache = new LeaderEpochFileCache(null, (LeaderEpochCheckpoint)myCheckpoint);
        cache.truncateFromStart(startOffset.longValue());
        cache.truncateFromEnd(endOffset.longValue());
        return myCheckpoint.read().stream().collect(Collectors.toMap(e -> e.epoch, e -> e.startOffset));
    }

    private Partition mockPartition(TopicIdPartition topicIdPartition) {
        TopicPartition tp = topicIdPartition.topicPartition();
        Partition partition = (Partition)Mockito.mock(Partition.class);
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Mockito.when((Object)partition.topicPartition()).thenReturn((Object)tp);
        Mockito.when((Object)partition.topic()).thenReturn((Object)tp.topic());
        Mockito.when((Object)log.remoteLogEnabled()).thenReturn((Object)true);
        Mockito.when((Object)partition.log()).thenReturn((Object)Option.apply((Object)log));
        return partition;
    }

    private RemoteLogManagerConfig createRLMConfig(Properties props) {
        props.put("remote.log.storage.system.enable", (Object)true);
        props.put("remote.log.storage.manager.class.name", NoOpRemoteStorageManager.class.getName());
        props.put("remote.log.metadata.manager.class.name", NoOpRemoteLogMetadataManager.class.getName());
        props.put("rsm.config.remote.log.storage.test", "storage.test");
        props.put("rlmm.config.remote.log.metadata.topic.num.partitions", "1");
        props.put("rlmm.config.remote.log.metadata.test", "metadata.test");
        props.put("rlmm.config.remote.log.metadata.common.client.common.client.test", "common.test");
        props.put("rlmm.config.remote.log.metadata.consumer.consumer.test", "consumer.test");
        props.put("rlmm.config.remote.log.metadata.producer.producer.test", "producer.test");
        AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, (Map)props);
        return new RemoteLogManagerConfig(config);
    }
}

