package org.apache.drill.exec.store.kafka;

import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({KafkaStorageTest.class, SlowTest.class})
/* loaded from: input_file:org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.class */
public class KafkaFilterPushdownTest extends KafkaTestBase {
    private static final int NUM_PARTITIONS = 5;
    private static final String expectedSubStr = "    \"kafkaScanSpec\" : {\n      \"topicName\" : \"drill-pushdown-topic\"\n    },\n    \"cost\"";

    @BeforeClass
    public static void setup() throws Exception {
        TestKafkaSuit.createTopicHelper(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_PARTITIONS);
        new KafkaMessageGenerator(TestKafkaSuit.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class).populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, 10);
        Assert.assertTrue("Kafka server does not have expected number of messages", testSql(String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC)) == 50);
    }

    @Test
    public void testPushdownOnOffset() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 4", "kafkaMsgOffset < 6");
        runKafkaSQLVerifyCount(format, NUM_PARTITIONS);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, Integer.valueOf(NUM_PARTITIONS))});
    }

    @Test
    public void testPushdownOnPartition() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaPartitionId = 1");
        runKafkaSQLVerifyCount(format, 10);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 10)});
    }

    @Test
    public void testPushdownOnTimestamp() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6");
        runKafkaSQLVerifyCount(format, 20);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 20)});
    }

    @Test
    public void testPushdownUnorderedTimestamp() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp = 1");
        runKafkaSQLVerifyCount(format, NUM_PARTITIONS);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 50)});
    }

    @Test
    public void testPushdownWhenTimestampDoesNotExist() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp = 20");
        runKafkaSQLVerifyCount(format, 0);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 0)});
    }

    @Test
    public void testPushdownWhenPartitionDoesNotExist() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaPartitionId = 100");
        runKafkaSQLVerifyCount(format, 0);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 0)});
    }

    @Test
    public void testPushdownForEmptyScanSpec() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6", "kafkaPartitionId = 100");
        runKafkaSQLVerifyCount(format, 0);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 0)});
    }

    @Test
    public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");
        runKafkaSQLVerifyCount(format, 0);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 0)});
        String format2 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");
        runKafkaSQLVerifyCount(format2, 0);
        testPhysicalPlan(format2, new String[]{String.format(expectedSubStr, 0)});
        String format3 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");
        runKafkaSQLVerifyCount(format3, 0);
        testPhysicalPlan(format3, new String[]{String.format(expectedSubStr, 0)});
        String format4 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");
        runKafkaSQLVerifyCount(format4, 0);
        testPhysicalPlan(format4, new String[]{String.format(expectedSubStr, 0)});
        String format5 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");
        runKafkaSQLVerifyCount(format5, 0);
        testPhysicalPlan(format5, new String[]{String.format(expectedSubStr, 0)});
        String format6 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");
        runKafkaSQLVerifyCount(format6, 0);
        testPhysicalPlan(format6, new String[]{String.format(expectedSubStr, 0)});
    }

    @Test
    public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");
        runKafkaSQLVerifyCount(format, NUM_PARTITIONS);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, Integer.valueOf(NUM_PARTITIONS))});
        String format2 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");
        runKafkaSQLVerifyCount(format2, NUM_PARTITIONS);
        testPhysicalPlan(format2, new String[]{String.format(expectedSubStr, Integer.valueOf(NUM_PARTITIONS))});
        String format3 = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");
        runKafkaSQLVerifyCount(format3, NUM_PARTITIONS);
        testPhysicalPlan(format3, new String[]{String.format(expectedSubStr, Integer.valueOf(NUM_PARTITIONS))});
    }

    @Test
    public void testPushdownWithOr() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_OR, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6", "kafkaPartitionId = 1");
        runKafkaSQLVerifyCount(format, 26);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 26)});
    }

    @Test
    public void testPushdownWithOr1() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_OR, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp = 6", "kafkaMsgOffset = 6");
        runKafkaSQLVerifyCount(format, 10);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 25)});
    }

    @Test
    public void testPushdownWithAndOrCombo() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_1, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6", "kafkaPartitionId = 1", "kafkaPartitionId = 2");
        runKafkaSQLVerifyCount(format, 8);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 8)});
    }

    @Test
    public void testPushdownWithAndOrCombo2() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_3, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp = 6", "kafkaMsgOffset = 6", "kafkaPartitionId = 1", "kafkaPartitionId = 2");
        runKafkaSQLVerifyCount(format, 4);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 10)});
    }

    @Test
    public void testPushdownTimestampWithNonMetaField() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgTimestamp > 6", "boolKey = true");
        runKafkaSQLVerifyCount(format, 10);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 20)});
    }

    @Test
    public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
        String format = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_2, TestQueryConstants.JSON_PUSHDOWN_TOPIC, "boolKey = true", "kafkaMsgTimestamp > 6", "kafkaMsgTimestamp < 9");
        runKafkaSQLVerifyCount(format, 30);
        testPhysicalPlan(format, new String[]{String.format(expectedSubStr, 50)});
    }
}
