/*
 * Decompiled with CFR 0.152.
 */
package src.test.java.com.mapr.streams.tests.compaction;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.admin.MarlinAdminImpl;
import com.mapr.streams.impl.admin.TopicFeedInfo;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.streams.tests.producer.SendTimedMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class CompactionTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(CompactionTest.class);
    private static final String PREFIX = "/jtest-" + CompactionTest.class.getSimpleName() + "-";
    private static Admin madmin;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        String sname = PREFIX + "testbasic";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "testbasicwithstats";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "testmultipartitions";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "testmultipartitionswithstats";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "testnullkeys";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "testnullkeyswithtombstonedelete";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "testAvoidFrequentCompaction";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testBasic() throws IOException, InterruptedException {
        String sname = PREFIX + "testbasic";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts = 1;
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 10);
        producer.run();
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
        tps.add(new TopicPartition(sname + ":t", 0));
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        kafkaconsumer.assign(tps);
        ArrayList topicsInStream = new ArrayList(kafkaconsumer.listTopics(sname).keySet());
        Assert.assertEquals((long)1L, (long)topicsInStream.size());
        int nPolledMsgs = 0;
        while (nPolledMsgs == 0) {
            ConsumerRecords records = kafkaconsumer.poll(100L);
            for (ConsumerRecord rec : records) {
                _logger.info("messageoffset is " + rec.offset());
                Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                ++nPolledMsgs;
            }
            if (nPolledMsgs == 0) continue;
            Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
            break;
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testThrottle() throws IOException, InterruptedException {
        String sname = PREFIX + "testthrottle";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts = 1;
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        sdesc.setCompactionThrottleFactor(1L);
        madmin.editStream(sname, sdesc);
        StreamDescriptor outDesc = madmin.getStreamDescriptor(sname);
        Assert.assertEquals((long)1L, (long)outDesc.getCompactionThrottleFactor());
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 10);
        producer.run();
        Thread.sleep(20000L);
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
        tps.add(new TopicPartition(sname + ":t", 0));
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        kafkaconsumer.assign(tps);
        ArrayList topicsInStream = new ArrayList(kafkaconsumer.listTopics(sname).keySet());
        Assert.assertEquals((long)1L, (long)topicsInStream.size());
        int nPolledMsgs = 0;
        while (nPolledMsgs == 0) {
            ConsumerRecords records = kafkaconsumer.poll(100L);
            for (ConsumerRecord rec : records) {
                _logger.info("messageoffset is " + rec.offset());
                Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                ++nPolledMsgs;
            }
            if (nPolledMsgs == 0) continue;
            Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
            break;
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testEventTime() throws IOException, InterruptedException {
        String sname = PREFIX + "testeventtime";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        int numParts = 1;
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 0L, 10);
        producer.run();
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
        tps.add(new TopicPartition(sname + ":t", 0));
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        kafkaconsumer.assign(tps);
        ArrayList topicsInStream = new ArrayList(kafkaconsumer.listTopics(sname).keySet());
        Assert.assertEquals((long)1L, (long)topicsInStream.size());
        int nPolledMsgs = 0;
        while (nPolledMsgs == 0) {
            ConsumerRecords records = kafkaconsumer.poll(100L);
            for (ConsumerRecord rec : records) {
                _logger.info("messageoffset is " + rec.offset() + "timestamp " + rec.timestamp());
                Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) && rec.timestamp() < 200L ? 1 : 0) != 0);
                ++nPolledMsgs;
            }
            if (nPolledMsgs == 0) continue;
            Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
            break;
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    @Ignore
    public void testBasicWithStats() throws IOException, InterruptedException {
        String sname = PREFIX + "testbasicwithstats";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts = 1;
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 10);
        producer.run();
        Marlinserver.LcPartitionStatus tpStatus = null;
        List feedList = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t");
        for (TopicFeedInfo tInfo : feedList) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_NOT_STARTED, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() == 0L ? 1 : 0) != 0);
        }
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        feedList = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t");
        for (TopicFeedInfo tInfo : feedList) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() > 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() > 0L ? 1 : 0) != 0);
        }
        ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
        tps.add(new TopicPartition(sname + ":t", 0));
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        kafkaconsumer.assign(tps);
        int nPolledMsgs = 0;
        while (nPolledMsgs == 0) {
            ConsumerRecords records = kafkaconsumer.poll(100L);
            for (ConsumerRecord rec : records) {
                _logger.info("messageoffset is " + rec.offset());
                Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                ++nPolledMsgs;
            }
            if (nPolledMsgs == 0) continue;
            Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
            break;
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testMultiPartitions() throws IOException, InterruptedException {
        String sname = PREFIX + "testmultipartitions";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts = 10;
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 10);
        producer.run();
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        block0: for (int i = 0; i < numParts; ++i) {
            ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
            tps.add(new TopicPartition(sname + ":t", i));
            kafkaconsumer.assign(tps);
            int nPolledMsgs = 0;
            while (nPolledMsgs == 0) {
                ConsumerRecords records = kafkaconsumer.poll(100L);
                for (ConsumerRecord rec : records) {
                    _logger.info("messageoffset is " + rec.offset());
                    Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                    ++nPolledMsgs;
                }
                if (nPolledMsgs == 0) continue;
                Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
                continue block0;
            }
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    @Ignore
    public void testMultiPartitionsWithStats() throws IOException, InterruptedException {
        String sname = PREFIX + "testmultipartitionswithstats";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts = 10;
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 10);
        producer.run();
        Marlinserver.LcPartitionStatus tpStatus = null;
        List feedList = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t");
        for (TopicFeedInfo tInfo : feedList) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_NOT_STARTED, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() == 0L ? 1 : 0) != 0);
        }
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        feedList = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t");
        for (TopicFeedInfo tInfo : feedList) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() > 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() > 0L ? 1 : 0) != 0);
        }
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        block2: for (int i = 0; i < numParts; ++i) {
            ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
            tps.add(new TopicPartition(sname + ":t", i));
            kafkaconsumer.assign(tps);
            int nPolledMsgs = 0;
            while (nPolledMsgs == 0) {
                ConsumerRecords records = kafkaconsumer.poll(100L);
                for (ConsumerRecord rec : records) {
                    _logger.info("messageoffset is " + rec.offset());
                    Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                    ++nPolledMsgs;
                }
                if (nPolledMsgs == 0) continue;
                Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
                continue block2;
            }
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    @Ignore
    public void testMultipleTopicsWithStats() throws IOException, InterruptedException {
        String sname = PREFIX + "testmultipletopicswithstats";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts1 = 10;
        int numParts2 = 5;
        madmin.createTopic(sname, "t1", numParts1);
        madmin.createTopic(sname, "t2", numParts2);
        madmin.deleteTopic(sname, "t1");
        madmin.deleteTopic(sname, "t2");
        madmin.createTopic(sname, "t1", numParts1);
        madmin.createTopic(sname, "t2", numParts2);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb1 = new ProducerMultiTest.CountCallback(numMsgs * numParts1);
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs * numParts2);
        KafkaProducer<byte[], byte[]> kafkaproducer1 = this.makeKafkaProducer();
        SendMessagesToProducer producer1 = new SendMessagesToProducer(kafkaproducer1, cb1, sname + ":t1", numParts1, numMsgs, 10);
        producer1.run();
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.makeKafkaProducer();
        SendMessagesToProducer producer2 = new SendMessagesToProducer(kafkaproducer2, cb2, sname + ":t2", numParts2, numMsgs, 10);
        producer2.run();
        Marlinserver.LcPartitionStatus tpStatus = null;
        List feedList1 = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t1");
        for (Object tInfo : feedList1) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_NOT_STARTED, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() == 0L ? 1 : 0) != 0);
        }
        List feedList2 = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t2");
        for (TopicFeedInfo tInfo : feedList2) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_NOT_STARTED, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() == 0L ? 1 : 0) != 0);
        }
        madmin.compactTopicNow(sname, "t1");
        madmin.compactTopicNow(sname, "t2");
        Thread.sleep(70000L);
        feedList1 = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t1");
        feedList2 = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t2");
        for (TopicFeedInfo tInfo : feedList1) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() > 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() > 0L ? 1 : 0) != 0);
        }
        for (TopicFeedInfo tInfo : feedList2) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() > 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() > 0L ? 1 : 0) != 0);
        }
        KafkaConsumer<byte[], byte[]> kafkaconsumer1 = this.makeKafkaConsumer();
        block4: for (int i = 0; i < numParts1; ++i) {
            ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
            tps.add(new TopicPartition(sname + ":t1", i));
            kafkaconsumer1.assign(tps);
            int nPolledMsgs = 0;
            while (nPolledMsgs == 0) {
                ConsumerRecords records = kafkaconsumer1.poll(100L);
                for (ConsumerRecord rec : records) {
                    _logger.info("messageoffset is " + rec.offset());
                    Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                    ++nPolledMsgs;
                }
                if (nPolledMsgs == 0) continue;
                Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
                continue block4;
            }
        }
        kafkaconsumer1.close();
        KafkaConsumer<byte[], byte[]> kafkaconsumer2 = this.makeKafkaConsumer();
        block7: for (int i = 0; i < numParts2; ++i) {
            ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
            tps.add(new TopicPartition(sname + ":t2", i));
            kafkaconsumer2.assign(tps);
            int nPolledMsgs = 0;
            while (nPolledMsgs == 0) {
                ConsumerRecords records = kafkaconsumer2.poll(100L);
                for (ConsumerRecord rec : records) {
                    _logger.info("messageoffset is " + rec.offset());
                    Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                    ++nPolledMsgs;
                }
                if (nPolledMsgs == 0) continue;
                Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
                continue block7;
            }
        }
        kafkaconsumer2.close();
        madmin.deleteTopic(sname, "t1");
        madmin.deleteTopic(sname, "t2");
        madmin.deleteStream(sname);
    }

    @Test
    @Ignore
    public void testDeleteAndRecreateTopicsWithStats() throws IOException, InterruptedException {
        ConsumerRecords records;
        int nPolledMsgs;
        ArrayList<TopicPartition> tps;
        String sname = PREFIX + "testdeleteandrecreatetopicswithstats";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts = 10;
        madmin.createTopic(sname, "t", numParts);
        madmin.deleteTopic(sname, "t");
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 10);
        producer.run();
        Marlinserver.LcPartitionStatus tpStatus = null;
        List feedList = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t");
        for (TopicFeedInfo tInfo : feedList) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_NOT_STARTED, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() == 0L ? 1 : 0) != 0);
        }
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        feedList = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t");
        for (TopicFeedInfo tInfo : feedList) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() > 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() > 0L ? 1 : 0) != 0);
        }
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        block2: for (int i = 0; i < numParts; ++i) {
            tps = new ArrayList<TopicPartition>();
            tps.add(new TopicPartition(sname + ":t", i));
            kafkaconsumer.assign(tps);
            nPolledMsgs = 0;
            while (nPolledMsgs == 0) {
                records = kafkaconsumer.poll(100L);
                for (ConsumerRecord rec : records) {
                    _logger.info("messageoffset is " + rec.offset());
                    Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                    ++nPolledMsgs;
                }
                if (nPolledMsgs == 0) continue;
                Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
                continue block2;
            }
        }
        kafkaconsumer.close();
        madmin.deleteTopic(sname, "t");
        madmin.createTopic(sname, "t", numParts);
        kafkaproducer = this.makeKafkaProducer();
        cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 10);
        producer.run();
        feedList = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t");
        for (TopicFeedInfo tInfo : feedList) {
            tpStatus = tInfo.logCompactionStatus();
            Assert.assertEquals((Object)Marlinserver.LogCompactionStatus.LC_NOT_STARTED, (Object)tpStatus.getLcStatus());
            Assert.assertTrue((tpStatus.getLastLcTimeMs() == 0L ? 1 : 0) != 0);
            Assert.assertTrue((tpStatus.getLastLcCompactedTimeMs() == 0L ? 1 : 0) != 0);
        }
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        feedList = ((MarlinAdminImpl)madmin).infoTopic(sname + ":t");
        kafkaconsumer = this.makeKafkaConsumer();
        block6: for (int i = 0; i < numParts; ++i) {
            tps = new ArrayList();
            tps.add(new TopicPartition(sname + ":t", i));
            kafkaconsumer.assign(tps);
            nPolledMsgs = 0;
            while (nPolledMsgs == 0) {
                records = kafkaconsumer.poll(100L);
                for (ConsumerRecord rec : records) {
                    _logger.info("messageoffset is " + rec.offset());
                    Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                    ++nPolledMsgs;
                }
                if (nPolledMsgs == 0) continue;
                Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
                continue block6;
            }
        }
        kafkaconsumer.close();
        madmin.deleteTopic(sname, "t");
        madmin.deleteStream(sname);
    }

    @Test
    public void testNullKeys() throws IOException, InterruptedException {
        String sname = PREFIX + "testnullkeys";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts = 1;
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 0);
        producer.run();
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        block0: for (int i = 0; i < numParts; ++i) {
            ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
            tps.add(new TopicPartition(sname + ":t", i));
            kafkaconsumer.assign(tps);
            int nPolledMsgs = 0;
            while (nPolledMsgs == 0) {
                ConsumerRecords records = kafkaconsumer.poll(100L);
                for (ConsumerRecord rec : records) {
                    _logger.info("messageoffset is " + rec.offset());
                    Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                    ++nPolledMsgs;
                }
                if (nPolledMsgs == 0) continue;
                Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
                continue block0;
            }
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testNullKeysWithTombstoneDelete() throws IOException, InterruptedException {
        String sname = PREFIX + "testnullkeyswithtombstonedelete";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDeleteRetentionMS(0L);
        madmin.createStream(sname, sdesc);
        int numParts = 1;
        madmin.createTopic(sname, "t", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.makeKafkaProducer();
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + ":t", numParts, numMsgs, 0);
        producer.run();
        madmin.compactTopicNow(sname, "t");
        Thread.sleep(70000L);
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.makeKafkaConsumer();
        for (int i = 0; i < numParts; ++i) {
            ArrayList<TopicPartition> tps = new ArrayList<TopicPartition>();
            tps.add(new TopicPartition(sname + ":t", i));
            kafkaconsumer.assign(tps);
            ConsumerRecords records = kafkaconsumer.poll(30000L);
            for (ConsumerRecord rec : records) {
                _logger.info("messageoffset is " + rec.offset());
            }
            Assert.assertTrue((boolean)records.isEmpty());
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testAvoidFrequentCompaction() throws IOException, InterruptedException {
        String sname = PREFIX + "testAvoidFrequentCompaction";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        int numParts = 1;
        madmin.createTopic(sname, "topic0", numParts);
        sdesc.setCompact(true);
        madmin.editStream(sname, sdesc);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb1 = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        KafkaProducer<byte[], byte[]> kafkaproducer1 = this.makeKafkaProducer();
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.makeKafkaProducer();
        SendMessagesToProducer producer1 = new SendMessagesToProducer(kafkaproducer1, cb1, sname + ":topic0", numParts, numMsgs, 10);
        SendMessagesToProducer producer2 = new SendMessagesToProducer(kafkaproducer2, cb2, sname + ":topic0", numParts, numMsgs, 10);
        _logger.info("Producing " + numMsgs + " messages.");
        producer1.run();
        madmin.compactTopicNow(sname, "topic0");
        Thread.sleep(70000L);
        ArrayList<TopicPartition> tps1 = new ArrayList<TopicPartition>();
        tps1.add(new TopicPartition(sname + ":topic0", 0));
        KafkaConsumer<byte[], byte[]> kafkaconsumer1 = this.makeKafkaConsumer();
        kafkaconsumer1.assign(tps1);
        int nPolledMsgs = 0;
        while (nPolledMsgs == 0) {
            ConsumerRecords records = kafkaconsumer1.poll(100L);
            for (ConsumerRecord rec : records) {
                _logger.info("messageoffset is " + rec.offset());
                Assert.assertTrue((rec.offset() == (long)(numMsgs - 1) ? 1 : 0) != 0);
                ++nPolledMsgs;
            }
            if (nPolledMsgs == 0) continue;
            Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
            _logger.info("First round of compaction is successfull.");
            break;
        }
        _logger.info("Producing " + numMsgs + " messages again.");
        producer2.run();
        Thread.sleep(30000L);
        ArrayList<TopicPartition> tps2 = new ArrayList<TopicPartition>();
        tps2.add(new TopicPartition(sname + ":topic0", 0));
        KafkaConsumer<byte[], byte[]> kafkaconsumer2 = this.makeKafkaConsumer();
        kafkaconsumer2.assign(tps2);
        nPolledMsgs = 0;
        while (nPolledMsgs == 0) {
            ConsumerRecords records = kafkaconsumer2.poll(100L);
            for (ConsumerRecord rec : records) {
                ++nPolledMsgs;
            }
            if (nPolledMsgs == 0) continue;
            Assert.assertTrue((nPolledMsgs == numMsgs + 1 ? 1 : 0) != 0);
            _logger.info("Second round of compaction has not started yet.");
            break;
        }
        Thread.sleep(60000L);
        ArrayList<TopicPartition> tps3 = new ArrayList<TopicPartition>();
        tps3.add(new TopicPartition(sname + ":topic0", 0));
        KafkaConsumer<byte[], byte[]> kafkaconsumer3 = this.makeKafkaConsumer();
        kafkaconsumer3.assign(tps3);
        nPolledMsgs = 0;
        while (nPolledMsgs == 0) {
            ConsumerRecords records = kafkaconsumer3.poll(100L);
            for (ConsumerRecord rec : records) {
                Assert.assertTrue((rec.offset() == (long)(2 * numMsgs - 1) ? 1 : 0) != 0);
                ++nPolledMsgs;
            }
            if (nPolledMsgs == 0) continue;
            Assert.assertTrue((nPolledMsgs == 1 ? 1 : 0) != 0);
            _logger.info("Second round of compaction is successfull.");
            break;
        }
        kafkaconsumer1.close();
        kafkaconsumer2.close();
        kafkaconsumer3.close();
        madmin.deleteStream(sname);
    }

    private KafkaProducer<byte[], byte[]> makeKafkaProducer() {
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        return kafkaproducer;
    }

    private KafkaConsumer<byte[], byte[]> makeKafkaConsumer() {
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", "false");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        return kafkaconsumer;
    }
}

