/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.producer;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ProducerPartitionerTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ProducerPartitionerTest.class);
    private static final String STREAM = "/jtest-" + ProducerPartitionerTest.class.getSimpleName();
    private static final String TOPIC = "testtopic";
    private static Admin madmin;
    private static KafkaProducer producer;
    private static Properties props;
    public static int msgValueLength;
    public static final byte[] value;
    public static final byte[] key;
    public static int numPartitions;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Before
    public void setupTable() throws Exception {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(numPartitions);
        sdesc.setAutoCreateTopics(true);
        madmin.createStream(STREAM, sdesc);
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testSendWithDefaultPartitioner() throws IOException {
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), (Object)true);
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        props.put(cdef.getBufferTime(), (Object)500);
        KafkaProducer producer = new KafkaProducer(props);
        ProducerRecord record = new ProducerRecord(STREAM + ":testtopic", Integer.valueOf(1), (Object)key, (Object)value);
        TestCallback callback1 = new TestCallback(1, false);
        Future future1 = producer.send(record, (Callback)callback1);
        try {
            future1.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback1.verify());
        record = new ProducerRecord(STREAM + ":testtopic", (Object)key, (Object)value);
        TestCallback callback2 = new TestCallback(false);
        Future future2 = producer.send(record, (Callback)callback2);
        try {
            future2.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback2.verify());
        record = new ProducerRecord(STREAM + ":testtopic", (Object)key, (Object)value);
        TestCallback callback3 = new TestCallback(callback2.partitionid(), false);
        Future future3 = producer.send(record, (Callback)callback3);
        try {
            future3.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback3.verify());
        producer.close();
        producer.flush();
        producer.close();
    }

    @Test
    public void testSendWithCustomPartitioner() throws IOException {
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), (Object)true);
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        props.put(cdef.getBufferTime(), (Object)500);
        props.put(cdef.getPartitioner(), "com.mapr.streams.tests.producer.TestPartitioner");
        KafkaProducer producer = new KafkaProducer(props);
        for (int i = 1; i < numPartitions; ++i) {
            ProducerRecord record = new ProducerRecord(STREAM + ":testtopic", (Object)key, (Object)value);
            TestCallback callback = new TestCallback(i, false);
            Future future = producer.send(record, (Callback)callback);
            try {
                future.get();
            }
            catch (Exception e) {
                System.out.println(e);
            }
            Assert.assertTrue((boolean)callback.verify());
        }
        ProducerRecord record = new ProducerRecord(STREAM + ":testtopic", (Object)key, (Object)value);
        TestCallback callback = new TestCallback(-1, true);
        Future future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue((boolean)callback.verify());
        producer.close();
        producer.flush();
        producer.close();
    }

    @Test
    public void testSendWithKeyBasedPartitioner() throws IOException {
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put(cdef.getParallelFlushersPerPartition(), (Object)true);
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        props.put(cdef.getBufferTime(), (Object)500);
        props.put(cdef.getPartitioner(), "com.mapr.streams.tests.producer.KeyBasedPartitioner");
        KafkaProducer producer = new KafkaProducer(props);
        int numMsgsPerKey = 1000;
        String keyA = "abc";
        String keyB = "123456789";
        String value = "this is a value";
        Future[] futuresA = new Future[numMsgsPerKey];
        Future[] futuresB = new Future[numMsgsPerKey];
        for (int i = 0; i < numMsgsPerKey; ++i) {
            ProducerRecord recordA = new ProducerRecord(STREAM + ":testtopic", (Object)keyA, (Object)value);
            ProducerRecord recordB = new ProducerRecord(STREAM + ":testtopic", (Object)keyB, (Object)value);
            futuresA[i] = producer.send(recordA);
            futuresB[i] = producer.send(recordB);
        }
        producer.flush();
        producer.close();
        long lastOffsetA = -1L;
        long lastOffsetB = -1L;
        int feedIDA = 0;
        int feedIDB = 1;
        try {
            for (int i = 0; i < numMsgsPerKey; ++i) {
                Future futureA = futuresA[i];
                Assert.assertTrue((lastOffsetA < ((RecordMetadata)futureA.get()).offset() ? 1 : 0) != 0);
                Assert.assertTrue((feedIDA == ((RecordMetadata)futureA.get()).partition() ? 1 : 0) != 0);
                lastOffsetA = ((RecordMetadata)futureA.get()).offset();
                Future futureB = futuresB[i];
                Assert.assertTrue((lastOffsetB < ((RecordMetadata)futureB.get()).offset() ? 1 : 0) != 0);
                Assert.assertTrue((feedIDB == ((RecordMetadata)futureB.get()).partition() ? 1 : 0) != 0);
                lastOffsetB = ((RecordMetadata)futureB.get()).offset();
            }
        }
        catch (Exception e) {
            System.out.println(e);
            Assert.assertTrue((boolean)false);
        }
    }

    static {
        msgValueLength = 200;
        value = new byte[msgValueLength];
        key = "abc".getBytes();
        numPartitions = 10;
    }

    private static final class TestCallback
    implements Callback {
        private boolean error;
        private int expectedFeedID;
        private Exception exceptionReceived;
        private RecordMetadata metadataReceived;
        private boolean callbackCompleted;
        private boolean checkfeedID;

        public TestCallback(boolean errors) {
            this.checkfeedID = false;
            this.error = errors;
            this.callbackCompleted = false;
        }

        public TestCallback(int feedid, boolean errors) {
            this.checkfeedID = true;
            this.expectedFeedID = feedid;
            this.error = errors;
            this.callbackCompleted = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            this.exceptionReceived = exception;
            this.metadataReceived = metadata;
            TestCallback testCallback = this;
            synchronized (testCallback) {
                this.callbackCompleted = true;
                try {
                    this.notifyAll();
                }
                catch (Exception e) {
                    System.out.println(e);
                }
            }
        }

        public int partitionid() {
            return this.metadataReceived.partition();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean verify() {
            TestCallback testCallback = this;
            synchronized (testCallback) {
                if (!this.callbackCompleted) {
                    try {
                        this.wait();
                    }
                    catch (Exception e) {
                        System.out.println(e);
                    }
                }
            }
            boolean verified = true;
            if (this.error && this.exceptionReceived == null) {
                System.out.println("Did not get exception when expected");
                verified = false;
            } else if (!this.error && this.exceptionReceived != null) {
                System.out.println("Received exception " + this.exceptionReceived + " but expected none");
                verified = false;
            }
            if (this.checkfeedID && this.expectedFeedID != this.metadataReceived.partition()) {
                System.out.println("Received partition " + this.metadataReceived.partition() + " but expected " + this.expectedFeedID);
                verified = false;
            }
            return verified;
        }
    }
}

