/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.xcr;

import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.MarlinAdmin;
import com.mapr.streams.listener.ListenerPerformance;
import com.mapr.streams.producer.ProducerPerformance;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.StressTest;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={StressTest.class})
public class AsyncReplTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(AsyncReplTest.class);
    private static final String SRCSTREAM = "/jtest-src-" + AsyncReplTest.class.getSimpleName();
    private static final String DSTSTREAM = "/jtest-dst-" + AsyncReplTest.class.getSimpleName();
    private static MarlinAdmin madmin;
    private static final int numParts = 1;
    private static final int numMsgs = 100000;
    private static final int numTopics = 30;

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration conf = new Configuration();
        madmin = new MarlinAdmin(conf);
        try {
            madmin.deleteStream(SRCSTREAM);
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            madmin.deleteStream(DSTSTREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Before
    public void setupTest() throws Exception {
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        madmin.createStream(SRCSTREAM, sdesc);
        ProcessBuilder proc = new ProcessBuilder("maprcli", "table", "edit", "-path", SRCSTREAM, "-regionsizemb", "512");
        Process process = proc.start();
        int errorCode = process.waitFor();
        System.out.println("Set region size, completed with return code " + errorCode);
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(SRCSTREAM);
        madmin.deleteStream(DSTSTREAM);
    }

    @Test
    public void testAsyncReplAutoSetupOrder() throws IOException {
        try {
            ProcessBuilder proc = new ProcessBuilder("maprcli", "stream", "replica", "autosetup", "-path", SRCSTREAM, "-replica", DSTSTREAM);
            Process autosetup = proc.start();
            autosetup.waitFor();
            System.out.println("Replica autosetup process exit code: " + autosetup.exitValue());
            this.runProducerAndListener();
        }
        catch (InterruptedException e) {
            System.out.println(e);
            Assert.assertTrue((boolean)false);
        }
    }

    @Test
    public void testAsyncReplManualSetupOrder() throws IOException {
        try {
            MStreamDescriptor sdesc = new MStreamDescriptor();
            sdesc.setDefaultPartitions(1);
            madmin.createStream(DSTSTREAM, sdesc);
            ProcessBuilder proc = new ProcessBuilder("maprcli", "stream", "replica", "add", "-path", SRCSTREAM, "-replica", DSTSTREAM);
            Process replicaAdd = proc.start();
            replicaAdd.waitFor();
            System.out.println("Replica add process exit code: " + replicaAdd.exitValue());
            proc = new ProcessBuilder("maprcli", "stream", "upstream", "add", "-path", DSTSTREAM, "-upstream", SRCSTREAM);
            Process upstreamAdd = proc.start();
            upstreamAdd.waitFor();
            System.out.println("Upstream add process exit code: " + upstreamAdd.exitValue());
            this.runProducerAndListener();
        }
        catch (InterruptedException e) {
            System.out.println(e);
            Assert.assertTrue((boolean)false);
        }
    }

    private void runProducerAndListener() throws InterruptedException {
        Thread.sleep(5000L);
        ProducerRunnable pp = new ProducerRunnable(SRCSTREAM, 100000, 30, 1, 1, 10000L, true, false, false, false, false);
        Thread pt = new Thread(pp);
        pt.start();
        ListenerPerformance lp = new ListenerPerformance(DSTSTREAM, SRCSTREAM, 30, 1, 100000, 1, true, true, false, false, true, null, "AsyncReplTest", false);
        Thread lt = new Thread(lp);
        lt.start();
        pt.join();
        lt.join();
        Assert.assertTrue((boolean)pp.status);
        Assert.assertTrue((boolean)lp.status);
    }

    private static class ProducerRunnable
    implements Runnable {
        public String stream;
        public int nmsgs;
        public int ntopics;
        public int npartitions;
        public int nbatches;
        public long batchsleepms;
        public boolean verifyKeys;
        public boolean mflushers;
        public boolean printProgress;
        public boolean roundrobin;
        public boolean hashkey;
        public boolean status;

        public ProducerRunnable(String stream, int nmsgs, int ntopics, int nparts, int nbatches, long batchSleepMs, boolean verifyKeys, boolean mflushers, boolean print, boolean rr, boolean hash) {
            this.stream = stream;
            this.nmsgs = nmsgs;
            this.ntopics = ntopics;
            this.npartitions = nparts;
            this.nbatches = nbatches;
            this.batchsleepms = batchSleepMs;
            this.verifyKeys = verifyKeys;
            this.mflushers = mflushers;
            this.printProgress = print;
            this.roundrobin = rr;
            this.hashkey = hash;
            this.status = false;
        }

        @Override
        public void run() {
            try {
                this.status = ProducerPerformance.runStressTest(this.stream, this.nmsgs, this.ntopics, this.npartitions, this.nbatches, this.batchsleepms, this.verifyKeys, this.mflushers, this.printProgress, this.roundrobin, this.hashkey);
            }
            catch (Exception e) {
                System.out.println(e);
                this.status = false;
            }
        }
    }
}

