package kafka.server;

import java.util.List;
import java.util.Properties;
import java.util.Set;
import kafka.api.LeaderAndIsr$;
import kafka.cluster.Broker;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.StateChangeLogger;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.requests.LeaderAndIsrResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: LeaderElectionTest.scala */
@ScalaSignature(bytes = "\u0006\u0001U4A!\u0001\u0002\u0001\u000f\t\u0011B*Z1eKJ,E.Z2uS>tG+Z:u\u0015\t\u0019A!\u0001\u0004tKJ4XM\u001d\u0006\u0002\u000b\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\t!\tIA\"D\u0001\u000b\u0015\tYA!\u0001\u0002{W&\u0011QB\u0003\u0002\u00155>|7*Z3qKJ$Vm\u001d;ICJtWm]:\t\u000b=\u0001A\u0011\u0001\t\u0002\rqJg.\u001b;?)\u0005\t\u0002C\u0001\n\u0001\u001b\u0005\u0011\u0001b\u0002\u000b\u0001\u0005\u0004%\t!F\u0001\nEJ|7.\u001a:JIF*\u0012A\u0006\t\u0003/ii\u0011\u0001\u0007\u0006\u00023\u0005)1oY1mC&\u00111\u0004\u0007\u0002\u0004\u0013:$\bBB\u000f\u0001A\u0003%a#\u0001\u0006ce>\\WM]%ec\u0001Bqa\b\u0001C\u0002\u0013\u0005Q#A\u0005ce>\\WM]%ee!1\u0011\u0005\u0001Q\u0001\nY\t!B\u0019:pW\u0016\u0014\u0018\n\u001a\u001a!\u0011\u001d\u0019\u0003\u00011A\u0005\u0002\u0011\nqa]3sm\u0016\u00148/F\u0001&!\r1c&\r\b\u0003O1r!\u0001K\u0016\u000e\u0003%R!A\u000b\u0004\u0002\rq\u0012xn\u001c;?\u0013\u0005I\u0012BA\u0017\u0019\u0003\u001d\u0001\u0018mY6bO\u0016L!a\f\u0019\u0003\u0007M+\u0017O\u0003\u0002.1A\u0011!CM\u0005\u0003g\t\u00111bS1gW\u0006\u001cVM\u001d<fe\"9Q\u0007\u0001a\u0001\n\u00031\u0014aC:feZ,'o]0%KF$\"a\u000e\u001e\u0011\u0005]A\u0014BA\u001d\u0019\u0005\u0011)f.\u001b;\t\u000fm\"\u0014\u0011!a\u0001K\u0005\u0019\u0001\u0010J\u0019\t\ru\u0002\u0001\u0015)\u0003&\u0003!\u0019XM\u001d<feN\u0004\u0003bB \u0001\u0001\u0004%\t\u0001Q\u0001\u001dgR\fG.Z\"p]R\u0014x\u000e\u001c7fe\u0016\u0003xn\u00195EKR,7\r^3e+\u0005\t\u0005CA\fC\u0013\t\u0019\u0005DA\u0004C_>dW-\u00198\t\u000f\u0015\u0003\u0001\u0019!C\u0001\r\u0006\u00013\u000f^1mK\u000e{g\u000e\u001e:pY2,'/\u00129pG\"$U\r^3di\u0016$w\fJ3r)\t9t\tC\u0004<\t\u0006\u0005\t\u0019A!\t\r%\u0003\u0001\u0015)\u0003B\u0003u\u0019H/\u00197f\u0007>tGO]8mY\u0016\u0014X\t]8dQ\u0012+G/Z2uK\u0012\u0004\u0003\"B&\u0001\t\u0003b\u0015!B:fiV\u0003H#A\u001c)\u0005)s\u0005CA(U\u001b\u0005\u0001&BA)S\u0003\u0015QWO\\5u\u0015\u0005\u0019\u0016aA8sO&\u0011Q\u000b\u0015\u0002\u0007\u0005\u00164wN]3\t\u000b]\u0003A\u0011\t'\u0002\u0011Q,\u0017M\u001d#po:D#AV-\u0011\u0005=S\u0016BA.Q\u0005\u0015\te\r^3s\u0011\u0015i\u0006\u0001\"\u0001M\u0003i!Xm\u001d;MK\u0006$WM]#mK\u000e$\u0018n\u001c8B]\u0012,\u0005o\\2iQ\tav\f\u0005\u0002PA&\u0011\u0011\r\u0015\u0002\u0005)\u0016\u001cH\u000fC\u0003d\u0001\u0011\u0005A*\u0001\u0016uKN$H*Z1eKJ,E.Z2uS>tw+\u001b;i'R\fG.Z\"p]R\u0014x\u000e\u001c7fe\u0016\u0003xn\u00195)\u0005\t|\u0006\"\u00024\u0001\t\u00139\u0017\u0001H:uC2,7i\u001c8ue>dG.\u001a:Fa>\u001c\u0007nQ1mY\n\f7m\u001b\u000b\u0003o!DQ![3A\u0002)\f\u0001B]3ta>t7/\u001a\t\u0003WNl\u0011\u0001\u001c\u0006\u0003[:\f\u0001B]3rk\u0016\u001cHo\u001d\u0006\u0003_B\faaY8n[>t'BA\u0003r\u0015\t\u0011(+\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0003i2\u0014\u0001#\u00112tiJ\f7\r\u001e*fgB|gn]3")
/* loaded from: input_file:kafka/server/LeaderElectionTest.class */
public class LeaderElectionTest extends ZooKeeperTestHarness {
    private final int brokerId1 = 0;
    private final int brokerId2 = 1;
    private Seq<KafkaServer> servers = Seq$.MODULE$.empty();
    private boolean staleControllerEpochDetected = false;

    public int brokerId1() {
        return this.brokerId1;
    }

    public int brokerId2() {
        return this.brokerId2;
    }

    public Seq<KafkaServer> servers() {
        return this.servers;
    }

    public void servers_$eq(Seq<KafkaServer> seq) {
        this.servers = seq;
    }

    public boolean staleControllerEpochDetected() {
        return this.staleControllerEpochDetected;
    }

    public void staleControllerEpochDetected_$eq(boolean z) {
        this.staleControllerEpochDetected = z;
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
        Properties createBrokerConfig = TestUtils$.MODULE$.createBrokerConfig(brokerId1(), zkConnect(), false, TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        Properties createBrokerConfig2 = TestUtils$.MODULE$.createBrokerConfig(brokerId2(), zkConnect(), false, TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18());
        createBrokerConfig.put("unclean.leader.election.enable", "true");
        createBrokerConfig2.put("unclean.leader.election.enable", "true");
        servers_$eq((Seq) servers().$plus$plus(new $colon.colon(TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig), TestUtils$.MODULE$.createServer$default$2()), new $colon.colon(TestUtils$.MODULE$.createServer(KafkaConfig$.MODULE$.fromProps(createBrokerConfig2), TestUtils$.MODULE$.createServer$default$2()), Nil$.MODULE$)), Seq$.MODULE$.canBuildFrom()));
    }

    @Override // kafka.zk.ZooKeeperTestHarness
    @After
    public void tearDown() {
        TestUtils$.MODULE$.shutdownServers(servers());
        super.tearDown();
    }

    @Test
    public void testLeaderElectionAndEpoch() {
        int unboxToInt = BoxesRunTime.unboxToInt(TestUtils$.MODULE$.createTopic(zkClient(), "new-topic", (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))})), servers()).apply(BoxesRunTime.boxToInteger(0)));
        int unboxToInt2 = BoxesRunTime.unboxToInt(zkClient().getEpochForPartition(new TopicPartition("new-topic", 0)).get());
        debug(() -> {
            return new StringBuilder(14).append("leader Epoch: ").append(unboxToInt2).toString();
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Leader is elected to be: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt)}));
        });
        Assert.assertTrue("Leader could be broker 0 or broker 1", unboxToInt == 0 || unboxToInt == 1);
        Assert.assertEquals("First epoch value should be 0", 0L, unboxToInt2);
        ((KafkaServer) servers().last()).shutdown();
        int waitUntilLeaderIsElectedOrChanged = TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), unboxToInt == 0 ? None$.MODULE$ : new Some(BoxesRunTime.boxToInteger(unboxToInt)), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        int unboxToInt3 = BoxesRunTime.unboxToInt(zkClient().getEpochForPartition(new TopicPartition("new-topic", 0)).get());
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Leader is elected to be: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt)}));
        });
        debug(() -> {
            return new StringBuilder(14).append("leader Epoch: ").append(unboxToInt3).toString();
        });
        Assert.assertEquals("Leader must move to broker 0", 0L, waitUntilLeaderIsElectedOrChanged);
        if (unboxToInt == waitUntilLeaderIsElectedOrChanged) {
            Assert.assertEquals(new StringBuilder(29).append("Second epoch value should be ").append(unboxToInt2).append(1).toString(), unboxToInt2 + 1, unboxToInt3);
        } else {
            Assert.assertEquals(new StringOps(Predef$.MODULE$.augmentString("Second epoch value should be %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt2 + 1)})), unboxToInt2 + 1, unboxToInt3);
        }
        ((KafkaServer) servers().last()).startup();
        ((KafkaServer) servers().head()).shutdown();
        Thread.sleep(zookeeper().tickTime());
        int waitUntilLeaderIsElectedOrChanged2 = TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "new-topic", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), waitUntilLeaderIsElectedOrChanged == 1 ? None$.MODULE$ : new Some(BoxesRunTime.boxToInteger(waitUntilLeaderIsElectedOrChanged)), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        int unboxToInt4 = BoxesRunTime.unboxToInt(zkClient().getEpochForPartition(new TopicPartition("new-topic", 0)).get());
        debug(() -> {
            return new StringBuilder(14).append("leader Epoch: ").append(unboxToInt4).toString();
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Leader is elected to be: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(waitUntilLeaderIsElectedOrChanged2)}));
        });
        Assert.assertEquals("Leader must return to 1", 1L, waitUntilLeaderIsElectedOrChanged2);
        if (waitUntilLeaderIsElectedOrChanged == waitUntilLeaderIsElectedOrChanged2) {
            Assert.assertEquals(new StringBuilder(29).append("Second epoch value should be ").append(unboxToInt3).toString(), unboxToInt3, unboxToInt4);
        } else {
            Assert.assertEquals(new StringOps(Predef$.MODULE$.augmentString("Second epoch value should be %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt3 + 1)})), unboxToInt3 + 1, unboxToInt4);
        }
    }

    @Test
    public void testLeaderElectionWithStaleControllerEpoch() {
        int unboxToInt = BoxesRunTime.unboxToInt(TestUtils$.MODULE$.createTopic(zkClient(), "new-topic", (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(0)), Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})))})), servers()).apply(BoxesRunTime.boxToInteger(0)));
        int unboxToInt2 = BoxesRunTime.unboxToInt(zkClient().getEpochForPartition(new TopicPartition("new-topic", 0)).get());
        debug(() -> {
            return new StringBuilder(14).append("leader Epoch: ").append(unboxToInt2).toString();
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Leader is elected to be: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(unboxToInt)}));
        });
        Assert.assertTrue("Leader could be broker 0 or broker 1", unboxToInt == 0 || unboxToInt == 1);
        Assert.assertEquals("First epoch value should be 0", 0L, unboxToInt2);
        KafkaConfig fromProps = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(2, zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18()));
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(securityProtocol);
        Seq seq = (Seq) servers().map(kafkaServer -> {
            return new Broker(kafkaServer.config().brokerId(), "localhost", TestUtils$.MODULE$.boundPort(kafkaServer, TestUtils$.MODULE$.boundPort$default$2()), forSecurityProtocol, securityProtocol);
        }, Seq$.MODULE$.canBuildFrom());
        Seq seq2 = (Seq) seq.map(broker -> {
            return broker.node(forSecurityProtocol);
        }, Seq$.MODULE$.canBuildFrom());
        ControllerContext controllerContext = new ControllerContext();
        controllerContext.liveBrokers_$eq(seq.toSet());
        Metrics metrics = new Metrics();
        ControllerChannelManager controllerChannelManager = new ControllerChannelManager(controllerContext, fromProps, Time.SYSTEM, metrics, new StateChangeLogger(2, true, None$.MODULE$), ControllerChannelManager$.MODULE$.$lessinit$greater$default$6());
        controllerChannelManager.startup();
        try {
            controllerChannelManager.sendRequest(brokerId2(), ApiKeys.LEADER_AND_ISR, new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), 2, 0, (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(new TopicPartition("new-topic", 0)), new LeaderAndIsrRequest.PartitionState(2, brokerId2(), LeaderAndIsr$.MODULE$.initialLeaderEpoch(), (List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{brokerId1(), brokerId2()})).map(obj -> {
                return Integer.valueOf(BoxesRunTime.unboxToInt(obj));
            }, Seq$.MODULE$.canBuildFrom())).asJava(), LeaderAndIsr$.MODULE$.initialZKVersion(), (List) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})).map(obj2 -> {
                return Integer.valueOf(BoxesRunTime.unboxToInt(obj2));
            }, Seq$.MODULE$.canBuildFrom())).asJava(), false))}))).asJava(), (Set) JavaConverters$.MODULE$.setAsJavaSetConverter(seq2.toSet()).asJava()), abstractResponse -> {
                this.staleControllerEpochCallback(abstractResponse);
                return BoxedUnit.UNIT;
            });
            TestUtils$.MODULE$.waitUntilTrue(() -> {
                return this.staleControllerEpochDetected();
            }, () -> {
                return "Controller epoch should be stale";
            }, TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
            Assert.assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected());
        } finally {
            controllerChannelManager.shutdown();
            metrics.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void staleControllerEpochCallback(AbstractResponse abstractResponse) {
        staleControllerEpochDetected_$eq(Errors.STALE_CONTROLLER_EPOCH.equals(((LeaderAndIsrResponse) abstractResponse).error()));
    }
}
