/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.flume.sink;

import java.io.Serializable;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Channel;
import org.apache.spark.streaming.flume.sink.EventBatch;
import org.apache.spark.streaming.flume.sink.Logging;
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol;
import org.apache.spark.streaming.flume.sink.SparkSinkThreadFactory;
import org.apache.spark.streaming.flume.sink.SparkSinkUtils$;
import org.apache.spark.streaming.flume.sink.TransactionProcessor;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashMap$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0001\u0005Mf!B\u0012%\u0001\u0019\u0002\u0004\u0002\u0003!\u0001\u0005\u000b\u0007I\u0011\u0001\"\t\u0011%\u0003!\u0011!Q\u0001\n\rC\u0001B\u0013\u0001\u0003\u0006\u0004%\ta\u0013\u0005\t#\u0002\u0011\t\u0011)A\u0005\u0019\"A!\u000b\u0001BC\u0002\u0013\u0005!\t\u0003\u0005T\u0001\t\u0005\t\u0015!\u0003D\u0011!!\u0006A!b\u0001\n\u0003\u0011\u0005\u0002C+\u0001\u0005\u0003\u0005\u000b\u0011B\"\t\u000bY\u0003A\u0011A,\t\u000fu\u0003!\u0019!C\u0001=\"1!\u000e\u0001Q\u0001\n}Cqa\u001b\u0001C\u0002\u0013%A\u000e\u0003\u0004|\u0001\u0001\u0006I!\u001c\u0005\by\u0002\u0011\r\u0011\"\u0003~\u0011\u001d\t\u0019\u0001\u0001Q\u0001\nyD\u0011\"!\u0002\u0001\u0005\u0004%I!a\u0002\t\u0011\u0005U\u0001\u0001)A\u0005\u0003\u0013A\u0011\"a\u0006\u0001\u0001\u0004%I!!\u0007\t\u0013\u0005\u0005\u0002\u00011A\u0005\n\u0005\r\u0002\u0002CA\u0018\u0001\u0001\u0006K!a\u0007\t\u0013\u0005E\u0002\u00011A\u0005\n\u0005e\u0001\"CA\u001a\u0001\u0001\u0007I\u0011BA\u001b\u0011!\tI\u0004\u0001Q!\n\u0005m\u0001\"CA\"\u0001\u0001\u0007I\u0011BA#\u0011%\ti\u0005\u0001a\u0001\n\u0013\ty\u0005\u0003\u0005\u0002T\u0001\u0001\u000b\u0015BA$\u0011\u001d\t)\u0006\u0001C!\u0003/Bq!a\u0019\u0001\t\u0013\t)\u0007C\u0004\u0002\u0004\u0002!\t%!\"\t\u000f\u0005E\u0005\u0001\"\u0011\u0002\u0014\"9\u0011q\u0013\u0001\u0005\n\u0005e\u0005\u0002CAQ\u0001\u0011\u0005A%a)\t\u0011\u0005\u001d\u0006\u0001\"\u0001%\u0003SCq!a,\u0001\t\u0003\t\tL\u0001\rTa\u0006\u00148.\u0011<s_\u000e\u000bG\u000e\u001c2bG.D\u0015M\u001c3mKJT!!\n\u0014\u0002\tMLgn\u001b\u0006\u0003O!\nQA\u001a7v[\u0016T!!\u000b\u0016\u0002\u0013M$(/Z1nS:<'BA\u0016-\u0003\u0015\u0019\b/\u0019:l\u0015\tic&\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002_\u0005\u0019qN]4\u0014\t\u0001\t\u0014(\u0010\t\u0003e]j\u0011a\r\u0006\u0003iU\nA\u0001\\1oO*\ta'\u0001\u0003kCZ\f\u0017B\u0001\u001d4\u0005\u0019y%M[3diB\u0011!hO\u0007\u0002I%\u0011A\b\n\u0002\u0013'B\f'o\u001b$mk6,\u0007K]8u_\u000e|G\u000e\u0005\u0002;}%\u0011q\b\n\u0002\b\u0019><w-\u001b8h\u0003\u001d!\bN]3bIN\u001c\u0001!F\u0001D!\t!u)D\u0001F\u0015\u00051\u0015!B:dC2\f\u0017B\u0001%F\u0005\rIe\u000e^\u0001\ti\"\u0014X-\u00193tA\u000591\r[1o]\u0016dW#\u0001'\u0011\u00055{U\"\u0001(\u000b\u0005\u001db\u0013B\u0001)O\u0005\u001d\u0019\u0005.\u00198oK2\f\u0001b\u00195b]:,G\u000eI\u0001\u0013iJ\fgn]1di&|g\u000eV5nK>,H/A\nue\u0006t7/Y2uS>tG+[7f_V$\b%A\bcC\u000e\\wJ\u001a4J]R,'O^1m\u0003A\u0011\u0017mY6PM\u001aLe\u000e^3sm\u0006d\u0007%\u0001\u0004=S:LGO\u0010\u000b\u00061fS6\f\u0018\t\u0003u\u0001AQ\u0001Q\u0005A\u0002\rCQAS\u0005A\u00021CQAU\u0005A\u0002\rCQ\u0001V\u0005A\u0002\r\u000ba\u0003\u001e:b]N\f7\r^5p]\u0016CXmY;u_J|\u0005\u000f^\u000b\u0002?B\u0019A\t\u00192\n\u0005\u0005,%AB(qi&|g\u000e\u0005\u0002dQ6\tAM\u0003\u0002fM\u0006Q1m\u001c8dkJ\u0014XM\u001c;\u000b\u0005\u001d,\u0014\u0001B;uS2L!!\u001b3\u0003\u001f\u0015CXmY;u_J\u001cVM\u001d<jG\u0016\fq\u0003\u001e:b]N\f7\r^5p]\u0016CXmY;u_J|\u0005\u000f\u001e\u0011\u00023M,\u0017/^3oG\u0016tU/\u001c2feR{\u0007K]8dKN\u001cxN]\u000b\u0002[B!an];y\u001b\u0005y'B\u00019r\u0003\u001diW\u000f^1cY\u0016T!A]#\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002u_\n9\u0001*Y:i\u001b\u0006\u0004\bC\u0001\u001aw\u0013\t98G\u0001\u0007DQ\u0006\u00148+Z9vK:\u001cW\r\u0005\u0002;s&\u0011!\u0010\n\u0002\u0015)J\fgn]1di&|g\u000e\u0015:pG\u0016\u001c8o\u001c:\u00025M,\u0017/^3oG\u0016tU/\u001c2feR{\u0007K]8dKN\u001cxN\u001d\u0011\u0002\u000fM,\u0017OQ1tKV\ta\u0010\u0005\u00023\u007f&\u0019\u0011\u0011A\u001a\u0003\rM#(/\u001b8h\u0003!\u0019X-\u001d\"bg\u0016\u0004\u0013AC:fc\u000e{WO\u001c;feV\u0011\u0011\u0011\u0002\t\u0005\u0003\u0017\t\t\"\u0004\u0002\u0002\u000e)\u0019\u0011q\u00023\u0002\r\u0005$x.\\5d\u0013\u0011\t\u0019\"!\u0004\u0003\u0015\u0005#x.\\5d\u0019>tw-A\u0006tKF\u001cu.\u001e8uKJ\u0004\u0013aB:u_B\u0004X\rZ\u000b\u0003\u00037\u00012\u0001RA\u000f\u0013\r\ty\"\u0012\u0002\b\u0005>|G.Z1o\u0003-\u0019Ho\u001c9qK\u0012|F%Z9\u0015\t\u0005\u0015\u00121\u0006\t\u0004\t\u0006\u001d\u0012bAA\u0015\u000b\n!QK\\5u\u0011%\ticEA\u0001\u0002\u0004\tY\"A\u0002yIE\n\u0001b\u001d;paB,G\rI\u0001\u0007SN$Vm\u001d;\u0002\u0015%\u001cH+Z:u?\u0012*\u0017\u000f\u0006\u0003\u0002&\u0005]\u0002\"CA\u0017-\u0005\u0005\t\u0019AA\u000e\u0003\u001dI7\u000fV3ti\u0002B3aFA\u001f!\r!\u0015qH\u0005\u0004\u0003\u0003*%\u0001\u0003<pY\u0006$\u0018\u000e\\3\u0002\u0013Q,7\u000f\u001e'bi\u000eDWCAA$!\r\u0019\u0017\u0011J\u0005\u0004\u0003\u0017\"'AD\"pk:$Hi\\<o\u0019\u0006$8\r[\u0001\u000ei\u0016\u001cH\u000fT1uG\"|F%Z9\u0015\t\u0005\u0015\u0012\u0011\u000b\u0005\n\u0003[I\u0012\u0011!a\u0001\u0003\u000f\n!\u0002^3ti2\u000bGo\u00195!\u000359W\r^#wK:$()\u0019;dQR!\u0011\u0011LA0!\rQ\u00141L\u0005\u0004\u0003;\"#AC#wK:$()\u0019;dQ\"1\u0011\u0011M\u000eA\u0002\r\u000b\u0011A\\\u0001\u0010GJ,\u0017\r^3Qe>\u001cWm]:peR1\u0011qMA5\u0003\u0003\u00032\u0001\u00121y\u0011\u001d\tY\u0007\ba\u0001\u0003[\n1a]3r!\u0011\ty'! \u000f\t\u0005E\u0014\u0011\u0010\t\u0004\u0003g*UBAA;\u0015\r\t9(Q\u0001\u0007yI|w\u000e\u001e \n\u0007\u0005mT)\u0001\u0004Qe\u0016$WMZ\u0005\u0005\u0003\u0003\tyHC\u0002\u0002|\u0015Ca!!\u0019\u001d\u0001\u0004\u0019\u0015aA1dWR!\u0011qQAG!\r\u0011\u0014\u0011R\u0005\u0004\u0003\u0017\u001b$\u0001\u0002,pS\u0012Da!a$\u001e\u0001\u0004)\u0018AD:fcV,gnY3Ok6\u0014WM]\u0001\u0005]\u0006\u001c7\u000e\u0006\u0003\u0002\b\u0006U\u0005BBAH=\u0001\u0007Q/A\nd_6\u0004H.\u001a;f)J\fgn]1di&|g\u000e\u0006\u0004\u0002&\u0005m\u0015Q\u0014\u0005\u0007\u0003\u001f{\u0002\u0019A;\t\u000f\u0005}u\u00041\u0001\u0002\u001c\u000591/^2dKN\u001c\u0018!\u0006:f[>4X-\u00118e\u000f\u0016$\bK]8dKN\u001cxN\u001d\u000b\u0005\u0003O\n)\u000b\u0003\u0004\u0002\u0010\u0002\u0002\r!^\u0001\u0018G>,h\u000e\u001e#po:<\u0006.\u001a8CCR\u001c\u0007.Q2lK\u0012$B!!\n\u0002,\"9\u0011QV\u0011A\u0002\u0005\u001d\u0013!\u00027bi\u000eD\u0017\u0001C:ikR$wn\u001e8\u0015\u0005\u0005\u0015\u0002")
public class SparkAvroCallbackHandler
implements SparkFlumeProtocol,
Logging {
    private final int threads;
    private final Channel channel;
    private final int transactionTimeout;
    private final int backOffInterval;
    private final Option<ExecutorService> transactionExecutorOpt;
    private final HashMap<CharSequence, TransactionProcessor> sequenceNumberToProcessor;
    private final String seqBase;
    private final AtomicLong seqCounter;
    private boolean stopped;
    private volatile boolean isTest;
    private CountDownLatch testLatch;
    private transient Logger org$apache$spark$streaming$flume$sink$Logging$$_log;

    @Override
    public Logger log() {
        return Logging.log$(this);
    }

    @Override
    public void logInfo(Function0<String> msg) {
        Logging.logInfo$(this, msg);
    }

    @Override
    public void logDebug(Function0<String> msg) {
        Logging.logDebug$(this, msg);
    }

    @Override
    public void logTrace(Function0<String> msg) {
        Logging.logTrace$(this, msg);
    }

    @Override
    public void logWarning(Function0<String> msg) {
        Logging.logWarning$(this, msg);
    }

    @Override
    public void logError(Function0<String> msg) {
        Logging.logError$(this, msg);
    }

    @Override
    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$(this, msg, throwable);
    }

    @Override
    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$(this, msg, throwable);
    }

    @Override
    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$(this, msg, throwable);
    }

    @Override
    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$(this, msg, throwable);
    }

    @Override
    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$(this, msg, throwable);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public Logger org$apache$spark$streaming$flume$sink$Logging$$_log() {
        return this.org$apache$spark$streaming$flume$sink$Logging$$_log;
    }

    @Override
    public void org$apache$spark$streaming$flume$sink$Logging$$_log_$eq(Logger x$1) {
        this.org$apache$spark$streaming$flume$sink$Logging$$_log = x$1;
    }

    public int threads() {
        return this.threads;
    }

    public Channel channel() {
        return this.channel;
    }

    public int transactionTimeout() {
        return this.transactionTimeout;
    }

    public int backOffInterval() {
        return this.backOffInterval;
    }

    public Option<ExecutorService> transactionExecutorOpt() {
        return this.transactionExecutorOpt;
    }

    private HashMap<CharSequence, TransactionProcessor> sequenceNumberToProcessor() {
        return this.sequenceNumberToProcessor;
    }

    private String seqBase() {
        return this.seqBase;
    }

    private AtomicLong seqCounter() {
        return this.seqCounter;
    }

    private boolean stopped() {
        return this.stopped;
    }

    private void stopped_$eq(boolean x$1) {
        this.stopped = x$1;
    }

    private boolean isTest() {
        return this.isTest;
    }

    private void isTest_$eq(boolean x$1) {
        this.isTest = x$1;
    }

    private CountDownLatch testLatch() {
        return this.testLatch;
    }

    private void testLatch_$eq(CountDownLatch x$1) {
        this.testLatch = x$1;
    }

    @Override
    public EventBatch getEventBatch(int n) {
        EventBatch eventBatch;
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Got getEventBatch call from Spark.");
        String sequenceNumber = new StringBuilder(0).append(this.seqBase()).append(this.seqCounter().incrementAndGet()).toString();
        Option<TransactionProcessor> option = this.createProcessor(sequenceNumber, n);
        if (option instanceof Some) {
            Some some = (Some)option;
            TransactionProcessor processor = (TransactionProcessor)some.value();
            this.transactionExecutorOpt().foreach((Function1 & Serializable & scala.Serializable)x$1 -> x$1.submit(processor));
            EventBatch batch = processor.getEventBatch();
            if (SparkSinkUtils$.MODULE$.isErrorBatch(batch)) {
                this.removeAndGetProcessor(sequenceNumber);
                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Received an error batch - no events were received from channel! ");
            }
            eventBatch = batch;
        } else if (None$.MODULE$.equals(option)) {
            eventBatch = new EventBatch("Spark sink has been stopped!", "", Collections.emptyList());
        } else {
            throw new MatchError(option);
        }
        return eventBatch;
    }

    private Option<TransactionProcessor> createProcessor(String seq, int n) {
        None$ none$;
        HashMap<CharSequence, TransactionProcessor> hashMap = this.sequenceNumberToProcessor();
        synchronized (hashMap) {
            None$ none$2;
            if (!this.stopped()) {
                TransactionProcessor processor = new TransactionProcessor(this.channel(), seq, n, this.transactionTimeout(), this.backOffInterval(), this);
                this.sequenceNumberToProcessor().put((Object)seq, (Object)processor);
                if (this.isTest()) {
                    processor.countDownWhenBatchAcked(this.testLatch());
                }
                none$2 = new Some((Object)processor);
            } else {
                none$2 = None$.MODULE$;
            }
            none$ = none$2;
        }
        return none$;
    }

    @Override
    public Void ack(CharSequence sequenceNumber) {
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(45).append("Received Ack for batch with sequence number: ").append(sequenceNumber).toString());
        this.completeTransaction(sequenceNumber, true);
        return null;
    }

    @Override
    public Void nack(CharSequence sequenceNumber) {
        this.completeTransaction(sequenceNumber, false);
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Spark failed to commit transaction. Will reattempt events.");
        return null;
    }

    private void completeTransaction(CharSequence sequenceNumber, boolean success) {
        this.removeAndGetProcessor(sequenceNumber).foreach((Function1 & Serializable & scala.Serializable)processor -> {
            processor.batchProcessed(success);
            return BoxedUnit.UNIT;
        });
    }

    public Option<TransactionProcessor> removeAndGetProcessor(CharSequence sequenceNumber) {
        Option option;
        HashMap<CharSequence, TransactionProcessor> hashMap = this.sequenceNumberToProcessor();
        synchronized (hashMap) {
            option = this.sequenceNumberToProcessor().remove((Object)((Object)sequenceNumber).toString());
        }
        return option;
    }

    public void countDownWhenBatchAcked(CountDownLatch latch) {
        this.testLatch_$eq(latch);
        this.isTest_$eq(true);
    }

    public void shutdown() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Shutting down Spark Avro Callback Handler");
        HashMap<CharSequence, TransactionProcessor> hashMap = this.sequenceNumberToProcessor();
        synchronized (hashMap) {
            this.stopped_$eq(true);
            this.sequenceNumberToProcessor().values().foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
                x$2.shutdown();
                return BoxedUnit.UNIT;
            });
        }
        this.transactionExecutorOpt().foreach((Function1 & Serializable & scala.Serializable)x$3 -> x$3.shutdownNow());
    }

    public SparkAvroCallbackHandler(int threads, Channel channel, int transactionTimeout, int backOffInterval) {
        this.threads = threads;
        this.channel = channel;
        this.transactionTimeout = transactionTimeout;
        this.backOffInterval = backOffInterval;
        Logging.$init$(this);
        this.transactionExecutorOpt = Option$.MODULE$.apply((Object)Executors.newFixedThreadPool(threads, new SparkSinkThreadFactory("Spark Sink Processor Thread - %d")));
        this.sequenceNumberToProcessor = (HashMap)HashMap$.MODULE$.apply((Seq)Nil$.MODULE$);
        this.seqBase = UUID.randomUUID().toString().substring(0, 8);
        this.seqCounter = new AtomicLong(0L);
        this.stopped = false;
        this.isTest = false;
        this.testLatch = null;
    }
}

