package kafka.producer;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import kafka.api.PartitionMetadata;
import kafka.api.PartitionMetadata$;
import kafka.api.ProducerRequest;
import kafka.api.ProducerResponse;
import kafka.api.ProducerResponse$;
import kafka.api.ProducerResponseStatus;
import kafka.api.TopicMetadata;
import kafka.api.TopicMetadata$;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping$;
import kafka.common.FailedToSendMessageException;
import kafka.common.QueueFullException;
import kafka.common.TopicAndPartition;
import kafka.javaapi.producer.Producer;
import kafka.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.NoCompressionCodec$;
import kafka.producer.async.DefaultEventHandler;
import kafka.producer.async.EventHandler;
import kafka.producer.async.ProducerSendThread;
import kafka.serializer.DefaultEncoder;
import kafka.serializer.Encoder;
import kafka.serializer.NullEncoder;
import kafka.serializer.NullEncoder$;
import kafka.serializer.StringEncoder;
import kafka.serializer.StringEncoder$;
import kafka.server.KafkaConfig;
import kafka.utils.FixedValuePartitioner;
import kafka.utils.FixedValuePartitioner$;
import kafka.utils.IntEncoder;
import kafka.utils.IntEncoder$;
import kafka.utils.TestUtils$;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.JavaConversions$;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.HashMap;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

/* compiled from: AsyncProducerTest.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Ud\u0001B\u0001\u0003\u0001\u001d\u0011\u0011#Q:z]\u000e\u0004&o\u001c3vG\u0016\u0014H+Z:u\u0015\t\u0019A!\u0001\u0005qe>$WoY3s\u0015\u0005)\u0011!B6bM.\f7\u0001A\n\u0003\u0001!\u0001\"!\u0003\u0007\u000e\u0003)Q\u0011aC\u0001\u0006g\u000e\fG.Y\u0005\u0003\u001b)\u0011a!\u00118z%\u00164\u0007\"B\b\u0001\t\u0003\u0001\u0012A\u0002\u001fj]&$h\bF\u0001\u0012!\t\u0011\u0002!D\u0001\u0003\u0011\u001d!\u0002A1A\u0005\u0002U\tQ\u0001\u001d:paN,\u0012A\u0006\t\u0004/iaR\"\u0001\r\u000b\u0005eQ\u0011AC2pY2,7\r^5p]&\u00111\u0004\u0007\u0002\u0004'\u0016\f\bCA\u000f#\u001b\u0005q\"BA\u0010!\u0003\u0011)H/\u001b7\u000b\u0003\u0005\nAA[1wC&\u00111E\b\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\bBB\u0013\u0001A\u0003%a#\u0001\u0004qe>\u00048\u000f\t\u0005\bO\u0001\u0011\r\u0011\"\u0001)\u0003\u001d\u0019wN\u001c4jON,\u0012!\u000b\t\u0004/iQ\u0003CA\u0016/\u001b\u0005a#BA\u0017\u0005\u0003\u0019\u0019XM\u001d<fe&\u0011q\u0006\f\u0002\f\u0017\u000647.Y\"p]\u001aLw\r\u0003\u00042\u0001\u0001\u0006I!K\u0001\tG>tg-[4tA!91\u0007\u0001b\u0001\n\u0003!\u0014A\u00032s_.,'\u000fT5tiV\tQ\u0007\u0005\u00027s9\u0011\u0011bN\u0005\u0003q)\ta\u0001\u0015:fI\u00164\u0017B\u0001\u001e<\u0005\u0019\u0019FO]5oO*\u0011\u0001H\u0003\u0005\u0007{\u0001\u0001\u000b\u0011B\u001b\u0002\u0017\t\u0014xn[3s\u0019&\u001cH\u000f\t\u0005\u0006\u007f\u0001!\t\u0001Q\u0001\u0016i\u0016\u001cH\u000f\u0015:pIV\u001cWM])vKV,7+\u001b>f)\u0005\t\u0005CA\u0005C\u0013\t\u0019%B\u0001\u0003V]&$\bF\u0001 F!\t15*D\u0001H\u0015\tA\u0015*A\u0003kk:LGOC\u0001K\u0003\ry'oZ\u0005\u0003\u0019\u001e\u0013A\u0001V3ti\")a\n\u0001C\u0001\u0001\u00061B/Z:u!J|G-^2f\u0003\u001a$XM]\"m_N,G\r\u000b\u0002N\u000b\")\u0011\u000b\u0001C\u0001\u0001\u0006iA/Z:u\u0005\u0006$8\r[*ju\u0016D#\u0001U#\t\u000bQ\u0003A\u0011\u0001!\u0002)Q,7\u000f^)vKV,G+[7f\u000bb\u0004\u0018N]3eQ\t\u0019V\tC\u0003X\u0001\u0011\u0005\u0001)A\u000fuKN$\b+\u0019:uSRLwN\\!oI\u000e{G\u000e\\1uK\u00163XM\u001c;tQ\t1V\tC\u0003[\u0001\u0011\u0005\u0001)A\nuKN$8+\u001a:jC2L'0Z#wK:$8\u000f\u000b\u0002Z\u000b\")Q\f\u0001C\u0001\u0001\u0006!B/Z:u\u0013:4\u0018\r\\5e!\u0006\u0014H/\u001b;j_:D#\u0001X#\t\u000b\u0001\u0004A\u0011\u0001!\u0002\u0019Q,7\u000f\u001e(p\u0005J|7.\u001a:)\u0005}+\u0005\"B2\u0001\t\u0003\u0001\u0015a\u0006;fgRLenY8na\u0006$\u0018N\u00197f\u000b:\u001cw\u000eZ3sQ\t\u0011W\tC\u0003g\u0001\u0011\u0005\u0001)A\u000buKN$(+\u00198e_6\u0004\u0016M\u001d;ji&|g.\u001a:)\u0005\u0015,\u0005\"B5\u0001\t\u0003\u0001\u0015\u0001\u0007;fgR4\u0015-\u001b7fIN+g\u000e\u001a*fiJLHj\\4jG\"\u0012\u0001.\u0012\u0005\u0006Y\u0002!\t\u0001Q\u0001\u0011i\u0016\u001cHOS1wCB\u0013x\u000eZ;dKJD#a[#\t\u000b=\u0004A\u0011\u0001!\u00021Q,7\u000f^%om\u0006d\u0017\u000eZ\"p]\u001aLw-\u001e:bi&|g\u000e\u000b\u0002o\u000b\")!\u000f\u0001C\u0001g\u0006qq-\u001a;Qe>$WoY3ECR\fGc\u0001;\u0002\u0006A\u0019Q/`@\u000f\u0005Y\\hBA<{\u001b\u0005A(BA=\u0007\u0003\u0019a$o\\8u}%\t1\"\u0003\u0002}\u0015\u00059\u0001/Y2lC\u001e,\u0017BA\u000e\u007f\u0015\ta(\u0002E\u0003\u0013\u0003\u0003)T'C\u0002\u0002\u0004\t\u0011AbS3zK\u0012lUm]:bO\u0016Dq!a\u0002r\u0001\u0004\tI!A\u0004o\u000bZ,g\u000e^:\u0011\u0007%\tY!C\u0002\u0002\u000e)\u00111!\u00138u\u0011\u001d\t\t\u0002\u0001C\u0005\u0003'\t\u0001cZ3u)>\u0004\u0018nY'fi\u0006$\u0017\r^1\u0015\u0019\u0005U\u0011\u0011EA\u0013\u0003S\ti#!\r\u0011\t\u0005]\u0011QD\u0007\u0003\u00033Q1!a\u0007\u0005\u0003\r\t\u0007/[\u0005\u0005\u0003?\tIBA\u0007U_BL7-T3uC\u0012\fG/\u0019\u0005\b\u0003G\ty\u00011\u00016\u0003\u0015!x\u000e]5d\u0011!\t9#a\u0004A\u0002\u0005%\u0011!\u00039beRLG/[8o\u0011!\tY#a\u0004A\u0002\u0005%\u0011\u0001\u00032s_.,'/\u00133\t\u000f\u0005=\u0012q\u0002a\u0001k\u0005Q!M]8lKJDun\u001d;\t\u0011\u0005M\u0012q\u0002a\u0001\u0003\u0013\t!B\u0019:pW\u0016\u0014\bk\u001c:u\u0011\u001d\t\t\u0002\u0001C\u0005\u0003o!B\"!\u0006\u0002:\u0005m\u0012qHA!\u0003\u0007Bq!a\t\u00026\u0001\u0007Q\u0007\u0003\u0005\u0002(\u0005U\u0002\u0019AA\u001f!\u0011)X0!\u0003\t\u0011\u0005-\u0012Q\u0007a\u0001\u0003\u0013Aq!a\f\u00026\u0001\u0007Q\u0007\u0003\u0005\u00024\u0005U\u0002\u0019AA\u0005\u0011\u001d\t9\u0005\u0001C\u0001\u0003\u0013\nQ\"\\3tg\u0006<Wm\u001d+p'\u0016$H\u0003BA&\u0003/\u0002B!!\u0014\u0002T5\u0011\u0011q\n\u0006\u0004\u0003#\"\u0011aB7fgN\fw-Z\u0005\u0005\u0003+\nyE\u0001\u000bCsR,')\u001e4gKJlUm]:bO\u0016\u001cV\r\u001e\u0005\t\u00033\n)\u00051\u0001\u0002\\\u0005AQ.Z:tC\u001e,7\u000fE\u0002v{VBq!a\u0012\u0001\t\u0003\ty\u0006\u0006\u0004\u0002L\u0005\u0005\u0014\u0011\u000f\u0005\t\u0003G\ni\u00061\u0001\u0002f\u0005\u00191.Z=\u0011\u000b%\t9'a\u001b\n\u0007\u0005%$BA\u0003BeJ\f\u0017\u0010E\u0002\n\u0003[J1!a\u001c\u000b\u0005\u0011\u0011\u0015\u0010^3\t\u0011\u0005e\u0013Q\fa\u0001\u0003g\u0002B!^?\u0002f\u0001")
/* loaded from: input_file:kafka/producer/AsyncProducerTest.class */
public class AsyncProducerTest {
    private final Seq<Properties> props = Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Properties[]{TestUtils$.MODULE$.createBrokerConfig(1, "127.0.0.1:1", TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), 65534, TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14())}));
    private final Seq<KafkaConfig> configs = (Seq) props().map(new AsyncProducerTest$$anonfun$1(this), Seq$.MODULE$.canBuildFrom());
    private final String brokerList = ((TraversableOnce) configs().map(new AsyncProducerTest$$anonfun$2(this), Seq$.MODULE$.canBuildFrom())).mkString(",");

    public Seq<Properties> props() {
        return this.props;
    }

    public Seq<KafkaConfig> configs() {
        return this.configs;
    }

    public String brokerList() {
        return this.brokerList;
    }

    @Test
    public void testProducerQueueSize() {
        EventHandler<String, String> eventHandler = new EventHandler<String, String>(this) { // from class: kafka.producer.AsyncProducerTest$$anon$1
            public void handle(Seq<KeyedMessage<String, String>> seq) {
                Thread.sleep(500L);
            }

            public void close() {
            }
        };
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("metadata.broker.list", brokerList());
        properties.put("producer.type", "async");
        properties.put("queue.buffering.max.messages", "10");
        properties.put("batch.num.messages", "1");
        properties.put("queue.enqueue.timeout.ms", "0");
        ProducerConfig producerConfig = new ProducerConfig(properties);
        Seq<KeyedMessage<String, String>> produceData = getProduceData(12);
        Producer producer = new Producer(producerConfig, eventHandler);
        try {
            producer.send(produceData);
            Assert.fail("Queue should be full");
        } catch (QueueFullException e) {
        } finally {
            producer.close();
        }
    }

    @Test
    public void testProduceAfterClosed() {
        Seq<KeyedMessage<String, String>> produceData = getProduceData(10);
        Producer createProducer = TestUtils$.MODULE$.createProducer(brokerList(), StringEncoder.class.getName(), TestUtils$.MODULE$.createProducer$default$3(), TestUtils$.MODULE$.createProducer$default$4(), TestUtils$.MODULE$.createProducer$default$5());
        createProducer.close();
        try {
            createProducer.send(produceData);
            Assert.fail("should complain that producer is already closed");
        } catch (ProducerClosedException e) {
        }
    }

    @Test
    public void testBatchSize() {
        Seq<KeyedMessage<String, String>> produceData = getProduceData(10);
        DefaultEventHandler defaultEventHandler = (DefaultEventHandler) EasyMock.createStrictMock(DefaultEventHandler.class);
        defaultEventHandler.handle((Seq) produceData.take(5));
        EasyMock.expectLastCall();
        defaultEventHandler.handle((Seq) produceData.takeRight(5));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{defaultEventHandler});
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(10);
        ProducerSendThread producerSendThread = new ProducerSendThread("thread1", linkedBlockingQueue, defaultEventHandler, 2147483647L, 5, "");
        producerSendThread.start();
        produceData.foreach(new AsyncProducerTest$$anonfun$testBatchSize$1(this, linkedBlockingQueue));
        producerSendThread.shutdown();
        EasyMock.verify(new Object[]{defaultEventHandler});
    }

    @Test
    public void testQueueTimeExpired() {
        Seq<KeyedMessage<String, String>> produceData = getProduceData(2);
        DefaultEventHandler defaultEventHandler = (DefaultEventHandler) EasyMock.createStrictMock(DefaultEventHandler.class);
        defaultEventHandler.handle(produceData);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{defaultEventHandler});
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(10);
        ProducerSendThread producerSendThread = new ProducerSendThread("thread1", linkedBlockingQueue, defaultEventHandler, 200, 5, "");
        producerSendThread.start();
        produceData.foreach(new AsyncProducerTest$$anonfun$testQueueTimeExpired$1(this, linkedBlockingQueue));
        Thread.sleep(200 + 100);
        EasyMock.verify(new Object[]{defaultEventHandler});
        producerSendThread.shutdown();
    }

    @Test
    public void testPartitionAndCollateEvents() {
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", BoxesRunTime.boxToInteger(0), new Message("msg1".getBytes()))}));
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic2", BoxesRunTime.boxToInteger(-99), BoxesRunTime.boxToInteger(1), new Message("msg2".getBytes()))}));
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", BoxesRunTime.boxToInteger(2), new Message("msg3".getBytes()))}));
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", BoxesRunTime.boxToInteger(-101), BoxesRunTime.boxToInteger(3), new Message("msg4".getBytes()))}));
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic2", BoxesRunTime.boxToInteger(4), new Message("msg5".getBytes()))}));
        Properties properties = new Properties();
        properties.put("metadata.broker.list", brokerList());
        BrokerEndPoint brokerEndPoint = new BrokerEndPoint(0, "localhost", 9092);
        BrokerEndPoint brokerEndPoint2 = new BrokerEndPoint(1, "localhost", 9093);
        PartitionMetadata partitionMetadata = new PartitionMetadata(0, new Some(brokerEndPoint), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new BrokerEndPoint[]{brokerEndPoint, brokerEndPoint2})), PartitionMetadata$.MODULE$.$lessinit$greater$default$4(), PartitionMetadata$.MODULE$.$lessinit$greater$default$5());
        PartitionMetadata partitionMetadata2 = new PartitionMetadata(1, new Some(brokerEndPoint2), List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new BrokerEndPoint[]{brokerEndPoint, brokerEndPoint2})), PartitionMetadata$.MODULE$.$lessinit$greater$default$4(), PartitionMetadata$.MODULE$.$lessinit$greater$default$5());
        TopicMetadata topicMetadata = new TopicMetadata("topic1", List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new PartitionMetadata[]{partitionMetadata, partitionMetadata2})), TopicMetadata$.MODULE$.$lessinit$greater$default$3());
        TopicMetadata topicMetadata2 = new TopicMetadata("topic2", List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new PartitionMetadata[]{partitionMetadata, partitionMetadata2})), TopicMetadata$.MODULE$.$lessinit$greater$default$3());
        HashMap hashMap = new HashMap();
        hashMap.put("topic1", topicMetadata);
        hashMap.put("topic2", topicMetadata2);
        Partitioner partitioner = new Partitioner(this) { // from class: kafka.producer.AsyncProducerTest$$anon$2
            public int partition(Object obj, int i) {
                return BoxesRunTime.unboxToInt(obj) % i;
            }
        };
        ProducerConfig producerConfig = new ProducerConfig(properties);
        DefaultEventHandler defaultEventHandler = new DefaultEventHandler(producerConfig, partitioner, (Encoder) null, new IntEncoder(IntEncoder$.MODULE$.$lessinit$greater$default$1()), new ProducerPool(producerConfig), hashMap);
        ArrayBuffer apply = ArrayBuffer$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", BoxesRunTime.boxToInteger(0), new Message("msg1".getBytes())), new KeyedMessage("topic1", BoxesRunTime.boxToInteger(2), new Message("msg3".getBytes()))}));
        ArrayBuffer apply2 = ArrayBuffer$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", BoxesRunTime.boxToInteger(-101), BoxesRunTime.boxToInteger(3), new Message("msg4".getBytes()))}));
        Assert.assertEquals(new Some(Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(0)), Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(new TopicAndPartition("topic1", 0)), apply), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(new TopicAndPartition("topic2", 0)), ArrayBuffer$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic2", BoxesRunTime.boxToInteger(4), new Message("msg5".getBytes()))})))}))), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(BoxesRunTime.boxToInteger(1)), Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(new TopicAndPartition("topic1", 1)), apply2), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.any2ArrowAssoc(new TopicAndPartition("topic2", 1)), ArrayBuffer$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic2", BoxesRunTime.boxToInteger(-99), BoxesRunTime.boxToInteger(1), new Message("msg2".getBytes()))})))})))}))), defaultEventHandler.partitionAndCollate(arrayBuffer));
    }

    @Test
    public void testSerializeEvents() {
        Seq seq = (Seq) TestUtils$.MODULE$.getMsgStrings(5).map(new AsyncProducerTest$$anonfun$3(this), Seq$.MODULE$.canBuildFrom());
        Properties properties = new Properties();
        properties.put("metadata.broker.list", brokerList());
        ProducerConfig producerConfig = new ProducerConfig(properties);
        TopicMetadata topicMetadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092);
        HashMap hashMap = new HashMap();
        hashMap.put("topic1", topicMetadata);
        DefaultEventHandler defaultEventHandler = new DefaultEventHandler(producerConfig, (Partitioner) null, new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), new ProducerPool(producerConfig), hashMap);
        Seq seq2 = (Seq) defaultEventHandler.serialize(seq).map(new AsyncProducerTest$$anonfun$4(this), Seq$.MODULE$.canBuildFrom());
        Seq seq3 = (Seq) defaultEventHandler.serialize(package$.MODULE$.Stream().apply(seq)).map(new AsyncProducerTest$$anonfun$5(this), Seq$.MODULE$.canBuildFrom());
        TestUtils$.MODULE$.checkEquals(seq.iterator(), seq2.iterator());
        TestUtils$.MODULE$.checkEquals(seq.iterator(), seq3.iterator());
    }

    @Test
    public void testInvalidPartition() {
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", "key1", new Message("msg1".getBytes()))}));
        Properties properties = new Properties();
        properties.put("metadata.broker.list", brokerList());
        ProducerConfig producerConfig = new ProducerConfig(properties);
        TopicMetadata topicMetadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092);
        HashMap hashMap = new HashMap();
        hashMap.put("topic1", topicMetadata);
        try {
            new DefaultEventHandler(producerConfig, new NegativePartitioner(NegativePartitioner$.MODULE$.$lessinit$greater$default$1()), (Encoder) null, (Encoder) null, new ProducerPool(producerConfig), hashMap).partitionAndCollate(arrayBuffer);
        } catch (Throwable th) {
            Assert.fail("Should not throw any exception");
        }
    }

    @Test
    public void testNoBroker() {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", brokerList());
        ProducerConfig producerConfig = new ProducerConfig(properties);
        TopicMetadata topicMetadata = new TopicMetadata("topic1", Seq$.MODULE$.empty(), TopicMetadata$.MODULE$.$lessinit$greater$default$3());
        HashMap hashMap = new HashMap();
        hashMap.put("topic1", topicMetadata);
        ProducerPool producerPool = new ProducerPool(producerConfig);
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", "msg1")}));
        try {
            new DefaultEventHandler(producerConfig, (Partitioner) null, new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), producerPool, hashMap).handle(arrayBuffer);
            Assert.fail("Should fail with FailedToSendMessageException");
        } catch (FailedToSendMessageException e) {
        }
    }

    @Test
    public void testIncompatibleEncoder() {
        Properties properties = new Properties();
        properties.put("message.send.max.retries", "0");
        Producer createProducer = TestUtils$.MODULE$.createProducer(brokerList(), DefaultEncoder.class.getName(), DefaultEncoder.class.getName(), TestUtils$.MODULE$.createProducer$default$4(), properties);
        try {
            createProducer.send(getProduceData(1));
            Assert.fail("Should fail with ClassCastException due to incompatible Encoder");
        } catch (ClassCastException e) {
        } finally {
            createProducer.close();
        }
    }

    @Test
    public void testRandomPartitioner() {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", brokerList());
        ProducerConfig producerConfig = new ProducerConfig(properties);
        TopicMetadata topicMetadata = getTopicMetadata("topic1", 0, 0, "localhost", 9092);
        TopicMetadata topicMetadata2 = getTopicMetadata("topic2", 0, 0, "localhost", 9092);
        HashMap hashMap = new HashMap();
        hashMap.put("topic1", topicMetadata);
        hashMap.put("topic2", topicMetadata2);
        DefaultEventHandler defaultEventHandler = new DefaultEventHandler(producerConfig, (Partitioner) null, (Encoder) null, (Encoder) null, new ProducerPool(producerConfig), hashMap);
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", new Message("msg1".getBytes()))}));
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic2", new Message("msg2".getBytes()))}));
        arrayBuffer.append(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{new KeyedMessage("topic1", new Message("msg3".getBytes()))}));
        Some partitionAndCollate = defaultEventHandler.partitionAndCollate(arrayBuffer);
        if (partitionAndCollate instanceof Some) {
            ((Map) partitionAndCollate.x()).withFilter(new AsyncProducerTest$$anonfun$testRandomPartitioner$1(this)).foreach(new AsyncProducerTest$$anonfun$testRandomPartitioner$2(this));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        None$ none$ = None$.MODULE$;
        if (none$ != null ? !none$.equals(partitionAndCollate) : partitionAndCollate != null) {
            throw new MatchError(partitionAndCollate);
        }
        Assert.fail("Failed to collate requests by topic, partition");
        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
    }

    @Test
    public void testFailedSendRetryLogic() {
        Properties properties = new Properties();
        properties.put("metadata.broker.list", brokerList());
        properties.put("request.required.acks", "1");
        properties.put("serializer.class", StringEncoder.class.getName().toString());
        properties.put("key.serializer.class", NullEncoder.class.getName().toString());
        properties.put("producer.num.retries", BoxesRunTime.boxToInteger(3).toString());
        ProducerConfig producerConfig = new ProducerConfig(properties);
        TopicMetadata topicMetadata = getTopicMetadata("topic1", (Seq<Object>) Predef$.MODULE$.wrapIntArray(new int[]{0, 1}), 0, "localhost", 9092);
        HashMap hashMap = new HashMap();
        hashMap.put("topic1", topicMetadata);
        Seq<String> msgStrings = TestUtils$.MODULE$.getMsgStrings(2);
        ProducerRequest produceRequestWithAcks = TestUtils$.MODULE$.produceRequestWithAcks(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"topic1"})), List$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})), messagesToSet(msgStrings), 1, TestUtils$.MODULE$.produceRequestWithAcks$default$5(), 11, TestUtils$.MODULE$.produceRequestWithAcks$default$7());
        ProducerRequest produceRequestWithAcks2 = TestUtils$.MODULE$.produceRequestWithAcks(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"topic1"})), List$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{0, 1})), messagesToSet(msgStrings), 1, TestUtils$.MODULE$.produceRequestWithAcks$default$5(), 17, TestUtils$.MODULE$.produceRequestWithAcks$default$7());
        ProducerResponse producerResponse = new ProducerResponse(0, Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(new TopicAndPartition("topic1", 0), new ProducerResponseStatus(ErrorMapping$.MODULE$.NotLeaderForPartitionCode(), 0L)), new Tuple2(new TopicAndPartition("topic1", 1), new ProducerResponseStatus(ErrorMapping$.MODULE$.NoError(), 0L))})), ProducerResponse$.MODULE$.apply$default$3(), ProducerResponse$.MODULE$.apply$default$4());
        ProducerRequest produceRequest = TestUtils$.MODULE$.produceRequest("topic1", 0, messagesToSet(msgStrings), 1, TestUtils$.MODULE$.produceRequest$default$5(), 21, TestUtils$.MODULE$.produceRequest$default$7());
        ProducerResponse producerResponse2 = new ProducerResponse(0, Map$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2(new TopicAndPartition("topic1", 0), new ProducerResponseStatus(ErrorMapping$.MODULE$.NoError(), 0L))})), ProducerResponse$.MODULE$.apply$default$3(), ProducerResponse$.MODULE$.apply$default$4());
        SyncProducer syncProducer = (SyncProducer) EasyMock.createMock(SyncProducer.class);
        EasyMock.expect(syncProducer.config()).andReturn(EasyMock.anyObject()).anyTimes();
        EasyMock.expect(syncProducer.send(produceRequestWithAcks)).andThrow(new RuntimeException());
        EasyMock.expect(syncProducer.send(produceRequestWithAcks2)).andReturn(producerResponse);
        EasyMock.expect(syncProducer.send(produceRequest)).andReturn(producerResponse2);
        EasyMock.replay(new Object[]{syncProducer});
        ProducerPool producerPool = (ProducerPool) EasyMock.createMock(ProducerPool.class);
        EasyMock.expect(producerPool.getProducer(0)).andReturn(syncProducer).times(4);
        producerPool.close();
        EasyMock.expect(BoxedUnit.UNIT);
        EasyMock.replay(new Object[]{producerPool});
        DefaultEventHandler defaultEventHandler = new DefaultEventHandler(producerConfig, new FixedValuePartitioner(FixedValuePartitioner$.MODULE$.$lessinit$greater$default$1()), new StringEncoder(StringEncoder$.MODULE$.$lessinit$greater$default$1()), new NullEncoder(NullEncoder$.MODULE$.$lessinit$greater$default$1()), producerPool, hashMap);
        defaultEventHandler.handle((Seq) ((TraversableLike) msgStrings.map(new AsyncProducerTest$$anonfun$6(this, "topic1"), Seq$.MODULE$.canBuildFrom())).$plus$plus((GenTraversableOnce) msgStrings.map(new AsyncProducerTest$$anonfun$7(this, "topic1"), Seq$.MODULE$.canBuildFrom()), Seq$.MODULE$.canBuildFrom()));
        defaultEventHandler.close();
        EasyMock.verify(new Object[]{syncProducer});
        EasyMock.verify(new Object[]{producerPool});
    }

    @Test
    public void testJavaProducer() {
        Seq seq = (Seq) TestUtils$.MODULE$.getMsgStrings(5).map(new AsyncProducerTest$$anonfun$8(this, "topic1"), Seq$.MODULE$.canBuildFrom());
        List seqAsJavaList = JavaConversions$.MODULE$.seqAsJavaList(seq);
        Producer producer = (Producer) EasyMock.createMock(Producer.class);
        producer.send(Predef$.MODULE$.wrapRefArray(new KeyedMessage[]{(KeyedMessage) seq.head()}));
        EasyMock.expectLastCall();
        producer.send(seq);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{producer});
        Producer producer2 = new Producer(producer);
        producer2.send((KeyedMessage) seqAsJavaList.get(0));
        producer2.send(seqAsJavaList);
        EasyMock.verify(new Object[]{producer});
    }

    @Test
    public void testInvalidConfiguration() {
        Properties properties = new Properties();
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("producer.type", "async");
        try {
            new ProducerConfig(properties);
            Assert.fail("should complain about wrong config");
        } catch (IllegalArgumentException e) {
        }
    }

    public Seq<KeyedMessage<String, String>> getProduceData(int i) {
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), i).foreach$mVc$sp(new AsyncProducerTest$$anonfun$getProduceData$1(this, arrayBuffer));
        return arrayBuffer;
    }

    private TopicMetadata getTopicMetadata(String str, int i, int i2, String str2, int i3) {
        return getTopicMetadata(str, (Seq<Object>) List$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{i})), i2, str2, i3);
    }

    private TopicMetadata getTopicMetadata(String str, Seq<Object> seq, int i, String str2, int i2) {
        return new TopicMetadata(str, (Seq) seq.map(new AsyncProducerTest$$anonfun$getTopicMetadata$1(this, new BrokerEndPoint(i, str2, i2)), Seq$.MODULE$.canBuildFrom()), TopicMetadata$.MODULE$.$lessinit$greater$default$3());
    }

    public ByteBufferMessageSet messagesToSet(Seq<String> seq) {
        return new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, (Seq) seq.map(new AsyncProducerTest$$anonfun$messagesToSet$1(this), Seq$.MODULE$.canBuildFrom()));
    }

    public ByteBufferMessageSet messagesToSet(byte[] bArr, Seq<byte[]> seq) {
        return new ByteBufferMessageSet(NoCompressionCodec$.MODULE$, (Seq) seq.map(new AsyncProducerTest$$anonfun$messagesToSet$2(this, bArr), Seq$.MODULE$.canBuildFrom()));
    }
}
