package com.mapr.streams.tests.producer;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/producer/ProducerPartitionerTest.class */
public class ProducerPartitionerTest extends BaseTest {
    private static final String TOPIC = "testtopic";
    private static Admin madmin;
    private static KafkaProducer producer;
    private static Properties props;
    private static final Logger _logger = LoggerFactory.getLogger(ProducerPartitionerTest.class);
    private static final String STREAM = "/jtest-" + ProducerPartitionerTest.class.getSimpleName();
    public static int msgValueLength = 200;
    public static final byte[] value = new byte[msgValueLength];
    public static final byte[] key = "abc".getBytes();
    public static int numPartitions = 10;

    /* loaded from: input_file:com/mapr/streams/tests/producer/ProducerPartitionerTest$TestCallback.class */
    private static final class TestCallback implements Callback {
        private boolean error;
        private int expectedFeedID;
        private Exception exceptionReceived;
        private RecordMetadata metadataReceived;
        private boolean checkfeedID = false;
        private boolean callbackCompleted = false;

        public TestCallback(boolean z) {
            this.error = z;
        }

        public TestCallback(int i, boolean z) {
            this.expectedFeedID = i;
            this.error = z;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            this.exceptionReceived = exc;
            this.metadataReceived = recordMetadata;
            synchronized (this) {
                this.callbackCompleted = true;
                try {
                    notifyAll();
                } catch (Exception e) {
                    System.out.println(e);
                }
            }
        }

        public int partitionid() {
            return this.metadataReceived.partition();
        }

        public boolean verify() {
            synchronized (this) {
                if (!this.callbackCompleted) {
                    try {
                        wait();
                    } catch (Exception e) {
                        System.out.println(e);
                    }
                }
            }
            boolean z = true;
            if (this.error && this.exceptionReceived == null) {
                System.out.println("Did not get exception when expected");
                z = false;
            } else if (!this.error && this.exceptionReceived != null) {
                System.out.println("Received exception " + this.exceptionReceived + " but expected none");
                z = false;
            }
            if (this.checkfeedID && this.expectedFeedID != this.metadataReceived.partition()) {
                System.out.println("Received partition " + this.metadataReceived.partition() + " but expected " + this.expectedFeedID);
                z = false;
            }
            return z;
        }
    }

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
    }

    @Before
    public void setupTable() throws Exception {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numPartitions);
        newStreamDescriptor.setAutoCreateTopics(true);
        madmin.createStream(STREAM, newStreamDescriptor);
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testSendWithDefaultPartitioner() throws IOException {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(defaultInstance.getParallelFlushersPerPartition(), true);
        props.put(defaultInstance.getMetadataMaxAge(), 100);
        props.put(defaultInstance.getBufferTime(), 500);
        KafkaProducer kafkaProducer = new KafkaProducer(props);
        ProducerRecord producerRecord = new ProducerRecord(STREAM + ":" + TOPIC, 1, key, value);
        TestCallback testCallback = new TestCallback(1, false);
        try {
            kafkaProducer.send(producerRecord, testCallback).get();
        } catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertTrue(testCallback.verify());
        ProducerRecord producerRecord2 = new ProducerRecord(STREAM + ":" + TOPIC, key, value);
        TestCallback testCallback2 = new TestCallback(false);
        try {
            kafkaProducer.send(producerRecord2, testCallback2).get();
        } catch (Exception e2) {
            System.out.println(e2);
        }
        Assert.assertTrue(testCallback2.verify());
        ProducerRecord producerRecord3 = new ProducerRecord(STREAM + ":" + TOPIC, key, value);
        TestCallback testCallback3 = new TestCallback(testCallback2.partitionid(), false);
        try {
            kafkaProducer.send(producerRecord3, testCallback3).get();
        } catch (Exception e3) {
            System.out.println(e3);
        }
        Assert.assertTrue(testCallback3.verify());
        kafkaProducer.close();
        kafkaProducer.flush();
        kafkaProducer.close();
    }

    @Test
    public void testSendWithCustomPartitioner() throws IOException {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(defaultInstance.getParallelFlushersPerPartition(), true);
        props.put(defaultInstance.getMetadataMaxAge(), 100);
        props.put(defaultInstance.getBufferTime(), 500);
        props.put(defaultInstance.getPartitioner(), "com.mapr.streams.tests.producer.TestPartitioner");
        KafkaProducer kafkaProducer = new KafkaProducer(props);
        for (int i = 1; i < numPartitions; i++) {
            ProducerRecord producerRecord = new ProducerRecord(STREAM + ":" + TOPIC, key, value);
            TestCallback testCallback = new TestCallback(i, false);
            try {
                kafkaProducer.send(producerRecord, testCallback).get();
            } catch (Exception e) {
                System.out.println(e);
            }
            Assert.assertTrue(testCallback.verify());
        }
        ProducerRecord producerRecord2 = new ProducerRecord(STREAM + ":" + TOPIC, key, value);
        TestCallback testCallback2 = new TestCallback(-1, true);
        try {
            kafkaProducer.send(producerRecord2, testCallback2).get();
        } catch (Exception e2) {
            System.out.println(e2);
        }
        Assert.assertTrue(testCallback2.verify());
        kafkaProducer.close();
        kafkaProducer.flush();
        kafkaProducer.close();
    }

    @Test
    public void testSendWithKeyBasedPartitioner() throws IOException {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put(defaultInstance.getParallelFlushersPerPartition(), true);
        props.put(defaultInstance.getMetadataMaxAge(), 100);
        props.put(defaultInstance.getBufferTime(), 500);
        props.put(defaultInstance.getPartitioner(), "com.mapr.streams.tests.producer.KeyBasedPartitioner");
        KafkaProducer kafkaProducer = new KafkaProducer(props);
        Future[] futureArr = new Future[1000];
        Future[] futureArr2 = new Future[1000];
        for (int i = 0; i < 1000; i++) {
            ProducerRecord producerRecord = new ProducerRecord(STREAM + ":" + TOPIC, "abc", "this is a value");
            ProducerRecord producerRecord2 = new ProducerRecord(STREAM + ":" + TOPIC, "123456789", "this is a value");
            futureArr[i] = kafkaProducer.send(producerRecord);
            futureArr2[i] = kafkaProducer.send(producerRecord2);
        }
        kafkaProducer.flush();
        kafkaProducer.close();
        long j = -1;
        long j2 = -1;
        for (int i2 = 0; i2 < 1000; i2++) {
            try {
                Future future = futureArr[i2];
                Assert.assertTrue(j < ((RecordMetadata) future.get()).offset());
                Assert.assertTrue(0 == ((RecordMetadata) future.get()).partition());
                j = ((RecordMetadata) future.get()).offset();
                Future future2 = futureArr2[i2];
                Assert.assertTrue(j2 < ((RecordMetadata) future2.get()).offset());
                Assert.assertTrue(1 == ((RecordMetadata) future2.get()).partition());
                j2 = ((RecordMetadata) future2.get()).offset();
            } catch (Exception e) {
                System.out.println(e);
                Assert.assertTrue(false);
                return;
            }
        }
    }
}
