/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.admin;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.TimestampType;
import com.mapr.streams.TopicDescriptor;
import com.mapr.streams.impl.admin.MarlinAdminImpl;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class TopicTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(TopicTest.class);
    private static final String STREAM = "/jtest-" + TopicTest.class.getSimpleName();
    private static Admin madmin;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(STREAM, sdesc);
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testCreate() throws IOException {
        String topicName = "testCreate";
        String topicFullName = STREAM + ":" + topicName;
        _logger.info("Verify topic create");
        madmin.createTopic(STREAM, topicName, 4);
        MarlinAdminImpl iadmin = (MarlinAdminImpl)madmin;
        Marlinserver.MarlinTopicMetaEntry mentry = iadmin.getTopicMetaEntry(topicFullName);
        Assert.assertTrue((mentry.getFeedIdsList().size() == 4 ? 1 : 0) != 0);
        Assert.assertTrue((!mentry.getIsDeleted() ? 1 : 0) != 0);
        _logger.info("Test topic create with same name");
        Exception ex = null;
        try {
            madmin.createTopic(STREAM, topicName);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        ex = null;
        try {
            madmin.createTopic(STREAM, "test.DotTopic");
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex == null ? 1 : 0) != 0);
    }

    @Test
    public void testCreateDesc() throws IOException {
        String topicName = "testCreateDesc";
        _logger.info("Verify topic create with descriptor");
        TopicDescriptor desc = Streams.newTopicDescriptor();
        desc.setPartitions(3);
        desc.setTimestampType(TimestampType.LOG_APPEND_TIME);
        madmin.createTopic(STREAM, topicName, desc);
        TopicDescriptor retrievedDesc = madmin.getTopicDescriptor(STREAM, topicName);
        Assert.assertTrue((retrievedDesc.getPartitions() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)retrievedDesc.getTimestampType().equals((Object)TimestampType.LOG_APPEND_TIME));
    }

    @Test
    public void testEdit() throws IOException {
        String topicName = "testEdit";
        String topicFullName = STREAM + ":" + topicName;
        madmin.createTopic(STREAM, topicName, 4);
        _logger.info("Verify topic edit");
        madmin.editTopic(STREAM, topicName, 8);
        MarlinAdminImpl iadmin = (MarlinAdminImpl)madmin;
        Marlinserver.MarlinTopicMetaEntry mentry = iadmin.getTopicMetaEntry(topicFullName);
        Assert.assertTrue((mentry.getFeedIdsList().size() == 8 ? 1 : 0) != 0);
        Assert.assertTrue((!mentry.getIsDeleted() ? 1 : 0) != 0);
        _logger.info("Test topic edit with reduced feeds");
        Exception ex = null;
        try {
            madmin.editTopic(STREAM, topicName, 6);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
    }

    @Test
    public void testEditDesc() throws IOException {
        String topicName = "testEditDesc";
        _logger.info("Verify topic edit with descriptor");
        madmin.createTopic(STREAM, topicName);
        TopicDescriptor desc = Streams.newTopicDescriptor();
        desc.setPartitions(3);
        desc.setTimestampType(TimestampType.LOG_APPEND_TIME);
        madmin.editTopic(STREAM, topicName, desc);
        TopicDescriptor retrievedDesc = madmin.getTopicDescriptor(STREAM, topicName);
        Assert.assertTrue((retrievedDesc.getPartitions() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)retrievedDesc.getTimestampType().equals((Object)TimestampType.LOG_APPEND_TIME));
    }

    @Test
    public void testDelete() throws IOException {
        String topicName = "testDelete";
        String topicFullName = STREAM + ":" + topicName;
        MarlinAdminImpl iadmin = (MarlinAdminImpl)madmin;
        madmin.createTopic(STREAM, topicName, 4);
        Marlinserver.MarlinTopicMetaEntry mentry = iadmin.getTopicMetaEntry(topicFullName);
        int uniq = mentry.getTopicUniq();
        Assert.assertTrue((uniq >= 1 ? 1 : 0) != 0);
        _logger.info("Verify topic delete");
        madmin.deleteTopic(STREAM, topicName);
        mentry = iadmin.getTopicMetaEntry(topicFullName);
        Assert.assertTrue((mentry.getIsDeleted() ? 1 : 0) != 0);
        _logger.info("Test topic delete on deleted topic");
        Exception ex = null;
        try {
            madmin.deleteTopic(STREAM, topicName);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        _logger.info("Verify topic recreate");
        madmin.createTopic(STREAM, topicName, 1);
        mentry = iadmin.getTopicMetaEntry(topicFullName);
        Assert.assertTrue((!mentry.getIsDeleted() ? 1 : 0) != 0);
        Assert.assertTrue((mentry.getFeedIdsList().size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((mentry.getTopicUniq() > uniq ? 1 : 0) != 0);
    }

    @Test
    public void testAutoOptions() throws IOException {
        String topicName = "testAutoOptions";
        String topicFullName = STREAM + ":" + topicName;
        _logger.info("Test #feeds is picked up from the default value on stream");
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(3);
        madmin.editStream(STREAM, sdesc);
        madmin.createTopic(STREAM, topicName);
        MarlinAdminImpl iadmin = (MarlinAdminImpl)madmin;
        Marlinserver.MarlinTopicMetaEntry mentry = iadmin.getTopicMetaEntry(topicFullName);
        Assert.assertTrue((!mentry.getIsDeleted() ? 1 : 0) != 0);
        Assert.assertTrue((mentry.getFeedIdsList().size() == 3 ? 1 : 0) != 0);
    }

    @Test
    public void testError() throws IOException {
        _logger.info("Test create with bad topic name");
        Exception ex = null;
        try {
            madmin.createTopic("/xyz", "");
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        _logger.info("Test create with bad stream name");
        ex = null;
        try {
            madmin.createTopic("", "xyz");
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        _logger.info("Test create with non-existent stream");
        ex = null;
        try {
            madmin.createTopic("/non-existent", "xyz");
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        _logger.info("Test edit with non-existent topic");
        ex = null;
        try {
            madmin.editTopic(STREAM, "non-existent", 4);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        _logger.info("Test delete with non-existent topic");
        ex = null;
        try {
            madmin.deleteTopic(STREAM, "non-existent");
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        _logger.info("Test getTopicMetaEntry on non-existent topic");
        ex = null;
        try {
            MarlinAdminImpl iadmin = (MarlinAdminImpl)madmin;
            Marlinserver.MarlinTopicMetaEntry mentry = iadmin.getTopicMetaEntry(STREAM + ":non-existent");
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
    }

    @Test
    public void testTestCount() throws IOException {
        int i;
        List flist;
        String streamName = STREAM + "2";
        String sampleTopic = "testCount5";
        String sampleFullTopic = streamName + ":" + sampleTopic;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(streamName, sdesc);
        _logger.info("test on empty stream");
        int count = madmin.countTopics(streamName);
        Assert.assertTrue((count == 0 ? 1 : 0) != 0);
        MarlinAdminImpl iadmin = (MarlinAdminImpl)madmin;
        Map map = iadmin.listTopicsForStream(streamName);
        Assert.assertTrue((map.size() == 0 ? 1 : 0) != 0);
        Exception ex = null;
        try {
            flist = iadmin.infoTopic(sampleTopic);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        _logger.info("Create 10 topics & test count");
        for (i = 0; i < 10; ++i) {
            madmin.createTopic(streamName, "testCount" + i);
        }
        count = madmin.countTopics(streamName);
        Assert.assertTrue((count == 10 ? 1 : 0) != 0);
        map = iadmin.listTopicsForStream(streamName);
        Assert.assertTrue((map.size() == 10 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)map.containsKey(sampleTopic));
        flist = iadmin.infoTopic(sampleFullTopic);
        Assert.assertTrue((flist.size() == 1 ? 1 : 0) != 0);
        _logger.info("delete 5 topics & test count");
        for (i = 0; i < 5; ++i) {
            madmin.deleteTopic(streamName, "testCount" + i);
        }
        count = madmin.countTopics(streamName);
        _logger.info("count after deletes : " + count);
        Assert.assertTrue((count == 5 ? 1 : 0) != 0);
        map = iadmin.listTopicsForStream(streamName);
        Assert.assertTrue((map.size() == 5 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)map.containsKey(sampleTopic));
        Assert.assertTrue((!map.containsKey(streamName + ":testCount" + 4) ? 1 : 0) != 0);
        flist = iadmin.infoTopic(sampleFullTopic);
        Assert.assertTrue((flist.size() == 1 ? 1 : 0) != 0);
        ex = null;
        try {
            flist = iadmin.infoTopic(streamName + ":testCount" + 4);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex != null ? 1 : 0) != 0);
        _logger.info("recreate 2 topics & test count");
        for (int i2 = 0; i2 < 2; ++i2) {
            madmin.createTopic(streamName, "testCount" + i2);
        }
        count = madmin.countTopics(streamName);
        Assert.assertTrue((count == 7 ? 1 : 0) != 0);
        map = iadmin.listTopicsForStream(streamName);
        Assert.assertTrue((map.size() == 7 ? 1 : 0) != 0);
        madmin.deleteStream(streamName);
    }

    @Test
    public void testFiveThousandTopics() throws IOException {
        String sname = "/jtest-fivethousand";
        int numTopics = 5000;
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(sname, Streams.newStreamDescriptor());
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("bootstrap.servers", "127.0.0.1:7222");
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer producer = new KafkaProducer(props);
        byte[] value = new byte[200];
        for (int i = 0; i < numTopics; ++i) {
            String topicFullName = sname + ":" + i;
            ProducerRecord record = new ProducerRecord(topicFullName, (Object)value);
            producer.send(record);
            if (i % 1000 != 0) continue;
            System.out.println("sending " + i);
        }
        producer.flush();
        producer.close();
        int count = madmin.countTopics(sname);
        Assert.assertTrue((count == numTopics ? 1 : 0) != 0);
        madmin.deleteStream(sname);
    }

    @Test
    @Ignore(value="This test takes very, very long")
    public void testMillionTopics() throws IOException {
        String sname = "/jtest-million";
        madmin.createStream(sname, Streams.newStreamDescriptor());
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("bootstrap.servers", "127.0.0.1:7222");
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer producer = new KafkaProducer(props);
        byte[] value = new byte[200];
        int numTopics = 1000000;
        for (int i = 0; i < numTopics; ++i) {
            String topicFullName = sname + ":" + i;
            ProducerRecord record = new ProducerRecord(topicFullName, (Object)value);
            producer.send(record);
            if (i % 10000 != 0) continue;
            System.out.println("sending " + i);
        }
        producer.flush();
        producer.close();
        int count = madmin.countTopics(sname);
        Assert.assertTrue((count == numTopics ? 1 : 0) != 0);
        madmin.deleteStream(sname);
    }
}

