/*
 * Decompiled with CFR 0.152.
 */
package kafka.api;

import java.io.Serializable;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.api.BaseProducerSendTest;
import kafka.log.LogConfig$;
import kafka.server.Defaults$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Assert;
import org.junit.Test;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0005\u00113AAC\u0006\u0001!!)Q\u0003\u0001C\u0001-!)\u0001\u0004\u0001C\u00013!)a\u0007\u0001C\u00013!)\u0001\b\u0001C\u00013!)!\b\u0001C\u00013!)A\b\u0001C\u00013!)a\b\u0001C\u00013!)\u0001\t\u0001C\u00013!)!\t\u0001C\u00013\tI\u0002\u000b\\1j]R,\u0007\u0010\u001e)s_\u0012,8-\u001a:TK:$G+Z:u\u0015\taQ\"A\u0002ba&T\u0011AD\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0011\u0003\u0005\u0002\u0013'5\t1\"\u0003\u0002\u0015\u0017\t!\")Y:f!J|G-^2feN+g\u000e\u001a+fgR\fa\u0001P5oSRtD#A\f\u0011\u0005I\u0001\u0011a\u0005;fgR<&o\u001c8h'\u0016\u0014\u0018.\u00197ju\u0016\u0014H#\u0001\u000e\u0011\u0005mqR\"\u0001\u000f\u000b\u0003u\tQa]2bY\u0006L!a\b\u000f\u0003\tUs\u0017\u000e\u001e\u0015\u0005\u0005\u0005J#\u0006\u0005\u0002#O5\t1E\u0003\u0002%K\u0005)!.\u001e8ji*\ta%A\u0002pe\u001eL!\u0001K\u0012\u0003\tQ+7\u000f^\u0001\tKb\u0004Xm\u0019;fI\u000e\n1\u0006\u0005\u0002-i5\tQF\u0003\u0002/_\u00051QM\u001d:peNT!\u0001M\u0019\u0002\r\r|W.\\8o\u0015\tq!G\u0003\u00024K\u00051\u0011\r]1dQ\u0016L!!N\u0017\u0003-M+'/[1mSj\fG/[8o\u000bb\u001cW\r\u001d;j_:\f\u0011\u0003^3ti\n\u000bGo\u00195TSj,',\u001a:pQ\t\u0019\u0011%\u0001\u0016uKN$8+\u001a8e\u0007>l\u0007O]3tg\u0016$W*Z:tC\u001e,w+\u001b;i\u0019><\u0017\t\u001d9f]\u0012$\u0016.\\3)\u0005\u0011\t\u0013!\f;fgR\u001cVM\u001c3O_:\u001cu.\u001c9sKN\u001cX\rZ'fgN\fw-Z,ji\"dunZ!qa\u0016tG\rV5nK\"\u0012Q!I\u0001\u0014i\u0016\u001cH/Q;u_\u000e\u0013X-\u0019;f)>\u0004\u0018n\u0019\u0015\u0003\r\u0005\nQ\u0004^3tiN+g\u000eZ,ji\"LeN^1mS\u0012\u001c%/Z1uKRKW.\u001a\u0015\u0003\u000f\u0005\nq\u0003^3ti:{gN\u00117pG.Lgn\u001a)s_\u0012,8-\u001a:)\u0005!\t\u0013A\f;fgR\u001cVM\u001c3SK\u000e|'\u000f\u001a\"bi\u000eDw+\u001b;i\u001b\u0006D(+Z9vKN$8+\u001b>f\u0003:$\u0007*[4iKJD#!C\u0011")
public class PlaintextProducerSendTest
extends BaseProducerSendTest {
    @Test(expected=SerializationException.class)
    public void testWrongSerializer() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", this.brokerList());
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<byte[], byte[]> producer = this.registerProducer((KafkaProducer<byte[], byte[]>)new KafkaProducer(producerProps));
        ProducerRecord record = new ProducerRecord(this.topic(), Integer.valueOf(0), (Object)"key".getBytes(), (Object)"value".getBytes());
        producer.send(record);
    }

    @Test
    public void testBatchSizeZero() {
        KafkaProducer<byte[], byte[]> producer = this.createProducer(this.brokerList(), Integer.MAX_VALUE, Integer.MAX_VALUE, 0, this.createProducer$default$5(), this.createProducer$default$6(), this.createProducer$default$7());
        this.sendAndVerify(producer, this.sendAndVerify$default$2(), this.sendAndVerify$default$3());
    }

    @Test
    public void testSendCompressedMessageWithLogAppendTime() {
        String x$1 = this.brokerList();
        int x$5 = this.createProducer$default$4();
        long x$6 = this.createProducer$default$6();
        long x$7 = this.createProducer$default$7();
        KafkaProducer<byte[], byte[]> producer = this.createProducer(x$1, Integer.MAX_VALUE, Integer.MAX_VALUE, x$5, "gzip", x$6, x$7);
        this.sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME);
    }

    @Test
    public void testSendNonCompressedMessageWithLogAppendTime() {
        KafkaProducer<byte[], byte[]> producer = this.createProducer(this.brokerList(), Integer.MAX_VALUE, Integer.MAX_VALUE, this.createProducer$default$4(), this.createProducer$default$5(), this.createProducer$default$6(), this.createProducer$default$7());
        this.sendAndVerifyTimestamp(producer, TimestampType.LOG_APPEND_TIME);
    }

    @Test
    public void testAutoCreateTopic() {
        try (KafkaProducer<byte[], byte[]> producer = this.createProducer(this.brokerList(), this.createProducer$default$2(), this.createProducer$default$3(), this.createProducer$default$4(), this.createProducer$default$5(), this.createProducer$default$6(), this.createProducer$default$7());){
            ProducerRecord record = new ProducerRecord(this.topic(), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            Assert.assertEquals((String)"Should have offset 0", (long)0L, (long)((RecordMetadata)producer.send(record).get()).offset());
            TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.zkClient(), this.topic(), 0, 30000L, (Option<Object>)None$.MODULE$, (Option<Object>)None$.MODULE$);
        }
    }

    @Test
    public void testSendWithInvalidCreateTime() {
        Properties topicProps = new Properties();
        topicProps.setProperty(LogConfig$.MODULE$.MessageTimestampDifferenceMaxMsProp(), "1000");
        this.createTopic(this.topic(), 1, 2, topicProps);
        try (KafkaProducer<byte[], byte[]> producer = this.createProducer(this.brokerList(), this.createProducer$default$2(), this.createProducer$default$3(), this.createProducer$default$4(), this.createProducer$default$5(), this.createProducer$default$6(), this.createProducer$default$7());){
            try {
                producer.send(new ProducerRecord(this.topic(), Integer.valueOf(0), Long.valueOf(System.currentTimeMillis() - 1001L), (Object)"key".getBytes(), (Object)"value".getBytes())).get();
                Assert.fail((String)"Should throw CorruptedRecordException");
            }
            catch (ExecutionException executionException) {
                Assert.assertTrue((boolean)(executionException.getCause() instanceof InvalidTimestampException));
            }
        }
        String x$1 = this.brokerList();
        int x$3 = this.createProducer$default$2();
        int x$4 = this.createProducer$default$3();
        int x$5 = this.createProducer$default$4();
        long x$6 = this.createProducer$default$6();
        long x$7 = this.createProducer$default$7();
        try (KafkaProducer<byte[], byte[]> compressedProducer = this.createProducer(x$1, x$3, x$4, x$5, "gzip", x$6, x$7);){
            try {
                compressedProducer.send(new ProducerRecord(this.topic(), Integer.valueOf(0), Long.valueOf(System.currentTimeMillis() - 1001L), (Object)"key".getBytes(), (Object)"value".getBytes())).get();
                Assert.fail((String)"Should throw CorruptedRecordException");
            }
            catch (ExecutionException executionException) {
                Assert.assertTrue((boolean)(executionException.getCause() instanceof InvalidTimestampException));
            }
        }
    }

    @Test
    public void testNonBlockingProducer() {
        String x$1 = this.brokerList();
        int x$3 = this.createProducer$default$2();
        int x$4 = this.createProducer$default$3();
        int x$5 = this.createProducer$default$4();
        String x$6 = this.createProducer$default$5();
        long x$7 = this.createProducer$default$7();
        KafkaProducer<byte[], byte[]> producer = this.createProducer(x$1, x$3, x$4, x$5, x$6, 0L, x$7);
        PlaintextProducerSendTest.verifyMetadataNotAvailable$1(this.send$1(producer));
        Future future = this.sendUntilQueued$1(producer);
        this.verifySendSuccess$1(future);
        String x$8 = this.brokerList();
        int x$13 = this.createProducer$default$3();
        String x$14 = this.createProducer$default$5();
        KafkaProducer<byte[], byte[]> producer2 = this.createProducer(x$8, 15000, x$13, 1100, x$14, 0L, 1500L);
        Future future2 = this.sendUntilQueued$1(producer2);
        PlaintextProducerSendTest.verifyBufferExhausted$1(this.send$1(producer2));
        this.verifySendSuccess$1(future2);
    }

    @Test
    public void testSendRecordBatchWithMaxRequestSizeAndHigher() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", this.brokerList());
        KafkaProducer<byte[], byte[]> producer = this.registerProducer((KafkaProducer<byte[], byte[]>)new KafkaProducer(producerProps, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer()));
        int keyLengthSize = 1;
        int headerLengthSize = 1;
        int valueLengthSize = 3;
        int overhead = 94 + keyLengthSize + headerLengthSize + valueLengthSize;
        int valueSize = Defaults$.MODULE$.MessageMaxBytes() - overhead;
        ProducerRecord record0 = new ProducerRecord(this.topic(), (Object)new byte[0], (Object)new byte[valueSize]);
        Assert.assertEquals((long)((byte[])record0.value()).length, (long)((RecordMetadata)producer.send(record0).get()).serializedValueSize());
        ProducerRecord record1 = new ProducerRecord(this.topic(), (Object)new byte[0], (Object)new byte[valueSize + 1]);
        Assert.assertEquals(RecordTooLargeException.class, ((Throwable)Assertions$.MODULE$.intercept((Function0 & Serializable)() -> (RecordMetadata)producer.send(record1).get(), ClassTag$.MODULE$.apply(ExecutionException.class), new Position("PlaintextProducerSendTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 196))).getCause().getClass());
    }

    private final Future send$1(KafkaProducer producer) {
        return producer.send(new ProducerRecord(this.topic(), Integer.valueOf(0), (Object)"key".getBytes(), (Object)new byte[1000]));
    }

    public static final /* synthetic */ boolean $anonfun$testNonBlockingProducer$2(Future future) {
        if (future.isDone()) {
            try {
                future.get();
                return true;
            }
            catch (ExecutionException executionException) {
                return false;
            }
        }
        return true;
    }

    /*
     * WARNING - void declaration
     */
    private final Future sendUntilQueued$1(KafkaProducer producer) {
        long l = 100L;
        long computeUntilTrue_waitTime = 15000L;
        long computeUntilTrue_startTime = System.currentTimeMillis();
        while (true) {
            void computeUntilTrue_pause;
            Future computeUntilTrue_result;
            if (PlaintextProducerSendTest.$anonfun$testNonBlockingProducer$2(computeUntilTrue_result = this.send$1(producer))) {
                return computeUntilTrue_result;
            }
            if (System.currentTimeMillis() > computeUntilTrue_startTime + computeUntilTrue_waitTime) {
                return computeUntilTrue_result;
            }
            Thread.sleep(Math.min(computeUntilTrue_waitTime, (long)computeUntilTrue_pause));
        }
    }

    private final void verifySendSuccess$1(Future future) {
        RecordMetadata recordMetadata = (RecordMetadata)future.get(30L, TimeUnit.SECONDS);
        Assert.assertEquals((Object)this.topic(), (Object)recordMetadata.topic());
        Assert.assertEquals((long)0L, (long)recordMetadata.partition());
        Assert.assertTrue((String)new StringBuilder(15).append("Invalid offset ").append(recordMetadata).toString(), (recordMetadata.offset() >= 0L ? 1 : 0) != 0);
    }

    private static final void verifyMetadataNotAvailable$1(Future future) {
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertEquals(TimeoutException.class, ((Throwable)Assertions$.MODULE$.intercept((Function0 & Serializable)() -> (RecordMetadata)future.get(), ClassTag$.MODULE$.apply(ExecutionException.class), new Position("PlaintextProducerSendTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 155))).getCause().getClass());
    }

    private static final void verifyBufferExhausted$1(Future future) {
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertEquals(BufferExhaustedException.class, ((Throwable)Assertions$.MODULE$.intercept((Function0 & Serializable)() -> (RecordMetadata)future.get(), ClassTag$.MODULE$.apply(ExecutionException.class), new Position("PlaintextProducerSendTest.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 160))).getCause().getClass());
    }

    public static final /* synthetic */ Object $anonfun$testNonBlockingProducer$2$adapted(Future future) {
        return BoxesRunTime.boxToBoolean((boolean)PlaintextProducerSendTest.$anonfun$testNonBlockingProducer$2(future));
    }
}

