package org.apache.drill.exec.store.kafka;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.MethodSorters;

@FixMethodOrder(MethodSorters.JVM)
@Category({KafkaStorageTest.class, SlowTest.class})
/* loaded from: input_file:org/apache/drill/exec/store/kafka/KafkaQueriesTest.class */
public class KafkaQueriesTest extends KafkaTestBase {
    @Test
    public void testSqlQueryOnInvalidTopic() throws Exception {
        try {
            testBuilder().sqlQuery(String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.INVALID_TOPIC)).unOrdered().baselineRecords(Collections.emptyList()).go();
            Assert.fail("Test passed though topic does not exist.");
        } catch (RpcException e) {
            Assert.assertTrue(e.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist"));
        }
    }

    @Test
    public void testResultLimit() throws Exception {
        queryBuilder().sql(String.format(TestQueryConstants.MSG_LIMIT_QUERY, TestQueryConstants.JSON_TOPIC)).planMatcher().include(new String[]{"Scan", "records=3"}).match();
    }

    @Test
    public void testResultCount() {
        runKafkaSQLVerifyCount(String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC), 10);
    }

    @Test
    public void testAvroResultCount() {
        try {
            client.alterSession("store.kafka.record.reader", "org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
            cluster.drillbit().getContext().getStorage().getStoredConfig("kafka").getKafkaConsumerProps().put("key.deserializer", KafkaAvroDeserializer.class.getName());
            runKafkaSQLVerifyCount(String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.AVRO_TOPIC), 10);
            client.resetSession("store.kafka.record.reader");
        } catch (Throwable th) {
            client.resetSession("store.kafka.record.reader");
            throw th;
        }
    }

    @Test
    public void testPartitionMinOffset() throws Exception {
        testBuilder().sqlQuery(String.format(TestQueryConstants.MIN_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC)).unOrdered().baselineColumns(new String[]{"minOffset"}).baselineValues(new Object[]{fetchOffsets(-2).get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))}).go();
    }

    @Test
    public void testPartitionMaxOffset() throws Exception {
        testBuilder().sqlQuery(String.format(TestQueryConstants.MAX_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC)).unOrdered().baselineColumns(new String[]{"maxOffset"}).baselineValues(new Object[]{Long.valueOf(fetchOffsets(-1).get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0)).longValue() - 1)}).go();
    }

    @Test
    public void testInformationSchema() throws Exception {
        queryBuilder().sql("select * from information_schema.`views`").run();
    }

    private Map<TopicPartition, Long> fetchOffsets(int i) throws InterruptedException {
        try {
            AutoCloseable kafkaConsumer = new KafkaConsumer(storagePluginConfig.getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
            HashMap hashMap = new HashMap();
            kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC));
            kafkaConsumer.poll(Duration.ofSeconds(5L));
            Set<TopicPartition> waitForConsumerAssignment = waitForConsumerAssignment(kafkaConsumer);
            if (i == -2) {
                kafkaConsumer.seekToBeginning(waitForConsumerAssignment);
                for (TopicPartition topicPartition : waitForConsumerAssignment) {
                    hashMap.put(topicPartition, Long.valueOf(kafkaConsumer.position(topicPartition)));
                }
            } else {
                if (i != -1) {
                    throw new RuntimeException(String.format("Unsupported flag %d", Integer.valueOf(i)));
                }
                kafkaConsumer.seekToEnd(waitForConsumerAssignment);
                for (TopicPartition topicPartition2 : waitForConsumerAssignment) {
                    hashMap.put(topicPartition2, Long.valueOf(kafkaConsumer.position(topicPartition2)));
                }
            }
            TestKafkaSuite.embeddedKafkaCluster.registerToClose(kafkaConsumer);
            return hashMap;
        } catch (Throwable th) {
            TestKafkaSuite.embeddedKafkaCluster.registerToClose(null);
            throw th;
        }
    }

    private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer) throws InterruptedException {
        Set<TopicPartition> assignment = consumer.assignment();
        long j = 0;
        while (assignment.isEmpty() && j < 5000) {
            Thread.sleep(500L);
            j += 500;
            assignment = consumer.assignment();
        }
        if (j >= 5000) {
            Assert.fail("Consumer assignment wasn't completed within the timeout 5000");
        }
        return assignment;
    }

    @Test
    public void testPhysicalPlanSubmission() throws Exception {
        queryBuilder().physical(queryBuilder().sql(String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC)).explainJson()).run();
    }

    @Test
    public void testPhysicalPlanSubmissionAvro() throws Exception {
        try {
            client.alterSession("store.kafka.record.reader", "org.apache.drill.exec.store.kafka.decoders.AvroMessageReader");
            queryBuilder().physical(queryBuilder().sql(String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.AVRO_TOPIC)).explainJson()).run();
            client.resetSession("store.kafka.record.reader");
        } catch (Throwable th) {
            client.resetSession("store.kafka.record.reader");
            throw th;
        }
    }

    @Test
    public void testOneMessageTopic() throws Exception {
        TestKafkaSuite.createTopicHelper("topicWithOneMessage", 1);
        new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class).populateMessages("topicWithOneMessage", "{\"index\": 1}");
        testBuilder().sqlQuery("select index from kafka.`%s`", new Object[]{"topicWithOneMessage"}).unOrdered().baselineColumns(new String[]{"index"}).baselineValues(new Object[]{1L}).go();
    }

    @Test
    public void testMalformedRecords() throws Exception {
        TestKafkaSuite.createTopicHelper("topicWithMalFormedMessages", 1);
        try {
            KafkaMessageGenerator kafkaMessageGenerator = new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
            kafkaMessageGenerator.populateMessages("topicWithMalFormedMessages", "Test");
            client.alterSession("store.kafka.reader.skip_invalid_records", false);
            try {
                queryBuilder().sql(TestQueryConstants.MSG_SELECT_QUERY, new Object[]{"topicWithMalFormedMessages"}).run();
                Assert.fail();
            } catch (UserException e) {
            }
            client.alterSession("store.kafka.reader.skip_invalid_records", true);
            testBuilder().sqlQuery(TestQueryConstants.MSG_SELECT_QUERY, new Object[]{"topicWithMalFormedMessages"}).expectsEmptyResultSet();
            kafkaMessageGenerator.populateMessages("topicWithMalFormedMessages", "{\"index\": 1}", "", "   ", "{Invalid}", "{\"index\": 2}");
            testBuilder().sqlQuery("select index from kafka.`%s`", new Object[]{"topicWithMalFormedMessages"}).unOrdered().baselineColumns(new String[]{"index"}).baselineValues(new Object[]{1L}).baselineValues(new Object[]{2L}).go();
            client.resetSession("store.kafka.reader.skip_invalid_records");
        } catch (Throwable th) {
            client.resetSession("store.kafka.reader.skip_invalid_records");
            throw th;
        }
    }

    @Test
    public void testNanInf() throws Exception {
        TestKafkaSuite.createTopicHelper("topicWithNanInf", 1);
        try {
            new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class).populateMessages("topicWithNanInf", "{\"nan_col\":NaN, \"inf_col\":Infinity}");
            client.alterSession("store.kafka.reader.allow_nan_inf", false);
            try {
                queryBuilder().sql("select nan_col, inf_col from kafka.`%s`", new Object[]{"topicWithNanInf"}).run();
                Assert.fail();
            } catch (UserException e) {
            }
            client.alterSession("store.kafka.reader.allow_nan_inf", true);
            testBuilder().sqlQuery("select nan_col, inf_col from kafka.`%s`", new Object[]{"topicWithNanInf"}).unOrdered().baselineColumns(new String[]{"nan_col", "inf_col"}).baselineValues(new Object[]{Double.valueOf(Double.NaN), Double.valueOf(Double.POSITIVE_INFINITY)}).go();
            client.resetSession("store.kafka.reader.allow_nan_inf");
        } catch (Throwable th) {
            client.resetSession("store.kafka.reader.allow_nan_inf");
            throw th;
        }
    }

    @Test
    public void testEscapeAnyChar() throws Exception {
        TestKafkaSuite.createTopicHelper("topicWithEscapeAnyChar", 1);
        try {
            new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class).populateMessages("topicWithEscapeAnyChar", "{\"name\": \"AB\\\"\\C\"}");
            client.alterSession("store.kafka.reader.allow_escape_any_char", false);
            try {
                queryBuilder().sql("select name from kafka.`%s`", new Object[]{"topicWithEscapeAnyChar"}).run();
                Assert.fail();
            } catch (UserException e) {
            }
            client.alterSession("store.kafka.reader.allow_escape_any_char", true);
            testBuilder().sqlQuery("select name from kafka.`%s`", new Object[]{"topicWithEscapeAnyChar"}).unOrdered().baselineColumns(new String[]{"name"}).baselineValues(new Object[]{"AB\"C"}).go();
            client.resetSession("store.kafka.reader.allow_escape_any_char");
        } catch (Throwable th) {
            client.resetSession("store.kafka.reader.allow_escape_any_char");
            throw th;
        }
    }
}
