/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.flume.sink;

import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Context;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.sink.AbstractSink;
import org.apache.spark.streaming.flume.sink.Logging;
import org.apache.spark.streaming.flume.sink.SparkAvroCallbackHandler;
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol;
import org.apache.spark.streaming.flume.sink.SparkSinkConfig$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0005e\u0001B\u0010!\u00015BQA\u0010\u0001\u0005\u0002}Bq!\u0011\u0001A\u0002\u0013%!\tC\u0004L\u0001\u0001\u0007I\u0011\u0002'\t\rU\u0003\u0001\u0015)\u0003D\u0011\u001d1\u0006\u00011A\u0005\n]Cqa\u0017\u0001A\u0002\u0013%A\f\u0003\u0004_\u0001\u0001\u0006K\u0001\u0017\u0005\b?\u0002\u0001\r\u0011\"\u0003a\u0011\u001da\u0007\u00011A\u0005\n5Daa\u001c\u0001!B\u0013\t\u0007b\u00029\u0001\u0001\u0004%Ia\u0016\u0005\bc\u0002\u0001\r\u0011\"\u0003s\u0011\u0019!\b\u0001)Q\u00051\"9Q\u000f\u0001a\u0001\n\u00139\u0006b\u0002<\u0001\u0001\u0004%Ia\u001e\u0005\u0007s\u0002\u0001\u000b\u0015\u0002-\t\u000fi\u0004\u0001\u0019!C\u0005w\"I\u0011q\u0002\u0001A\u0002\u0013%\u0011\u0011\u0003\u0005\b\u0003+\u0001\u0001\u0015)\u0003}\u0011%\t9\u0002\u0001a\u0001\n\u0013\tI\u0002C\u0005\u0002$\u0001\u0001\r\u0011\"\u0003\u0002&!A\u0011\u0011\u0006\u0001!B\u0013\tY\u0002C\u0005\u0002,\u0001\u0011\r\u0011\"\u0003\u0002.!A\u0011q\b\u0001!\u0002\u0013\ty\u0003C\u0004\u0002B\u0001!\t%a\u0011\t\u000f\u0005\u0015\u0003\u0001\"\u0011\u0002D!9\u0011q\t\u0001\u0005B\u0005%\u0003bBA,\u0001\u0011\u0005\u0013\u0011\f\u0005\t\u0003k\u0002A\u0011\u0001\u0012\u0002x!A\u0011\u0011\u0010\u0001\u0005\u0002\t\nYHA\u0005Ta\u0006\u00148nU5oW*\u0011\u0011EI\u0001\u0005g&t7N\u0003\u0002$I\u0005)a\r\\;nK*\u0011QEJ\u0001\ngR\u0014X-Y7j]\u001eT!a\n\u0015\u0002\u000bM\u0004\u0018M]6\u000b\u0005%R\u0013AB1qC\u000eDWMC\u0001,\u0003\ry'oZ\u0002\u0001'\u0011\u0001a\u0006\u000e\u001d\u0011\u0005=\u0012T\"\u0001\u0019\u000b\u0005\u0005\n$BA\u0012)\u0013\t\u0019\u0004G\u0001\u0007BEN$(/Y2u'&t7\u000e\u0005\u00026m5\t\u0001%\u0003\u00028A\t9Aj\\4hS:<\u0007CA\u001d=\u001b\u0005Q$BA\u001e2\u0003\u0011\u0019wN\u001c4\n\u0005uR$\u0001D\"p]\u001aLw-\u001e:bE2,\u0017A\u0002\u001fj]&$h\bF\u0001A!\t)\u0004!\u0001\u0005q_>d7+\u001b>f+\u0005\u0019\u0005C\u0001#J\u001b\u0005)%B\u0001$H\u0003\u0011a\u0017M\\4\u000b\u0003!\u000bAA[1wC&\u0011!*\u0012\u0002\b\u0013:$XmZ3s\u00031\u0001xn\u001c7TSj,w\fJ3r)\ti5\u000b\u0005\u0002O#6\tqJC\u0001Q\u0003\u0015\u00198-\u00197b\u0013\t\u0011vJ\u0001\u0003V]&$\bb\u0002+\u0004\u0003\u0003\u0005\raQ\u0001\u0004q\u0012\n\u0014!\u00039p_2\u001c\u0016N_3!\u0003I!(/\u00198tC\u000e$\u0018n\u001c8US6,w.\u001e;\u0016\u0003a\u0003\"AT-\n\u0005i{%aA%oi\u00061BO]1og\u0006\u001cG/[8o)&lWm\\;u?\u0012*\u0017\u000f\u0006\u0002N;\"9AKBA\u0001\u0002\u0004A\u0016a\u0005;sC:\u001c\u0018m\u0019;j_:$\u0016.\\3pkR\u0004\u0013\u0001\u00035pgRt\u0017-\\3\u0016\u0003\u0005\u0004\"AY5\u000f\u0005\r<\u0007C\u00013P\u001b\u0005)'B\u00014-\u0003\u0019a$o\\8u}%\u0011\u0001nT\u0001\u0007!J,G-\u001a4\n\u0005)\\'AB*ue&twM\u0003\u0002i\u001f\u0006a\u0001n\\:u]\u0006lWm\u0018\u0013fcR\u0011QJ\u001c\u0005\b)&\t\t\u00111\u0001b\u0003%Awn\u001d;oC6,\u0007%\u0001\u0003q_J$\u0018\u0001\u00039peR|F%Z9\u0015\u00055\u001b\bb\u0002+\r\u0003\u0003\u0005\r\u0001W\u0001\u0006a>\u0014H\u000fI\u0001\u0010E\u0006\u001c7n\u00144g\u0013:$XM\u001d<bY\u0006\u0019\"-Y2l\u001f\u001a4\u0017J\u001c;feZ\fGn\u0018\u0013fcR\u0011Q\n\u001f\u0005\b)>\t\t\u00111\u0001Y\u0003A\u0011\u0017mY6PM\u001aLe\u000e^3sm\u0006d\u0007%A\u0005tKJ4XM](qiV\tA\u0010E\u0002O{~L!A`(\u0003\r=\u0003H/[8o!\u0011\t\t!a\u0003\u000e\u0005\u0005\r!\u0002BA\u0003\u0003\u000f\t1!\u001b9d\u0015\r\tI\u0001K\u0001\u0005CZ\u0014x.\u0003\u0003\u0002\u000e\u0005\r!a\u0003(fiRL8+\u001a:wKJ\fQb]3sm\u0016\u0014x\n\u001d;`I\u0015\fHcA'\u0002\u0014!9AKEA\u0001\u0002\u0004a\u0018AC:feZ,'o\u00149uA\u00059\u0001.\u00198eY\u0016\u0014XCAA\u000e!\u0011qU0!\b\u0011\u0007U\ny\"C\u0002\u0002\"\u0001\u0012\u0001d\u00159be.\feO]8DC2d'-Y2l\u0011\u0006tG\r\\3s\u0003-A\u0017M\u001c3mKJ|F%Z9\u0015\u00075\u000b9\u0003\u0003\u0005U+\u0005\u0005\t\u0019AA\u000e\u0003!A\u0017M\u001c3mKJ\u0004\u0013!\u00042m_\u000e\\\u0017N\\4MCR\u001c\u0007.\u0006\u0002\u00020A!\u0011\u0011GA\u001e\u001b\t\t\u0019D\u0003\u0003\u00026\u0005]\u0012AC2p]\u000e,(O]3oi*\u0019\u0011\u0011H$\u0002\tU$\u0018\u000e\\\u0005\u0005\u0003{\t\u0019D\u0001\bD_VtG\u000fR8x]2\u000bGo\u00195\u0002\u001d\tdwnY6j]\u001ed\u0015\r^2iA\u0005)1\u000f^1siR\tQ*\u0001\u0003ti>\u0004\u0018!C2p]\u001aLw-\u001e:f)\ri\u00151\n\u0005\b\u0003\u001bZ\u0002\u0019AA(\u0003\r\u0019G\u000f\u001f\t\u0005\u0003#\n\u0019&D\u00012\u0013\r\t)&\r\u0002\b\u0007>tG/\u001a=u\u0003\u001d\u0001(o\\2fgN$\"!a\u0017\u0011\t\u0005u\u0013q\u000e\b\u0005\u0003?\nYG\u0004\u0003\u0002b\u0005%d\u0002BA2\u0003Or1\u0001ZA3\u0013\u0005Y\u0013BA\u0015+\u0013\t\u0019\u0003&C\u0002\u0002nE\nAaU5oW&!\u0011\u0011OA:\u0005\u0019\u0019F/\u0019;vg*\u0019\u0011QN\u0019\u0002\u000f\u001d,G\u000fU8siR\t\u0001,\u0001\u000ed_VtG\u000fZ8x]^CWM\u001c\"bi\u000eD'+Z2fSZ,G\rF\u0002N\u0003{Bq!a \u001f\u0001\u0004\ty#A\u0003mCR\u001c\u0007\u000e")
public class SparkSink
extends AbstractSink
implements Logging,
Configurable {
    private Integer poolSize;
    private int transactionTimeout;
    private String hostname;
    private int port;
    private int backOffInterval;
    private Option<NettyServer> serverOpt;
    private Option<SparkAvroCallbackHandler> handler;
    private final CountDownLatch blockingLatch;
    private transient Logger org$apache$spark$streaming$flume$sink$Logging$$_log;

    @Override
    public Logger log() {
        return Logging.log$(this);
    }

    @Override
    public void logInfo(Function0<String> msg) {
        Logging.logInfo$(this, msg);
    }

    @Override
    public void logDebug(Function0<String> msg) {
        Logging.logDebug$(this, msg);
    }

    @Override
    public void logTrace(Function0<String> msg) {
        Logging.logTrace$(this, msg);
    }

    @Override
    public void logWarning(Function0<String> msg) {
        Logging.logWarning$(this, msg);
    }

    @Override
    public void logError(Function0<String> msg) {
        Logging.logError$(this, msg);
    }

    @Override
    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$(this, msg, throwable);
    }

    @Override
    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$(this, msg, throwable);
    }

    @Override
    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$(this, msg, throwable);
    }

    @Override
    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$(this, msg, throwable);
    }

    @Override
    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$(this, msg, throwable);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public Logger org$apache$spark$streaming$flume$sink$Logging$$_log() {
        return this.org$apache$spark$streaming$flume$sink$Logging$$_log;
    }

    @Override
    public void org$apache$spark$streaming$flume$sink$Logging$$_log_$eq(Logger x$1) {
        this.org$apache$spark$streaming$flume$sink$Logging$$_log = x$1;
    }

    private Integer poolSize() {
        return this.poolSize;
    }

    private void poolSize_$eq(Integer x$1) {
        this.poolSize = x$1;
    }

    private int transactionTimeout() {
        return this.transactionTimeout;
    }

    private void transactionTimeout_$eq(int x$1) {
        this.transactionTimeout = x$1;
    }

    private String hostname() {
        return this.hostname;
    }

    private void hostname_$eq(String x$1) {
        this.hostname = x$1;
    }

    private int port() {
        return this.port;
    }

    private void port_$eq(int x$1) {
        this.port = x$1;
    }

    private int backOffInterval() {
        return this.backOffInterval;
    }

    private void backOffInterval_$eq(int x$1) {
        this.backOffInterval = x$1;
    }

    private Option<NettyServer> serverOpt() {
        return this.serverOpt;
    }

    private void serverOpt_$eq(Option<NettyServer> x$1) {
        this.serverOpt = x$1;
    }

    private Option<SparkAvroCallbackHandler> handler() {
        return this.handler;
    }

    private void handler_$eq(Option<SparkAvroCallbackHandler> x$1) {
        this.handler = x$1;
    }

    private CountDownLatch blockingLatch() {
        return this.blockingLatch;
    }

    public void start() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(91).append("Starting Spark Sink: ").append(this.getName()).append(" on port: ").append(this.port()).append(" and interface: ").append(this.hostname()).append(" with ").append("pool size: ").append(this.poolSize()).append(" and transaction timeout: ").append(this.transactionTimeout()).append(".").toString());
        this.handler_$eq((Option<SparkAvroCallbackHandler>)Option$.MODULE$.apply((Object)new SparkAvroCallbackHandler(Predef$.MODULE$.Integer2int(this.poolSize()), this.getChannel(), this.transactionTimeout(), this.backOffInterval())));
        SpecificResponder responder = new SpecificResponder(SparkFlumeProtocol.class, this.handler().get());
        this.serverOpt_$eq((Option<NettyServer>)Option$.MODULE$.apply((Object)new NettyServer((Responder)responder, new InetSocketAddress(this.hostname(), this.port()))));
        this.serverOpt().foreach((Function1 & Serializable & scala.Serializable)server -> {
            SparkSink.$anonfun$start$2(this, server);
            return BoxedUnit.UNIT;
        });
        super.start();
    }

    public void stop() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(21).append("Stopping Spark Sink: ").append(this.getName()).toString());
        this.handler().foreach((Function1 & Serializable & scala.Serializable)callbackHandler -> {
            callbackHandler.shutdown();
            return BoxedUnit.UNIT;
        });
        this.serverOpt().foreach((Function1 & Serializable & scala.Serializable)server -> {
            SparkSink.$anonfun$stop$3(this, server);
            return BoxedUnit.UNIT;
        });
        this.blockingLatch().countDown();
        super.stop();
    }

    public void configure(Context ctx) {
        this.hostname_$eq(ctx.getString(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), SparkSinkConfig$.MODULE$.DEFAULT_HOSTNAME()));
        this.port_$eq(Predef$.MODULE$.Integer2int((Integer)Option$.MODULE$.apply((Object)ctx.getInteger(SparkSinkConfig$.MODULE$.CONF_PORT())).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            throw new ConfigurationException("The port to bind to must be specified");
        })));
        this.poolSize_$eq(ctx.getInteger(SparkSinkConfig$.MODULE$.THREADS(), Predef$.MODULE$.int2Integer(SparkSinkConfig$.MODULE$.DEFAULT_THREADS())));
        this.transactionTimeout_$eq(Predef$.MODULE$.Integer2int(ctx.getInteger(SparkSinkConfig$.MODULE$.CONF_TRANSACTION_TIMEOUT(), Predef$.MODULE$.int2Integer(SparkSinkConfig$.MODULE$.DEFAULT_TRANSACTION_TIMEOUT()))));
        this.backOffInterval_$eq(Predef$.MODULE$.Integer2int(ctx.getInteger(SparkSinkConfig$.MODULE$.CONF_BACKOFF_INTERVAL(), Predef$.MODULE$.int2Integer(SparkSinkConfig$.MODULE$.DEFAULT_BACKOFF_INTERVAL()))));
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(98).append("Configured Spark Sink with hostname: ").append(this.hostname()).append(", port: ").append(this.port()).append(", ").append("poolSize: ").append(this.poolSize()).append(", transactionTimeout: ").append(this.transactionTimeout()).append(", ").append("backoffInterval: ").append(this.backOffInterval()).toString());
    }

    public Sink.Status process() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Blocking Sink Runner, sink will continue to run..");
        this.blockingLatch().await();
        return Sink.Status.BACKOFF;
    }

    public int getPort() {
        return BoxesRunTime.unboxToInt((Object)this.serverOpt().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToInteger((int)x$1.getPort())).getOrElse((Function0 & Serializable & scala.Serializable)() -> {
            throw new RuntimeException("Server was not started!");
        }));
    }

    public void countdownWhenBatchReceived(CountDownLatch latch) {
        this.handler().foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
            x$2.countDownWhenBatchAcked(latch);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$start$2(SparkSink $this, NettyServer server) {
        $this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(31).append("Starting Avro server for sink: ").append($this.getName()).toString());
        server.start();
    }

    public static final /* synthetic */ void $anonfun$stop$3(SparkSink $this, NettyServer server) {
        $this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(31).append("Stopping Avro Server for sink: ").append($this.getName()).toString());
        server.close();
        server.join();
    }

    public SparkSink() {
        Logging.$init$(this);
        this.poolSize = Predef$.MODULE$.int2Integer(SparkSinkConfig$.MODULE$.DEFAULT_THREADS());
        this.transactionTimeout = SparkSinkConfig$.MODULE$.DEFAULT_TRANSACTION_TIMEOUT();
        this.hostname = SparkSinkConfig$.MODULE$.DEFAULT_HOSTNAME();
        this.port = 0;
        this.backOffInterval = 200;
        this.serverOpt = None$.MODULE$;
        this.handler = None$.MODULE$;
        this.blockingLatch = new CountDownLatch(1);
    }
}

