package kafka.producer;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Properties;
import kafka.api.ProducerRequest;
import kafka.api.ProducerResponse;
import kafka.api.ProducerResponseStatus;
import kafka.common.TopicAndPartition;
import kafka.integration.KafkaServerTestHarness;
import kafka.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.NoCompressionCodec$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils$;
import kafka.zk.AdminZkClient;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.junit.Test;
import org.scalactic.source.Position;
import scala.Predef$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.reflect.ScalaSignature;

/* compiled from: SyncProducerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=b\u0001B\u0001\u0003\u0001\u001d\u0011\u0001cU=oGB\u0013x\u000eZ;dKJ$Vm\u001d;\u000b\u0005\r!\u0011\u0001\u00039s_\u0012,8-\u001a:\u000b\u0003\u0015\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001\u0011A\u0011\u0011\u0002D\u0007\u0002\u0015)\u00111\u0002B\u0001\fS:$Xm\u001a:bi&|g.\u0003\u0002\u000e\u0015\t12*\u00194lCN+'O^3s)\u0016\u001cH\u000fS1s]\u0016\u001c8\u000fC\u0003\u0010\u0001\u0011\u0005\u0001#\u0001\u0004=S:LGO\u0010\u000b\u0002#A\u0011!\u0003A\u0007\u0002\u0005!9A\u0003\u0001b\u0001\n\u0013)\u0012\u0001D7fgN\fw-\u001a\"zi\u0016\u001cX#\u0001\f\u0011\u0007]QB$D\u0001\u0019\u0015\u0005I\u0012!B:dC2\f\u0017BA\u000e\u0019\u0005\u0015\t%O]1z!\t9R$\u0003\u0002\u001f1\t!!)\u001f;f\u0011\u0019\u0001\u0003\u0001)A\u0005-\u0005iQ.Z:tC\u001e,')\u001f;fg\u0002BQA\t\u0001\u0005\u0002\r\nqbZ3oKJ\fG/Z\"p]\u001aLwm]\u000b\u0002IA\u0019QE\u000b\u0017\u000e\u0003\u0019R!a\n\u0015\u0002\u0013%lW.\u001e;bE2,'BA\u0015\u0019\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003W\u0019\u0012A\u0001T5tiB\u0011Q\u0006M\u0007\u0002])\u0011q\u0006B\u0001\u0007g\u0016\u0014h/\u001a:\n\u0005Er#aC&bM.\f7i\u001c8gS\u001eDQa\r\u0001\u0005\nQ\na\u0002\u001d:pIV\u001cWMU3rk\u0016\u001cH\u000f\u0006\u00056w\u0011K\u0005K\u0015+W!\t1\u0014(D\u00018\u0015\tAD!A\u0002ba&L!AO\u001c\u0003\u001fA\u0013x\u000eZ;dKJ\u0014V-];fgRDQ\u0001\u0010\u001aA\u0002u\nQ\u0001^8qS\u000e\u0004\"AP!\u000f\u0005]y\u0014B\u0001!\u0019\u0003\u0019\u0001&/\u001a3fM&\u0011!i\u0011\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005\u0001C\u0002\"B#3\u0001\u00041\u0015!\u00039beRLG/[8o!\t9r)\u0003\u0002I1\t\u0019\u0011J\u001c;\t\u000b)\u0013\u0004\u0019A&\u0002\u000f5,7o]1hKB\u0011AJT\u0007\u0002\u001b*\u0011!\nB\u0005\u0003\u001f6\u0013ACQ=uK\n+hMZ3s\u001b\u0016\u001c8/Y4f'\u0016$\b\"B)3\u0001\u00041\u0015\u0001B1dWNDqa\u0015\u001a\u0011\u0002\u0003\u0007a)A\u0004uS6,w.\u001e;\t\u000fU\u0013\u0004\u0013!a\u0001\r\u0006i1m\u001c:sK2\fG/[8o\u0013\u0012Dqa\u0016\u001a\u0011\u0002\u0003\u0007Q(\u0001\u0005dY&,g\u000e^%e\u0011\u0015I\u0006\u0001\"\u0001[\u0003M!Xm\u001d;SK\u0006\u001c\u0007.\u00192mKN+'O^3s)\u0005Y\u0006CA\f]\u0013\ti\u0006D\u0001\u0003V]&$\bF\u0001-`!\t\u0001W-D\u0001b\u0015\t\u00117-A\u0003kk:LGOC\u0001e\u0003\ry'oZ\u0005\u0003M\u0006\u0014A\u0001V3ti\")\u0001\u000e\u0001C\u00015\u00069B/Z:u\u000b6\u0004H/\u001f)s_\u0012,8-\u001a*fcV,7\u000f\u001e\u0015\u0003O~CQa\u001b\u0001\u0005\u0002i\u000bq\u0003^3ti6+7o]1hKNK'0\u001a+p_2\u000b'oZ3)\u0005)|\u0006\"\u00028\u0001\t\u0003Q\u0016A\t;fgRlUm]:bO\u0016\u001c\u0016N_3U_>d\u0015M]4f/&$\b.Q2l5\u0016\u0014x\u000e\u000b\u0002n?\")\u0011\u000f\u0001C\u00015\u0006!C/Z:u!J|G-^2f\u0007>\u0014(/Z2uYf\u0014VmY3jm\u0016\u001c(+Z:q_:\u001cX\r\u000b\u0002q?\")A\u000f\u0001C\u00015\u00061B/Z:u!J|G-^2fe\u000e\u000bg\u000eV5nK>,H\u000f\u000b\u0002t?\")q\u000f\u0001C\u00015\u0006\u0001C/Z:u!J|G-^2f%\u0016\fX/Z:u/&$\bNT8SKN\u0004xN\\:fQ\t1x\fC\u0003{\u0001\u0011\u0005!,A\u000buKN$hj\u001c;F]>,x\r\u001b*fa2L7-Y:)\u0005e|\u0006bB?\u0001#\u0003%IA`\u0001\u0019aJ|G-^2f%\u0016\fX/Z:uI\u0011,g-Y;mi\u0012*T#A@+\u0007\u0019\u000b\ta\u000b\u0002\u0002\u0004A!\u0011QAA\b\u001b\t\t9A\u0003\u0003\u0002\n\u0005-\u0011!C;oG\",7m[3e\u0015\r\ti\u0001G\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA\t\u0003\u000f\u0011\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0011!\t)\u0002AI\u0001\n\u0013q\u0018\u0001\u00079s_\u0012,8-\u001a*fcV,7\u000f\u001e\u0013eK\u001a\fW\u000f\u001c;%m!I\u0011\u0011\u0004\u0001\u0012\u0002\u0013%\u00111D\u0001\u0019aJ|G-^2f%\u0016\fX/Z:uI\u0011,g-Y;mi\u0012:TCAA\u000fU\ri\u0014\u0011\u0001\u0015\b\u0001\u0005\u0005\u0012qEA\u0016!\r9\u00121E\u0005\u0004\u0003KA\"A\u00033faJ,7-\u0019;fI\u0006\u0012\u0011\u0011F\u0001I)\"L7\u000f\t;fgR\u0004\u0003.Y:!E\u0016,g\u000e\t3faJ,7-\u0019;fI\u0002\ng\u000e\u001a\u0011ji\u0002:\u0018\u000e\u001c7!E\u0016\u0004#/Z7pm\u0016$\u0007%\u001b8!C\u00022W\u000f^;sK\u0002\u0012X\r\\3bg\u0016\f#!!\f\u0002\u0011Ar\u0013\u0007\r\u00181]A\u0002")
/* loaded from: input_file:kafka/producer/SyncProducerTest.class */
public class SyncProducerTest extends KafkaServerTestHarness {
    private final byte[] messageBytes = new byte[2];

    private byte[] messageBytes() {
        return this.messageBytes;
    }

    @Override // kafka.integration.KafkaServerTestHarness
    /* renamed from: generateConfigs, reason: merged with bridge method [inline-methods] */
    public List<KafkaConfig> mo557generateConfigs() {
        return List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KafkaConfig[]{KafkaConfig$.MODULE$.fromProps((Properties) TestUtils$.MODULE$.createBrokerConfigs(1, zkConnect(), false, TestUtils$.MODULE$.createBrokerConfigs$default$4(), TestUtils$.MODULE$.createBrokerConfigs$default$5(), TestUtils$.MODULE$.createBrokerConfigs$default$6(), TestUtils$.MODULE$.createBrokerConfigs$default$7(), TestUtils$.MODULE$.createBrokerConfigs$default$8(), TestUtils$.MODULE$.createBrokerConfigs$default$9(), TestUtils$.MODULE$.createBrokerConfigs$default$10(), TestUtils$.MODULE$.createBrokerConfigs$default$11(), TestUtils$.MODULE$.createBrokerConfigs$default$12(), TestUtils$.MODULE$.createBrokerConfigs$default$13(), TestUtils$.MODULE$.createBrokerConfigs$default$14()).head())}));
    }

    private ProducerRequest produceRequest(String str, int i, ByteBufferMessageSet byteBufferMessageSet, int i2, int i3, int i4, String str2) {
        return TestUtils$.MODULE$.produceRequest(str, i, byteBufferMessageSet, i2, i3, i4, str2);
    }

    private int produceRequest$default$5() {
        return SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs();
    }

    private int produceRequest$default$6() {
        return 0;
    }

    private String produceRequest$default$7() {
        return SyncProducerConfig$.MODULE$.DefaultClientId();
    }

    @Test
    public void testReachableServer() {
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(boundPort((KafkaServer) servers().head()))));
        long milliseconds = Time.SYSTEM.milliseconds();
        Assert.assertNotNull(syncProducer.send(produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7())));
        Assert.assertTrue(Time.SYSTEM.milliseconds() - milliseconds < 12000);
        long milliseconds2 = Time.SYSTEM.milliseconds();
        Assert.assertNotNull(syncProducer.send(produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7())));
        Assert.assertTrue(Time.SYSTEM.milliseconds() - milliseconds2 < 12000);
        Assert.assertNotNull(syncProducer.send(produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7())));
    }

    @Test
    public void testEmptyProduceRequest() {
        ProducerResponse send = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(boundPort((KafkaServer) servers().head())))).send(new ProducerRequest(0, SyncProducerConfig$.MODULE$.DefaultClientId(), (short) 1, SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), Map$.MODULE$.apply(Nil$.MODULE$)));
        Assert.assertTrue(send != null);
        Assert.assertTrue(!send.hasError() && send.status().isEmpty());
    }

    @Test
    public void testMessageSizeTooLarge() {
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(boundPort((KafkaServer) servers().head()))));
        createTopic("test", 1, 1, createTopic$default$4());
        ProducerResponse send = syncProducer.send(produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig) configs().head()).messageMaxBytes()) + 1])})), 1, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7()));
        Assert.assertEquals(1L, send.status().count(new SyncProducerTest$$anonfun$testMessageSizeTooLarge$1(this)));
        Assert.assertEquals(Errors.MESSAGE_TOO_LARGE, ((ProducerResponseStatus) send.status().apply(new TopicAndPartition("test", 0))).error());
        Assert.assertEquals(-1L, ((ProducerResponseStatus) send.status().apply(new TopicAndPartition("test", 0))).offset());
        ProducerResponse send2 = syncProducer.send(produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[(Predef$.MODULE$.Integer2int(((KafkaConfig) configs().head()).messageMaxBytes()) - 61) - 21])})), 1, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7()));
        Assert.assertEquals(1L, send.status().count(new SyncProducerTest$$anonfun$testMessageSizeTooLarge$2(this)));
        Assert.assertEquals(Errors.NONE, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("test", 0))).error());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("test", 0))).offset());
    }

    @Test
    public void testMessageSizeTooLargeWithAckZero() {
        Properties syncProducerConfig = TestUtils$.MODULE$.getSyncProducerConfig(boundPort((KafkaServer) servers().head()));
        syncProducerConfig.put("request.required.acks", "0");
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(syncProducerConfig));
        AdminZkClient adminZkClient = adminZkClient();
        adminZkClient.createTopic("test", 1, 1, adminZkClient.createTopic$default$4(), adminZkClient.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "test", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        syncProducer.send(produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig) configs().head()).messageMaxBytes()) + 1])})), 0, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7()));
        try {
            syncProducer.send(produceRequest("test", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(new byte[Predef$.MODULE$.Integer2int(((KafkaConfig) configs().head()).messageMaxBytes()) + 1])})), 0, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7()));
        } catch (IOException unused) {
        }
    }

    @Test
    public void testProduceCorrectlyReceivesResponse() {
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(boundPort((KafkaServer) servers().head()))));
        ProducerRequest produceRequestWithAcks = TestUtils$.MODULE$.produceRequestWithAcks(Predef$.MODULE$.wrapRefArray(new String[]{"topic1", "topic2", "topic3"}), Predef$.MODULE$.wrapIntArray(new int[]{0}), new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), TestUtils$.MODULE$.produceRequestWithAcks$default$6(), SyncProducerConfig$.MODULE$.DefaultClientId());
        ProducerResponse send = syncProducer.send(produceRequestWithAcks);
        Assert.assertNotNull(send);
        Assert.assertEquals(produceRequestWithAcks.correlationId(), send.correlationId());
        Assert.assertEquals(3L, send.status().size());
        send.status().values().foreach(new SyncProducerTest$$anonfun$testProduceCorrectlyReceivesResponse$1(this));
        AdminZkClient adminZkClient = adminZkClient();
        adminZkClient.createTopic("topic1", 1, 1, adminZkClient.createTopic$default$4(), adminZkClient.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "topic1", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        AdminZkClient adminZkClient2 = adminZkClient();
        adminZkClient2.createTopic("topic3", 1, 1, adminZkClient2.createTopic$default$4(), adminZkClient2.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "topic3", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        ProducerResponse send2 = syncProducer.send(produceRequestWithAcks);
        Assert.assertNotNull(send2);
        Assert.assertEquals(produceRequestWithAcks.correlationId(), send2.correlationId());
        Assert.assertEquals(3L, send2.status().size());
        Assert.assertEquals(Errors.NONE, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic1", 0))).error());
        Assert.assertEquals(Errors.NONE, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic3", 0))).error());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic1", 0))).offset());
        Assert.assertEquals(0L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic3", 0))).offset());
        Assert.assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic2", 0))).error());
        Assert.assertEquals(-1L, ((ProducerResponseStatus) send2.status().apply(new TopicAndPartition("topic2", 0))).offset());
    }

    @Test
    public void testProducerCanTimeout() {
        KafkaServer kafkaServer = (KafkaServer) servers().head();
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(boundPort(kafkaServer))));
        ProducerRequest produceRequest = produceRequest("topic1", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), 1, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7());
        kafkaServer.requestHandlerPool().shutdown();
        long milliseconds = Time.SYSTEM.milliseconds();
        try {
            syncProducer.send(produceRequest);
            throw fail("Should have received timeout exception since request handling is stopped.", new Position("SyncProducerTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 209));
        } catch (SocketTimeoutException unused) {
            Assert.assertTrue(Time.SYSTEM.milliseconds() - milliseconds >= ((long) 500));
        }
    }

    @Test
    public void testProduceRequestWithNoResponse() {
        Assert.assertTrue(new SyncProducer(new SyncProducerConfig(TestUtils$.MODULE$.getSyncProducerConfig(TestUtils$.MODULE$.boundPort((KafkaServer) servers().head(), TestUtils$.MODULE$.boundPort$default$2())))).send(new ProducerRequest(0, SyncProducerConfig$.MODULE$.DefaultClientId(), (short) 0, SyncProducerConfig$.MODULE$.DefaultAckTimeoutMs(), Map$.MODULE$.apply(Nil$.MODULE$))) == null);
    }

    @Test
    public void testNotEnoughReplicas() {
        Properties syncProducerConfig = TestUtils$.MODULE$.getSyncProducerConfig(boundPort((KafkaServer) servers().head()));
        syncProducerConfig.put("request.required.acks", "-1");
        SyncProducer syncProducer = new SyncProducer(new SyncProducerConfig(syncProducerConfig));
        Properties properties = new Properties();
        properties.put("min.insync.replicas", "2");
        AdminZkClient adminZkClient = adminZkClient();
        adminZkClient.createTopic("minisrtest", 1, 1, properties, adminZkClient.createTopic$default$5());
        TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(zkClient(), "minisrtest", 0, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
        Assert.assertEquals(Errors.NOT_ENOUGH_REPLICAS, ((ProducerResponseStatus) syncProducer.send(produceRequest("minisrtest", 0, new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, Predef$.MODULE$.wrapRefArray(new Message[]{new Message(messageBytes())})), -1, produceRequest$default$5(), produceRequest$default$6(), produceRequest$default$7())).status().apply(new TopicAndPartition("minisrtest", 0))).error());
    }
}
