package com.mapr.streams.tests.admin;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.TimestampType;
import com.mapr.streams.TopicDescriptor;
import com.mapr.streams.impl.admin.MarlinAdminImpl;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/admin/TopicTest.class */
public class TopicTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(TopicTest.class);
    private static final String STREAM = "/jtest-" + TopicTest.class.getSimpleName();
    private static Admin madmin;

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
        madmin.createStream(STREAM, Streams.newStreamDescriptor());
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testCreate() throws IOException {
        String str = STREAM + ":" + "testCreate";
        _logger.info("Verify topic create");
        madmin.createTopic(STREAM, "testCreate", 4);
        Marlinserver.MarlinTopicMetaEntry topicMetaEntry = madmin.getTopicMetaEntry(str);
        Assert.assertTrue(topicMetaEntry.getFeedIdsList().size() == 4);
        Assert.assertTrue(!topicMetaEntry.getIsDeleted());
        _logger.info("Test topic create with same name");
        Exception exc = null;
        try {
            madmin.createTopic(STREAM, "testCreate");
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertTrue(exc != null);
        Exception exc2 = null;
        try {
            madmin.createTopic(STREAM, "test.DotTopic");
        } catch (Exception e2) {
            exc2 = e2;
        }
        Assert.assertTrue(exc2 == null);
    }

    @Test
    public void testCreateDesc() throws IOException {
        _logger.info("Verify topic create with descriptor");
        TopicDescriptor newTopicDescriptor = Streams.newTopicDescriptor();
        newTopicDescriptor.setPartitions(3);
        newTopicDescriptor.setTimestampType(TimestampType.LOG_APPEND_TIME);
        madmin.createTopic(STREAM, "testCreateDesc", newTopicDescriptor);
        TopicDescriptor topicDescriptor = madmin.getTopicDescriptor(STREAM, "testCreateDesc");
        Assert.assertTrue(topicDescriptor.getPartitions() == 3);
        Assert.assertTrue(topicDescriptor.getTimestampType().equals(TimestampType.LOG_APPEND_TIME));
    }

    @Test
    public void testEdit() throws IOException {
        String str = STREAM + ":" + "testEdit";
        madmin.createTopic(STREAM, "testEdit", 4);
        _logger.info("Verify topic edit");
        madmin.editTopic(STREAM, "testEdit", 8);
        Marlinserver.MarlinTopicMetaEntry topicMetaEntry = madmin.getTopicMetaEntry(str);
        Assert.assertTrue(topicMetaEntry.getFeedIdsList().size() == 8);
        Assert.assertTrue(!topicMetaEntry.getIsDeleted());
        _logger.info("Test topic edit with reduced feeds");
        Exception exc = null;
        try {
            madmin.editTopic(STREAM, "testEdit", 6);
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertTrue(exc != null);
    }

    @Test
    public void testEditDesc() throws IOException {
        _logger.info("Verify topic edit with descriptor");
        madmin.createTopic(STREAM, "testEditDesc");
        TopicDescriptor newTopicDescriptor = Streams.newTopicDescriptor();
        newTopicDescriptor.setPartitions(3);
        newTopicDescriptor.setTimestampType(TimestampType.LOG_APPEND_TIME);
        madmin.editTopic(STREAM, "testEditDesc", newTopicDescriptor);
        TopicDescriptor topicDescriptor = madmin.getTopicDescriptor(STREAM, "testEditDesc");
        Assert.assertTrue(topicDescriptor.getPartitions() == 3);
        Assert.assertTrue(topicDescriptor.getTimestampType().equals(TimestampType.LOG_APPEND_TIME));
    }

    @Test
    public void testDelete() throws IOException {
        String str = STREAM + ":" + "testDelete";
        MarlinAdminImpl marlinAdminImpl = madmin;
        madmin.createTopic(STREAM, "testDelete", 4);
        int topicUniq = marlinAdminImpl.getTopicMetaEntry(str).getTopicUniq();
        Assert.assertTrue(topicUniq >= 1);
        _logger.info("Verify topic delete");
        madmin.deleteTopic(STREAM, "testDelete");
        Assert.assertTrue(marlinAdminImpl.getTopicMetaEntry(str).getIsDeleted());
        _logger.info("Test topic delete on deleted topic");
        Exception exc = null;
        try {
            madmin.deleteTopic(STREAM, "testDelete");
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertTrue(exc != null);
        _logger.info("Verify topic recreate");
        madmin.createTopic(STREAM, "testDelete", 1);
        Marlinserver.MarlinTopicMetaEntry topicMetaEntry = marlinAdminImpl.getTopicMetaEntry(str);
        Assert.assertTrue(!topicMetaEntry.getIsDeleted());
        Assert.assertTrue(topicMetaEntry.getFeedIdsList().size() == 1);
        Assert.assertTrue(topicMetaEntry.getTopicUniq() > topicUniq);
    }

    @Test
    public void testAutoOptions() throws IOException {
        String str = STREAM + ":" + "testAutoOptions";
        _logger.info("Test #feeds is picked up from the default value on stream");
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(3);
        madmin.editStream(STREAM, newStreamDescriptor);
        madmin.createTopic(STREAM, "testAutoOptions");
        Marlinserver.MarlinTopicMetaEntry topicMetaEntry = madmin.getTopicMetaEntry(str);
        Assert.assertTrue(!topicMetaEntry.getIsDeleted());
        Assert.assertTrue(topicMetaEntry.getFeedIdsList().size() == 3);
    }

    @Test
    public void testError() throws IOException {
        _logger.info("Test create with bad topic name");
        Exception exc = null;
        try {
            madmin.createTopic("/xyz", "");
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertTrue(exc != null);
        _logger.info("Test create with bad stream name");
        Exception exc2 = null;
        try {
            madmin.createTopic("", "xyz");
        } catch (Exception e2) {
            exc2 = e2;
        }
        Assert.assertTrue(exc2 != null);
        _logger.info("Test create with non-existent stream");
        Exception exc3 = null;
        try {
            madmin.createTopic("/non-existent", "xyz");
        } catch (Exception e3) {
            exc3 = e3;
        }
        Assert.assertTrue(exc3 != null);
        _logger.info("Test edit with non-existent topic");
        Exception exc4 = null;
        try {
            madmin.editTopic(STREAM, "non-existent", 4);
        } catch (Exception e4) {
            exc4 = e4;
        }
        Assert.assertTrue(exc4 != null);
        _logger.info("Test delete with non-existent topic");
        Exception exc5 = null;
        try {
            madmin.deleteTopic(STREAM, "non-existent");
        } catch (Exception e5) {
            exc5 = e5;
        }
        Assert.assertTrue(exc5 != null);
        _logger.info("Test getTopicMetaEntry on non-existent topic");
        Exception exc6 = null;
        try {
            madmin.getTopicMetaEntry(STREAM + ":non-existent");
        } catch (Exception e6) {
            exc6 = e6;
        }
        Assert.assertTrue(exc6 != null);
    }

    @Test
    public void testTestCount() throws IOException {
        String str = STREAM + "2";
        String str2 = str + ":" + "testCount5";
        madmin.createStream(str, Streams.newStreamDescriptor());
        _logger.info("test on empty stream");
        Assert.assertTrue(madmin.countTopics(str) == 0);
        MarlinAdminImpl marlinAdminImpl = madmin;
        Assert.assertTrue(marlinAdminImpl.listTopicsForStream(str).size() == 0);
        Exception exc = null;
        try {
            marlinAdminImpl.infoTopic("testCount5");
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertTrue(exc != null);
        _logger.info("Create 10 topics & test count");
        for (int i = 0; i < 10; i++) {
            madmin.createTopic(str, "testCount" + i);
        }
        Assert.assertTrue(madmin.countTopics(str) == 10);
        Map listTopicsForStream = marlinAdminImpl.listTopicsForStream(str);
        Assert.assertTrue(listTopicsForStream.size() == 10);
        Assert.assertTrue(listTopicsForStream.containsKey("testCount5"));
        Assert.assertTrue(marlinAdminImpl.infoTopic(str2).size() == 1);
        _logger.info("delete 5 topics & test count");
        for (int i2 = 0; i2 < 5; i2++) {
            madmin.deleteTopic(str, "testCount" + i2);
        }
        int countTopics = madmin.countTopics(str);
        _logger.info("count after deletes : " + countTopics);
        Assert.assertTrue(countTopics == 5);
        Map listTopicsForStream2 = marlinAdminImpl.listTopicsForStream(str);
        Assert.assertTrue(listTopicsForStream2.size() == 5);
        Assert.assertTrue(listTopicsForStream2.containsKey("testCount5"));
        Assert.assertTrue(!listTopicsForStream2.containsKey(str + ":testCount4"));
        Assert.assertTrue(marlinAdminImpl.infoTopic(str2).size() == 1);
        Exception exc2 = null;
        try {
            marlinAdminImpl.infoTopic(str + ":testCount4");
        } catch (Exception e2) {
            exc2 = e2;
        }
        Assert.assertTrue(exc2 != null);
        _logger.info("recreate 2 topics & test count");
        for (int i3 = 0; i3 < 2; i3++) {
            madmin.createTopic(str, "testCount" + i3);
        }
        Assert.assertTrue(madmin.countTopics(str) == 7);
        Assert.assertTrue(marlinAdminImpl.listTopicsForStream(str).size() == 7);
        madmin.deleteStream(str);
    }

    @Test
    public void testFiveThousandTopics() throws IOException {
        try {
            madmin.deleteStream("/jtest-fivethousand");
        } catch (Exception e) {
        }
        madmin.createStream("/jtest-fivethousand", Streams.newStreamDescriptor());
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("bootstrap.servers", "127.0.0.1:7222");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        byte[] bArr = new byte[200];
        for (int i = 0; i < 5000; i++) {
            kafkaProducer.send(new ProducerRecord("/jtest-fivethousand" + ":" + i, bArr));
            if (i % 1000 == 0) {
                System.out.println("sending " + i);
            }
        }
        kafkaProducer.flush();
        kafkaProducer.close();
        Assert.assertTrue(madmin.countTopics("/jtest-fivethousand") == 5000);
        madmin.deleteStream("/jtest-fivethousand");
    }

    @Test
    @Ignore("This test takes very, very long")
    public void testMillionTopics() throws IOException {
        madmin.createStream("/jtest-million", Streams.newStreamDescriptor());
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("bootstrap.servers", "127.0.0.1:7222");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        byte[] bArr = new byte[200];
        for (int i = 0; i < 1000000; i++) {
            kafkaProducer.send(new ProducerRecord("/jtest-million" + ":" + i, bArr));
            if (i % 10000 == 0) {
                System.out.println("sending " + i);
            }
        }
        kafkaProducer.flush();
        kafkaProducer.close();
        Assert.assertTrue(madmin.countTopics("/jtest-million") == 1000000);
        madmin.deleteStream("/jtest-million");
    }
}
