/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.producer;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ProducerMultiTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ProducerMultiTest.class);
    private static final String STREAM = "/jtest-" + ProducerMultiTest.class.getSimpleName();
    private static final int numPartitions = 5;
    private static final int numThreads = 20;
    private static Admin madmin;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception e) {
            System.out.println(e);
        }
    }

    @Before
    public void setupTable() throws Exception {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(5);
        madmin.createStream(STREAM, sdesc);
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testManyProducersTogetherDeleteStream() throws Exception {
        int i;
        Thread[] producers = new Thread[20];
        OneProducer[] oneproducers = new OneProducer[20];
        for (i = 0; i < 20; ++i) {
            OneProducer producer;
            oneproducers[i] = producer = new OneProducer(STREAM, 5, 5);
            producers[i] = new Thread(producer);
        }
        for (i = 0; i < 20; ++i) {
            producers[i].start();
        }
        madmin.deleteStream(STREAM);
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(5);
        madmin.createStream(STREAM, sdesc);
        for (int i2 = 0; i2 < 20; ++i2) {
            producers[i2].join();
        }
        boolean exceptionsHappened = false;
        for (int i3 = 0; i3 < 20; ++i3) {
            Assert.assertTrue((boolean)oneproducers[i3].success());
            exceptionsHappened = exceptionsHappened || oneproducers[i3].exceptions();
        }
        Assert.assertTrue((boolean)exceptionsHappened);
        Assert.assertTrue((!ProducerMultiTest.ExistZombieThreads() ? 1 : 0) != 0);
    }

    @Test
    public void testManyProducersTogether() throws Exception {
        int i;
        Thread[] producers = new Thread[20];
        OneProducer[] oneproducers = new OneProducer[20];
        for (i = 0; i < 20; ++i) {
            OneProducer producer;
            oneproducers[i] = producer = new OneProducer(STREAM, 5, 10000);
            producers[i] = new Thread(producer);
        }
        for (i = 0; i < 20; ++i) {
            producers[i].start();
        }
        for (i = 0; i < 20; ++i) {
            producers[i].join();
        }
        for (i = 0; i < 20; ++i) {
            Assert.assertTrue((boolean)oneproducers[i].success());
            Assert.assertTrue((!oneproducers[i].exceptions() ? 1 : 0) != 0);
        }
        Assert.assertTrue((!ProducerMultiTest.ExistZombieThreads() ? 1 : 0) != 0);
    }

    @Test
    public void testManyProducersTogetherWithSharedObject() throws Exception {
        int i;
        int numWorkers = 100;
        Thread[] producers = new Thread[numWorkers];
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        CountCallback cb = new CountCallback(50000 * numWorkers);
        for (i = 0; i < numWorkers; ++i) {
            SendMessagesToProducer worker = new SendMessagesToProducer(kafkaproducer, cb, STREAM + ":sharedtopic", 5, 10000, 20, false);
            producers[i] = new Thread(worker);
        }
        for (i = 0; i < numWorkers; ++i) {
            producers[i].start();
        }
        for (i = 0; i < numWorkers; ++i) {
            producers[i].join();
        }
        kafkaproducer.close();
        Assert.assertTrue((boolean)cb.success());
        Assert.assertTrue((!cb.exceptions() ? 1 : 0) != 0);
        Assert.assertTrue((!ProducerMultiTest.ExistZombieThreads() ? 1 : 0) != 0);
    }

    public static boolean ExistZombieThreads() {
        boolean zombieThreads = false;
        boolean enableTrace = false;
        Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
        try {
            for (Thread t : stacks.keySet()) {
                StackTraceElement[] stes = stacks.get(t);
                boolean zombieThread = false;
                if (stes.length == 0) continue;
                for (StackTraceElement ste : stes) {
                    if (!ste.toString().toLowerCase().contains("marlin") || !ste.toString().replace("com.mapr.streams.tests", "ZOMBIE").toLowerCase().contains("marlin")) continue;
                    zombieThread = true;
                    break;
                }
                if (zombieThread || enableTrace) {
                    System.out.println("Thread: " + t.getName() + " ID: " + t.getId());
                    for (StackTraceElement ste : stes) {
                        System.out.println(ste.toString());
                    }
                }
                if (!zombieThread) continue;
                zombieThreads = true;
            }
        }
        catch (Throwable e1) {
            System.out.println("Exception while printing stacktrace " + e1);
        }
        return zombieThreads;
    }

    public static final class CountCallback
    implements Callback {
        private int numTotalCallbacks;
        private AtomicInteger numCallbacks;
        private AtomicInteger numExceptions;

        public CountCallback(int numMsgs) {
            this.numTotalCallbacks = numMsgs;
            this.numCallbacks = new AtomicInteger(0);
            this.numExceptions = new AtomicInteger(0);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            int value;
            if (exception != null) {
                this.numExceptions.getAndIncrement();
            }
            if ((value = this.numCallbacks.getAndIncrement()) + 1 == this.numTotalCallbacks) {
                CountCallback countCallback = this;
                synchronized (countCallback) {
                    try {
                        this.notifyAll();
                    }
                    catch (Exception e) {
                        System.out.println("notify all on completion");
                    }
                }
            }
        }

        public boolean exceptions() {
            return this.numExceptions.get() > 0;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitOnCompletion() {
            CountCallback countCallback = this;
            synchronized (countCallback) {
                try {
                    this.wait();
                }
                catch (Exception e) {
                    System.out.println("waiting on completion interrupted");
                }
            }
        }

        public boolean success() {
            return this.numCallbacks.get() == this.numTotalCallbacks;
        }
    }

    public class OneProducer
    implements Runnable {
        private String streamName;
        private int numPartitions;
        private KafkaProducer producer;
        private int numMsgsPerPartition;
        private String topicName;
        private byte[] key;
        private byte[] value;
        private boolean success;
        private boolean exceptions;

        public OneProducer(String streamName, int numparts, int numMsgsPerPartition) {
            this.streamName = streamName;
            this.numPartitions = numparts;
            this.numMsgsPerPartition = numMsgsPerPartition;
            this.topicName = streamName + ":topicname";
            this.key = new byte[20];
            this.value = new byte[20];
            this.success = false;
            Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
            Properties props = new Properties();
            props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put(cdef.getMetadataMaxAge(), (Object)100);
            this.producer = new KafkaProducer(props);
        }

        @Override
        public void run() {
            CountCallback cb = new CountCallback(this.numMsgsPerPartition * this.numPartitions);
            for (int i = 0; i < this.numMsgsPerPartition; ++i) {
                for (int j = 0; j < this.numPartitions; ++j) {
                    ProducerRecord record = new ProducerRecord(this.topicName, Integer.valueOf(j), (Object)this.key, (Object)this.value);
                    this.producer.send(record, (Callback)cb);
                }
            }
            this.producer.flush();
            this.producer.close();
            this.success = cb.success();
            this.exceptions = cb.exceptions();
        }

        public boolean success() {
            return this.success;
        }

        public boolean exceptions() {
            return this.exceptions;
        }
    }
}

