package org.apache.nifi.processors.kafka.pubsub;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.FlowFileFilters;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/PublishKafkaUtil.class */
public class PublishKafkaUtil {
    public static List<FlowFile> pollFlowFiles(ProcessSession processSession) {
        List<FlowFile> list = processSession.get(FlowFileFilters.newSizeBasedFilter(1.0d, DataUnit.MB, 500));
        if (list.isEmpty()) {
            return list;
        }
        boolean z = true;
        Iterator<FlowFile> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if ("false".equals(it.next().getAttribute("kafka.consumer.offsets.committed"))) {
                z = false;
                break;
            }
        }
        return z ? list : pollAllFlowFiles(processSession, list);
    }

    private static List<FlowFile> pollAllFlowFiles(ProcessSession processSession, List<FlowFile> list) {
        ArrayList arrayList = new ArrayList(list);
        while (true) {
            List list2 = processSession.get(10000);
            if (list2.isEmpty()) {
                return arrayList;
            }
            arrayList.addAll(list2);
        }
    }

    public static void addConsumerOffsets(PublisherLease publisherLease, FlowFile flowFile, ComponentLog componentLog) {
        String attribute = flowFile.getAttribute("kafka.topic");
        Long numericAttribute = getNumericAttribute(flowFile, "kafka.partition", componentLog);
        Long numericAttribute2 = getNumericAttribute(flowFile, "kafka.max.offset", componentLog);
        if (numericAttribute2 == null) {
            numericAttribute2 = getNumericAttribute(flowFile, "kafka.offset", componentLog);
        }
        Long numericAttribute3 = getNumericAttribute(flowFile, "kafka.leader.epoch", componentLog);
        String attribute2 = flowFile.getAttribute("kafka.consumer.id");
        if (attribute == null || numericAttribute == null || numericAttribute2 == null || attribute2 == null) {
            componentLog.warn("Cannot commit consumer offsets because at least one of the following FlowFile attributes is missing from {}: {}", new Object[]{flowFile, Arrays.asList("kafka.topic", "kafka.partition", "kafka.max.offset (or kafka.offset)", "kafka.leader.epoch", "kafka.consumer.id")});
        } else {
            publisherLease.ackConsumerOffsets(attribute, numericAttribute.intValue(), numericAttribute2.longValue(), numericAttribute3 == null ? null : Integer.valueOf(numericAttribute3.intValue()), attribute2);
        }
    }

    private static Long getNumericAttribute(FlowFile flowFile, String str, ComponentLog componentLog) {
        String attribute = flowFile.getAttribute(str);
        if (attribute == null) {
            return null;
        }
        try {
            return Long.valueOf(Long.parseLong(attribute));
        } catch (NumberFormatException e) {
            componentLog.warn("Expected a numeric value for attribute '{}' but found non-numeric value for {}", new Object[]{str, flowFile});
            return null;
        }
    }
}
