/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.listener;

import com.mapr.streams.listener.Listener;
import java.io.IOException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;

public class ListenerV10
extends Listener {
    public ListenerV10(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs, boolean verifyKeys, boolean keysInOrder, boolean allowDuplicateKeys, boolean isTracingEnabled, boolean printStats, Listener.TED_ACTION action, boolean topicSubscription, String groupId, int maxPartitionFetchSize, int maxFetchSize, int maxPollRecords, boolean verifyHdrKey, boolean verifyHdrVal) {
        super(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, verifyKeys, keysInOrder, allowDuplicateKeys, isTracingEnabled, printStats, action, topicSubscription, groupId, maxPartitionFetchSize, maxFetchSize, maxPollRecords, verifyHdrKey, verifyHdrVal);
    }

    public static boolean runTestWithPollOptions(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs, int maxPartitionFetchSize, int maxFetchSize, int maxPollRecords, boolean verifyHdrKey, boolean verifyHdrVal) throws IOException {
        ListenerV10 lp = new ListenerV10(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, true, true, false, false, false, Listener.TED_ACTION.kNone, false, null, maxPartitionFetchSize, maxFetchSize, maxPollRecords, verifyHdrKey, verifyHdrVal);
        lp.runWithException();
        return lp.status;
    }

    @Override
    protected long getHeadersSize(ConsumerRecord<byte[], byte[]> rec) throws IOException {
        long headersSize = 0L;
        Header[] headers = rec.headers().toArray();
        for (int i = 0; i < headers.length; ++i) {
            headersSize += (long)headers[i].key().length();
            if (headers[i].value() == null) continue;
            headersSize += (long)headers[i].value().length;
        }
        return headersSize;
    }

    @Override
    protected void verifyHeaders(ConsumerRecord<byte[], byte[]> rec) throws IOException {
        Header[] headers = rec.headers().toArray();
        if (this.verifyHeaderKey || this.verifyHeaderVal) {
            for (int i = 0; i < headers.length; ++i) {
                int hdrIdx = this.verifyHeader(headers[i], i, rec);
                if (hdrIdx == i) continue;
                throw new IOException("Headers recieved in out of order");
            }
        }
    }

    private int verifyHeader(Header header, int index, ConsumerRecord<byte[], byte[]> rec) throws IOException {
        int keyIdx = 0;
        if (this.verifyHeaderKey) {
            keyIdx = this.verifyHeaderKey(header.key(), index, rec);
        }
        int valIdx = 0;
        if (this.verifyHeaderVal) {
            if (header.value() == null) {
                throw new IOException("Received null header value");
            }
            valIdx = this.verifyHeaderVal(header.value(), index, rec);
        }
        if (this.verifyHeaderKey && this.verifyHeaderVal && keyIdx != valIdx) {
            throw new IOException("Header key idx " + keyIdx + " not matching with Header val Idx " + valIdx);
        }
        return keyIdx;
    }

    private int verifyHeaderVal(byte[] value, int index, ConsumerRecord<byte[], byte[]> rec) throws IOException {
        String valStr = new String(value, "UTF-8");
        String[] tokens = valStr.split(":");
        if (tokens.length != 6) {
            throw new IOException("Header Value " + valStr + " not of correct format");
        }
        int partition = Integer.parseInt(tokens[3]);
        if (this.isTracingEnabled) {
            System.out.println("Header key " + valStr + " ntokens " + tokens.length);
        }
        if (tokens[1].length() != this.streamName.length() + 1 || !tokens[1].startsWith(this.streamName) || Integer.parseInt(tokens[1].substring(this.streamName.length())) >= this.numStreams) {
            throw new IOException("StreamName in header val " + tokens[0] + " didnot match intial " + this.streamName + " and numstreams " + this.numStreams);
        }
        String recTopic = rec.topic();
        String[] recTopicTokens = recTopic.split(":");
        if (recTopicTokens.length != 2 || !tokens[2].equals(recTopicTokens[1])) {
            throw new IOException("Topic in header val " + tokens[2] + " mismatched. Expected " + recTopicTokens[1]);
        }
        if (partition != rec.partition()) {
            throw new IOException("Partition in header val " + partition + " mismatched. Expected " + rec.partition());
        }
        if (!tokens[0].equals("hVal")) {
            throw new IOException("Header val prefix mismatch");
        }
        int HdrSeq = Integer.parseInt(tokens[5]);
        if (HdrSeq != index) {
            throw new IOException("Header val sequence number mismatch");
        }
        return HdrSeq;
    }

    private int verifyHeaderKey(String keyStr, int index, ConsumerRecord<byte[], byte[]> rec) throws IOException {
        String[] tokens = keyStr.split(":");
        if (tokens.length != 6) {
            throw new IOException("Header key " + keyStr + " not of correct format");
        }
        int partition = Integer.parseInt(tokens[3]);
        if (this.isTracingEnabled) {
            System.out.println("Header val " + keyStr + " ntokens " + tokens.length);
        }
        if (tokens[1].length() != this.streamName.length() + 1 || !tokens[1].startsWith(this.streamName) || Integer.parseInt(tokens[1].substring(this.streamName.length())) >= this.numStreams) {
            throw new IOException("StreamName in header key " + tokens[0] + " didnot match intial " + this.streamName + " and numstreams " + this.numStreams);
        }
        String recTopic = rec.topic();
        String[] recTopicTokens = recTopic.split(":");
        if (recTopicTokens.length != 2 || !tokens[2].equals(recTopicTokens[1])) {
            throw new IOException("Topic in header key " + tokens[2] + " mismatched. Expected " + recTopicTokens[1]);
        }
        if (partition != rec.partition()) {
            throw new IOException("Partition in header key " + partition + " mismatched. Expected " + rec.partition());
        }
        if (!tokens[0].equals("hKey")) {
            throw new IOException("Header key prefix mismatch");
        }
        int HdrSeq = Integer.parseInt(tokens[5]);
        if (HdrSeq != index) {
            throw new IOException("Header key sequence number mismatch");
        }
        return HdrSeq;
    }
}

