package org.apache.spark.streaming.kafka010;

import java.util.Arrays;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Serializable;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxesRunTime;

/* compiled from: DirectKafkaInputDStream.scala */
/* loaded from: input_file:org/apache/spark/streaming/kafka010/DirectKafkaInputDStream$$anonfun$start$1.class */
public final class DirectKafkaInputDStream$$anonfun$start$1 extends AbstractFunction1<TopicPartition, Tuple2<TopicPartition, Object>> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ DirectKafkaInputDStream $outer;
    private final Consumer c$2;
    private final long pollTimeout$1;

    public final Tuple2<TopicPartition, Object> apply(TopicPartition topicPartition) {
        Long boxToLong;
        Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
        Object ArrowAssoc = Predef$.MODULE$.ArrowAssoc(topicPartition);
        long position = this.c$2.position(topicPartition);
        this.$outer.serviceConsumer().assign(Arrays.asList(topicPartition));
        Iterator it = this.$outer.serviceConsumer().poll(this.pollTimeout$1).iterator();
        long offset = it.hasNext() ? ((ConsumerRecord) it.next()).offset() : ((Long) this.c$2.endOffsets(Arrays.asList(topicPartition)).get(topicPartition)).longValue();
        if (position < offset) {
            this.$outer.serviceConsumer().seek(topicPartition, offset);
            boxToLong = BoxesRunTime.boxToLong(offset);
        } else {
            this.$outer.serviceConsumer().seek(topicPartition, position);
            boxToLong = BoxesRunTime.boxToLong(position);
        }
        return predef$ArrowAssoc$.$minus$greater$extension(ArrowAssoc, boxToLong);
    }

    public DirectKafkaInputDStream$$anonfun$start$1(DirectKafkaInputDStream directKafkaInputDStream, Consumer consumer, long j) {
        if (directKafkaInputDStream == null) {
            throw null;
        }
        this.$outer = directKafkaInputDStream;
        this.c$2 = consumer;
        this.pollTimeout$1 = j;
    }
}
