/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.shared.kafka.test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.shared.kafka.test.PartitionTestScenario;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;

public class KafkaPartitionTestUtil {
    public static final String PARTITION_HEADER = "partition-header";

    public static void checkResultsAgainstSkew(PartitionTestScenario scenario, Map<Integer, List<Event>> partitionMap, Map<Integer, List<byte[]>> resultsMap, int staticPtn, int numMsgs) {
        int numPtns = partitionMap.size();
        if (scenario == PartitionTestScenario.NO_PARTITION_HEADERS && numMsgs % numPtns != 0) {
            throw new IllegalArgumentException("This method is not designed to work with scenarios where there is expected to be a non-even distribution of messages");
        }
        for (int ptn = 0; ptn < numPtns; ++ptn) {
            List<Event> expectedResults = partitionMap.get(ptn);
            List<byte[]> actualResults = resultsMap.get(ptn);
            if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY || scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
                Assert.assertEquals((long)expectedResults.size(), (long)actualResults.size());
                for (int idx = 0; idx < expectedResults.size(); ++idx) {
                    Assert.assertArrayEquals((byte[])expectedResults.get(idx).getBody(), (byte[])actualResults.get(idx));
                }
                continue;
            }
            if (scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
                if (ptn == staticPtn) {
                    Assert.assertEquals((long)numMsgs, (long)actualResults.size());
                    continue;
                }
                Assert.assertEquals((long)0L, (long)actualResults.size());
                continue;
            }
            if (scenario != PartitionTestScenario.NO_PARTITION_HEADERS) continue;
            Assert.assertEquals((long)(numMsgs / numPtns), (long)actualResults.size());
        }
    }

    public static List<Event> generateSkewedMessageList(PartitionTestScenario scenario, int numMsgs, Map<Integer, List<Event>> partitionMap, int numPtns, int staticPtn) {
        ArrayList<Event> msgs = new ArrayList<Event>(numMsgs);
        if (numMsgs < 0) {
            throw new IllegalArgumentException("Number of messages must be greater than zero");
        }
        if (staticPtn >= numPtns) {
            throw new IllegalArgumentException("The static partition must be less than the number of partitions");
        }
        if (numPtns < 5) {
            throw new IllegalArgumentException("This method is designed to work with at least 5 partitions");
        }
        if (partitionMap.size() != numPtns) {
            throw new IllegalArgumentException("partitionMap has not been correctly initialised");
        }
        for (int i = 0; i < numMsgs; ++i) {
            HashMap<String, String> headers = new HashMap<String, String>();
            Integer partition = null;
            if (scenario != PartitionTestScenario.NO_PARTITION_HEADERS) {
                if (scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
                    partition = staticPtn;
                } else if (i % 5 == 0) {
                    partition = 4;
                    headers.put(PARTITION_HEADER, String.valueOf(partition));
                } else if (i % 3 == 0) {
                    partition = 3;
                    headers.put(PARTITION_HEADER, String.valueOf(partition));
                } else if (scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
                    partition = staticPtn;
                } else if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY) {
                    partition = 2;
                    headers.put(PARTITION_HEADER, String.valueOf(partition));
                }
            }
            Event event = EventBuilder.withBody((byte[])String.valueOf(i).getBytes(), headers);
            if (scenario != PartitionTestScenario.NO_PARTITION_HEADERS) {
                partitionMap.get(partition).add(event);
            }
            msgs.add(event);
        }
        return msgs;
    }

    public static Map<Integer, List<byte[]>> retrieveRecordsFromPartitions(String topic, int numPtns, Properties consumerProperties) {
        HashMap<Integer, List<byte[]>> resultsMap = new HashMap<Integer, List<byte[]>>();
        for (int i = 0; i < numPtns; ++i) {
            ArrayList<Object> partitionResults = new ArrayList<Object>();
            resultsMap.put(i, partitionResults);
            KafkaConsumer consumer = new KafkaConsumer(consumerProperties);
            TopicPartition partition = new TopicPartition(topic, i);
            ArrayList<TopicPartition> topicPartitionCollection = new ArrayList<TopicPartition>();
            topicPartitionCollection.addAll(Arrays.asList(partition));
            consumer.assign(topicPartitionCollection);
            ConsumerRecords records = consumer.poll(1000L);
            for (ConsumerRecord record : records) {
                partitionResults.add(record.value());
            }
            consumer.close();
        }
        return resultsMap;
    }
}

