package com.mapr.streams.tests.producer;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.MarlinAdmin;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/producer/ProducerMultiTest.class */
public class ProducerMultiTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ProducerMultiTest.class);
    private static final String STREAM = "/jtest-" + ProducerMultiTest.class.getSimpleName();
    private static final int numPartitions = 5;
    private static final int numThreads = 20;
    private static MarlinAdmin madmin;

    /* loaded from: input_file:com/mapr/streams/tests/producer/ProducerMultiTest$CountCallback.class */
    public static final class CountCallback implements Callback {
        private int numTotalCallbacks;
        private AtomicInteger numCallbacks = new AtomicInteger(0);
        private AtomicInteger numExceptions = new AtomicInteger(0);

        public CountCallback(int i) {
            this.numTotalCallbacks = i;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                this.numExceptions.getAndIncrement();
            }
            if (this.numCallbacks.getAndIncrement() + 1 == this.numTotalCallbacks) {
                synchronized (this) {
                    try {
                        notifyAll();
                    } catch (Exception e) {
                        System.out.println("notify all on completion");
                    }
                }
            }
        }

        public boolean exceptions() {
            return this.numExceptions.get() > 0;
        }

        public void waitOnCompletion() {
            synchronized (this) {
                try {
                    wait();
                } catch (Exception e) {
                    System.out.println("waiting on completion interrupted");
                }
            }
        }

        public boolean success() {
            return this.numCallbacks.get() == this.numTotalCallbacks;
        }
    }

    /* loaded from: input_file:com/mapr/streams/tests/producer/ProducerMultiTest$OneProducer.class */
    public class OneProducer implements Runnable {
        private String streamName;
        private int numPartitions;
        private KafkaProducer producer;
        private int numMsgsPerPartition;
        private String topicName;
        private byte[] key = new byte[ProducerMultiTest.numThreads];
        private byte[] value = new byte[ProducerMultiTest.numThreads];
        private boolean success = false;
        private boolean exceptions;

        public OneProducer(String str, int i, int i2) {
            this.streamName = str;
            this.numPartitions = i;
            this.numMsgsPerPartition = i2;
            this.topicName = str + ":topicname";
            Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
            Properties properties = new Properties();
            properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put(defaultInstance.getMetadataMaxAge(), 100);
            this.producer = new KafkaProducer(properties);
        }

        @Override // java.lang.Runnable
        public void run() {
            CountCallback countCallback = new CountCallback(this.numMsgsPerPartition * this.numPartitions);
            for (int i = 0; i < this.numMsgsPerPartition; i++) {
                for (int i2 = 0; i2 < this.numPartitions; i2++) {
                    this.producer.send(new ProducerRecord(this.topicName, Integer.valueOf(i2), this.key, this.value), countCallback);
                }
            }
            this.producer.flush();
            this.producer.close();
            this.success = countCallback.success();
            this.exceptions = countCallback.exceptions();
        }

        public boolean success() {
            return this.success;
        }

        public boolean exceptions() {
            return this.exceptions;
        }
    }

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = new MarlinAdmin(new Configuration());
        Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    @Before
    public void setupTable() throws Exception {
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numPartitions);
        madmin.createStream(STREAM, mStreamDescriptor);
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testManyProducersTogetherDeleteStream() throws Exception {
        Thread[] threadArr = new Thread[numThreads];
        OneProducer[] oneProducerArr = new OneProducer[numThreads];
        for (int i = 0; i < numThreads; i++) {
            OneProducer oneProducer = new OneProducer(STREAM, numPartitions, numPartitions);
            oneProducerArr[i] = oneProducer;
            threadArr[i] = new Thread(oneProducer);
        }
        for (int i2 = 0; i2 < numThreads; i2++) {
            threadArr[i2].start();
        }
        madmin.deleteStream(STREAM);
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numPartitions);
        madmin.createStream(STREAM, mStreamDescriptor);
        for (int i3 = 0; i3 < numThreads; i3++) {
            threadArr[i3].join();
        }
        boolean z = false;
        for (int i4 = 0; i4 < numThreads; i4++) {
            Assert.assertTrue(oneProducerArr[i4].success());
            z = z || oneProducerArr[i4].exceptions();
        }
        Assert.assertTrue(z);
        Assert.assertTrue(!ExistZombieThreads());
    }

    @Test
    public void testManyProducersTogether() throws Exception {
        Thread[] threadArr = new Thread[numThreads];
        OneProducer[] oneProducerArr = new OneProducer[numThreads];
        for (int i = 0; i < numThreads; i++) {
            OneProducer oneProducer = new OneProducer(STREAM, numPartitions, 10000);
            oneProducerArr[i] = oneProducer;
            threadArr[i] = new Thread(oneProducer);
        }
        for (int i2 = 0; i2 < numThreads; i2++) {
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < numThreads; i3++) {
            threadArr[i3].join();
        }
        for (int i4 = 0; i4 < numThreads; i4++) {
            Assert.assertTrue(oneProducerArr[i4].success());
            Assert.assertTrue(!oneProducerArr[i4].exceptions());
        }
        Assert.assertTrue(!ExistZombieThreads());
    }

    @Test
    public void testManyProducersTogetherWithSharedObject() throws Exception {
        Thread[] threadArr = new Thread[100];
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(defaultInstance.getMetadataMaxAge(), 100);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        CountCallback countCallback = new CountCallback(50000 * 100);
        for (int i = 0; i < 100; i++) {
            threadArr[i] = new Thread(new SendMessagesToProducer(kafkaProducer, countCallback, STREAM + ":sharedtopic", numPartitions, 10000));
        }
        for (int i2 = 0; i2 < 100; i2++) {
            threadArr[i2].start();
        }
        for (int i3 = 0; i3 < 100; i3++) {
            threadArr[i3].join();
        }
        Assert.assertTrue(countCallback.success());
        Assert.assertTrue(!countCallback.exceptions());
        Assert.assertTrue(!ExistZombieThreads());
    }

    public static boolean ExistZombieThreads() {
        boolean z = false;
        Map<Thread, StackTraceElement[]> allStackTraces = Thread.getAllStackTraces();
        try {
            for (Thread thread : allStackTraces.keySet()) {
                StackTraceElement[] stackTraceElementArr = allStackTraces.get(thread);
                boolean z2 = false;
                if (stackTraceElementArr.length != 0) {
                    int length = stackTraceElementArr.length;
                    int i = 0;
                    while (true) {
                        if (i >= length) {
                            break;
                        }
                        StackTraceElement stackTraceElement = stackTraceElementArr[i];
                        if (stackTraceElement.toString().toLowerCase().contains("marlin") && stackTraceElement.toString().replace("com.mapr.streams.tests", "ZOMBIE").toLowerCase().contains("marlin")) {
                            z2 = true;
                            break;
                        }
                        i++;
                    }
                    if (z2 || 0 != 0) {
                        System.out.println("Thread: " + thread.getName() + " ID: " + thread.getId());
                        for (StackTraceElement stackTraceElement2 : stackTraceElementArr) {
                            System.out.println(stackTraceElement2.toString());
                        }
                    }
                    if (z2) {
                        z = true;
                    }
                }
            }
        } catch (Throwable th) {
            System.out.println("Exception while printing stacktrace " + th);
        }
        return z;
    }
}
