/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.drill.PlanTestBase;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
import org.apache.drill.exec.store.kafka.TestKafkaSuit;
import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;

public class KafkaTestBase
extends PlanTestBase {
    protected static KafkaStoragePluginConfig storagePluginConfig;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        Assume.assumeTrue((boolean)TestKafkaSuit.isRunningSuite());
        TestKafkaSuit.initKafka();
        KafkaTestBase.initKafkaStoragePlugin(TestKafkaSuit.embeddedKafkaCluster);
    }

    public static void initKafkaStoragePlugin(EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception {
        StoragePluginRegistry pluginRegistry = KafkaTestBase.getDrillbitContext().getStorage();
        HashMap kafkaConsumerProps = Maps.newHashMap();
        kafkaConsumerProps.put("bootstrap.servers", embeddedKafkaCluster.getKafkaBrokerList());
        kafkaConsumerProps.put("group.id", "drill-test-consumer");
        storagePluginConfig = new KafkaStoragePluginConfig((Map)kafkaConsumerProps);
        storagePluginConfig.setEnabled(Boolean.valueOf(true));
        pluginRegistry.createOrUpdate("kafka", (StoragePluginConfig)storagePluginConfig, true);
        KafkaTestBase.testNoResult((String)String.format("alter session set `%s` = '%s'", "store.kafka.record.reader", "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"), (Object[])new Object[0]);
        KafkaTestBase.testNoResult((String)String.format("alter session set `%s` = %d", "store.kafka.poll.timeout", 5000), (Object[])new Object[0]);
    }

    public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception {
        return KafkaTestBase.testSqlWithResults((String)sql);
    }

    public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) throws Exception {
        List<QueryDataBatch> results = this.runKafkaSQLWithResults(sql);
        this.logResultAndVerifyRowCount(results, expectedRowCount);
    }

    public void logResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException {
        int rowCount = this.logResult(results);
        if (expectedRowCount != -1) {
            Assert.assertEquals((long)expectedRowCount, (long)rowCount);
        }
    }

    public void testHelper(String query, String expectedExprInPlan, int expectedRecordCount) throws Exception {
        KafkaTestBase.testPhysicalPlan((String)query, (String[])new String[]{expectedExprInPlan});
        int actualRecordCount = KafkaTestBase.testSql((String)query);
        Assert.assertEquals((String)String.format("Received unexpected number of rows in output: expected=%d, received=%s", expectedRecordCount, actualRecordCount), (long)expectedRecordCount, (long)actualRecordCount);
    }

    @AfterClass
    public static void tearDownKafkaTestBase() throws Exception {
        if (TestKafkaSuit.isRunningSuite()) {
            TestKafkaSuit.tearDownCluster();
        }
    }
}

