/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.flume;

import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.spark.internal.Logging;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.flume.FlumeBatchFetcher;
import org.apache.spark.streaming.flume.FlumeConnection;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol;
import org.apache.spark.streaming.receiver.Receiver;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.spark_project.guava.util.concurrent.ThreadFactoryBuilder;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001da!\u0002\b\u0010\u0001EI\u0002\u0002\u0003\u0016\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0017\t\u0011\t\u0003!\u0011!Q\u0001\n\rC\u0001b\u0012\u0001\u0003\u0002\u0003\u0006Ia\u0011\u0005\n\u0011\u0002\u0011\t\u0011)A\u0005\u0013>CQ\u0001\u0015\u0001\u0005\u0002EC\u0001b\u0016\u0001\t\u0006\u0004%\t\u0001\u0017\u0005\tC\u0002A)\u0019!C\u0001E\"A\u0011\u000f\u0001EC\u0002\u0013\u0005\u0001\f\u0003\u0005s\u0001!\u0015\r\u0011\"\u0003t\u0011\u0015Q\b\u0001\"\u0011|\u0011\u0015y\b\u0001\"\u0011|\u0011\u001d\t\t\u0001\u0001C\u0001\u001fMD\u0001\"a\u0001\u0001\t\u0003y\u0011Q\u0001\u0002\u0015\r2,X.\u001a)pY2Lgn\u001a*fG\u0016Lg/\u001a:\u000b\u0005A\t\u0012!\u00024mk6,'B\u0001\n\u0014\u0003%\u0019HO]3b[&twM\u0003\u0002\u0015+\u0005)1\u000f]1sW*\u0011acF\u0001\u0007CB\f7\r[3\u000b\u0003a\t1a\u001c:h'\r\u0001!\u0004\n\t\u00047y\u0001S\"\u0001\u000f\u000b\u0005u\t\u0012\u0001\u0003:fG\u0016Lg/\u001a:\n\u0005}a\"\u0001\u0003*fG\u0016Lg/\u001a:\u0011\u0005\u0005\u0012S\"A\b\n\u0005\rz!aD*qCJ\\g\t\\;nK\u00163XM\u001c;\u0011\u0005\u0015BS\"\u0001\u0014\u000b\u0005\u001d\u001a\u0012\u0001C5oi\u0016\u0014h.\u00197\n\u0005%2#a\u0002'pO\u001eLgnZ\u0001\nC\u0012$'/Z:tKN\u001c\u0001\u0001E\u0002.oir!A\f\u001b\u000f\u0005=\u0012T\"\u0001\u0019\u000b\u0005EZ\u0013A\u0002\u001fs_>$h(C\u00014\u0003\u0015\u00198-\u00197b\u0013\t)d'A\u0004qC\u000e\\\u0017mZ3\u000b\u0003MJ!\u0001O\u001d\u0003\u0007M+\u0017O\u0003\u00026mA\u00111\bQ\u0007\u0002y)\u0011QHP\u0001\u0004]\u0016$(\"A \u0002\t)\fg/Y\u0005\u0003\u0003r\u0012\u0011#\u00138fiN{7m[3u\u0003\u0012$'/Z:t\u00031i\u0017\r\u001f\"bi\u000eD7+\u001b>f!\t!U)D\u00017\u0013\t1eGA\u0002J]R\f1\u0002]1sC2dW\r\\5t[\u0006a1\u000f^8sC\u001e,G*\u001a<fYB\u0011!*T\u0007\u0002\u0017*\u0011AjE\u0001\bgR|'/Y4f\u0013\tq5J\u0001\u0007Ti>\u0014\u0018mZ3MKZ,G.\u0003\u0002I=\u00051A(\u001b8jiz\"RAU*U+Z\u0003\"!\t\u0001\t\u000b)*\u0001\u0019\u0001\u0017\t\u000b\t+\u0001\u0019A\"\t\u000b\u001d+\u0001\u0019A\"\t\u000b!+\u0001\u0019A%\u0002-\rD\u0017M\u001c8fY\u001a\u000b7\r^8ss\u0016CXmY;u_J,\u0012!\u0017\t\u00035~k\u0011a\u0017\u0006\u00039v\u000b!bY8oGV\u0014(/\u001a8u\u0015\tqf(\u0001\u0003vi&d\u0017B\u00011\\\u0005=)\u00050Z2vi>\u00148+\u001a:wS\u000e,\u0017AD2iC:tW\r\u001c$bGR|'/_\u000b\u0002GB\u0011Am\\\u0007\u0002K*\u0011amZ\u0001\u0004]&|'B\u00015j\u0003\u0019\u0019xnY6fi*\u0011!n[\u0001\bG\"\fgN\\3m\u0015\taW.A\u0003oKR$\u0018P\u0003\u0002o/\u0005)!NY8tg&\u0011\u0001/\u001a\u0002\u001e\u001d&|7\t\\5f]R\u001cvnY6fi\u000eC\u0017M\u001c8fY\u001a\u000b7\r^8ss\u0006\u0001\"/Z2fSZ,'/\u0012=fGV$xN]\u0001\fG>tg.Z2uS>t7/F\u0001u!\rQVo^\u0005\u0003mn\u00131\u0003T5oW\u0016$'\t\\8dW&tw-U;fk\u0016\u0004\"!\t=\n\u0005e|!a\u0004$mk6,7i\u001c8oK\u000e$\u0018n\u001c8\u0002\u000f=t7\u000b^1siR\tA\u0010\u0005\u0002E{&\u0011aP\u000e\u0002\u0005+:LG/\u0001\u0004p]N#x\u000e]\u0001\u000fO\u0016$8i\u001c8oK\u000e$\u0018n\u001c8t\u0003=9W\r^'bq\n\u000bGo\u00195TSj,W#A\"")
public class FlumePollingReceiver
extends Receiver<SparkFlumeEvent>
implements Logging {
    private ExecutorService channelFactoryExecutor;
    private NioClientSocketChannelFactory channelFactory;
    private ExecutorService receiverExecutor;
    private LinkedBlockingQueue<FlumeConnection> connections;
    private final Seq<InetSocketAddress> addresses;
    private final int maxBatchSize;
    private final int parallelism;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private ExecutorService channelFactoryExecutor$lzycompute() {
        FlumePollingReceiver flumePollingReceiver = this;
        synchronized (flumePollingReceiver) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.channelFactoryExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Channel Thread - %d").build());
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.channelFactoryExecutor;
    }

    public ExecutorService channelFactoryExecutor() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.channelFactoryExecutor$lzycompute() : this.channelFactoryExecutor;
    }

    private NioClientSocketChannelFactory channelFactory$lzycompute() {
        FlumePollingReceiver flumePollingReceiver = this;
        synchronized (flumePollingReceiver) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.channelFactory = new NioClientSocketChannelFactory((Executor)this.channelFactoryExecutor(), (Executor)this.channelFactoryExecutor());
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        return this.channelFactory;
    }

    public NioClientSocketChannelFactory channelFactory() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.channelFactory$lzycompute() : this.channelFactory;
    }

    private ExecutorService receiverExecutor$lzycompute() {
        FlumePollingReceiver flumePollingReceiver = this;
        synchronized (flumePollingReceiver) {
            if ((byte)(this.bitmap$0 & 4) == 0) {
                this.receiverExecutor = Executors.newFixedThreadPool(this.parallelism, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Flume Receiver Thread - %d").build());
                this.bitmap$0 = (byte)(this.bitmap$0 | 4);
            }
        }
        return this.receiverExecutor;
    }

    public ExecutorService receiverExecutor() {
        return (byte)(this.bitmap$0 & 4) == 0 ? this.receiverExecutor$lzycompute() : this.receiverExecutor;
    }

    private LinkedBlockingQueue<FlumeConnection> connections$lzycompute() {
        FlumePollingReceiver flumePollingReceiver = this;
        synchronized (flumePollingReceiver) {
            if ((byte)(this.bitmap$0 & 8) == 0) {
                this.connections = new LinkedBlockingQueue();
                this.bitmap$0 = (byte)(this.bitmap$0 | 8);
            }
        }
        return this.connections;
    }

    private LinkedBlockingQueue<FlumeConnection> connections() {
        return (byte)(this.bitmap$0 & 8) == 0 ? this.connections$lzycompute() : this.connections;
    }

    public void onStart() {
        this.addresses.foreach((Function1 & Serializable & scala.Serializable)host -> BoxesRunTime.boxToBoolean((boolean)FlumePollingReceiver.$anonfun$onStart$1(this, host)));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.parallelism).foreach((Function1 & Serializable & scala.Serializable)i -> FlumePollingReceiver.$anonfun$onStart$2(this, BoxesRunTime.unboxToInt((Object)i)));
    }

    public void onStop() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Shutting down Flume Polling Receiver");
        this.receiverExecutor().shutdown();
        Object object = !this.receiverExecutor().awaitTermination(60L, TimeUnit.SECONDS) ? this.receiverExecutor().shutdownNow() : BoxedUnit.UNIT;
        ((IterableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(this.connections()).asScala()).foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
            FlumePollingReceiver.$anonfun$onStop$2(x$1);
            return BoxedUnit.UNIT;
        });
        this.channelFactory().releaseExternalResources();
    }

    public LinkedBlockingQueue<FlumeConnection> getConnections() {
        return this.connections();
    }

    public int getMaxBatchSize() {
        return this.maxBatchSize;
    }

    public static final /* synthetic */ boolean $anonfun$onStart$1(FlumePollingReceiver $this, InetSocketAddress host) {
        NettyTransceiver transceiver = new NettyTransceiver(host, (ChannelFactory)$this.channelFactory());
        SparkFlumeProtocol.Callback client = (SparkFlumeProtocol.Callback)SpecificRequestor.getClient(SparkFlumeProtocol.Callback.class, (Transceiver)transceiver);
        return $this.connections().add(new FlumeConnection(transceiver, client));
    }

    public static final /* synthetic */ Future $anonfun$onStart$2(FlumePollingReceiver $this, int i) {
        $this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Starting Flume Polling Receiver worker threads..");
        return $this.receiverExecutor().submit(new FlumeBatchFetcher($this));
    }

    public static final /* synthetic */ void $anonfun$onStop$2(FlumeConnection x$1) {
        x$1.transceiver().close();
    }

    public FlumePollingReceiver(Seq<InetSocketAddress> addresses, int maxBatchSize, int parallelism, StorageLevel storageLevel) {
        this.addresses = addresses;
        this.maxBatchSize = maxBatchSize;
        this.parallelism = parallelism;
        super(storageLevel);
        Logging.$init$((Logging)this);
    }
}

