/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.producer;

import com.mapr.fs.MapRFileStatus;
import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.admin.MarlinAdminImpl;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ProducerErrorTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ProducerErrorTest.class);
    private static final String STREAM = "/jtest-" + ProducerErrorTest.class.getSimpleName();
    private static final String STREAMFUTURES = STREAM + "-futures";
    private static final String STREAMKEY = STREAM + "-key";
    private static final String STREAMAUTO = STREAM + "-auto";
    private static final String STREAMNA = STREAM + "-noauto";
    private static final String STREAMMETA = STREAM + "-meta";
    private static final String STREAMENOENT = STREAM + "-enoent";
    private static final String STREAMENOENTAUTO = STREAM + "-enoent-auto";
    private static final String STREAMDELETE = STREAM + "-deletestream";
    private static final String TOPIC = "testtopic";
    private static KafkaProducer producer;
    private static Properties props;
    public static int msgValueLength;
    public static final byte[] value;
    public static final byte[] key;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), (Object)false);
        props.put(cdef.getMetadataMaxAge(), (Object)1000);
        props.put(cdef.getBufferTime(), (Object)500);
        try {
            madmin.deleteStream(STREAMFUTURES);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMKEY);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMAUTO);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMNA);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMMETA);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMENOENT);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMENOENTAUTO);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMDELETE);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.close();
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAMFUTURES);
            madmin.deleteStream(STREAMKEY);
            madmin.deleteStream(STREAMAUTO);
            madmin.deleteStream(STREAMNA);
            madmin.deleteStream(STREAMMETA);
            madmin.deleteStream(STREAMENOENT);
            madmin.deleteStream(STREAMENOENTAUTO);
            madmin.deleteStream(STREAMDELETE);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.close();
    }

    @Test
    public void testStreamDelete() throws IOException {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        sdesc.setAutoCreateTopics(false);
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        madmin.createStream(STREAMDELETE, sdesc);
        madmin.close();
        KafkaProducer producer = new KafkaProducer(props);
        ProducerRecord record = new ProducerRecord(STREAMDELETE + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        TestCallback callback = new TestCallback(-1, true);
        Future future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.createTopic(STREAMDELETE, TOPIC, 1);
        madmin.close();
        try {
            Thread.sleep(2100L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        record = new ProducerRecord(STREAMDELETE + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        callback = new TestCallback(0, false);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.deleteStream(STREAMDELETE);
        madmin.close();
        record = new ProducerRecord(STREAMDELETE + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        callback = new TestCallback(true);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println("Failed as stream deleted " + e);
        }
        Assert.assertTrue((boolean)callback.verify());
        sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        sdesc.setAutoCreateTopics(true);
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.createStream(STREAMDELETE, sdesc);
        madmin.close();
        record = new ProducerRecord(STREAMDELETE + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        callback = new TestCallback(false);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println("Failed on new stream " + e);
        }
        try {
            Thread.sleep(2100L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        record = new ProducerRecord(STREAMDELETE + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        callback = new TestCallback(0, false);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        producer.close();
    }

    @Test
    public void testFuturesAndInvalidTopicNames() throws IOException {
        try {
            ProducerRecord record;
            StreamDescriptor sdesc = Streams.newStreamDescriptor();
            sdesc.setDefaultPartitions(1);
            sdesc.setAutoCreateTopics(true);
            Configuration conf = new Configuration();
            Admin madmin = Streams.newAdmin((Configuration)conf);
            madmin.createStream(STREAMFUTURES, sdesc);
            madmin.close();
            KafkaProducer producer = new KafkaProducer(props);
            String futures = new Future[2];
            for (int i = 0; i < 2; ++i) {
                String key = "key-value" + i;
                String msg = "msg-value" + i;
                record = new ProducerRecord(STREAMFUTURES + ":testtopic", (Object)key.getBytes(), (Object)msg.getBytes());
                futures[i] = producer.send(record);
            }
            producer.flush();
            for (Future future : futures) {
                future.get();
            }
            futures = new Future[5000];
            for (int i = 0; i < 5000; ++i) {
                String key = "key-value" + i;
                String msg = "msg-value" + i;
                record = new ProducerRecord(STREAMFUTURES + ":testtopic", (Object)key.getBytes(), (Object)msg.getBytes());
                futures[i] = producer.send(record);
            }
            producer.flush();
            for (Future future : futures) {
                future.get();
            }
            String clusterName = null;
            try {
                MapRFileSystem fs = new MapRFileSystem();
                fs.initialize(new URI("maprfs:///"), conf);
                MapRFileStatus[] contents = fs.listStatus(new Path("/mapr"));
                clusterName = contents[0].getPath().getName();
            }
            catch (Exception e) {
                System.out.println("Exception " + e + " occurred");
                throw e;
            }
            String fulltopicpath = "/mapr/" + clusterName + STREAMFUTURES + ":fullpath";
            String nonfullpath = STREAMFUTURES + ":fullpath";
            futures = new Future[10000];
            for (int i = 0; i < 5000; ++i) {
                String key = "key-value" + i;
                String msg = "msg-value" + i;
                ProducerRecord record2 = new ProducerRecord(fulltopicpath, (Object)key.getBytes(), (Object)msg.getBytes());
                futures[2 * i] = producer.send(record2);
                record2 = new ProducerRecord(nonfullpath, (Object)key.getBytes(), (Object)msg.getBytes());
                futures[2 * i + 1] = producer.send(record2);
            }
            producer.flush();
            long prevoffset = -1L;
            for (Future future : futures) {
                assert (prevoffset < ((RecordMetadata)future.get()).offset());
                prevoffset = ((RecordMetadata)future.get()).offset();
            }
            Object invalidtopic = STREAMFUTURES + ":@";
            TestCallback testcallback = new TestCallback(-1, true);
            ProducerRecord record3 = new ProducerRecord((String)invalidtopic, Integer.valueOf(0), (Object)"key".getBytes(), (Object)"value".getBytes());
            System.out.println("Sending to invalid topic, " + (String)invalidtopic);
            Future invalidFuture = producer.send(record3, (Callback)testcallback);
            boolean exceptionCaught = false;
            try {
                invalidFuture.get();
            }
            catch (Exception e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            Assert.assertTrue((boolean)testcallback.verify());
            invalidtopic = STREAMFUTURES;
            testcallback = new TestCallback(-1, true);
            System.out.println("Sending to invalid topic, " + (String)invalidtopic);
            record3 = new ProducerRecord((String)invalidtopic, Integer.valueOf(0), (Object)"key".getBytes(), (Object)"value".getBytes());
            invalidFuture = producer.send(record3, (Callback)testcallback);
            exceptionCaught = false;
            try {
                invalidFuture.get();
            }
            catch (Exception e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((boolean)exceptionCaught);
            Assert.assertTrue((boolean)testcallback.verify());
            String dottopic = STREAMFUTURES + ":topic.dot";
            testcallback = new TestCallback(0, false);
            System.out.println("Sending to topic with . in its name, " + dottopic);
            record3 = new ProducerRecord(dottopic, Integer.valueOf(0), (Object)"key".getBytes(), (Object)"value".getBytes());
            Future validFuture = producer.send(record3, (Callback)testcallback);
            exceptionCaught = false;
            try {
                validFuture.get();
            }
            catch (Exception e) {
                exceptionCaught = true;
            }
            Assert.assertTrue((!exceptionCaught ? 1 : 0) != 0);
            Assert.assertTrue((boolean)testcallback.verify());
            producer.close();
        }
        catch (Exception e) {
            System.out.println(e);
        }
    }

    @Test
    public void testSendWithKey() throws IOException {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(10);
        sdesc.setAutoCreateTopics(true);
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        madmin.createStream(STREAMKEY, sdesc);
        madmin.close();
        KafkaProducer producer = new KafkaProducer(props);
        ProducerRecord record = new ProducerRecord(STREAMKEY + ":testtopic", (Object)key, (Object)value);
        TestCallback callback1 = new TestCallback(false);
        Future future1 = producer.send(record, (Callback)callback1);
        try {
            future1.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback1.verify());
        record = new ProducerRecord(STREAMKEY + ":testtopic", (Object)"ab".getBytes(), (Object)value);
        TestCallback callback2 = new TestCallback(false);
        Future future2 = producer.send(record, (Callback)callback2);
        try {
            future2.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback2.verify());
        record = new ProducerRecord(STREAMKEY + ":testtopic", (Object)key, (Object)value);
        TestCallback callback3 = new TestCallback(callback1.partitionid(), false);
        Future future3 = producer.send(record, (Callback)callback3);
        try {
            future3.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback3.verify());
        record = new ProducerRecord(STREAMKEY + ":testtopic", (Object)"ab".getBytes(), (Object)value);
        TestCallback callback4 = new TestCallback(callback2.partitionid(), false);
        Future future4 = producer.send(record, (Callback)callback4);
        try {
            future4.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback4.verify());
        producer.close();
        producer.flush();
        producer.close();
    }

    @Test
    public void testSendMessageAutoCreate() throws IOException {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        sdesc.setAutoCreateTopics(true);
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        madmin.createStream(STREAMAUTO, sdesc);
        madmin.close();
        KafkaProducer producer = new KafkaProducer(props);
        producer.close();
        ProducerRecord record = new ProducerRecord(STREAMAUTO + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        Future future = producer.send(record);
        boolean exceptionCaught = false;
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
            exceptionCaught = true;
        }
        Assert.assertTrue((boolean)exceptionCaught);
        producer = new KafkaProducer(props);
        record = new ProducerRecord(STREAMAUTO + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        TestCallback callback = new TestCallback(0, false);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        List partList = producer.partitionsFor(STREAMAUTO + ":testtopic");
        Assert.assertTrue((partList.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((((PartitionInfo)partList.get(0)).partition() == 0 ? 1 : 0) != 0);
        record = new ProducerRecord(STREAMAUTO + ":testtopic", (Object)key, (Object)value);
        callback = new TestCallback(0, false);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        record = new ProducerRecord(STREAMAUTO + ":testtopic", Integer.valueOf(1), (Object)key, (Object)value);
        callback = new TestCallback(-1, true);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        record = new ProducerRecord(STREAMAUTO + ":testtopic", Integer.valueOf(1), (Object)key, null);
        callback = new TestCallback(-1, true);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        producer.close();
        producer.flush();
        producer.close();
        record = new ProducerRecord(STREAMAUTO + ":testtopic", Integer.valueOf(1), (Object)key, null);
        callback = new TestCallback(-1, true);
        exceptionCaught = false;
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
            exceptionCaught = true;
        }
        Assert.assertTrue((boolean)exceptionCaught);
    }

    @Test
    public void testSendMessageNoAutoCreate() throws IOException {
        KafkaProducer producer = new KafkaProducer(props);
        ProducerRecord record = new ProducerRecord(STREAMNA + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        TestCallback callback = new TestCallback(-1, true);
        Future future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        sdesc.setAutoCreateTopics(false);
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        madmin.createStream(STREAMNA, sdesc);
        madmin.close();
        record = new ProducerRecord(STREAMNA + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        callback = new TestCallback(-1, true);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        record = new ProducerRecord(STREAMNA + ":testtopic", (Object)key, (Object)value);
        callback = new TestCallback(-1, true);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        producer.close();
        producer.flush();
        producer.close();
    }

    @Test
    public void testMetadataRefresher() throws IOException {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        sdesc.setAutoCreateTopics(false);
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        madmin.createStream(STREAMMETA, sdesc);
        madmin.close();
        KafkaProducer producer = new KafkaProducer(props);
        ProducerRecord record = new ProducerRecord(STREAMMETA + ":testtopic", Integer.valueOf(1), (Object)key, (Object)value);
        TestCallback callback = new TestCallback(-1, true);
        Future future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.createTopic(STREAMMETA, TOPIC, 1);
        madmin.close();
        callback = new TestCallback(-1, true);
        record = new ProducerRecord(STREAMMETA + ":testtopic", Integer.valueOf(1), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.editTopic(STREAMMETA, TOPIC, 3);
        madmin.close();
        try {
            Thread.sleep(2100L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        callback = new TestCallback(1, false);
        record = new ProducerRecord(STREAMMETA + ":testtopic", Integer.valueOf(1), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.deleteTopic(STREAMMETA, TOPIC);
        madmin.close();
        try {
            Thread.sleep(2100L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        callback = new TestCallback(-1, true);
        record = new ProducerRecord(STREAMMETA + ":testtopic", Integer.valueOf(1), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.createTopic(STREAMMETA, TOPIC, 2);
        madmin.close();
        callback = new TestCallback(1, false);
        record = new ProducerRecord(STREAMMETA + ":testtopic", Integer.valueOf(1), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        callback = new TestCallback(-1, true);
        record = new ProducerRecord(STREAMMETA + ":testtopic", Integer.valueOf(2), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        producer.close();
        producer.flush();
        producer.close();
    }

    @Test
    public void testENOENTWithoutAutoCreateFromServer() throws IOException {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        sdesc.setAutoCreateTopics(false);
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        madmin.createStream(STREAMENOENT, sdesc);
        madmin.close();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties myprops = new Properties();
        myprops.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        myprops.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        myprops.put(cdef.getParallelFlushersPerPartition(), (Object)true);
        myprops.put(cdef.getMetadataMaxAge(), (Object)300000);
        myprops.put(cdef.getBufferTime(), (Object)3000);
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.createTopic(STREAMENOENT, TOPIC, 1);
        madmin.close();
        KafkaProducer producer = new KafkaProducer(myprops);
        ProducerRecord record = new ProducerRecord(STREAMENOENT + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        TestCallback callback = new TestCallback(0, false);
        Future future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.deleteTopic(STREAMENOENT, TOPIC);
        madmin.close();
        try {
            Thread.sleep(2100L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        callback = new TestCallback(0, true);
        record = new ProducerRecord(STREAMENOENT + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        try {
            Thread.sleep(2100L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        boolean exceptionCaught = false;
        Marlinserver.MarlinTopicMetaEntry topicEntry = null;
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            MarlinAdminImpl admin = (MarlinAdminImpl)madmin;
            topicEntry = admin.getTopicMetaEntry(STREAMENOENT + ":testtopic");
        }
        catch (Exception e) {
            exceptionCaught = true;
        }
        madmin.close();
        Assert.assertTrue((exceptionCaught || topicEntry.getIsDeleted() ? 1 : 0) != 0);
        callback = new TestCallback(-1, true);
        record = new ProducerRecord(STREAMENOENT + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        producer.flush();
        producer.close();
    }

    @Test
    public void testENOENTWithAutoCreateFromServer() throws IOException {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        sdesc.setAutoCreateTopics(true);
        Configuration conf = new Configuration();
        Admin madmin = Streams.newAdmin((Configuration)conf);
        madmin.createStream(STREAMENOENTAUTO, sdesc);
        madmin.close();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties myprops = new Properties();
        myprops.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        myprops.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        myprops.put(cdef.getParallelFlushersPerPartition(), (Object)true);
        myprops.put(cdef.getMetadataMaxAge(), (Object)300000);
        myprops.put(cdef.getBufferTime(), (Object)3000);
        KafkaProducer producer = new KafkaProducer(myprops);
        ProducerRecord record = new ProducerRecord(STREAMENOENTAUTO + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        TestCallback callback = new TestCallback(0, false);
        Future future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        boolean exceptionCaught = false;
        Marlinserver.MarlinTopicMetaEntry topicEntryFirst = null;
        madmin = Streams.newAdmin((Configuration)conf);
        MarlinAdminImpl admin = (MarlinAdminImpl)madmin;
        try {
            topicEntryFirst = admin.getTopicMetaEntry(STREAMENOENTAUTO + ":testtopic");
        }
        catch (Exception e) {
            exceptionCaught = true;
        }
        madmin.close();
        Assert.assertFalse((boolean)exceptionCaught);
        madmin = Streams.newAdmin((Configuration)conf);
        madmin.deleteTopic(STREAMENOENTAUTO, TOPIC);
        madmin.close();
        try {
            Thread.sleep(2100L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        callback = new TestCallback(0, true);
        record = new ProducerRecord(STREAMENOENTAUTO + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        try {
            Thread.sleep(2100L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        callback = new TestCallback(0, false);
        record = new ProducerRecord(STREAMENOENTAUTO + ":testtopic", Integer.valueOf(0), (Object)key, (Object)value);
        future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        exceptionCaught = false;
        Marlinserver.MarlinTopicMetaEntry topicEntrySecond = null;
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            MarlinAdminImpl iadmin = (MarlinAdminImpl)madmin;
            topicEntrySecond = iadmin.getTopicMetaEntry(STREAMENOENTAUTO + ":testtopic");
        }
        catch (Exception e) {
            exceptionCaught = true;
        }
        madmin.close();
        Assert.assertFalse((boolean)exceptionCaught);
        Assert.assertTrue((topicEntryFirst.getTopicUniq() != topicEntrySecond.getTopicUniq() ? 1 : 0) != 0);
        producer.flush();
        producer.close();
    }

    static {
        msgValueLength = 200;
        value = new byte[msgValueLength];
        key = "abc".getBytes();
    }

    private static final class TestCallback
    implements Callback {
        private boolean error;
        private int expectedFeedID;
        private Exception exceptionReceived;
        private RecordMetadata metadataReceived;
        private boolean callbackCompleted;
        private boolean checkfeedID;

        public TestCallback(boolean errors) {
            this.checkfeedID = false;
            this.error = errors;
            this.callbackCompleted = false;
        }

        public TestCallback(int feedid, boolean errors) {
            this.checkfeedID = true;
            this.expectedFeedID = feedid;
            this.error = errors;
            this.callbackCompleted = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            this.exceptionReceived = exception;
            this.metadataReceived = metadata;
            TestCallback testCallback = this;
            synchronized (testCallback) {
                this.callbackCompleted = true;
                try {
                    this.notifyAll();
                }
                catch (Exception e) {
                    System.out.println(e);
                }
            }
        }

        public int partitionid() {
            return this.metadataReceived.partition();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean verify() {
            TestCallback testCallback = this;
            synchronized (testCallback) {
                if (!this.callbackCompleted) {
                    try {
                        this.wait();
                    }
                    catch (Exception e) {
                        System.out.println(e);
                    }
                }
            }
            boolean verified = true;
            if (this.error && this.exceptionReceived == null) {
                System.out.println("Did not get exception when expected");
                verified = false;
            } else if (!this.error && this.exceptionReceived != null) {
                System.out.println("Received exception " + this.exceptionReceived + " but expected none");
                verified = false;
            }
            if (this.checkfeedID && this.expectedFeedID != this.metadataReceived.partition()) {
                System.out.println("Received partition " + this.metadataReceived.partition() + " but expected " + this.expectedFeedID);
                verified = false;
            }
            return verified;
        }
    }
}

