/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.store.kafka.KafkaMessageGenerator;
import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
import org.apache.drill.exec.store.kafka.KafkaTestBase;
import org.apache.drill.exec.store.kafka.TestKafkaSuite;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.MethodSorters;

@FixMethodOrder(value=MethodSorters.JVM)
@Category(value={KafkaStorageTest.class, SlowTest.class})
public class KafkaQueriesTest
extends KafkaTestBase {
    @Test
    public void testSqlQueryOnInvalidTopic() throws Exception {
        String queryString = String.format("select * from kafka.`%s`", "invalid-topic");
        try {
            this.testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.emptyList()).go();
            Assert.fail((String)"Test passed though topic does not exist.");
        }
        catch (RpcException re) {
            Assert.assertTrue((boolean)re.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist"));
        }
    }

    @Test
    public void testResultLimit() throws Exception {
        String queryString = String.format("select * from kafka.`%s` limit 3", "drill-json-topic");
        this.queryBuilder().sql(queryString).planMatcher().include(new String[]{"Scan", "records=3"}).match();
    }

    @Test
    public void testResultCount() {
        String queryString = String.format("select * from kafka.`%s`", "drill-json-topic");
        this.runKafkaSQLVerifyCount(queryString, 10);
    }

    @Test
    public void testAvroResultCount() {
        try {
            client.alterSession("store.kafka.record.reader", (Object)"org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
            KafkaStoragePluginConfig config = (KafkaStoragePluginConfig)cluster.drillbit().getContext().getStorage().getStoredConfig("kafka");
            config.getKafkaConsumerProps().put("key.deserializer", KafkaAvroDeserializer.class.getName());
            String queryString = String.format("select * from kafka.`%s`", "drill-avro-topic");
            this.runKafkaSQLVerifyCount(queryString, 10);
        }
        finally {
            client.resetSession("store.kafka.record.reader");
        }
    }

    @Test
    public void testPartitionMinOffset() throws Exception {
        Map<TopicPartition, Long> startOffsetsMap = this.fetchOffsets(-2);
        String queryString = String.format("select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`", "drill-json-topic");
        this.testBuilder().sqlQuery(queryString).unOrdered().baselineColumns(new String[]{"minOffset"}).baselineValues(new Object[]{startOffsetsMap.get(new TopicPartition("drill-json-topic", 0))}).go();
    }

    @Test
    public void testPartitionMaxOffset() throws Exception {
        Map<TopicPartition, Long> endOffsetsMap = this.fetchOffsets(-1);
        String queryString = String.format("select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`", "drill-json-topic");
        this.testBuilder().sqlQuery(queryString).unOrdered().baselineColumns(new String[]{"maxOffset"}).baselineValues(new Object[]{endOffsetsMap.get(new TopicPartition("drill-json-topic", 0)) - 1L}).go();
    }

    @Test
    public void testInformationSchema() throws Exception {
        String query = "select * from information_schema.`views`";
        this.queryBuilder().sql(query).run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<TopicPartition, Long> fetchOffsets(int flag) throws InterruptedException {
        KafkaConsumer kafkaConsumer = null;
        try {
            kafkaConsumer = new KafkaConsumer(storagePluginConfig.getKafkaConsumerProps(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            HashMap<TopicPartition, Long> offsetsMap = new HashMap<TopicPartition, Long>();
            kafkaConsumer.subscribe(Collections.singletonList("drill-json-topic"));
            kafkaConsumer.poll(Duration.ofSeconds(5L));
            Set<TopicPartition> assignments = this.waitForConsumerAssignment((Consumer)kafkaConsumer);
            if (flag == -2) {
                kafkaConsumer.seekToBeginning(assignments);
                for (TopicPartition topicPartition : assignments) {
                    offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
                }
            } else if (flag == -1) {
                kafkaConsumer.seekToEnd(assignments);
                for (TopicPartition topicPartition : assignments) {
                    offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
                }
            } else {
                throw new RuntimeException(String.format("Unsupported flag %d", flag));
            }
            HashMap<TopicPartition, Long> hashMap = offsetsMap;
            TestKafkaSuite.embeddedKafkaCluster.registerToClose((AutoCloseable)kafkaConsumer);
            return hashMap;
        }
        catch (Throwable throwable) {
            TestKafkaSuite.embeddedKafkaCluster.registerToClose((AutoCloseable)kafkaConsumer);
            throw throwable;
        }
    }

    private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer) throws InterruptedException {
        long timeout;
        Set assignments = consumer.assignment();
        long waitingForAssigmentTimeout = 5000L;
        for (timeout = 0L; assignments.isEmpty() && timeout < waitingForAssigmentTimeout; timeout += 500L) {
            Thread.sleep(500L);
            assignments = consumer.assignment();
        }
        if (timeout >= waitingForAssigmentTimeout) {
            Assert.fail((String)("Consumer assignment wasn't completed within the timeout " + waitingForAssigmentTimeout));
        }
        return assignments;
    }

    @Test
    public void testPhysicalPlanSubmission() throws Exception {
        String query = String.format("select * from kafka.`%s`", "drill-json-topic");
        String plan = this.queryBuilder().sql(query).explainJson();
        this.queryBuilder().physical(plan).run();
    }

    @Test
    public void testPhysicalPlanSubmissionAvro() throws Exception {
        try {
            client.alterSession("store.kafka.record.reader", (Object)"org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
            String query = String.format("select * from kafka.`%s`", "drill-avro-topic");
            String plan = this.queryBuilder().sql(query).explainJson();
            this.queryBuilder().physical(plan).run();
        }
        finally {
            client.resetSession("store.kafka.record.reader");
        }
    }

    @Test
    public void testOneMessageTopic() throws Exception {
        String topicName = "topicWithOneMessage";
        TestKafkaSuite.createTopicHelper(topicName, 1);
        KafkaMessageGenerator generator = new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
        generator.populateMessages(topicName, "{\"index\": 1}");
        this.testBuilder().sqlQuery("select index from kafka.`%s`", new Object[]{topicName}).unOrdered().baselineColumns(new String[]{"index"}).baselineValues(new Object[]{1L}).go();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testMalformedRecords() throws Exception {
        String topicName = "topicWithMalFormedMessages";
        TestKafkaSuite.createTopicHelper(topicName, 1);
        try {
            KafkaMessageGenerator generator = new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
            generator.populateMessages(topicName, "Test");
            client.alterSession("store.kafka.reader.skip_invalid_records", (Object)false);
            try {
                this.queryBuilder().sql("select * from kafka.`%s`", new Object[]{topicName}).run();
                Assert.fail();
            }
            catch (UserException userException) {
                // empty catch block
            }
            client.alterSession("store.kafka.reader.skip_invalid_records", (Object)true);
            this.testBuilder().sqlQuery("select * from kafka.`%s`", new Object[]{topicName}).expectsEmptyResultSet();
            generator.populateMessages(topicName, "{\"index\": 1}", "", "   ", "{Invalid}", "{\"index\": 2}");
            this.testBuilder().sqlQuery("select index from kafka.`%s`", new Object[]{topicName}).unOrdered().baselineColumns(new String[]{"index"}).baselineValues(new Object[]{1L}).baselineValues(new Object[]{2L}).go();
        }
        finally {
            client.resetSession("store.kafka.reader.skip_invalid_records");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNanInf() throws Exception {
        String topicName = "topicWithNanInf";
        TestKafkaSuite.createTopicHelper(topicName, 1);
        try {
            KafkaMessageGenerator generator = new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
            generator.populateMessages(topicName, "{\"nan_col\":NaN, \"inf_col\":Infinity}");
            client.alterSession("store.kafka.reader.allow_nan_inf", (Object)false);
            try {
                this.queryBuilder().sql("select nan_col, inf_col from kafka.`%s`", new Object[]{topicName}).run();
                Assert.fail();
            }
            catch (UserException userException) {
                // empty catch block
            }
            client.alterSession("store.kafka.reader.allow_nan_inf", (Object)true);
            this.testBuilder().sqlQuery("select nan_col, inf_col from kafka.`%s`", new Object[]{topicName}).unOrdered().baselineColumns(new String[]{"nan_col", "inf_col"}).baselineValues(new Object[]{Double.NaN, Double.POSITIVE_INFINITY}).go();
        }
        finally {
            client.resetSession("store.kafka.reader.allow_nan_inf");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEscapeAnyChar() throws Exception {
        String topicName = "topicWithEscapeAnyChar";
        TestKafkaSuite.createTopicHelper(topicName, 1);
        try {
            KafkaMessageGenerator generator = new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
            generator.populateMessages(topicName, "{\"name\": \"AB\\\"\\C\"}");
            client.alterSession("store.kafka.reader.allow_escape_any_char", (Object)false);
            try {
                this.queryBuilder().sql("select name from kafka.`%s`", new Object[]{topicName}).run();
                Assert.fail();
            }
            catch (UserException userException) {
                // empty catch block
            }
            client.alterSession("store.kafka.reader.allow_escape_any_char", (Object)true);
            this.testBuilder().sqlQuery("select name from kafka.`%s`", new Object[]{topicName}).unOrdered().baselineColumns(new String[]{"name"}).baselineValues(new Object[]{"AB\"C"}).go();
        }
        finally {
            client.resetSession("store.kafka.reader.allow_escape_any_char");
        }
    }
}

