package src.test.java.com.mapr.streams.tests.compaction;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.admin.TopicFeedInfo;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.streams.tests.producer.SendTimedMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:src/test/java/com/mapr/streams/tests/compaction/CompactionTest.class */
public class CompactionTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(CompactionTest.class);
    private static final String PREFIX = "/jtest-" + CompactionTest.class.getSimpleName() + "-";
    private static Admin madmin;

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(PREFIX + "testbasic");
        } catch (Exception e) {
        }
        try {
            madmin.deleteStream(PREFIX + "testbasicwithstats");
        } catch (Exception e2) {
        }
        try {
            madmin.deleteStream(PREFIX + "testmultipartitions");
        } catch (Exception e3) {
        }
        try {
            madmin.deleteStream(PREFIX + "testmultipartitionswithstats");
        } catch (Exception e4) {
        }
        try {
            madmin.deleteStream(PREFIX + "testnullkeys");
        } catch (Exception e5) {
        }
        try {
            madmin.deleteStream(PREFIX + "testnullkeyswithtombstonedelete");
        } catch (Exception e6) {
        }
        try {
            madmin.deleteStream(PREFIX + "testAvoidFrequentCompaction");
        } catch (Exception e7) {
        }
    }

    @Test
    public void testBasic() throws IOException, InterruptedException {
        String str = PREFIX + "testbasic";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 1);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 1), str + ":t", 1, 128, 10).run();
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition(str + ":t", 0));
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        makeKafkaConsumer.assign(arrayList);
        Assert.assertEquals(1L, new ArrayList(makeKafkaConsumer.listTopics(str).keySet()).size());
        int i = 0;
        while (true) {
            if (i != 0) {
                break;
            }
            Iterator it = makeKafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                _logger.info("messageoffset is " + consumerRecord.offset());
                Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                i++;
            }
            if (i != 0) {
                Assert.assertTrue(i == 1);
            }
        }
        makeKafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testThrottle() throws IOException, InterruptedException {
        String str = PREFIX + "testthrottle";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 1);
        newStreamDescriptor.setCompact(true);
        newStreamDescriptor.setCompactionThrottleFactor(1L);
        madmin.editStream(str, newStreamDescriptor);
        Assert.assertEquals(1L, madmin.getStreamDescriptor(str).getCompactionThrottleFactor());
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 1), str + ":t", 1, 128, 10).run();
        Thread.sleep(20000L);
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition(str + ":t", 0));
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        makeKafkaConsumer.assign(arrayList);
        Assert.assertEquals(1L, new ArrayList(makeKafkaConsumer.listTopics(str).keySet()).size());
        int i = 0;
        while (true) {
            if (i != 0) {
                break;
            }
            Iterator it = makeKafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                _logger.info("messageoffset is " + consumerRecord.offset());
                Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                i++;
            }
            if (i != 0) {
                Assert.assertTrue(i == 1);
            }
        }
        makeKafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testEventTime() throws IOException, InterruptedException {
        String str = PREFIX + "testeventtime";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 1);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        new SendTimedMessagesToProducer((KafkaProducer) makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 1), str + ":t", 1, 128, 0L, 10).run();
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition(str + ":t", 0));
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        makeKafkaConsumer.assign(arrayList);
        Assert.assertEquals(1L, new ArrayList(makeKafkaConsumer.listTopics(str).keySet()).size());
        int i = 0;
        while (true) {
            if (i != 0) {
                break;
            }
            Iterator it = makeKafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                _logger.info("messageoffset is " + consumerRecord.offset() + "timestamp " + consumerRecord.timestamp());
                Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)) && consumerRecord.timestamp() < 200);
                i++;
            }
            if (i != 0) {
                Assert.assertTrue(i == 1);
            }
        }
        makeKafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    @Ignore
    public void testBasicWithStats() throws IOException, InterruptedException {
        String str = PREFIX + "testbasicwithstats";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 1);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 1), str + ":t", 1, 128, 10).run();
        Iterator it = madmin.infoTopic(str + ":t").iterator();
        while (it.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus = ((TopicFeedInfo) it.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_NOT_STARTED, logCompactionStatus.getLcStatus());
            Assert.assertTrue(logCompactionStatus.getLastLcTimeMs() == 0);
            Assert.assertTrue(logCompactionStatus.getLastLcCompactedTimeMs() == 0);
        }
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        Iterator it2 = madmin.infoTopic(str + ":t").iterator();
        while (it2.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus2 = ((TopicFeedInfo) it2.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, logCompactionStatus2.getLcStatus());
            Assert.assertTrue(logCompactionStatus2.getLastLcTimeMs() > 0);
            Assert.assertTrue(logCompactionStatus2.getLastLcCompactedTimeMs() > 0);
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition(str + ":t", 0));
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        makeKafkaConsumer.assign(arrayList);
        int i = 0;
        while (true) {
            if (i != 0) {
                break;
            }
            Iterator it3 = makeKafkaConsumer.poll(100L).iterator();
            while (it3.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it3.next();
                _logger.info("messageoffset is " + consumerRecord.offset());
                Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                i++;
            }
            if (i != 0) {
                Assert.assertTrue(i == 1);
            }
        }
        makeKafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testMultiPartitions() throws IOException, InterruptedException {
        String str = PREFIX + "testmultipartitions";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 10);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 10), str + ":t", 10, 128, 10).run();
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        for (int i = 0; i < 10; i++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new TopicPartition(str + ":t", i));
            makeKafkaConsumer.assign(arrayList);
            int i2 = 0;
            while (true) {
                if (i2 == 0) {
                    Iterator it = makeKafkaConsumer.poll(100L).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        _logger.info("messageoffset is " + consumerRecord.offset());
                        Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                        i2++;
                    }
                    if (i2 != 0) {
                        Assert.assertTrue(i2 == 1);
                    }
                }
            }
        }
        makeKafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    @Ignore
    public void testMultiPartitionsWithStats() throws IOException, InterruptedException {
        String str = PREFIX + "testmultipartitionswithstats";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 10);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 10), str + ":t", 10, 128, 10).run();
        Iterator it = madmin.infoTopic(str + ":t").iterator();
        while (it.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus = ((TopicFeedInfo) it.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_NOT_STARTED, logCompactionStatus.getLcStatus());
            Assert.assertTrue(logCompactionStatus.getLastLcTimeMs() == 0);
            Assert.assertTrue(logCompactionStatus.getLastLcCompactedTimeMs() == 0);
        }
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        Iterator it2 = madmin.infoTopic(str + ":t").iterator();
        while (it2.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus2 = ((TopicFeedInfo) it2.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, logCompactionStatus2.getLcStatus());
            Assert.assertTrue(logCompactionStatus2.getLastLcTimeMs() > 0);
            Assert.assertTrue(logCompactionStatus2.getLastLcCompactedTimeMs() > 0);
        }
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        for (int i = 0; i < 10; i++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new TopicPartition(str + ":t", i));
            makeKafkaConsumer.assign(arrayList);
            int i2 = 0;
            while (true) {
                if (i2 == 0) {
                    Iterator it3 = makeKafkaConsumer.poll(100L).iterator();
                    while (it3.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it3.next();
                        _logger.info("messageoffset is " + consumerRecord.offset());
                        Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                        i2++;
                    }
                    if (i2 != 0) {
                        Assert.assertTrue(i2 == 1);
                    }
                }
            }
        }
        makeKafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    @Ignore
    public void testMultipleTopicsWithStats() throws IOException, InterruptedException {
        String str = PREFIX + "testmultipletopicswithstats";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t1", 10);
        madmin.createTopic(str, "t2", 5);
        madmin.deleteTopic(str, "t1");
        madmin.deleteTopic(str, "t2");
        madmin.createTopic(str, "t1", 10);
        madmin.createTopic(str, "t2", 5);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        ProducerMultiTest.CountCallback countCallback = new ProducerMultiTest.CountCallback(128 * 10);
        ProducerMultiTest.CountCallback countCallback2 = new ProducerMultiTest.CountCallback(128 * 5);
        new SendMessagesToProducer(makeKafkaProducer(), countCallback, str + ":t1", 10, 128, 10).run();
        new SendMessagesToProducer(makeKafkaProducer(), countCallback2, str + ":t2", 5, 128, 10).run();
        Iterator it = madmin.infoTopic(str + ":t1").iterator();
        while (it.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus = ((TopicFeedInfo) it.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_NOT_STARTED, logCompactionStatus.getLcStatus());
            Assert.assertTrue(logCompactionStatus.getLastLcTimeMs() == 0);
            Assert.assertTrue(logCompactionStatus.getLastLcCompactedTimeMs() == 0);
        }
        Iterator it2 = madmin.infoTopic(str + ":t2").iterator();
        while (it2.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus2 = ((TopicFeedInfo) it2.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_NOT_STARTED, logCompactionStatus2.getLcStatus());
            Assert.assertTrue(logCompactionStatus2.getLastLcTimeMs() == 0);
            Assert.assertTrue(logCompactionStatus2.getLastLcCompactedTimeMs() == 0);
        }
        madmin.compactTopicNow(str, "t1");
        madmin.compactTopicNow(str, "t2");
        Thread.sleep(70000L);
        List infoTopic = madmin.infoTopic(str + ":t1");
        List infoTopic2 = madmin.infoTopic(str + ":t2");
        Iterator it3 = infoTopic.iterator();
        while (it3.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus3 = ((TopicFeedInfo) it3.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, logCompactionStatus3.getLcStatus());
            Assert.assertTrue(logCompactionStatus3.getLastLcTimeMs() > 0);
            Assert.assertTrue(logCompactionStatus3.getLastLcCompactedTimeMs() > 0);
        }
        Iterator it4 = infoTopic2.iterator();
        while (it4.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus4 = ((TopicFeedInfo) it4.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, logCompactionStatus4.getLcStatus());
            Assert.assertTrue(logCompactionStatus4.getLastLcTimeMs() > 0);
            Assert.assertTrue(logCompactionStatus4.getLastLcCompactedTimeMs() > 0);
        }
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        for (int i = 0; i < 10; i++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new TopicPartition(str + ":t1", i));
            makeKafkaConsumer.assign(arrayList);
            int i2 = 0;
            while (true) {
                if (i2 == 0) {
                    Iterator it5 = makeKafkaConsumer.poll(100L).iterator();
                    while (it5.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it5.next();
                        _logger.info("messageoffset is " + consumerRecord.offset());
                        Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                        i2++;
                    }
                    if (i2 != 0) {
                        Assert.assertTrue(i2 == 1);
                    }
                }
            }
        }
        makeKafkaConsumer.close();
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer2 = makeKafkaConsumer();
        for (int i3 = 0; i3 < 5; i3++) {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(new TopicPartition(str + ":t2", i3));
            makeKafkaConsumer2.assign(arrayList2);
            int i4 = 0;
            while (true) {
                if (i4 == 0) {
                    Iterator it6 = makeKafkaConsumer2.poll(100L).iterator();
                    while (it6.hasNext()) {
                        ConsumerRecord consumerRecord2 = (ConsumerRecord) it6.next();
                        _logger.info("messageoffset is " + consumerRecord2.offset());
                        Assert.assertTrue(consumerRecord2.offset() == ((long) (128 - 1)));
                        i4++;
                    }
                    if (i4 != 0) {
                        Assert.assertTrue(i4 == 1);
                    }
                }
            }
        }
        makeKafkaConsumer2.close();
        madmin.deleteTopic(str, "t1");
        madmin.deleteTopic(str, "t2");
        madmin.deleteStream(str);
    }

    @Test
    @Ignore
    public void testDeleteAndRecreateTopicsWithStats() throws IOException, InterruptedException {
        String str = PREFIX + "testdeleteandrecreatetopicswithstats";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 10);
        madmin.deleteTopic(str, "t");
        madmin.createTopic(str, "t", 10);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 10), str + ":t", 10, 128, 10).run();
        Iterator it = madmin.infoTopic(str + ":t").iterator();
        while (it.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus = ((TopicFeedInfo) it.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_NOT_STARTED, logCompactionStatus.getLcStatus());
            Assert.assertTrue(logCompactionStatus.getLastLcTimeMs() == 0);
            Assert.assertTrue(logCompactionStatus.getLastLcCompactedTimeMs() == 0);
        }
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        Iterator it2 = madmin.infoTopic(str + ":t").iterator();
        while (it2.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus2 = ((TopicFeedInfo) it2.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_DONE_SUCCESS, logCompactionStatus2.getLcStatus());
            Assert.assertTrue(logCompactionStatus2.getLastLcTimeMs() > 0);
            Assert.assertTrue(logCompactionStatus2.getLastLcCompactedTimeMs() > 0);
        }
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        for (int i = 0; i < 10; i++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new TopicPartition(str + ":t", i));
            makeKafkaConsumer.assign(arrayList);
            int i2 = 0;
            while (true) {
                if (i2 == 0) {
                    Iterator it3 = makeKafkaConsumer.poll(100L).iterator();
                    while (it3.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it3.next();
                        _logger.info("messageoffset is " + consumerRecord.offset());
                        Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                        i2++;
                    }
                    if (i2 != 0) {
                        Assert.assertTrue(i2 == 1);
                    }
                }
            }
        }
        makeKafkaConsumer.close();
        madmin.deleteTopic(str, "t");
        madmin.createTopic(str, "t", 10);
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 10), str + ":t", 10, 128, 10).run();
        Iterator it4 = madmin.infoTopic(str + ":t").iterator();
        while (it4.hasNext()) {
            Marlinserver.LcPartitionStatus logCompactionStatus3 = ((TopicFeedInfo) it4.next()).logCompactionStatus();
            Assert.assertEquals(Marlinserver.LogCompactionStatus.LC_NOT_STARTED, logCompactionStatus3.getLcStatus());
            Assert.assertTrue(logCompactionStatus3.getLastLcTimeMs() == 0);
            Assert.assertTrue(logCompactionStatus3.getLastLcCompactedTimeMs() == 0);
        }
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        madmin.infoTopic(str + ":t");
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer2 = makeKafkaConsumer();
        for (int i3 = 0; i3 < 10; i3++) {
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(new TopicPartition(str + ":t", i3));
            makeKafkaConsumer2.assign(arrayList2);
            int i4 = 0;
            while (true) {
                if (i4 == 0) {
                    Iterator it5 = makeKafkaConsumer2.poll(100L).iterator();
                    while (it5.hasNext()) {
                        ConsumerRecord consumerRecord2 = (ConsumerRecord) it5.next();
                        _logger.info("messageoffset is " + consumerRecord2.offset());
                        Assert.assertTrue(consumerRecord2.offset() == ((long) (128 - 1)));
                        i4++;
                    }
                    if (i4 != 0) {
                        Assert.assertTrue(i4 == 1);
                    }
                }
            }
        }
        makeKafkaConsumer2.close();
        madmin.deleteTopic(str, "t");
        madmin.deleteStream(str);
    }

    @Test
    public void testNullKeys() throws IOException, InterruptedException {
        String str = PREFIX + "testnullkeys";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 1);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 1), str + ":t", 1, 128, 0).run();
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        for (int i = 0; i < 1; i++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new TopicPartition(str + ":t", i));
            makeKafkaConsumer.assign(arrayList);
            int i2 = 0;
            while (true) {
                if (i2 == 0) {
                    Iterator it = makeKafkaConsumer.poll(100L).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        _logger.info("messageoffset is " + consumerRecord.offset());
                        Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                        i2++;
                    }
                    if (i2 != 0) {
                        Assert.assertTrue(i2 == 1);
                    }
                }
            }
        }
        makeKafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testNullKeysWithTombstoneDelete() throws IOException, InterruptedException {
        String str = PREFIX + "testnullkeyswithtombstonedelete";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDeleteRetentionMS(0L);
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", 1);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        new SendMessagesToProducer(makeKafkaProducer(), new ProducerMultiTest.CountCallback(128 * 1), str + ":t", 1, 128, 0).run();
        madmin.compactTopicNow(str, "t");
        Thread.sleep(70000L);
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        for (int i = 0; i < 1; i++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new TopicPartition(str + ":t", i));
            makeKafkaConsumer.assign(arrayList);
            ConsumerRecords poll = makeKafkaConsumer.poll(30000L);
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                _logger.info("messageoffset is " + ((ConsumerRecord) it.next()).offset());
            }
            Assert.assertTrue(poll.isEmpty());
        }
        makeKafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testAvoidFrequentCompaction() throws IOException, InterruptedException {
        String str = PREFIX + "testAvoidFrequentCompaction";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "topic0", 1);
        newStreamDescriptor.setCompact(true);
        madmin.editStream(str, newStreamDescriptor);
        ProducerMultiTest.CountCallback countCallback = new ProducerMultiTest.CountCallback(128 * 1);
        ProducerMultiTest.CountCallback countCallback2 = new ProducerMultiTest.CountCallback(128 * 1);
        KafkaProducer<byte[], byte[]> makeKafkaProducer = makeKafkaProducer();
        KafkaProducer<byte[], byte[]> makeKafkaProducer2 = makeKafkaProducer();
        SendMessagesToProducer sendMessagesToProducer = new SendMessagesToProducer(makeKafkaProducer, countCallback, str + ":topic0", 1, 128, 10);
        SendMessagesToProducer sendMessagesToProducer2 = new SendMessagesToProducer(makeKafkaProducer2, countCallback2, str + ":topic0", 1, 128, 10);
        _logger.info("Producing 128 messages.");
        sendMessagesToProducer.run();
        madmin.compactTopicNow(str, "topic0");
        Thread.sleep(70000L);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition(str + ":topic0", 0));
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer = makeKafkaConsumer();
        makeKafkaConsumer.assign(arrayList);
        int i = 0;
        while (true) {
            if (i != 0) {
                break;
            }
            Iterator it = makeKafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                _logger.info("messageoffset is " + consumerRecord.offset());
                Assert.assertTrue(consumerRecord.offset() == ((long) (128 - 1)));
                i++;
            }
            if (i != 0) {
                Assert.assertTrue(i == 1);
                _logger.info("First round of compaction is successfull.");
            }
        }
        _logger.info("Producing 128 messages again.");
        sendMessagesToProducer2.run();
        Thread.sleep(30000L);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new TopicPartition(str + ":topic0", 0));
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer2 = makeKafkaConsumer();
        makeKafkaConsumer2.assign(arrayList2);
        int i2 = 0;
        while (true) {
            if (i2 != 0) {
                break;
            }
            Iterator it2 = makeKafkaConsumer2.poll(100L).iterator();
            while (it2.hasNext()) {
                i2++;
            }
            if (i2 != 0) {
                Assert.assertTrue(i2 == 128 + 1);
                _logger.info("Second round of compaction has not started yet.");
            }
        }
        Thread.sleep(60000L);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(new TopicPartition(str + ":topic0", 0));
        KafkaConsumer<byte[], byte[]> makeKafkaConsumer3 = makeKafkaConsumer();
        makeKafkaConsumer3.assign(arrayList3);
        int i3 = 0;
        while (true) {
            if (i3 != 0) {
                break;
            }
            Iterator it3 = makeKafkaConsumer3.poll(100L).iterator();
            while (it3.hasNext()) {
                Assert.assertTrue(((ConsumerRecord) it3.next()).offset() == ((long) ((2 * 128) - 1)));
                i3++;
            }
            if (i3 != 0) {
                Assert.assertTrue(i3 == 1);
                _logger.info("Second round of compaction is successfull.");
            }
        }
        makeKafkaConsumer.close();
        makeKafkaConsumer2.close();
        makeKafkaConsumer3.close();
        madmin.deleteStream(str);
    }

    private KafkaProducer<byte[], byte[]> makeKafkaProducer() {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<>(properties);
    }

    private KafkaConsumer<byte[], byte[]> makeKafkaConsumer() {
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "false");
        return new KafkaConsumer<>(properties);
    }
}
