/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.store.kafka.KafkaTestBase;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={KafkaStorageTest.class, SlowTest.class})
public class KafkaQueriesTest
extends KafkaTestBase {
    @Test
    public void testSqlQueryOnInvalidTopic() throws Exception {
        String queryString = String.format("select * from kafka.`%s`", "invalid-topic");
        try {
            KafkaQueriesTest.testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.emptyList()).build().run();
            Assert.fail((String)"Test passed though topic does not exist.");
        }
        catch (RpcException re) {
            Assert.assertTrue((boolean)re.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist"));
        }
    }

    @Test
    public void testResultCount() throws Exception {
        String queryString = String.format("select * from kafka.`%s`", "drill-json-topic");
        this.runKafkaSQLVerifyCount(queryString, 10);
    }

    @Test
    public void testPartitionMinOffset() throws Exception {
        Map<TopicPartition, Long> startOffsetsMap = this.fetchOffsets(-2);
        String queryString = String.format("select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`", "drill-json-topic");
        KafkaQueriesTest.testBuilder().sqlQuery(queryString).unOrdered().baselineColumns(new String[]{"minOffset"}).baselineValues(new Object[]{startOffsetsMap.get(new TopicPartition("drill-json-topic", 0))}).go();
    }

    @Test
    public void testPartitionMaxOffset() throws Exception {
        Map<TopicPartition, Long> endOffsetsMap = this.fetchOffsets(-1);
        String queryString = String.format("select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`", "drill-json-topic");
        KafkaQueriesTest.testBuilder().sqlQuery(queryString).unOrdered().baselineColumns(new String[]{"maxOffset"}).baselineValues(new Object[]{endOffsetsMap.get(new TopicPartition("drill-json-topic", 0)) - 1L}).go();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<TopicPartition, Long> fetchOffsets(int flag) {
        HashMap offsetsMap;
        block7: {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(storagePluginConfig.getKafkaConsumerProps(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
            offsetsMap = Maps.newHashMap();
            kafkaConsumer.subscribe(Collections.singletonList("drill-json-topic"));
            kafkaConsumer.poll(0L);
            Set assignments = kafkaConsumer.assignment();
            try {
                if (flag == -2) {
                    kafkaConsumer.seekToBeginning((Collection)assignments);
                    for (TopicPartition topicPartition : assignments) {
                        offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
                    }
                    break block7;
                }
                if (flag == -1) {
                    kafkaConsumer.seekToEnd((Collection)assignments);
                    for (TopicPartition topicPartition : assignments) {
                        offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
                    }
                    break block7;
                }
                throw new RuntimeException(String.format("Unsupported flag %d", flag));
            }
            finally {
                kafkaConsumer.close();
            }
        }
        return offsetsMap;
    }

    @Test
    public void testPhysicalPlanSubmission() throws Exception {
        String query = String.format("select * from kafka.`%s`", "drill-json-topic");
        KafkaQueriesTest.testPhysicalPlanExecutionBasedOnQuery((String)query);
    }
}

