package kafka.server;

import java.util.Properties;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Replica;
import kafka.log.LogManager;
import kafka.server.epoch.LeaderEpochCache;
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.PartitionStates;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.utils.SystemTime;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
import org.junit.Assert;
import org.junit.Test;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: ReplicaFetcherThreadTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\r4A!\u0001\u0002\u0001\u000f\tA\"+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012$Vm\u001d;\u000b\u0005\r!\u0011AB:feZ,'OC\u0001\u0006\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u0005\u0011\u0005%aQ\"\u0001\u0006\u000b\u0003-\tQa]2bY\u0006L!!\u0004\u0006\u0003\r\u0005s\u0017PU3g\u0011\u0015y\u0001\u0001\"\u0001\u0011\u0003\u0019a\u0014N\\5u}Q\t\u0011\u0003\u0005\u0002\u0013\u00015\t!\u0001C\u0004\u0015\u0001\t\u0007I\u0011B\u000b\u0002\tQ\f\u0004\u000fM\u000b\u0002-A\u0011qcH\u0007\u00021)\u0011\u0011DG\u0001\u0007G>lWn\u001c8\u000b\u0005\u0015Y\"B\u0001\u000f\u001e\u0003\u0019\t\u0007/Y2iK*\ta$A\u0002pe\u001eL!\u0001\t\r\u0003\u001dQ{\u0007/[2QCJ$\u0018\u000e^5p]\"1!\u0005\u0001Q\u0001\nY\tQ\u0001^\u0019qa\u0001Bq\u0001\n\u0001C\u0002\u0013%Q#\u0001\u0003ucA\f\u0004B\u0002\u0014\u0001A\u0003%a#A\u0003ucA\f\u0004\u0005C\u0004)\u0001\t\u0007I\u0011B\u000b\u0002\tQ\u0014\u0004/\r\u0005\u0007U\u0001\u0001\u000b\u0011\u0002\f\u0002\u000bQ\u0014\u0004/\r\u0011\t\u000b1\u0002A\u0011A\u0017\u0002wMDw.\u001e7e\u001d>$\u0018j]:vK2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f^%g\u0013:$XM\u001d2s_.,'OV3sg&|gNQ3m_^\f\u0014\u0007F\u0001/!\tIq&\u0003\u00021\u0015\t!QK\\5uQ\tY#\u0007\u0005\u00024m5\tAG\u0003\u00026;\u0005)!.\u001e8ji&\u0011q\u0007\u000e\u0002\u0005)\u0016\u001cH\u000fC\u0003:\u0001\u0011\u0005Q&A\u0013tQ>,H\u000e\u001a%b]\u0012dW-\u0012=dKB$\u0018n\u001c8Ge>l'\t\\8dW&twmU3oI\"\u0012\u0001H\r\u0005\u0006y\u0001!\t!L\u0001'g\"|W\u000f\u001c3GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195P]2L\bFA\u001e3\u0011\u0015y\u0004\u0001\"\u0001.\u0003Q\u001a\bn\\;mIR\u0013XO\\2bi\u0016$vn\u00144gg\u0016$8\u000b]3dS\u001aLW\rZ%o\u000bB|7\r[(gMN,GOU3ta>t7/\u001a\u0015\u0003}IBQA\u0011\u0001\u0005\u00025\n1h\u001d5pk2$GK];oG\u0006$X\rV8IS\u001eDw+\u0019;fe6\f'o[%g\u0019\u0016\fG-\u001a:SKR,(O\\:V]\u0012,g-\u001b8fI>3gm]3uQ\t\t%\u0007C\u0003F\u0001\u0011\u0005Q&A\u0019tQ>,H\u000e\u001a)pY2Le\u000eZ3gS:LG/\u001a7z\u0013\u001adU-\u00193feJ+G/\u001e:og\u0006s\u00170\u0012=dKB$\u0018n\u001c8)\u0005\u0011\u0013\u0004\"\u0002%\u0001\t\u0003i\u0013aK:i_VdG-T8wKB\u000b'\u000f^5uS>t7oT;u\u001f\u001a$&/\u001e8dCRLgn\u001a'pON#\u0018\r^3)\u0005\u001d\u0013\u0004\"B&\u0001\t\u0003i\u0013\u0001O:i_VdGMR5mi\u0016\u0014\b+\u0019:uSRLwN\\:NC\u0012,G*Z1eKJ$UO]5oO2+\u0017\rZ3s\u000bB|7\r\u001b*fcV,7\u000f\u001e\u0015\u0003\u0015JBQA\u0014\u0001\u0005\u0002=\u000bAa\u001d;vER\u0019\u0001\u000b\u00180\u0011\u0007E#f+D\u0001S\u0015\t\u0019V$\u0001\u0005fCNLXn\\2l\u0013\t)&KA\nJ\u000bb\u0004Xm\u0019;bi&|gnU3ui\u0016\u00148\u000f\u0005\u0002X56\t\u0001L\u0003\u0002Z\t\u000591\r\\;ti\u0016\u0014\u0018BA.Y\u0005\u001d\u0011V\r\u001d7jG\u0006DQ!X'A\u0002Y\u000bqA]3qY&\u001c\u0017\rC\u0003`\u001b\u0002\u0007\u0001-\u0001\bsKBd\u0017nY1NC:\fw-\u001a:\u0011\u0005I\t\u0017B\u00012\u0003\u00059\u0011V\r\u001d7jG\u0006l\u0015M\\1hKJ\u0004")
/* loaded from: input_file:kafka/server/ReplicaFetcherThreadTest.class */
public class ReplicaFetcherThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private TopicPartition t2p1() {
        return this.t2p1;
    }

    @Test
    public void shouldNotIssueLeaderEpochRequestIfInterbrokerVersionBelow11() {
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17());
        createBrokerConfig.put(KafkaConfig$.MODULE$.InterBrokerProtocolVersionProp(), "0.10.2");
        createBrokerConfig.put(KafkaConfig$.MODULE$.LogMessageFormatVersionProp(), "0.10.2");
        Assert.assertEquals("results from leader epoch request should have undefined offset", Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(Errors.NONE, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(Errors.NONE, -1L))})), new ReplicaFetcherThread("bob", 0, new BrokerEndPoint(0, "localhost", 1000), KafkaConfig$.MODULE$.fromProps(createBrokerConfig), (ReplicaManager) null, new Metrics(), new SystemTime(), (ReplicationQuotaManager) null, None$.MODULE$).fetchEpochsFromLeader(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToInteger(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), BoxesRunTime.boxToInteger(0))}))));
    }

    @Test
    public void shouldHandleExceptionFromBlockingSend() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17()));
        BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
        BlockingSend blockingSend = (BlockingSend) EasyMock.createMock(BlockingSend.class);
        EasyMock.expect(blockingSend.sendRequest((AbstractRequest.Builder) EasyMock.anyObject())).andThrow(new NullPointerException()).once();
        EasyMock.replay(new Object[]{blockingSend});
        Assert.assertEquals("results from leader epoch request should have undefined offset", Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1L))})), new ReplicaFetcherThread("bob", 0, brokerEndPoint, fromProps, (ReplicaManager) null, new Metrics(), new SystemTime(), (ReplicationQuotaManager) null, new Some(blockingSend)).fetchEpochsFromLeader(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToInteger(0)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), BoxesRunTime.boxToInteger(0))}))));
        EasyMock.verify(new Object[]{blockingSend});
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnly() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochCache = (LeaderEpochCache) EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        Replica replica = (Replica) EasyMock.createNiceMock(Replica.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(replica.epochs()).andReturn(new Some(leaderEpochCache)).anyTimes();
        EasyMock.expect(replica.logEndOffset()).andReturn(new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderEpochCache.latestEpoch())).andReturn(BoxesRunTime.boxToInteger(5));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stub(replica, replicaManager);
        logManager.truncateTo((Map) EasyMock.anyObject());
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.replay(new Object[]{leaderEpochCache, replicaManager, logManager, replicationQuotaManager, replica});
        java.util.Map map = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(1L))}))).asJava();
        BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, fromProps, replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend(map, brokerEndPoint, new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToLong(0L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), BoxesRunTime.boxToLong(0L))})));
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(1L, r0.fetchCount());
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(2L, r0.fetchCount());
        replicaFetcherThread.doWork();
        Assert.assertEquals(1L, r0.epochFetchCount());
        Assert.assertEquals(3L, r0.fetchCount());
        EasyMock.verify(new Object[]{logManager});
    }

    @Test
    public void shouldTruncateToOffsetSpecifiedInEpochOffsetResponse() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties);
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochCache = (LeaderEpochCache) EasyMock.createMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        Replica replica = (Replica) EasyMock.createNiceMock(Replica.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        logManager.truncateTo((Map) EasyMock.capture(newCapture));
        EasyMock.expect(BoxedUnit.UNIT).times(2);
        EasyMock.expect(replica.epochs()).andReturn(new Some(leaderEpochCache)).anyTimes();
        EasyMock.expect(replica.logEndOffset()).andReturn(new LogOffsetMetadata(200, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderEpochCache.latestEpoch())).andReturn(BoxesRunTime.boxToInteger(5)).anyTimes();
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stub(replica, replicaManager);
        EasyMock.replay(new Object[]{leaderEpochCache, replicaManager, logManager, replicationQuotaManager, replica});
        java.util.Map map = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(156L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), new EpochEndOffset(172L))}))).asJava();
        BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, (KafkaConfig) seq.apply(0), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend(map, brokerEndPoint, new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToLong(0L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), BoxesRunTime.boxToLong(0L))})));
        replicaFetcherThread.doWork();
        scala.collection.immutable.Map map2 = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).flatMap(map3 -> {
            return map3.toSeq();
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        Assert.assertEquals(156L, BoxesRunTime.unboxToLong(map2.apply(t1p0())));
        Assert.assertEquals(172L, BoxesRunTime.unboxToLong(map2.apply(t2p1())));
    }

    @Test
    public void shouldTruncateToHighWatermarkIfLeaderReturnsUndefinedOffset() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties);
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochCache = (LeaderEpochCache) EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        Replica replica = (Replica) EasyMock.createNiceMock(Replica.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(replica.highWatermark()).andReturn(new LogOffsetMetadata(100, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        logManager.truncateTo((Map) EasyMock.capture(newCapture));
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(replica.epochs()).andReturn(new Some(leaderEpochCache)).anyTimes();
        EasyMock.expect(replica.logEndOffset()).andReturn(new LogOffsetMetadata(300, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderEpochCache.latestEpoch())).andReturn(BoxesRunTime.boxToInteger(5));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stub(replica, replicaManager);
        EasyMock.replay(new Object[]{leaderEpochCache, replicaManager, logManager, replicationQuotaManager, replica});
        java.util.Map map = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(-1L))}))).asJava();
        BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, (KafkaConfig) seq.apply(0), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend(map, brokerEndPoint, new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToLong(0L))})));
        replicaFetcherThread.doWork();
        Assert.assertEquals(100, BoxesRunTime.unboxToLong(((MapLike) newCapture.getValue()).get(t1p0()).get()));
    }

    @Test
    public void shouldPollIndefinitelyIfLeaderReturnsAnyException() {
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        Seq seq = (Seq) TestUtils$.MODULE$.createBrokerConfigs(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfigs$default$3(), TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties);
        }, Seq$.MODULE$.canBuildFrom());
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochCache = (LeaderEpochCache) EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager) EasyMock.createMock(LogManager.class);
        Replica replica = (Replica) EasyMock.createNiceMock(Replica.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createMock(ReplicaManager.class);
        EasyMock.expect(replica.highWatermark()).andReturn(new LogOffsetMetadata(100, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        logManager.truncateTo((Map) EasyMock.capture(newCapture));
        EasyMock.expect(BoxedUnit.UNIT).anyTimes();
        EasyMock.expect(replica.epochs()).andReturn(new Some(leaderEpochCache)).anyTimes();
        EasyMock.expect(replica.logEndOffset()).andReturn(new LogOffsetMetadata(300, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderEpochCache.latestEpoch())).andReturn(BoxesRunTime.boxToInteger(5));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stub(replica, replicaManager);
        EasyMock.replay(new Object[]{leaderEpochCache, replicaManager, logManager, replicationQuotaManager, replica});
        java.util.Map map = (java.util.Map) JavaConverters$.MODULE$.mutableMapAsJavaMapConverter(scala.collection.mutable.Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, -1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(Errors.UNKNOWN_SERVER_ERROR, -1L))}))).asJava();
        BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, (KafkaConfig) seq.apply(0), replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend(map, brokerEndPoint, new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToLong(0L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t2p1()), BoxesRunTime.boxToLong(0L))})));
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp(i -> {
            replicaFetcherThread.doWork();
        });
        ((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).foreach(map2 -> {
            $anonfun$shouldPollIndefinitelyIfLeaderReturnsAnyException$3(map2);
            return BoxedUnit.UNIT;
        });
        map.put(t1p0(), new EpochEndOffset(156L));
        replicaFetcherThread.doWork();
        Assert.assertEquals(156L, BoxesRunTime.unboxToLong(((MapLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(newCapture.getValues()).asScala()).last()).get(t1p0()).get()));
    }

    @Test
    public void shouldMovePartitionsOutOfTruncatingLogState() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17()));
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochCache = (LeaderEpochCache) EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        Replica replica = (Replica) EasyMock.createNiceMock(Replica.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        EasyMock.expect(replica.epochs()).andReturn(new Some(leaderEpochCache)).anyTimes();
        EasyMock.expect(replica.logEndOffset()).andReturn(new LogOffsetMetadata(0L, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderEpochCache.latestEpoch())).andReturn(BoxesRunTime.boxToInteger(5));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stub(replica, replicaManager);
        EasyMock.replay(new Object[]{leaderEpochCache, replicaManager, logManager, replicationQuotaManager, replica});
        java.util.Map map = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(1L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(1L))}))).asJava();
        BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, fromProps, replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(new ReplicaFetcherMockBlockingSend(map, brokerEndPoint, new SystemTime())));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToLong(0L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), BoxesRunTime.boxToLong(0L))})));
        Assert.assertTrue(((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(replicaFetcherThread.partitionStates().partitionStates()).asScala()).forall(partitionState -> {
            return BoxesRunTime.boxToBoolean($anonfun$shouldMovePartitionsOutOfTruncatingLogState$1(partitionState));
        }));
        replicaFetcherThread.doWork();
        Assert.assertFalse(((IterableLike) JavaConverters$.MODULE$.asScalaBufferConverter(replicaFetcherThread.partitionStates().partitionStates()).asScala()).forall(partitionState2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$shouldMovePartitionsOutOfTruncatingLogState$2(partitionState2));
        }));
    }

    @Test
    public void shouldFilterPartitionsMadeLeaderDuringLeaderEpochRequest() {
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17()));
        Capture newCapture = EasyMock.newCapture(CaptureType.ALL);
        ReplicationQuotaManager replicationQuotaManager = (ReplicationQuotaManager) EasyMock.createNiceMock(ReplicationQuotaManager.class);
        LeaderEpochCache leaderEpochCache = (LeaderEpochCache) EasyMock.createNiceMock(LeaderEpochCache.class);
        LogManager logManager = (LogManager) EasyMock.createNiceMock(LogManager.class);
        Replica replica = (Replica) EasyMock.createNiceMock(Replica.class);
        ReplicaManager replicaManager = (ReplicaManager) EasyMock.createNiceMock(ReplicaManager.class);
        logManager.truncateTo((Map) EasyMock.capture(newCapture));
        EasyMock.expect(BoxedUnit.UNIT).once();
        EasyMock.expect(replica.epochs()).andReturn(new Some(leaderEpochCache)).anyTimes();
        EasyMock.expect(replica.logEndOffset()).andReturn(new LogOffsetMetadata(100, LogOffsetMetadata$.MODULE$.$lessinit$greater$default$2(), LogOffsetMetadata$.MODULE$.$lessinit$greater$default$3())).anyTimes();
        EasyMock.expect(BoxesRunTime.boxToInteger(leaderEpochCache.latestEpoch())).andReturn(BoxesRunTime.boxToInteger(5));
        EasyMock.expect(replicaManager.logManager()).andReturn(logManager).anyTimes();
        stub(replica, replicaManager);
        EasyMock.replay(new Object[]{leaderEpochCache, replicaManager, logManager, replicationQuotaManager, replica});
        java.util.Map map = (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), new EpochEndOffset(52L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), new EpochEndOffset(49L))}))).asJava();
        BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000);
        ReplicaFetcherMockBlockingSend replicaFetcherMockBlockingSend = new ReplicaFetcherMockBlockingSend(map, brokerEndPoint, new SystemTime());
        ReplicaFetcherThread replicaFetcherThread = new ReplicaFetcherThread("bob", 0, brokerEndPoint, fromProps, replicaManager, new Metrics(), new SystemTime(), replicationQuotaManager, new Some(replicaFetcherMockBlockingSend));
        replicaFetcherThread.addPartitions(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p0()), BoxesRunTime.boxToLong(0L)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(t1p1()), BoxesRunTime.boxToLong(0L))})));
        TopicPartition t1p0 = t1p0();
        replicaFetcherMockBlockingSend.setEpochRequestCallback(() -> {
            replicaFetcherThread.removePartitions(Predef$.MODULE$.Set().apply(Predef$.MODULE$.wrapRefArray(new TopicPartition[]{t1p0})));
        });
        replicaFetcherThread.doWork();
        Assert.assertEquals(None$.MODULE$, ((MapLike) newCapture.getValue()).get(t1p0));
        Assert.assertEquals(49L, BoxesRunTime.unboxToLong(((MapLike) newCapture.getValue()).get(t1p1()).get()));
    }

    public IExpectationSetters<Replica> stub(Replica replica, ReplicaManager replicaManager) {
        EasyMock.expect(replicaManager.getReplica(t1p0())).andReturn(new Some(replica)).anyTimes();
        EasyMock.expect(replicaManager.getReplicaOrException(t1p0())).andReturn(replica).anyTimes();
        EasyMock.expect(replicaManager.getReplica(t1p1())).andReturn(new Some(replica)).anyTimes();
        EasyMock.expect(replicaManager.getReplicaOrException(t1p1())).andReturn(replica).anyTimes();
        EasyMock.expect(replicaManager.getReplica(t2p1())).andReturn(new Some(replica)).anyTimes();
        return EasyMock.expect(replicaManager.getReplicaOrException(t2p1())).andReturn(replica).anyTimes();
    }

    public static final /* synthetic */ void $anonfun$shouldPollIndefinitelyIfLeaderReturnsAnyException$3(Map map) {
        Assert.assertEquals(0L, map.size());
    }

    public static final /* synthetic */ boolean $anonfun$shouldMovePartitionsOutOfTruncatingLogState$1(PartitionStates.PartitionState partitionState) {
        return ((PartitionFetchState) partitionState.value()).truncatingLog();
    }

    public static final /* synthetic */ boolean $anonfun$shouldMovePartitionsOutOfTruncatingLogState$2(PartitionStates.PartitionState partitionState) {
        return ((PartitionFetchState) partitionState.value()).truncatingLog();
    }
}
