/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.store.kafka.KafkaMessageGenerator;
import org.apache.drill.exec.store.kafka.KafkaTestBase;
import org.apache.drill.exec.store.kafka.TestKafkaSuit;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={KafkaStorageTest.class, SlowTest.class})
public class KafkaFilterPushdownTest
extends KafkaTestBase {
    private static final int NUM_PARTITIONS = 5;
    private static final String EXPECTED_PATTERN = "kafkaScanSpec.*\\n.*\"topicName\" : \"drill-pushdown-topic\"\\n(.*\\n)?(.*\\n)?(.*\\n)?.*cost\"(.*\\n)(.*\\n).*outputRowCount\" : (%s.0)";

    @BeforeClass
    public static void setup() throws Exception {
        TestKafkaSuit.createTopicHelper("drill-pushdown-topic", 5);
        KafkaMessageGenerator generator = new KafkaMessageGenerator(TestKafkaSuit.embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
        generator.populateJsonMsgWithTimestamps("drill-pushdown-topic", 10);
        String query = String.format("select * from kafka.`%s`", "drill-pushdown-topic");
        Assert.assertEquals((String)"Kafka server does not have expected number of messages", (long)KafkaFilterPushdownTest.testSql(query), (long)50L);
    }

    @Test
    public void testPushdownOnOffset() throws Exception {
        String predicate1 = "kafkaMsgOffset > 4";
        String predicate2 = "kafkaMsgOffset < 6";
        int expectedRowCount = 5;
        String queryString = String.format("select * from kafka.`%s` where %s AND %s", "drill-pushdown-topic", "kafkaMsgOffset > 4", "kafkaMsgOffset < 6");
        this.runKafkaSQLVerifyCount(queryString, 5);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 5)}).match();
    }

    @Test
    public void testPushdownOnPartition() throws Exception {
        String predicate = "kafkaPartitionId = 1";
        int expectedRowCount = 10;
        String queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaPartitionId = 1");
        this.runKafkaSQLVerifyCount(queryString, 10);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 10)}).match();
    }

    @Test
    public void testPushdownOnTimestamp() throws Exception {
        String predicate = "kafkaMsgTimestamp > 6";
        int expectedRowCount = 20;
        String queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgTimestamp > 6");
        this.runKafkaSQLVerifyCount(queryString, 20);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 20)}).match();
    }

    @Test
    public void testPushdownUnorderedTimestamp() throws Exception {
        String predicate = "kafkaMsgTimestamp = 1";
        int expectedRowInPlan = 50;
        int expectedRowCount = 5;
        String queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgTimestamp = 1");
        this.runKafkaSQLVerifyCount(queryString, 5);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 50)}).match();
    }

    @Test
    public void testPushdownWhenTimestampDoesNotExist() throws Exception {
        String predicate = "kafkaMsgTimestamp = 20";
        boolean expectedRowCount = false;
        String queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgTimestamp = 20");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
    }

    @Test
    public void testPushdownWhenPartitionDoesNotExist() throws Exception {
        String predicate = "kafkaPartitionId = 100";
        boolean expectedRowCount = false;
        String queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaPartitionId = 100");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
    }

    @Test
    public void testPushdownForEmptyScanSpec() throws Exception {
        String predicate1 = "kafkaMsgTimestamp > 6";
        String predicate2 = "kafkaPartitionId = 100";
        boolean expectedRowCount = false;
        String queryString = String.format("select * from kafka.`%s` where %s AND %s", "drill-pushdown-topic", "kafkaMsgTimestamp > 6", "kafkaPartitionId = 100");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
    }

    @Test
    public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws Exception {
        boolean expectedRowCount = false;
        String queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset = 10");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset = -1");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset > 9");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset >= 10");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset < 0");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
        queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset <= -1");
        this.runKafkaSQLVerifyCount(queryString, 0);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 0)}).match();
    }

    @Test
    public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws Exception {
        int expectedRowCount = 5;
        String queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset = 9");
        this.runKafkaSQLVerifyCount(queryString, 5);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 5)}).match();
        queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset > 8");
        this.runKafkaSQLVerifyCount(queryString, 5);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 5)}).match();
        queryString = String.format("select * from kafka.`%s` where %s", "drill-pushdown-topic", "kafkaMsgOffset >= 9");
        this.runKafkaSQLVerifyCount(queryString, 5);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 5)}).match();
    }

    @Test
    public void testPushdownWithOr() throws Exception {
        String predicate1 = "kafkaMsgTimestamp > 6";
        String predicate2 = "kafkaPartitionId = 1";
        int expectedRowCount = 26;
        String queryString = String.format("select * from kafka.`%s` where %s OR %s", "drill-pushdown-topic", "kafkaMsgTimestamp > 6", "kafkaPartitionId = 1");
        this.runKafkaSQLVerifyCount(queryString, 26);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 26)}).match();
    }

    @Test
    public void testPushdownWithOr1() throws Exception {
        String predicate1 = "kafkaMsgTimestamp = 6";
        String predicate2 = "kafkaMsgOffset = 6";
        int expectedRowInPlan = 25;
        int expectedRowCount = 10;
        String queryString = String.format("select * from kafka.`%s` where %s OR %s", "drill-pushdown-topic", "kafkaMsgTimestamp = 6", "kafkaMsgOffset = 6");
        this.runKafkaSQLVerifyCount(queryString, 10);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 25)}).match();
    }

    @Test
    public void testPushdownWithAndOrCombo() throws Exception {
        String predicate1 = "kafkaMsgTimestamp > 6";
        String predicate2 = "kafkaPartitionId = 1";
        String predicate3 = "kafkaPartitionId = 2";
        int expectedRowCount = 8;
        String queryString = String.format("select * from kafka.`%s` where %s AND (%s OR %s)", "drill-pushdown-topic", "kafkaMsgTimestamp > 6", "kafkaPartitionId = 1", "kafkaPartitionId = 2");
        this.runKafkaSQLVerifyCount(queryString, 8);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 8)}).match();
    }

    @Test
    public void testPushdownWithAndOrCombo2() throws Exception {
        String predicate1 = "kafkaMsgTimestamp = 6";
        String predicate2 = "kafkaMsgOffset = 6";
        String predicate3 = "kafkaPartitionId = 1";
        String predicate4 = "kafkaPartitionId = 2";
        int expectedRowCountInPlan = 10;
        int expectedRowCount = 4;
        String queryString = String.format("select * from kafka.`%s` where (%s OR %s) AND (%s OR %s)", "drill-pushdown-topic", "kafkaMsgTimestamp = 6", "kafkaMsgOffset = 6", "kafkaPartitionId = 1", "kafkaPartitionId = 2");
        this.runKafkaSQLVerifyCount(queryString, 4);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 10)}).match();
    }

    @Test
    public void testPushdownTimestampWithNonMetaField() throws Exception {
        String predicate1 = "kafkaMsgTimestamp > 6";
        String predicate2 = "boolKey = true";
        int expectedRowCountInPlan = 20;
        int expectedRowCount = 10;
        String queryString = String.format("select * from kafka.`%s` where %s AND %s", "drill-pushdown-topic", "kafkaMsgTimestamp > 6", "boolKey = true");
        this.runKafkaSQLVerifyCount(queryString, 10);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 20)}).match();
    }

    @Test
    public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
        String predicate1 = "boolKey = true";
        String predicate2 = "kafkaMsgTimestamp > 6";
        String predicate3 = "kafkaMsgTimestamp < 9";
        int expectedRowCountInPlan = 50;
        int expectedRowCount = 30;
        String queryString = String.format("select * from kafka.`%s` where %s OR (%s AND %s)", "drill-pushdown-topic", "boolKey = true", "kafkaMsgTimestamp > 6", "kafkaMsgTimestamp < 9");
        this.runKafkaSQLVerifyCount(queryString, 30);
        this.queryBuilder().sql(queryString).jsonPlanMatcher().include(new String[]{String.format(EXPECTED_PATTERN, 50)}).match();
    }
}

