/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processors.kafka.pubsub.PublisherLease;

public class PublishKafkaUtil {
    public static List<FlowFile> pollFlowFiles(ProcessSession session) {
        List initialFlowFiles = session.get(FlowFileFilters.newSizeBasedFilter((double)1.0, (DataUnit)DataUnit.MB, (int)500));
        if (initialFlowFiles.isEmpty()) {
            return initialFlowFiles;
        }
        boolean offsetsCommitted = true;
        for (FlowFile flowFile : initialFlowFiles) {
            if (!"false".equals(flowFile.getAttribute("kafka.consumer.offsets.committed"))) continue;
            offsetsCommitted = false;
            break;
        }
        if (offsetsCommitted) {
            return initialFlowFiles;
        }
        return PublishKafkaUtil.pollAllFlowFiles(session, initialFlowFiles);
    }

    private static List<FlowFile> pollAllFlowFiles(ProcessSession session, List<FlowFile> initialFlowFiles) {
        List flowFiles;
        ArrayList<FlowFile> polled = new ArrayList<FlowFile>(initialFlowFiles);
        while (!(flowFiles = session.get(10000)).isEmpty()) {
            polled.addAll(flowFiles);
        }
        return polled;
    }

    public static void addConsumerOffsets(PublisherLease lease, FlowFile flowFile, ComponentLog logger) {
        String topic = flowFile.getAttribute("kafka.topic");
        Long partition = PublishKafkaUtil.getNumericAttribute(flowFile, "kafka.partition", logger);
        Long maxOffset = PublishKafkaUtil.getNumericAttribute(flowFile, "kafka.max.offset", logger);
        if (maxOffset == null) {
            maxOffset = PublishKafkaUtil.getNumericAttribute(flowFile, "kafka.offset", logger);
        }
        Long epoch = PublishKafkaUtil.getNumericAttribute(flowFile, "kafka.leader.epoch", logger);
        String consumerGroupId = flowFile.getAttribute("kafka.consumer.id");
        if (topic == null || partition == null || maxOffset == null || consumerGroupId == null) {
            logger.warn("Cannot commit consumer offsets because at least one of the following FlowFile attributes is missing from {}: {}", new Object[]{flowFile, Arrays.asList("kafka.topic", "kafka.partition", "kafka.max.offset (or kafka.offset)", "kafka.leader.epoch", "kafka.consumer.id")});
            return;
        }
        lease.ackConsumerOffsets(topic, partition.intValue(), maxOffset, epoch == null ? null : Integer.valueOf(epoch.intValue()), consumerGroupId);
    }

    private static Long getNumericAttribute(FlowFile flowFile, String attributeName, ComponentLog logger) {
        String attributeValue = flowFile.getAttribute(attributeName);
        if (attributeValue == null) {
            return null;
        }
        try {
            return Long.parseLong(attributeValue);
        }
        catch (NumberFormatException nfe) {
            logger.warn("Expected a numeric value for attribute '{}' but found non-numeric value for {}", new Object[]{attributeName, flowFile});
            return null;
        }
    }
}

