/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.db.cdc.tests;

import com.mapr.db.Table;
import com.mapr.db.cdc.tests.MixConsumer;
import com.mapr.db.cdc.tests.SimpleProducer;
import com.mapr.db.cdc.tests.TestCDPSBinaryWithCluster;
import com.mapr.db.cdc.tests.TestCDPSJSONWithCluster;
import com.mapr.db.cdc.tests.TestCDPSUtil;
import com.mapr.db.impl.AdminImpl;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class TestCDPSOpenFormatConsumerGrp
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(TestCDPSOpenFormatConsumerGrp.class);
    public static String dir = "/tmp/";
    private static AdminImpl testAdmin = (AdminImpl)MapRDBImpl.newAdmin();

    @BeforeClass
    public static void startupBeforeClass() throws Exception {
        System.out.println("--- On single node cluster without other workload, these tests takes total about 3 minutes, please wait ---\nTestCDPSOpenFormatConsumerGrp#testMixConsumer ------ 3 minutes\n");
    }

    @AfterClass
    public static void cleanupAfterClass() throws IOException, Exception {
        _logger.info("Done!");
    }

    public static void prepareBinTables(String[] bOFTopics) throws Exception {
        for (String bt : bOFTopics) {
            TestCDPSBinaryWithCluster.createTable(dir + bt);
        }
    }

    public static void prepareJsonTables(String[] jOFTopics, List<Table> jmtList) throws Exception {
        for (String jt : jOFTopics) {
            Table jMcfTable = TestCDPSUtil.replaceMcfJsonTable(testAdmin, dir + jt);
            jmtList.add(jMcfTable);
            TestCDPSJSONWithCluster.putSimpleMap("row1", jMcfTable);
            TestCDPSJSONWithCluster.putSimpleMap("row2", jMcfTable);
        }
    }

    public static void prepareStreams(String[] bOFTopics, String[] jOFTopics, String[] nTopics, String[] chglogs, String[] streams) throws Exception {
        int[] feedCount = new int[]{10, 20, 30, 40, 2};
        int idx = 0;
        for (String chg : chglogs) {
            TestCDPSUtil.replaceStreamTable(dir + chg, true, feedCount[idx]);
            ++idx;
        }
        for (String st : streams) {
            TestCDPSUtil.replaceStreamTable(dir + st, false, feedCount[idx]);
            ++idx;
        }
        TestCDPSUtil.setupCDPSReplica(dir + bOFTopics[0], dir + chglogs[0], dir + chglogs[0] + ":" + bOFTopics[0]);
        TestCDPSUtil.setupCDPSReplica(dir + bOFTopics[1], dir + chglogs[0], dir + chglogs[0] + ":" + bOFTopics[1]);
        TestCDPSUtil.setupCDPSReplica(dir + jOFTopics[0], dir + chglogs[1], dir + chglogs[1] + ":" + jOFTopics[0]);
        TestCDPSUtil.setupCDPSReplica(dir + jOFTopics[1], dir + chglogs[1], dir + chglogs[1] + ":" + jOFTopics[1]);
        TestCDPSUtil.setupCDPSReplica(dir + jOFTopics[2], dir + chglogs[1], dir + chglogs[1] + ":" + jOFTopics[2]);
        TestCDPSUtil.setupCDPSReplica(dir + bOFTopics[2], dir + chglogs[2], dir + chglogs[2] + ":" + bOFTopics[2]);
        TestCDPSUtil.setupCDPSReplica(dir + jOFTopics[3], dir + chglogs[2], dir + chglogs[2] + ":" + jOFTopics[3]);
        TestCDPSUtil.setupCDPSReplica(dir + bOFTopics[3], dir + chglogs[2], dir + chglogs[2] + ":" + bOFTopics[3]);
        TestCDPSUtil.setupCDPSReplica(dir + jOFTopics[4], dir + chglogs[2], dir + chglogs[2] + ":" + jOFTopics[4]);
        int retCode = 0;
        for (int i = 0; i < 5; ++i) {
            retCode = SimpleProducer.testSimpleProducer(dir + streams[0] + ":" + nTopics[i], 2);
        }
        retCode = SimpleProducer.testSimpleProducer(dir + streams[1] + ":" + nTopics[5], 2);
    }

    public static void addMoreData(String[] bOFTopics, List<Table> jmtList, String[] nTopics, String[] streams) throws Exception {
        for (String bt : bOFTopics) {
            TestCDPSBinaryWithCluster.deleteRecFromTable(dir + bt);
        }
        for (Table jMcfTable : jmtList) {
            TestCDPSJSONWithCluster.putSimpleMap("row3", jMcfTable);
            TestCDPSJSONWithCluster.putSimpleMap("row4", jMcfTable);
        }
        int retCode = 0;
        for (int i = 0; i < 5; ++i) {
            retCode = SimpleProducer.testSimpleProducer(dir + streams[0] + ":" + nTopics[i], 2);
        }
        retCode = SimpleProducer.testSimpleProducer(dir + streams[1] + ":" + nTopics[5], 2);
    }

    @Test
    public void testMixConsumer() throws Exception {
        boolean isDebug = true;
        int consumerCount = 20;
        String groupId = "mixGrp1";
        String dir = "/tmp/";
        int[] feedCount = new int[]{10, 20, 30, 40, 2};
        String[] bOFTopics = new String[]{"bOFT1", "bOFT2", "bOFT3", "bOFT4"};
        String[] jOFTopics = new String[]{"jOFT1", "jOFT2", "jOFT3", "jOFT4", "jOFT5"};
        String[] nTopics = new String[]{"nT1", "nT2", "nT3", "nT4", "nT5", "nT6"};
        String[] chglogs = new String[]{"bOFChg1", "jOFChg1", "mOFChg1"};
        String[] streams = new String[]{"nOFChg1", "nOFChg2"};
        String[] fullTopics = new String[]{dir + chglogs[0] + ":" + bOFTopics[0], dir + chglogs[0] + ":" + bOFTopics[1], dir + chglogs[1] + ":" + jOFTopics[0], dir + chglogs[1] + ":" + jOFTopics[1], dir + chglogs[1] + ":" + jOFTopics[2], dir + chglogs[2] + ":" + bOFTopics[2], dir + chglogs[2] + ":" + jOFTopics[3], dir + chglogs[2] + ":" + bOFTopics[3], dir + chglogs[2] + ":" + jOFTopics[4], dir + streams[0] + ":" + nTopics[0], dir + streams[0] + ":" + nTopics[1], dir + streams[0] + ":" + nTopics[2], dir + streams[0] + ":" + nTopics[3], dir + streams[0] + ":" + nTopics[4], dir + streams[1] + ":" + nTopics[5]};
        ArrayList<Table> jMTList = new ArrayList<Table>();
        TestCDPSOpenFormatConsumerGrp.prepareBinTables(bOFTopics);
        TestCDPSOpenFormatConsumerGrp.prepareJsonTables(jOFTopics, jMTList);
        TestCDPSOpenFormatConsumerGrp.prepareStreams(bOFTopics, jOFTopics, nTopics, chglogs, streams);
        final ExecutorService executor = Executors.newFixedThreadPool(consumerCount);
        final ArrayList<MixConsumer> consumers = new ArrayList<MixConsumer>();
        HashMap<String, Boolean> readTopics = new HashMap<String, Boolean>();
        for (int i = 0; i < consumerCount; ++i) {
            MixConsumer consumer = new MixConsumer(i, groupId, Arrays.asList(fullTopics), readTopics);
            consumers.add(consumer);
            executor.submit(consumer);
        }
        TestCDPSOpenFormatConsumerGrp.addMoreData(bOFTopics, jMTList, nTopics, streams);
        boolean hasMissing = false;
        for (int waitCount = 0; waitCount < 30; ++waitCount) {
            hasMissing = false;
            for (String ft : fullTopics) {
                if (readTopics.get(ft) != null) continue;
                _logger.info("wait count : " + waitCount + ", Missing topic :" + ft);
                hasMissing = true;
            }
            if (!hasMissing) break;
            TimeUnit.SECONDS.sleep(5L);
        }
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                for (MixConsumer consumer : consumers) {
                    consumer.shutdown();
                }
                executor.shutdown();
                try {
                    executor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        _logger.info("Final Fetched topics :");
        for (Map.Entry entry : readTopics.entrySet()) {
            _logger.info((String)entry.getKey() + ":" + entry.getValue());
        }
        Assert.assertTrue((!hasMissing ? 1 : 0) != 0);
    }
}

