/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.kafka09;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv$;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.internal.Logging;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.dstream.InputDStream;
import org.apache.spark.streaming.kafka09.ConsumerStrategy;
import org.apache.spark.streaming.kafka09.DefaultPerPartitionConfig;
import org.apache.spark.streaming.kafka09.DirectKafkaInputDStream;
import org.apache.spark.streaming.kafka09.KafkaRDD;
import org.apache.spark.streaming.kafka09.LocationStrategy;
import org.apache.spark.streaming.kafka09.OffsetRange;
import org.apache.spark.streaming.kafka09.PerPartitionConfig;
import org.apache.spark.streaming.kafka09.PreferBrokers$;
import org.apache.spark.streaming.kafka09.PreferConsistent$;
import org.apache.spark.streaming.kafka09.PreferFixed;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

@Experimental
public final class KafkaUtils$
implements Logging {
    public static final KafkaUtils$ MODULE$;
    private final int eofOffset;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new KafkaUtils$();
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.class.initializeLogIfNecessary((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.class.initializeLogIfNecessary((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.class.initializeLogIfNecessary$default$2((Logging)this);
    }

    @Experimental
    public <K, V> RDD<ConsumerRecord<K, V>> createRDD(SparkContext sc, Map<String, Object> kafkaParams, OffsetRange[] offsetRanges, LocationStrategy locationStrategy) {
        LocationStrategy locationStrategy2;
        block6: {
            Map<TopicPartition, String> map;
            block5: {
                block4: {
                    locationStrategy2 = locationStrategy;
                    if (PreferBrokers$.MODULE$.equals(locationStrategy2)) {
                        throw new AssertionError((Object)"If you want to prefer brokers, you must provide a mapping using PreferFixed A single KafkaRDD does not have a driver consumer and cannot look up brokers for you.");
                    }
                    if (!PreferConsistent$.MODULE$.equals(locationStrategy2)) break block4;
                    map = Collections.emptyMap();
                    break block5;
                }
                if (!(locationStrategy2 instanceof PreferFixed)) break block6;
                PreferFixed preferFixed = (PreferFixed)locationStrategy2;
                Map<TopicPartition, String> hostMap = preferFixed.hostMap();
                map = hostMap;
            }
            Map<TopicPartition, String> preferredHosts = map;
            HashMap<String, Object> kp = new HashMap<String, Object>(kafkaParams);
            this.fixKafkaParams(kp);
            OffsetRange[] osr = (OffsetRange[])offsetRanges.clone();
            return new KafkaRDD(sc, kp, osr, preferredHosts, true);
        }
        throw new MatchError((Object)locationStrategy2);
    }

    @Experimental
    public <K, V> JavaRDD<ConsumerRecord<K, V>> createRDD(JavaSparkContext jsc, Map<String, Object> kafkaParams, OffsetRange[] offsetRanges, LocationStrategy locationStrategy) {
        return new JavaRDD(this.createRDD(jsc.sc(), kafkaParams, offsetRanges, locationStrategy), ClassTag$.MODULE$.apply(ConsumerRecord.class));
    }

    @Experimental
    public <K, V> InputDStream<ConsumerRecord<K, V>> createDirectStream(StreamingContext ssc, LocationStrategy locationStrategy, ConsumerStrategy<K, V> consumerStrategy) {
        DefaultPerPartitionConfig ppc = new DefaultPerPartitionConfig(ssc.sparkContext().getConf());
        return this.createDirectStream(ssc, locationStrategy, consumerStrategy, (PerPartitionConfig)ppc);
    }

    @Experimental
    public <K, V> InputDStream<ConsumerRecord<K, V>> createDirectStream(StreamingContext ssc, LocationStrategy locationStrategy, ConsumerStrategy<K, V> consumerStrategy, PerPartitionConfig perPartitionConfig) {
        return new DirectKafkaInputDStream<K, V>(ssc, locationStrategy, consumerStrategy, perPartitionConfig);
    }

    @Experimental
    public <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(JavaStreamingContext jssc, LocationStrategy locationStrategy, ConsumerStrategy<K, V> consumerStrategy) {
        return new JavaInputDStream(this.createDirectStream(jssc.ssc(), locationStrategy, consumerStrategy), ClassTag$.MODULE$.apply(ConsumerRecord.class));
    }

    @Experimental
    public <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(JavaStreamingContext jssc, LocationStrategy locationStrategy, ConsumerStrategy<K, V> consumerStrategy, PerPartitionConfig perPartitionConfig) {
        return new JavaInputDStream(this.createDirectStream(jssc.ssc(), locationStrategy, consumerStrategy, perPartitionConfig), ClassTag$.MODULE$.apply(ConsumerRecord.class));
    }

    public int eofOffset() {
        return this.eofOffset;
    }

    public void fixKafkaParams(HashMap<String, Object> kafkaParams) {
        this.logWarning((Function0<String>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"overriding ", " to true"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"streams.negativeoffset.record.on.eof"}));
            }
        });
        kafkaParams.put("streams.negativeoffset.record.on.eof", Predef$.MODULE$.boolean2Boolean(true));
        this.logWarning((Function0<String>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"overriding ", " to false for executor"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"enable.auto.commit"}));
            }
        });
        kafkaParams.put("enable.auto.commit", Predef$.MODULE$.boolean2Boolean(false));
        this.logWarning((Function0<String>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"overriding ", " to none for executor"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"auto.offset.reset"}));
            }
        });
        kafkaParams.put("auto.offset.reset", "none");
        Object originalGroupId = kafkaParams.get("group.id");
        if (originalGroupId == null) {
            this.logError((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", " is null, you should probably set it"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"group.id"}));
                }
            });
        }
        String groupId = new StringBuilder().append((Object)"spark-executor-").append(originalGroupId).toString();
        this.logWarning((Function0<String>)new Serializable(groupId){
            public static final long serialVersionUID = 0L;
            private final String groupId$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"overriding executor ", " to ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"group.id", this.groupId$1}));
            }
            {
                this.groupId$1 = groupId$1;
            }
        });
        kafkaParams.put("group.id", groupId);
        Object rbb = kafkaParams.get("receive.buffer.bytes");
        if (rbb == null || Predef$.MODULE$.Integer2int((Integer)rbb) < 65536) {
            this.logWarning((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"overriding ", " to 65536 see KAFKA-3135"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"receive.buffer.bytes"}));
                }
            });
            kafkaParams.put("receive.buffer.bytes", Predef$.MODULE$.int2Integer(65536));
        }
    }

    public <K, V> void waitForConsumerAssignment(KafkaConsumer<K, V> consumer2, Set<TopicPartition> partitions) {
        long waitingForAssigmentTimeout = SparkEnv$.MODULE$.get().conf().getLong("spark.mapr.WaitingForAssignmentTimeout", 600000L);
        int timeout = 0;
        while ((consumer2.assignment().isEmpty() || consumer2.assignment().size() < partitions.size()) && (long)timeout < waitingForAssigmentTimeout) {
            Thread.sleep(500L);
            timeout += 500;
        }
        if ((long)timeout >= waitingForAssigmentTimeout) {
            this.logError((Function0<String>)new Serializable(consumer2, waitingForAssigmentTimeout){
                public static final long serialVersionUID = 0L;
                private final KafkaConsumer consumer$1;
                private final long waitingForAssigmentTimeout$1;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Consumer assignment wasn't completed within the timeout ", ".\n           |Assigned partitions: ", "."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)this.waitingForAssigmentTimeout$1), this.consumer$1.assignment()})))).stripMargin();
                }
                {
                    this.consumer$1 = consumer$1;
                    this.waitingForAssigmentTimeout$1 = waitingForAssigmentTimeout$1;
                }
            });
        }
    }

    public boolean isStreams(scala.collection.immutable.Map<TopicPartition, Object> currentOffsets) {
        return ((IterableLike)currentOffsets.keys().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(TopicPartition x$1) {
                return x$1.topic();
            }
        }, Iterable$.MODULE$.canBuildFrom())).exists((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(String topic) {
                return topic.startsWith("/") && topic.contains(":");
            }
        });
    }

    private KafkaUtils$() {
        MODULE$ = this;
        Logging.class.$init$((Logging)this);
        this.eofOffset = -1001;
    }
}

