package kafka.api;

import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.ShutdownableThread;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Test;
import scala.Function3;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.BufferLike;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.ListBuffer;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering$Int$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

/* compiled from: TransactionsBounceTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ee\u0001B\u000e\u001d\u0001\u0005BQA\n\u0001\u0005\u0002\u001dBq!\u000b\u0001C\u0002\u0013%!\u0006\u0003\u00042\u0001\u0001\u0006Ia\u000b\u0005\be\u0001\u0011\r\u0011\"\u0003+\u0011\u0019\u0019\u0004\u0001)A\u0005W!9A\u0007\u0001b\u0001\n\u0013Q\u0003BB\u001b\u0001A\u0003%1\u0006C\u00047\u0001\t\u0007I\u0011B\u001c\t\r\u0001\u0003\u0001\u0015!\u00039\u0011\u001d\t\u0005A1A\u0005\n]BaA\u0011\u0001!\u0002\u0013A\u0004bB\"\u0001\u0005\u0004%\t\u0001\u0012\u0005\u0007\u0017\u0002\u0001\u000b\u0011B#\t\u000b1\u0003A\u0011I'\t\u000bi\u0003A\u0011\u000b\u0016\t\u000bm\u0003A\u0011\u0001/\t\u000b%\u0004A\u0011\u0001/\t\u000b-\u0004A\u0011\u00027\t\u000f\u0005\u001d\u0002\u0001\"\u0003\u0002*!9\u0011q\u0006\u0001\u0005\n\u0005E\u0002\"CA,\u0001E\u0005I\u0011BA-\u0011\u001d\ty\u0007\u0001C\u0005\u0003c2a!a \u0001\t\u0005\u0005\u0005B\u0002\u0014\u0018\t\u0003\ty\t\u0003\u0004\u0002\u0016^!\t\u0005\u0018\u0005\u0007\u0003/;B\u0011\t/\u0003-Q\u0013\u0018M\\:bGRLwN\\:C_Vt7-\u001a+fgRT!!\b\u0010\u0002\u0007\u0005\u0004\u0018NC\u0001 \u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\u0012\u0011\u0005\r\"S\"\u0001\u000f\n\u0005\u0015b\"AF%oi\u0016<'/\u0019;j_:$Vm\u001d;ICJtWm]:\u0002\rqJg.\u001b;?)\u0005A\u0003CA\u0012\u0001\u0003I\u0001(o\u001c3vG\u0016\u0014()\u001e4gKJ\u001c\u0016N_3\u0016\u0003-\u0002\"\u0001L\u0018\u000e\u00035R\u0011AL\u0001\u0006g\u000e\fG.Y\u0005\u0003a5\u00121!\u00138u\u0003M\u0001(o\u001c3vG\u0016\u0014()\u001e4gKJ\u001c\u0016N_3!\u0003U\u0019XM\u001d<fe6+7o]1hK6\u000b\u0007PQ=uKN\fac]3sm\u0016\u0014X*Z:tC\u001e,W*\u0019=CsR,7\u000fI\u0001\u000e]Vl\u0007+\u0019:uSRLwN\\:\u0002\u001d9,X\u000eU1si&$\u0018n\u001c8tA\u0005Yq.\u001e;qkR$v\u000e]5d+\u0005A\u0004CA\u001d?\u001b\u0005Q$BA\u001e=\u0003\u0011a\u0017M\\4\u000b\u0003u\nAA[1wC&\u0011qH\u000f\u0002\u0007'R\u0014\u0018N\\4\u0002\u0019=,H\u000f];u)>\u0004\u0018n\u0019\u0011\u0002\u0015%t\u0007/\u001e;U_BL7-A\u0006j]B,H\u000fV8qS\u000e\u0004\u0013aD8wKJ\u0014\u0018\u000eZ5oOB\u0013x\u000e]:\u0016\u0003\u0015\u0003\"AR%\u000e\u0003\u001dS!\u0001\u0013\u001f\u0002\tU$\u0018\u000e\\\u0005\u0003\u0015\u001e\u0013!\u0002\u0015:pa\u0016\u0014H/[3t\u0003Ayg/\u001a:sS\u0012Lgn\u001a)s_B\u001c\b%A\bhK:,'/\u0019;f\u0007>tg-[4t+\u0005q\u0005cA(S)6\t\u0001K\u0003\u0002R[\u0005Q1m\u001c7mK\u000e$\u0018n\u001c8\n\u0005M\u0003&aA*fcB\u0011Q\u000bW\u0007\u0002-*\u0011qKH\u0001\u0007g\u0016\u0014h/\u001a:\n\u0005e3&aC&bM.\f7i\u001c8gS\u001e\f1B\u0019:pW\u0016\u00148i\\;oi\u0006yA/Z:u/&$\bn\u0012:pkBLE\rF\u0001^!\tac,\u0003\u0002`[\t!QK\\5uQ\t\u0001\u0012\r\u0005\u0002cO6\t1M\u0003\u0002eK\u0006)!.\u001e8ji*\ta-A\u0002pe\u001eL!\u0001[2\u0003\tQ+7\u000f^\u0001\u0016i\u0016\u001cHoV5uQ\u001e\u0013x.\u001e9NKR\fG-\u0019;bQ\t\t\u0012-A\tuKN$(I]8lKJ4\u0015-\u001b7ve\u0016$\"!X7\t\u000b9\u0014\u0002\u0019A8\u0002\r\r|W.\\5u!!a\u0003O]A\u0004\u00037i\u0016BA9.\u0005%1UO\\2uS>t7\u0007\u0005\u0003twvlX\"\u0001;\u000b\u0005U4\u0018\u0001\u00039s_\u0012,8-\u001a:\u000b\u0005]D\u0018aB2mS\u0016tGo\u001d\u0006\u0003?eT!A_3\u0002\r\u0005\u0004\u0018m\u00195f\u0013\taHOA\u0007LC\u001a\\\u0017\r\u0015:pIV\u001cWM\u001d\t\u0005Yy\f\t!\u0003\u0002��[\t)\u0011I\u001d:bsB\u0019A&a\u0001\n\u0007\u0005\u0015QF\u0001\u0003CsR,\u0007\u0003BA\u0005\u0003/qA!a\u0003\u0002\u0014A\u0019\u0011QB\u0017\u000e\u0005\u0005=!bAA\tA\u00051AH]8pizJ1!!\u0006.\u0003\u0019\u0001&/\u001a3fM&\u0019q(!\u0007\u000b\u0007\u0005UQ\u0006\u0005\u0004\u0002\u001e\u0005\rR0`\u0007\u0003\u0003?Q1!!\tw\u0003!\u0019wN\\:v[\u0016\u0014\u0018\u0002BA\u0013\u0003?\u0011QbS1gW\u0006\u001cuN\\:v[\u0016\u0014\u0018aG2sK\u0006$X\r\u0016:b]N\f7\r^5p]\u0006d\u0007K]8ek\u000e,'\u000fF\u0002s\u0003WAq!!\f\u0014\u0001\u0004\t9!A\bue\u0006t7/Y2uS>t\u0017\r\\%e\u0003i\u0019'/Z1uK\u000e{gn];nKJ\fe\u000eZ*vEN\u001c'/\u001b2f)!\tY\"a\r\u00028\u00055\u0003bBA\u001b)\u0001\u0007\u0011qA\u0001\bOJ|W\u000f]%e\u0011\u001d\tI\u0004\u0006a\u0001\u0003w\ta\u0001^8qS\u000e\u001c\bCBA\u001f\u0003\u000f\n9A\u0004\u0003\u0002@\u0005\rc\u0002BA\u0007\u0003\u0003J\u0011AL\u0005\u0004\u0003\u000bj\u0013a\u00029bG.\fw-Z\u0005\u0005\u0003\u0013\nYE\u0001\u0003MSN$(bAA#[!I\u0011q\n\u000b\u0011\u0002\u0003\u0007\u0011\u0011K\u0001\u000ee\u0016\fGmQ8n[&$H/\u001a3\u0011\u00071\n\u0019&C\u0002\u0002V5\u0012qAQ8pY\u0016\fg.\u0001\u0013de\u0016\fG/Z\"p]N,X.\u001a:B]\u0012\u001cVOY:de&\u0014W\r\n3fM\u0006,H\u000e\u001e\u00134+\t\tYF\u000b\u0003\u0002R\u0005u3FAA0!\u0011\t\t'a\u001b\u000e\u0005\u0005\r$\u0002BA3\u0003O\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005%T&\u0001\u0006b]:|G/\u0019;j_:LA!!\u001c\u0002d\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3\u0002\u0019\r\u0014X-\u0019;f)>\u0004\u0018nY:\u0015\u0005\u0005M\u0004CBA;\u0003wZ3&\u0004\u0002\u0002x)\u0019\u0011\u0011\u0010)\u0002\u0013%lW.\u001e;bE2,\u0017\u0002BA?\u0003o\u00121!T1q\u0005=\u0011u.\u001e8dKN\u001b\u0007.\u001a3vY\u0016\u00148cA\f\u0002\u0004B!\u0011QQAF\u001b\t\t9IC\u0002\u0002\nz\tQ!\u001e;jYNLA!!$\u0002\b\n\u00112\u000b[;uI><h.\u00192mKRC'/Z1e)\t\t\t\nE\u0002\u0002\u0014^i\u0011\u0001A\u0001\u0007I><vN]6\u0002\u0011MDW\u000f\u001e3po:\u0004")
/* loaded from: input_file:kafka/api/TransactionsBounceTest.class */
public class TransactionsBounceTest extends IntegrationTestHarness {
    private final int producerBufferSize = 65536;
    private final int serverMessageMaxBytes = producerBufferSize() / 2;
    private final int kafka$api$TransactionsBounceTest$$numPartitions = 3;
    private final String kafka$api$TransactionsBounceTest$$outputTopic = "output-topic";
    private final String inputTopic = "input-topic";
    private final Properties overridingProps = new Properties();

    /* compiled from: TransactionsBounceTest.scala */
    /* loaded from: input_file:kafka/api/TransactionsBounceTest$BounceScheduler.class */
    private class BounceScheduler extends ShutdownableThread {
        public final /* synthetic */ TransactionsBounceTest $outer;

        public void doWork() {
            kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().servers().foreach(kafkaServer -> {
                $anonfun$doWork$1(this, kafkaServer);
                return BoxedUnit.UNIT;
            });
            RichInt$ richInt$ = RichInt$.MODULE$;
            if (Predef$.MODULE$ == null) {
                throw null;
            }
            Range until$extension0 = richInt$.until$extension0(0, kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$numPartitions());
            if (until$extension0 == null) {
                throw null;
            }
            if (until$extension0.isEmpty()) {
                return;
            }
            int start = until$extension0.start();
            while (true) {
                int i = start;
                $anonfun$doWork$5(this, i);
                if (i == until$extension0.scala$collection$immutable$Range$$lastElement()) {
                    return;
                } else {
                    start = i + until$extension0.step();
                }
            }
        }

        public void shutdown() {
            super.shutdown();
        }

        public /* synthetic */ TransactionsBounceTest kafka$api$TransactionsBounceTest$BounceScheduler$$$outer() {
            return this.$outer;
        }

        public static final /* synthetic */ void $anonfun$doWork$1(BounceScheduler bounceScheduler, KafkaServer kafkaServer) {
            bounceScheduler.trace(() -> {
                if (Predef$.MODULE$ == null) {
                    throw null;
                }
                return new StringOps("Shutting down server : %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(kafkaServer.config().brokerId())}));
            });
            kafkaServer.shutdown();
            kafkaServer.awaitShutdown();
            Thread.sleep(500L);
            bounceScheduler.trace(() -> {
                if (Predef$.MODULE$ == null) {
                    throw null;
                }
                return new StringOps("Server %s shut down. Starting it up again.").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(kafkaServer.config().brokerId())}));
            });
            kafkaServer.startup();
            bounceScheduler.trace(() -> {
                if (Predef$.MODULE$ == null) {
                    throw null;
                }
                return new StringOps("Restarted server: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(kafkaServer.config().brokerId())}));
            });
            Thread.sleep(500L);
        }

        public static final /* synthetic */ int $anonfun$doWork$5(BounceScheduler bounceScheduler, int i) {
            return TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(bounceScheduler.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().zkClient(), bounceScheduler.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$outputTopic(), i, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        }

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public BounceScheduler(TransactionsBounceTest transactionsBounceTest) {
            super("daemon-broker-bouncer", false);
            if (transactionsBounceTest == null) {
                throw null;
            }
            this.$outer = transactionsBounceTest;
        }
    }

    private int producerBufferSize() {
        return this.producerBufferSize;
    }

    private int serverMessageMaxBytes() {
        return this.serverMessageMaxBytes;
    }

    public int kafka$api$TransactionsBounceTest$$numPartitions() {
        return this.kafka$api$TransactionsBounceTest$$numPartitions;
    }

    public String kafka$api$TransactionsBounceTest$$outputTopic() {
        return this.kafka$api$TransactionsBounceTest$$outputTopic;
    }

    private String inputTopic() {
        return this.inputTopic;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    @Override // kafka.api.IntegrationTestHarness, kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs */
    public Seq<KafkaConfig> mo77generateConfigs() {
        return (Seq) FixedPortTestUtils$.MODULE$.createBrokerConfigs(brokerCount(), zkConnect(), true, FixedPortTestUtils$.MODULE$.createBrokerConfigs$default$4()).map(properties -> {
            return KafkaConfig$.MODULE$.fromProps(properties, this.overridingProps());
        }, Seq$.MODULE$.canBuildFrom());
    }

    @Override // kafka.api.IntegrationTestHarness
    public int brokerCount() {
        return 4;
    }

    @Test
    public void testWithGroupId() {
        createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(inputTopic(), 10000, servers());
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribe = createConsumerAndSubscribe("myGroup", new $colon.colon(inputTopic(), Nil$.MODULE$), createConsumerAndSubscribe$default$3());
        KafkaProducer<byte[], byte[]> createTransactionalProducer = createTransactionalProducer("test-txn");
        createTransactionalProducer.initTransactions();
        BounceScheduler bounceScheduler = new BounceScheduler(this);
        bounceScheduler.start();
        try {
            IntRef create = IntRef.create(0);
            IntRef create2 = IntRef.create(0);
            while (create.elem < 10000) {
                int min = Math.min(200, 10000 - create.elem);
                trace(() -> {
                    return new StringBuilder(46).append(create2.elem).append(": About to read ").append(min).append(" messages, processed ").append(create.elem).append(" so far..").toString();
                });
                Seq pollUntilAtLeastNumRecords = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(createConsumerAndSubscribe, min, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                trace(() -> {
                    return new StringBuilder(52).append("Received ").append(pollUntilAtLeastNumRecords.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString();
                });
                createTransactionalProducer.beginTransaction();
                boolean z = create2.elem % 3 == 0;
                pollUntilAtLeastNumRecords.foreach(consumerRecord -> {
                    return createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), (Integer) null, (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), !z), new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), true));
                });
                trace(() -> {
                    return new StringBuilder(35).append("Sent ").append(pollUntilAtLeastNumRecords.size()).append(" messages. Committing offsets.").toString();
                });
                $anonfun$testWithGroupId$1(createTransactionalProducer, "myGroup", createConsumerAndSubscribe);
                if (z) {
                    trace(() -> {
                        return new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(pollUntilAtLeastNumRecords.size()).append(" messages.").toString();
                    });
                    createTransactionalProducer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(createConsumerAndSubscribe);
                } else {
                    trace(() -> {
                        return new StringBuilder(55).append("Committed offsets. committing transaction of ").append(pollUntilAtLeastNumRecords.size()).append(" messages.").toString();
                    });
                    createTransactionalProducer.commitTransaction();
                    create.elem += pollUntilAtLeastNumRecords.size();
                }
                create2.elem++;
            }
            bounceScheduler.shutdown();
            KafkaConsumer<byte[], byte[]> createConsumerAndSubscribe2 = createConsumerAndSubscribe("randomGroup", new $colon.colon(kafka$api$TransactionsBounceTest$$outputTopic(), Nil$.MODULE$), true);
            HashMap hashMap = new HashMap();
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(createConsumerAndSubscribe2, 10000, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach(consumerRecord2 -> {
                $anonfun$testBrokerFailure$7(hashMap, consumerRecord2);
                return BoxedUnit.UNIT;
            });
            ListBuffer listBuffer = new ListBuffer();
            hashMap.values().foreach(listBuffer2 -> {
                $anonfun$testBrokerFailure$9(listBuffer, listBuffer2);
                return BoxedUnit.UNIT;
            });
            Set set = listBuffer.toSet();
            Assert.assertEquals(10000, set.size());
            RichInt$ richInt$ = RichInt$.MODULE$;
            if (Predef$.MODULE$ == null) {
                throw null;
            }
            Set set2 = richInt$.until$extension0(0, 10000).toSet();
            Assert.assertEquals(new StringBuilder(18).append("Missing messages: ").append(set2.$minus$minus(set)).toString(), set2, set);
        } catch (Throwable th) {
            bounceScheduler.shutdown();
            throw th;
        }
    }

    @Test
    public void testWithGroupMetadata() {
        createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(inputTopic(), 10000, servers());
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribe = createConsumerAndSubscribe("myGroup", new $colon.colon(inputTopic(), Nil$.MODULE$), createConsumerAndSubscribe$default$3());
        KafkaProducer<byte[], byte[]> createTransactionalProducer = createTransactionalProducer("test-txn");
        createTransactionalProducer.initTransactions();
        BounceScheduler bounceScheduler = new BounceScheduler(this);
        bounceScheduler.start();
        try {
            IntRef create = IntRef.create(0);
            IntRef create2 = IntRef.create(0);
            while (create.elem < 10000) {
                int min = Math.min(200, 10000 - create.elem);
                trace(() -> {
                    return new StringBuilder(46).append(create2.elem).append(": About to read ").append(min).append(" messages, processed ").append(create.elem).append(" so far..").toString();
                });
                Seq pollUntilAtLeastNumRecords = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(createConsumerAndSubscribe, min, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                trace(() -> {
                    return new StringBuilder(52).append("Received ").append(pollUntilAtLeastNumRecords.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString();
                });
                createTransactionalProducer.beginTransaction();
                boolean z = create2.elem % 3 == 0;
                pollUntilAtLeastNumRecords.foreach(consumerRecord -> {
                    return createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), (Integer) null, (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), !z), new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), true));
                });
                trace(() -> {
                    return new StringBuilder(35).append("Sent ").append(pollUntilAtLeastNumRecords.size()).append(" messages. Committing offsets.").toString();
                });
                $anonfun$testWithGroupMetadata$1(createTransactionalProducer, "myGroup", createConsumerAndSubscribe);
                if (z) {
                    trace(() -> {
                        return new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(pollUntilAtLeastNumRecords.size()).append(" messages.").toString();
                    });
                    createTransactionalProducer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(createConsumerAndSubscribe);
                } else {
                    trace(() -> {
                        return new StringBuilder(55).append("Committed offsets. committing transaction of ").append(pollUntilAtLeastNumRecords.size()).append(" messages.").toString();
                    });
                    createTransactionalProducer.commitTransaction();
                    create.elem += pollUntilAtLeastNumRecords.size();
                }
                create2.elem++;
            }
            bounceScheduler.shutdown();
            KafkaConsumer<byte[], byte[]> createConsumerAndSubscribe2 = createConsumerAndSubscribe("randomGroup", new $colon.colon(kafka$api$TransactionsBounceTest$$outputTopic(), Nil$.MODULE$), true);
            HashMap hashMap = new HashMap();
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(createConsumerAndSubscribe2, 10000, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach(consumerRecord2 -> {
                $anonfun$testBrokerFailure$7(hashMap, consumerRecord2);
                return BoxedUnit.UNIT;
            });
            ListBuffer listBuffer = new ListBuffer();
            hashMap.values().foreach(listBuffer2 -> {
                $anonfun$testBrokerFailure$9(listBuffer, listBuffer2);
                return BoxedUnit.UNIT;
            });
            Set set = listBuffer.toSet();
            Assert.assertEquals(10000, set.size());
            RichInt$ richInt$ = RichInt$.MODULE$;
            if (Predef$.MODULE$ == null) {
                throw null;
            }
            Set set2 = richInt$.until$extension0(0, 10000).toSet();
            Assert.assertEquals(new StringBuilder(18).append("Missing messages: ").append(set2.$minus$minus(set)).toString(), set2, set);
        } catch (Throwable th) {
            bounceScheduler.shutdown();
            throw th;
        }
    }

    private void testBrokerFailure(Function3<KafkaProducer<byte[], byte[]>, String, KafkaConsumer<byte[], byte[]>, BoxedUnit> function3) {
        createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(inputTopic(), 10000, servers());
        KafkaConsumer<byte[], byte[]> createConsumerAndSubscribe = createConsumerAndSubscribe("myGroup", new $colon.colon(inputTopic(), Nil$.MODULE$), createConsumerAndSubscribe$default$3());
        KafkaProducer<byte[], byte[]> createTransactionalProducer = createTransactionalProducer("test-txn");
        createTransactionalProducer.initTransactions();
        BounceScheduler bounceScheduler = new BounceScheduler(this);
        bounceScheduler.start();
        try {
            IntRef create = IntRef.create(0);
            IntRef create2 = IntRef.create(0);
            while (create.elem < 10000) {
                int min = Math.min(200, 10000 - create.elem);
                trace(() -> {
                    return new StringBuilder(46).append(create2.elem).append(": About to read ").append(min).append(" messages, processed ").append(create.elem).append(" so far..").toString();
                });
                Seq pollUntilAtLeastNumRecords = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(createConsumerAndSubscribe, min, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                trace(() -> {
                    return new StringBuilder(52).append("Received ").append(pollUntilAtLeastNumRecords.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString();
                });
                createTransactionalProducer.beginTransaction();
                boolean z = create2.elem % 3 == 0;
                pollUntilAtLeastNumRecords.foreach(consumerRecord -> {
                    return createTransactionalProducer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), (Integer) null, (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), !z), new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), true));
                });
                trace(() -> {
                    return new StringBuilder(35).append("Sent ").append(pollUntilAtLeastNumRecords.size()).append(" messages. Committing offsets.").toString();
                });
                function3.apply(createTransactionalProducer, "myGroup", createConsumerAndSubscribe);
                if (z) {
                    trace(() -> {
                        return new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(pollUntilAtLeastNumRecords.size()).append(" messages.").toString();
                    });
                    createTransactionalProducer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(createConsumerAndSubscribe);
                } else {
                    trace(() -> {
                        return new StringBuilder(55).append("Committed offsets. committing transaction of ").append(pollUntilAtLeastNumRecords.size()).append(" messages.").toString();
                    });
                    createTransactionalProducer.commitTransaction();
                    create.elem += pollUntilAtLeastNumRecords.size();
                }
                create2.elem++;
            }
            bounceScheduler.shutdown();
            KafkaConsumer<byte[], byte[]> createConsumerAndSubscribe2 = createConsumerAndSubscribe("randomGroup", new $colon.colon(kafka$api$TransactionsBounceTest$$outputTopic(), Nil$.MODULE$), true);
            HashMap hashMap = new HashMap();
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(createConsumerAndSubscribe2, 10000, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach(consumerRecord2 -> {
                $anonfun$testBrokerFailure$7(hashMap, consumerRecord2);
                return BoxedUnit.UNIT;
            });
            ListBuffer listBuffer = new ListBuffer();
            hashMap.values().foreach(listBuffer2 -> {
                $anonfun$testBrokerFailure$9(listBuffer, listBuffer2);
                return BoxedUnit.UNIT;
            });
            Set set = listBuffer.toSet();
            Assert.assertEquals(10000, set.size());
            RichInt$ richInt$ = RichInt$.MODULE$;
            if (Predef$.MODULE$ == null) {
                throw null;
            }
            Set set2 = richInt$.until$extension0(0, 10000).toSet();
            Assert.assertEquals(new StringBuilder(18).append("Missing messages: ").append(set2.$minus$minus(set)).toString(), set2, set);
        } catch (Throwable th) {
            bounceScheduler.shutdown();
            throw th;
        }
    }

    private KafkaProducer<byte[], byte[]> createTransactionalProducer(String str) {
        Properties properties = new Properties();
        properties.put("acks", "all");
        properties.put("batch.size", "512");
        properties.put("transactional.id", str);
        properties.put("enable.idempotence", "true");
        return createProducer(createProducer$default$1(), createProducer$default$2(), properties);
    }

    private KafkaConsumer<byte[], byte[]> createConsumerAndSubscribe(String str, List<String> list, boolean z) {
        Properties properties = new Properties();
        properties.put("group.id", str);
        properties.put("enable.auto.commit", "false");
        properties.put("isolation.level", z ? "read_committed" : "read_uncommitted");
        KafkaConsumer<byte[], byte[]> createConsumer = createConsumer(createConsumer$default$1(), createConsumer$default$2(), properties, createConsumer$default$4());
        createConsumer.subscribe((java.util.List) CollectionConverters$.MODULE$.seqAsJavaListConverter(list).asJava());
        return createConsumer;
    }

    private boolean createConsumerAndSubscribe$default$3() {
        return false;
    }

    private Map<Object, Object> createTopics() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), Integer.toString(2));
        createTopic(inputTopic(), kafka$api$TransactionsBounceTest$$numPartitions(), 3, properties);
        return createTopic(kafka$api$TransactionsBounceTest$$outputTopic(), kafka$api$TransactionsBounceTest$$numPartitions(), 3, properties);
    }

    public static final /* synthetic */ void $anonfun$testWithGroupId$1(KafkaProducer kafkaProducer, String str, KafkaConsumer kafkaConsumer) {
        kafkaProducer.sendOffsetsToTransaction((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions(kafkaConsumer)).asJava(), str);
    }

    public static final /* synthetic */ void $anonfun$testWithGroupMetadata$1(KafkaProducer kafkaProducer, String str, KafkaConsumer kafkaConsumer) {
        kafkaProducer.sendOffsetsToTransaction((java.util.Map) CollectionConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions(kafkaConsumer)).asJava(), kafkaConsumer.groupMetadata());
    }

    public static final /* synthetic */ void $anonfun$testBrokerFailure$7(HashMap hashMap, ConsumerRecord consumerRecord) {
        Predef$ predef$ = Predef$.MODULE$;
        String assertCommittedAndGetValue = TestUtils$.MODULE$.assertCommittedAndGetValue(consumerRecord);
        if (predef$ == null) {
            throw null;
        }
        ((BufferLike) hashMap.getOrElseUpdate(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), () -> {
            return new ListBuffer();
        })).append(Predef$.MODULE$.wrapIntArray(new int[]{new StringOps(assertCommittedAndGetValue).toInt()}));
    }

    public static final /* synthetic */ void $anonfun$testBrokerFailure$9(ListBuffer listBuffer, ListBuffer listBuffer2) {
        Assert.assertEquals("Out of order messages detected", listBuffer2, listBuffer2.sorted(Ordering$Int$.MODULE$));
        listBuffer.appendAll(listBuffer2);
    }

    public TransactionsBounceTest() {
        overridingProps().put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), Boolean.toString(false));
        overridingProps().put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), Integer.toString(serverMessageMaxBytes()));
        overridingProps().put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), Boolean.toString(true));
        overridingProps().put(KafkaConfig$.MODULE$.UncleanLeaderElectionEnableProp(), Boolean.toString(false));
        overridingProps().put(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), Boolean.toString(false));
        overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), Integer.toString(1));
        overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), Integer.toString(3));
        overridingProps().put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), Integer.toString(2));
        overridingProps().put(KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), Integer.toString(1));
        overridingProps().put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), Integer.toString(3));
        overridingProps().put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), "10");
        overridingProps().put(KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), "0");
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupId$1$adapted(KafkaProducer kafkaProducer, String str, KafkaConsumer kafkaConsumer) {
        $anonfun$testWithGroupId$1(kafkaProducer, str, kafkaConsumer);
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupMetadata$1$adapted(KafkaProducer kafkaProducer, String str, KafkaConsumer kafkaConsumer) {
        $anonfun$testWithGroupMetadata$1(kafkaProducer, str, kafkaConsumer);
        return BoxedUnit.UNIT;
    }
}
