package org.apache.spark.streaming.flume;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.storage.StorageLevel$;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Seconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.TestOutputStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfter;
import org.scalatest.FunSuiteLike;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.time.Span$;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: FlumePollingStreamSuite.scala */
@ScalaSignature(bytes = "\u0006\u0001\u00154A!\u0001\u0002\u0001\u001b\t9b\t\\;nKB{G\u000e\\5oON#(/Z1n'VLG/\u001a\u0006\u0003\u0007\u0011\tQA\u001a7v[\u0016T!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0004\u0001M!\u0001A\u0004\n\u0019!\ty\u0001#D\u0001\u0007\u0013\t\tbAA\u0007Ta\u0006\u00148NR;o'VLG/\u001a\t\u0003'Yi\u0011\u0001\u0006\u0006\u0003+)\t\u0011b]2bY\u0006$Xm\u001d;\n\u0005]!\"A\u0004\"fM>\u0014X-\u00118e\u0003\u001a$XM\u001d\t\u0003\u001feI!A\u0007\u0004\u0003\u000f1{wmZ5oO\")A\u0004\u0001C\u0001;\u00051A(\u001b8jiz\"\u0012A\b\t\u0003?\u0001i\u0011A\u0001\u0005\bC\u0001\u0011\r\u0011\"\u0001#\u0003-i\u0017\r_!ui\u0016l\u0007\u000f^:\u0016\u0003\r\u0002\"\u0001J\u0014\u000e\u0003\u0015R\u0011AJ\u0001\u0006g\u000e\fG.Y\u0005\u0003Q\u0015\u00121!\u00138u\u0011\u0019Q\u0003\u0001)A\u0005G\u0005aQ.\u0019=BiR,W\u000e\u001d;tA!9A\u0006\u0001b\u0001\n\u0003i\u0013!\u00042bi\u000eDG)\u001e:bi&|g.F\u0001/!\ty\u0003'D\u0001\u0005\u0013\t\tDA\u0001\u0005EkJ\fG/[8o\u0011\u0019\u0019\u0004\u0001)A\u0005]\u0005q!-\u0019;dQ\u0012+(/\u0019;j_:\u0004\u0003bB\u001b\u0001\u0005\u0004%\tAN\u0001\u0005G>tg-F\u00018!\ty\u0001(\u0003\u0002:\r\tI1\u000b]1sW\u000e{gN\u001a\u0005\u0007w\u0001\u0001\u000b\u0011B\u001c\u0002\u000b\r|gN\u001a\u0011\t\u000fu\u0002!\u0019!C\u0001}\u0005)Q\u000f^5mgV\tq\b\u0005\u0002 \u0001&\u0011\u0011I\u0001\u0002\u0016!>dG.\u001b8h\r2,X.\u001a+fgR,F/\u001b7t\u0011\u0019\u0019\u0005\u0001)A\u0005\u007f\u00051Q\u000f^5mg\u0002BQ!\u0012\u0001\u0005\n\u0019\u000b\u0011\u0003^3ti6+H\u000e^5qY\u0016$\u0016.\\3t)\t9%\n\u0005\u0002%\u0011&\u0011\u0011*\n\u0002\u0005+:LG\u000fC\u0003L\t\u0002\u0007A*\u0001\u0003uKN$\bc\u0001\u0013N\u000f&\u0011a*\n\u0002\n\rVt7\r^5p]BBQ\u0001\u0015\u0001\u0005\nE\u000b\u0001\u0003^3ti\u001acW/\\3Q_2d\u0017N\\4\u0015\u0003\u001dCQa\u0015\u0001\u0005\nE\u000bA\u0004^3ti\u001acW/\\3Q_2d\u0017N\\4Nk2$\u0018\u000e\u001d7f\u0011>\u001cH\u000fC\u0003V\u0001\u0011\u0005a+\u0001\bxe&$X-\u00118e-\u0016\u0014\u0018NZ=\u0015\u0005\u001d;\u0006\"\u0002-U\u0001\u0004I\u0016!C:j].\u0004vN\u001d;t!\rQ&m\t\b\u00037\u0002t!\u0001X0\u000e\u0003uS!A\u0018\u0007\u0002\rq\u0012xn\u001c;?\u0013\u00051\u0013BA1&\u0003\u001d\u0001\u0018mY6bO\u0016L!a\u00193\u0003\u0007M+\u0017O\u0003\u0002bK\u0001")
/* loaded from: input_file:org/apache/spark/streaming/flume/FlumePollingStreamSuite.class */
public class FlumePollingStreamSuite extends SparkFunSuite implements BeforeAndAfter {
    private final int maxAttempts;
    private final Duration batchDuration;
    private final SparkConf conf;
    private final PollingFlumeTestUtils utils;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    private final AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    private volatile boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked;

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$beforeFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic;
    }

    public AtomicReference<Option<Function0<Object>>> org$scalatest$BeforeAndAfter$$afterFunctionAtomic() {
        return this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic;
    }

    public boolean org$scalatest$BeforeAndAfter$$runHasBeenInvoked() {
        return this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked;
    }

    public void org$scalatest$BeforeAndAfter$$runHasBeenInvoked_$eq(boolean z) {
        this.org$scalatest$BeforeAndAfter$$runHasBeenInvoked = z;
    }

    public Status org$scalatest$BeforeAndAfter$$super$runTest(String str, Args args) {
        return FunSuiteLike.class.runTest(this, str, args);
    }

    public Status org$scalatest$BeforeAndAfter$$super$run(Option option, Args args) {
        return FunSuiteLike.class.run(this, option, args);
    }

    public void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$beforeFunctionAtomic_$eq(AtomicReference atomicReference) {
        this.org$scalatest$BeforeAndAfter$$beforeFunctionAtomic = atomicReference;
    }

    public void org$scalatest$BeforeAndAfter$_setter_$org$scalatest$BeforeAndAfter$$afterFunctionAtomic_$eq(AtomicReference atomicReference) {
        this.org$scalatest$BeforeAndAfter$$afterFunctionAtomic = atomicReference;
    }

    public void before(Function0<Object> function0) {
        BeforeAndAfter.class.before(this, function0);
    }

    public void after(Function0<Object> function0) {
        BeforeAndAfter.class.after(this, function0);
    }

    public Status runTest(String str, Args args) {
        return BeforeAndAfter.class.runTest(this, str, args);
    }

    public Status run(Option<String> option, Args args) {
        return BeforeAndAfter.class.run(this, option, args);
    }

    public int maxAttempts() {
        return this.maxAttempts;
    }

    public Duration batchDuration() {
        return this.batchDuration;
    }

    public SparkConf conf() {
        return this.conf;
    }

    public PollingFlumeTestUtils utils() {
        return this.utils;
    }

    /* JADX WARN: Removed duplicated region for block: B:13:0x0015  */
    /* JADX WARN: Removed duplicated region for block: B:21:0x009c A[EDGE_INSN: B:21:0x009c->B:18:0x009c BREAK  A[LOOP:0: B:2:0x003e->B:9:0x003e], SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void org$apache$spark$streaming$flume$FlumePollingStreamSuite$$testMultipleTimes(scala.Function0<scala.runtime.BoxedUnit> r11) {
        /*
            r10 = this;
            r0 = 0
            r12 = r0
            r0 = 0
            r13 = r0
            goto L3e
        L7:
            r14 = move-exception
            r0 = r14
            r15 = r0
            r0 = r15
            boolean r0 = r0 instanceof java.lang.Exception
            if (r0 == 0) goto L9c
            r0 = r15
            java.lang.Exception r0 = (java.lang.Exception) r0
            r16 = r0
            org.apache.spark.util.Utils$ r0 = org.apache.spark.util.Utils$.MODULE$
            r1 = r16
            boolean r0 = r0.isBindCollision(r1)
            if (r0 == 0) goto L9c
            r0 = r10
            org.apache.spark.streaming.flume.FlumePollingStreamSuite$$anonfun$org$apache$spark$streaming$flume$FlumePollingStreamSuite$$testMultipleTimes$1 r1 = new org.apache.spark.streaming.flume.FlumePollingStreamSuite$$anonfun$org$apache$spark$streaming$flume$FlumePollingStreamSuite$$testMultipleTimes$1
            r2 = r1
            r3 = r10
            r4 = r16
            r2.<init>(r3, r4)
            r0.logWarning(r1)
            r0 = r13
            r1 = 1
            int r0 = r0 + r1
            r13 = r0
            scala.runtime.BoxedUnit r0 = scala.runtime.BoxedUnit.UNIT
            r17 = r0
        L3e:
            r0 = r12
            if (r0 != 0) goto L55
            r0 = r13
            r1 = r10
            int r1 = r1.maxAttempts()
            if (r0 >= r1) goto L55
            r0 = r11
            r0.apply$mcV$sp()     // Catch: java.lang.Throwable -> L7
            r0 = 1
            r12 = r0
            goto L3e
        L55:
            org.scalactic.Bool$ r0 = org.scalactic.Bool$.MODULE$     // Catch: java.lang.Throwable -> L7
            r1 = r12
            java.lang.String r2 = "testPassed"
            org.scalactic.Bool r0 = r0.simpleMacroBool(r1, r2)
            r18 = r0
            r0 = r10
            org.scalatest.Assertions$AssertionsHelper r0 = r0.assertionsHelper()
            r1 = r18
            scala.StringContext r2 = new scala.StringContext
            r3 = r2
            scala.Predef$ r4 = scala.Predef$.MODULE$
            r5 = 2
            java.lang.String[] r5 = new java.lang.String[r5]
            r6 = r5
            r7 = 0
            java.lang.String r8 = "Test failed after "
            r6[r7] = r8
            r6 = r5
            r7 = 1
            java.lang.String r8 = " attempts!"
            r6[r7] = r8
            java.lang.Object[] r5 = (java.lang.Object[]) r5
            scala.collection.mutable.WrappedArray r4 = r4.wrapRefArray(r5)
            r3.<init>(r4)
            scala.Predef$ r3 = scala.Predef$.MODULE$
            r4 = 1
            java.lang.Object[] r4 = new java.lang.Object[r4]
            r5 = r4
            r6 = 0
            r7 = r13
            java.lang.Integer r7 = scala.runtime.BoxesRunTime.boxToInteger(r7)
            r5[r6] = r7
            scala.collection.mutable.WrappedArray r3 = r3.genericWrapArray(r4)
            java.lang.String r2 = r2.s(r3)
            r0.macroAssert(r1, r2)
            return
        L9c:
            r0 = r14
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.spark.streaming.flume.FlumePollingStreamSuite.org$apache$spark$streaming$flume$FlumePollingStreamSuite$$testMultipleTimes(scala.Function0):void");
    }

    public void org$apache$spark$streaming$flume$FlumePollingStreamSuite$$testFlumePolling() {
        try {
            writeAndVerify((Seq) Seq$.MODULE$.apply(Predef$.MODULE$.wrapIntArray(new int[]{utils().startSingleSink()})));
            utils().assertChannelsAreEmpty();
        } finally {
            utils().close();
        }
    }

    public void org$apache$spark$streaming$flume$FlumePollingStreamSuite$$testFlumePollingMultipleHost() {
        try {
            writeAndVerify(utils().startMultipleSinks());
            utils().assertChannelsAreEmpty();
        } finally {
            utils().close();
        }
    }

    public void writeAndVerify(Seq<Object> seq) {
        StreamingContext streamingContext = new StreamingContext(conf(), batchDuration());
        ReceiverInputDStream createPollingStream = FlumeUtils$.MODULE$.createPollingStream(streamingContext, (Seq) seq.map(new FlumePollingStreamSuite$$anonfun$3(this), Seq$.MODULE$.canBuildFrom()), StorageLevel$.MODULE$.MEMORY_AND_DISK(), utils().eventsPerBatch(), 5);
        FlumePollingStreamSuite$$anon$1 flumePollingStreamSuite$$anon$1 = new FlumePollingStreamSuite$$anon$1(this);
        new TestOutputStream(createPollingStream, flumePollingStreamSuite$$anon$1, ClassTag$.MODULE$.apply(SparkFlumeEvent.class)).register();
        streamingContext.start();
        try {
            utils().sendDatAndEnsureAllDataHasBeenReceived();
            streamingContext.scheduler().clock().advance(batchDuration().milliseconds());
            Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(10)).seconds())), Eventually$.MODULE$.interval(Span$.MODULE$.convertDurationToSpan(new package.DurationInt(package$.MODULE$.DurationInt(100)).milliseconds())), new FlumePollingStreamSuite$$anonfun$writeAndVerify$1(this, flumePollingStreamSuite$$anon$1));
        } finally {
            streamingContext.stop(streamingContext.stop$default$1());
        }
    }

    public FlumePollingStreamSuite() {
        BeforeAndAfter.class.$init$(this);
        this.maxAttempts = 5;
        this.batchDuration = Seconds$.MODULE$.apply(1L);
        this.conf = new SparkConf().setMaster("local[2]").setAppName(getClass().getSimpleName()).set("spark.streaming.clock", "org.apache.spark.util.ManualClock");
        this.utils = new PollingFlumeTestUtils();
        test("flume polling test", Predef$.MODULE$.wrapRefArray(new Tag[0]), new FlumePollingStreamSuite$$anonfun$1(this));
        test("flume polling test multiple hosts", Predef$.MODULE$.wrapRefArray(new Tag[0]), new FlumePollingStreamSuite$$anonfun$2(this));
    }
}
