package com.mapr.streams.tests.xcr;

import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.listener.ListenerPerformance;
import com.mapr.streams.producer.ProducerPerformance;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.StressTest;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({StressTest.class})
/* loaded from: input_file:com/mapr/streams/tests/xcr/AsyncReplTest.class */
public class AsyncReplTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(AsyncReplTest.class);
    private static final String SRCSTREAM = "/jtest-src-" + AsyncReplTest.class.getSimpleName();
    private static final String DSTSTREAM = "/jtest-dst-" + AsyncReplTest.class.getSimpleName();
    private static Admin madmin;
    private static final int numParts = 1;
    private static final int numMsgs = 100000;
    private static final int numTopics = 30;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/streams/tests/xcr/AsyncReplTest$ProducerRunnable.class */
    public static class ProducerRunnable implements Runnable {
        public String stream;
        public int nmsgs;
        public int ntopics;
        public int npartitions;
        public int nbatches;
        public long batchsleepms;
        public boolean verifyKeys;
        public boolean mflushers;
        public boolean printProgress;
        public boolean roundrobin;
        public boolean hashkey;
        public boolean status = false;

        public ProducerRunnable(String str, int i, int i2, int i3, int i4, long j, boolean z, boolean z2, boolean z3, boolean z4, boolean z5) {
            this.stream = str;
            this.nmsgs = i;
            this.ntopics = i2;
            this.npartitions = i3;
            this.nbatches = i4;
            this.batchsleepms = j;
            this.verifyKeys = z;
            this.mflushers = z2;
            this.printProgress = z3;
            this.roundrobin = z4;
            this.hashkey = z5;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.status = ProducerPerformance.runStressTest(this.stream, this.nmsgs, this.ntopics, this.npartitions, this.nbatches, this.batchsleepms, this.verifyKeys, this.mflushers, this.printProgress, this.roundrobin, this.hashkey);
            } catch (Exception e) {
                System.out.println(e);
                this.status = false;
            }
        }
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(SRCSTREAM);
        } catch (Exception e) {
        }
        try {
            madmin.deleteStream(DSTSTREAM);
        } catch (Exception e2) {
        }
    }

    @Before
    public void setupTest() throws Exception {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        madmin.createStream(SRCSTREAM, newStreamDescriptor);
        System.out.println("Set region size, completed with return code " + new ProcessBuilder("maprcli", "table", "edit", "-path", SRCSTREAM, "-regionsizemb", "512").start().waitFor());
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(SRCSTREAM);
        madmin.deleteStream(DSTSTREAM);
    }

    @Test
    public void testAsyncReplAutoSetupOrder() throws IOException {
        try {
            Process start = new ProcessBuilder("maprcli", "stream", "replica", "autosetup", "-path", SRCSTREAM, "-replica", DSTSTREAM).start();
            start.waitFor();
            System.out.println("Replica autosetup process exit code: " + start.exitValue());
            runProducerAndListener();
        } catch (InterruptedException e) {
            System.out.println(e);
            Assert.assertTrue(false);
        }
    }

    @Test
    public void testAsyncReplManualSetupOrder() throws IOException {
        try {
            StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
            newStreamDescriptor.setDefaultPartitions(numParts);
            madmin.createStream(DSTSTREAM, newStreamDescriptor);
            Process start = new ProcessBuilder("maprcli", "stream", "replica", "add", "-path", SRCSTREAM, "-replica", DSTSTREAM).start();
            start.waitFor();
            System.out.println("Replica add process exit code: " + start.exitValue());
            Process start2 = new ProcessBuilder("maprcli", "stream", "upstream", "add", "-path", DSTSTREAM, "-upstream", SRCSTREAM).start();
            start2.waitFor();
            System.out.println("Upstream add process exit code: " + start2.exitValue());
            runProducerAndListener();
        } catch (InterruptedException e) {
            System.out.println(e);
            Assert.assertTrue(false);
        }
    }

    private void runProducerAndListener() throws InterruptedException {
        Thread.sleep(5000L);
        ProducerRunnable producerRunnable = new ProducerRunnable(SRCSTREAM, numMsgs, numTopics, numParts, numParts, 10000L, true, false, false, false, false);
        Thread thread = new Thread(producerRunnable);
        thread.start();
        ListenerPerformance listenerPerformance = new ListenerPerformance(DSTSTREAM, SRCSTREAM, numTopics, numParts, numMsgs, numParts, true, true, false, false, true, null, "AsyncReplTest", false);
        Thread thread2 = new Thread(listenerPerformance);
        thread2.start();
        thread.join();
        thread2.join();
        Assert.assertTrue(producerRunnable.status);
        Assert.assertTrue(listenerPerformance.status);
    }
}
