package org.apache.drill.exec.store.kafka;

import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({KafkaStorageTest.class, SlowTest.class})
/* loaded from: input_file:org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.class */
public class KafkaFilterPushdownTest extends KafkaTestBase {
    private static final int NUM_PARTITIONS = 5;
    private static final String EXPECTED_PATTERN = "kafkaScanSpec.*\\n.*\"topicName\" : \"drill-pushdown-topic\"\\n(.*\\n)?(.*\\n)?(.*\\n)?.*cost\"(.*\\n)(.*\\n).*outputRowCount\" : (%s.0)";

    @BeforeClass
    public static void setup() throws Exception {
        TestKafkaSuite.createTopicHelper(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_PARTITIONS);
        new KafkaMessageGenerator(TestKafkaSuite.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class).populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, 10);
        Assert.assertEquals("Kafka server does not have expected number of messages", testSql(String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC)), 50L);
    }

    @Test
    public void testPushdownOnOffset() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 4", "kafkaMsgOffset < 6");
        runKafkaSQLVerifyCount(format, NUM_PARTITIONS);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, Integer.valueOf(NUM_PARTITIONS))}).match();
    }

    @Test
    public void testPushdownOnPartition() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaPartitionId = 1");
        runKafkaSQLVerifyCount(format, 10);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 10)}).match();
    }

    @Test
    public void testPushdownOnTimestamp() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6");
        runKafkaSQLVerifyCount(format, 20);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 20)}).match();
    }

    @Test
    public void testPushdownUnorderedTimestamp() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp = 1");
        runKafkaSQLVerifyCount(format, NUM_PARTITIONS);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 50)}).match();
    }

    @Test
    public void testPushdownWhenTimestampDoesNotExist() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp = 20");
        runKafkaSQLVerifyCount(format, 0);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
    }

    @Test
    public void testPushdownWhenPartitionDoesNotExist() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaPartitionId = 100");
        runKafkaSQLVerifyCount(format, 0);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
    }

    @Test
    public void testPushdownForEmptyScanSpec() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6", "kafkaPartitionId = 100");
        runKafkaSQLVerifyCount(format, 0);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
    }

    @Test
    public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");
        runKafkaSQLVerifyCount(format, 0);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        String format2 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");
        runKafkaSQLVerifyCount(format2, 0);
        queryBuilder().sql(format2).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        String format3 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");
        runKafkaSQLVerifyCount(format3, 0);
        queryBuilder().sql(format3).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        String format4 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");
        runKafkaSQLVerifyCount(format4, 0);
        queryBuilder().sql(format4).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        String format5 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");
        runKafkaSQLVerifyCount(format5, 0);
        queryBuilder().sql(format5).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        String format6 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");
        runKafkaSQLVerifyCount(format6, 0);
        queryBuilder().sql(format6).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
    }

    @Test
    public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");
        runKafkaSQLVerifyCount(format, NUM_PARTITIONS);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, Integer.valueOf(NUM_PARTITIONS))}).match();
        String format2 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");
        runKafkaSQLVerifyCount(format2, NUM_PARTITIONS);
        queryBuilder().sql(format2).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, Integer.valueOf(NUM_PARTITIONS))}).match();
        String format3 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");
        runKafkaSQLVerifyCount(format3, NUM_PARTITIONS);
        queryBuilder().sql(format3).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, Integer.valueOf(NUM_PARTITIONS))}).match();
    }

    @Test
    public void testPushdownWithOr() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_OR, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6", "kafkaPartitionId = 1");
        runKafkaSQLVerifyCount(format, 26);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 26)}).match();
    }

    @Test
    public void testPushdownWithOr1() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_OR, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp = 6", "kafkaMsgOffset = 6");
        runKafkaSQLVerifyCount(format, 10);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 25)}).match();
    }

    @Test
    public void testPushdownWithAndOrCombo() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_1, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6", "kafkaPartitionId = 1", "kafkaPartitionId = 2");
        runKafkaSQLVerifyCount(format, 8);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 8)}).match();
    }

    @Test
    public void testPushdownWithAndOrCombo2() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_3, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp = 6", "kafkaMsgOffset = 6", "kafkaPartitionId = 1", "kafkaPartitionId = 2");
        runKafkaSQLVerifyCount(format, 4);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 10)}).match();
    }

    @Test
    public void testPushdownTimestampWithNonMetaField() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6", "boolKey = true");
        runKafkaSQLVerifyCount(format, 10);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 20)}).match();
    }

    @Test
    public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_2, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "boolKey = true", "kafkaMsgTimestamp > 6", "kafkaMsgTimestamp < 9");
        runKafkaSQLVerifyCount(format, 30);
        queryBuilder().sql(format).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 50)}).match();
    }
}
