/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.flume.sink;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.ipc.NettyTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.event.EventBuilder;
import org.apache.spark.streaming.flume.sink.EventBatch;
import org.apache.spark.streaming.flume.sink.SparkFlumeProtocol;
import org.apache.spark.streaming.flume.sink.SparkSink;
import org.apache.spark.streaming.flume.sink.SparkSinkConfig$;
import org.apache.spark.streaming.flume.sink.SparkSinkThreadFactory;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.FunSuite;
import org.scalatest.Tag;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutorService;
import scala.concurrent.Future$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;
import scala.util.Failure;
import scala.util.Random$;
import scala.util.Success;
import scala.util.Try;

@ScalaSignature(bytes="\u0006\u0001\u0005Ec\u0001\u0002\b\u0010\u0001qAQa\t\u0001\u0005\u0002\u0011Bqa\n\u0001C\u0002\u0013\u0005\u0001\u0006\u0003\u00040\u0001\u0001\u0006I!\u000b\u0005\ba\u0001\u0011\r\u0011\"\u0001)\u0011\u0019\t\u0004\u0001)A\u0005S!)!\u0007\u0001C\u0001g!)A\b\u0001C\u0005{!9q\rAI\u0001\n\u0013A\u0007bB:\u0001#\u0003%I\u0001\u001e\u0005\u0006m\u0002!Ia\u001e\u0005\u0006y\u0002!I! \u0005\b\u0003\u000b\u0002A\u0011BA$\u0011\u001d\tY\u0005\u0001C\u0005\u0003\u001b\u0012ab\u00159be.\u001c\u0016N\\6Tk&$XM\u0003\u0002\u0011#\u0005!1/\u001b8l\u0015\t\u00112#A\u0003gYVlWM\u0003\u0002\u0015+\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003-]\tQa\u001d9be.T!\u0001G\r\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Q\u0012aA8sO\u000e\u00011C\u0001\u0001\u001e!\tq\u0012%D\u0001 \u0015\t\u0001\u0013$A\u0005tG\u0006d\u0017\r^3ti&\u0011!e\b\u0002\t\rVt7+^5uK\u00061A(\u001b8jiz\"\u0012!\n\t\u0003M\u0001i\u0011aD\u0001\u000fKZ,g\u000e^:QKJ\u0014\u0015\r^2i+\u0005I\u0003C\u0001\u0016.\u001b\u0005Y#\"\u0001\u0017\u0002\u000bM\u001c\u0017\r\\1\n\u00059Z#aA%oi\u0006yQM^3oiN\u0004VM\u001d\"bi\u000eD\u0007%A\bdQ\u0006tg.\u001a7DCB\f7-\u001b;z\u0003A\u0019\u0007.\u00198oK2\u001c\u0015\r]1dSRL\b%A\u000buKN$X*\u001e7uSBdWmQ8ogVlWM]:\u0015\u0005Q:\u0004C\u0001\u00166\u0013\t14F\u0001\u0003V]&$\b\"\u0002\u001d\u0007\u0001\u0004I\u0014\u0001\u00034bS2\u001cv.\\3\u0011\u0005)R\u0014BA\u001e,\u0005\u001d\u0011un\u001c7fC:\f\u0001$\u001b8ji&\fG.\u001b>f\u0007\"\fgN\\3m\u0003:$7+\u001b8l)\rqT+\u001a\t\u0006U}\n\u0005jS\u0005\u0003\u0001.\u0012a\u0001V;qY\u0016\u001c\u0004C\u0001\"G\u001b\u0005\u0019%B\u0001#F\u0003\u001d\u0019\u0007.\u00198oK2T!AE\f\n\u0005\u001d\u001b%!D'f[>\u0014\u0018p\u00115b]:,G\u000e\u0005\u0002'\u0013&\u0011!j\u0004\u0002\n'B\f'o[*j].\u0004\"\u0001T*\u000e\u00035S!AT(\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002Q#\u0006!Q\u000f^5m\u0015\u0005\u0011\u0016\u0001\u00026bm\u0006L!\u0001V'\u0003\u001d\r{WO\u001c;E_^tG*\u0019;dQ\"9ak\u0002I\u0001\u0002\u00049\u0016!C8wKJ\u0014\u0018\u000eZ3t!\u0011AvL\u00192\u000f\u0005ek\u0006C\u0001.,\u001b\u0005Y&B\u0001/\u001c\u0003\u0019a$o\\8u}%\u0011alK\u0001\u0007!J,G-\u001a4\n\u0005\u0001\f'aA'ba*\u0011al\u000b\t\u00031\u000eL!\u0001Z1\u0003\rM#(/\u001b8h\u0011\u001d1w\u0001%AA\u0002%\nABY1uG\"\u001cu.\u001e8uKJ\f!%\u001b8ji&\fG.\u001b>f\u0007\"\fgN\\3m\u0003:$7+\u001b8lI\u0011,g-Y;mi\u0012\nT#A5+\u0005]S7&A6\u0011\u00051\fX\"A7\u000b\u00059|\u0017!C;oG\",7m[3e\u0015\t\u00018&\u0001\u0006b]:|G/\u0019;j_:L!A]7\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-\u0001\u0012j]&$\u0018.\u00197ju\u0016\u001c\u0005.\u00198oK2\fe\u000eZ*j].$C-\u001a4bk2$HEM\u000b\u0002k*\u0012\u0011F[\u0001\naV$XI^3oiN$2\u0001\u000e={\u0011\u0015I(\u00021\u0001B\u0003\t\u0019\u0007\u000eC\u0003|\u0015\u0001\u0007\u0011&A\u0003d_VtG/A\fhKR$&/\u00198tG\u0016Lg/\u001a:B]\u0012\u001cE.[3oiR)a0a\r\u0002DA)q0!\u0003\u0002\u00109!\u0011\u0011AA\u0003\u001d\rQ\u00161A\u0005\u0002Y%\u0019\u0011qA\u0016\u0002\u000fA\f7m[1hK&!\u00111BA\u0007\u0005\r\u0019V-\u001d\u0006\u0004\u0003\u000fY\u0003c\u0002\u0016\u0002\u0012\u0005U\u0011QE\u0005\u0004\u0003'Y#A\u0002+va2,'\u0007\u0005\u0003\u0002\u0018\u0005\u0005RBAA\r\u0015\u0011\tY\"!\b\u0002\u0007%\u00048MC\u0002\u0002 ]\tA!\u0019<s_&!\u00111EA\r\u0005AqU\r\u001e;z)J\fgn]2fSZ,'\u000f\u0005\u0003\u0002(\u00055bb\u0001\u0014\u0002*%\u0019\u00111F\b\u0002%M\u0003\u0018M]6GYVlW\r\u0015:pi>\u001cw\u000e\\\u0005\u0005\u0003_\t\tD\u0001\u0005DC2d'-Y2l\u0015\r\tYc\u0004\u0005\b\u0003kY\u0001\u0019AA\u001c\u0003\u001d\tG\r\u001a:fgN\u0004B!!\u000f\u0002@5\u0011\u00111\b\u0006\u0004\u0003{\t\u0016a\u00018fi&!\u0011\u0011IA\u001e\u0005EIe.\u001a;T_\u000e\\W\r^!eIJ,7o\u001d\u0005\u0006w.\u0001\r!K\u0001\u0015CN\u001cXM\u001d;DQ\u0006tg.\u001a7Jg\u0016k\u0007\u000f^=\u0015\u0007Q\nI\u0005C\u0003E\u0019\u0001\u0007\u0011)A\u000bbm\u0006LG.\u00192mK\u000eC\u0017M\u001c8fYNcw\u000e^:\u0015\u0007%\ny\u0005C\u0003E\u001b\u0001\u0007\u0011\t")
public class SparkSinkSuite
extends FunSuite {
    private final int eventsPerBatch;
    private final int channelCapacity;

    public int eventsPerBatch() {
        return this.eventsPerBatch;
    }

    public int channelCapacity() {
        return this.channelCapacity;
    }

    public void testMultipleConsumers(boolean failSome) {
        BoxedUnit boxedUnit;
        ExecutionContextExecutorService executorContext = ExecutionContext$.MODULE$.fromExecutorService(Executors.newFixedThreadPool(5));
        Tuple3<MemoryChannel, SparkSink, CountDownLatch> tuple3 = this.initializeChannelAndSink((scala.collection.immutable.Map<String, String>)Predef$.MODULE$.Map().empty(), 5);
        if (tuple3 == null) {
            throw new MatchError(tuple3);
        }
        MemoryChannel channel = (MemoryChannel)tuple3._1();
        SparkSink sink = (SparkSink)tuple3._2();
        CountDownLatch latch = (CountDownLatch)tuple3._3();
        Tuple3 tuple32 = new Tuple3((Object)channel, (Object)sink, (Object)latch);
        Tuple3 tuple33 = tuple32;
        MemoryChannel channel2 = (MemoryChannel)tuple33._1();
        SparkSink sink2 = (SparkSink)tuple33._2();
        CountDownLatch latch2 = (CountDownLatch)tuple33._3();
        channel2.start();
        sink2.start();
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), 5).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x$8 -> this.putEvents(channel2, this.eventsPerBatch()));
        int port = sink2.getPort();
        InetSocketAddress address = new InetSocketAddress("0.0.0.0", port);
        Seq<Tuple2<NettyTransceiver, SparkFlumeProtocol.Callback>> transceiversAndClients = this.getTransceiverAndClient(address, 5);
        CountDownLatch batchCounter = new CountDownLatch(5);
        AtomicInteger counter = new AtomicInteger(0);
        transceiversAndClients.foreach((Function1 & Serializable & scala.Serializable)x -> {
            SparkSinkSuite.$anonfun$testMultipleConsumers$2(this, failSome, counter, executorContext, batchCounter, x);
            return BoxedUnit.UNIT;
        });
        batchCounter.await();
        latch2.await(1L, TimeUnit.SECONDS);
        executorContext.shutdown();
        if (failSome) {
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)this.availableChannelSlots(channel2)));
            int $org_scalatest_assert_macro_right = 3000;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            boxedUnit = this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 153));
        } else {
            this.assertChannelIsEmpty(channel2);
            boxedUnit = BoxedUnit.UNIT;
        }
        sink2.stop();
        channel2.stop();
        transceiversAndClients.foreach((Function1 & Serializable & scala.Serializable)x -> {
            SparkSinkSuite.$anonfun$testMultipleConsumers$5(x);
            return BoxedUnit.UNIT;
        });
    }

    private Tuple3<MemoryChannel, SparkSink, CountDownLatch> initializeChannelAndSink(scala.collection.immutable.Map<String, String> overrides, int batchCounter) {
        MemoryChannel channel = new MemoryChannel();
        Context channelContext = new Context();
        channelContext.put("capacity", ((Object)BoxesRunTime.boxToInteger((int)this.channelCapacity())).toString());
        channelContext.put("transactionCapacity", ((Object)BoxesRunTime.boxToInteger((int)1000)).toString());
        channelContext.put("keep-alive", ((Object)BoxesRunTime.boxToInteger((int)0)).toString());
        channelContext.putAll((Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(overrides).asJava());
        channel.setName(Random$.MODULE$.nextString(10));
        channel.configure(channelContext);
        SparkSink sink = new SparkSink();
        Context sinkContext = new Context();
        sinkContext.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "0.0.0.0");
        sinkContext.put(SparkSinkConfig$.MODULE$.CONF_PORT(), ((Object)BoxesRunTime.boxToInteger((int)0)).toString());
        sink.configure(sinkContext);
        sink.setChannel((Channel)channel);
        CountDownLatch latch = new CountDownLatch(batchCounter);
        sink.countdownWhenBatchReceived(latch);
        return new Tuple3((Object)channel, (Object)sink, (Object)latch);
    }

    private scala.collection.immutable.Map<String, String> initializeChannelAndSink$default$1() {
        return Predef$.MODULE$.Map().empty();
    }

    private int initializeChannelAndSink$default$2() {
        return 1;
    }

    private void putEvents(MemoryChannel ch, int count) {
        Transaction tx = ch.getTransaction();
        tx.begin();
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), count).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)x -> ch.put(EventBuilder.withBody((byte[])((Object)BoxesRunTime.boxToInteger((int)x)).toString().getBytes(StandardCharsets.UTF_8))));
        tx.commit();
        tx.close();
    }

    private Seq<Tuple2<NettyTransceiver, SparkFlumeProtocol.Callback>> getTransceiverAndClient(InetSocketAddress address, int count) {
        return (Seq)RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(1), count).map((Function1 & Serializable & scala.Serializable)x$9 -> SparkSinkSuite.$anonfun$getTransceiverAndClient$1(address, BoxesRunTime.unboxToInt((Object)x$9)), IndexedSeq$.MODULE$.canBuildFrom());
    }

    private void assertChannelIsEmpty(MemoryChannel channel) {
        TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)this.availableChannelSlots(channel)));
        int $org_scalatest_assert_macro_right = this.channelCapacity();
        Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
        this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 209));
    }

    private int availableChannelSlots(MemoryChannel channel) {
        Field queueRemaining = channel.getClass().getDeclaredField("queueRemaining");
        queueRemaining.setAccessible(true);
        Method m = queueRemaining.get(channel).getClass().getDeclaredMethod("availablePermits", new Class[0]);
        return BoxesRunTime.unboxToInt((Object)m.invoke(queueRemaining.get(channel), new Object[0]));
    }

    public static final /* synthetic */ void $anonfun$testMultipleConsumers$4(SparkSinkSuite $this, CountDownLatch batchCounter$1, Try x0$1) {
        Try try_ = x0$1;
        if (try_ instanceof Success) {
            Success success = (Success)try_;
            EventBatch events = (EventBatch)success.value();
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = $this.convertToEqualizer(BoxesRunTime.boxToInteger((int)events.getEvents().size()));
            int $org_scalatest_assert_macro_right = 1000;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            $this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 142));
            batchCounter$1.countDown();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else if (try_ instanceof Failure) {
            batchCounter$1.countDown();
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            throw new MatchError((Object)try_);
        }
    }

    public static final /* synthetic */ void $anonfun$testMultipleConsumers$2(SparkSinkSuite $this, boolean failSome$1, AtomicInteger counter$1, ExecutionContextExecutorService executorContext$1, CountDownLatch batchCounter$1, Tuple2 x) {
        Future$.MODULE$.apply((Function0 & Serializable & scala.Serializable)() -> {
            SparkFlumeProtocol.Callback client = (SparkFlumeProtocol.Callback)x._2();
            EventBatch events = client.getEventBatch(1000);
            if (failSome$1 && counter$1.getAndIncrement() % 2 != 0) {
                client.nack(events.getSequenceNumber());
                throw new RuntimeException("Sending NACK for failure!");
            }
            client.ack(events.getSequenceNumber());
            return events;
        }, (ExecutionContext)executorContext$1).onComplete((Function1 & Serializable & scala.Serializable)x0$1 -> {
            SparkSinkSuite.$anonfun$testMultipleConsumers$4($this, batchCounter$1, x0$1);
            return BoxedUnit.UNIT;
        }, (ExecutionContext)executorContext$1);
    }

    public static final /* synthetic */ void $anonfun$testMultipleConsumers$5(Tuple2 x) {
        ((NettyTransceiver)x._1()).close();
    }

    private static final /* synthetic */ ExecutorService channelFactoryExecutor$lzycompute$1(LazyRef channelFactoryExecutor$lzy$1) {
        ExecutorService executorService;
        LazyRef lazyRef = channelFactoryExecutor$lzy$1;
        synchronized (lazyRef) {
            executorService = channelFactoryExecutor$lzy$1.initialized() ? (ExecutorService)channelFactoryExecutor$lzy$1.value() : (ExecutorService)channelFactoryExecutor$lzy$1.initialize((Object)Executors.newCachedThreadPool((ThreadFactory)new SparkSinkThreadFactory("Flume Receiver Channel Thread - %d")));
        }
        return executorService;
    }

    private static final ExecutorService channelFactoryExecutor$1(LazyRef channelFactoryExecutor$lzy$1) {
        return channelFactoryExecutor$lzy$1.initialized() ? (ExecutorService)channelFactoryExecutor$lzy$1.value() : SparkSinkSuite.channelFactoryExecutor$lzycompute$1(channelFactoryExecutor$lzy$1);
    }

    private static final /* synthetic */ NioClientSocketChannelFactory channelFactory$lzycompute$1(LazyRef channelFactory$lzy$1, LazyRef channelFactoryExecutor$lzy$1) {
        NioClientSocketChannelFactory nioClientSocketChannelFactory;
        LazyRef lazyRef = channelFactory$lzy$1;
        synchronized (lazyRef) {
            nioClientSocketChannelFactory = channelFactory$lzy$1.initialized() ? (NioClientSocketChannelFactory)channelFactory$lzy$1.value() : (NioClientSocketChannelFactory)channelFactory$lzy$1.initialize((Object)new NioClientSocketChannelFactory((Executor)SparkSinkSuite.channelFactoryExecutor$1(channelFactoryExecutor$lzy$1), (Executor)SparkSinkSuite.channelFactoryExecutor$1(channelFactoryExecutor$lzy$1)));
        }
        return nioClientSocketChannelFactory;
    }

    private static final NioClientSocketChannelFactory channelFactory$1(LazyRef channelFactory$lzy$1, LazyRef channelFactoryExecutor$lzy$1) {
        return channelFactory$lzy$1.initialized() ? (NioClientSocketChannelFactory)channelFactory$lzy$1.value() : SparkSinkSuite.channelFactory$lzycompute$1(channelFactory$lzy$1, channelFactoryExecutor$lzy$1);
    }

    public static final /* synthetic */ Tuple2 $anonfun$getTransceiverAndClient$1(InetSocketAddress address$1, int x$9) {
        LazyRef channelFactoryExecutor$lzy = new LazyRef();
        LazyRef channelFactory$lzy = new LazyRef();
        NettyTransceiver transceiver = new NettyTransceiver(address$1, (ChannelFactory)SparkSinkSuite.channelFactory$1(channelFactory$lzy, channelFactoryExecutor$lzy));
        SparkFlumeProtocol.Callback client = (SparkFlumeProtocol.Callback)SpecificRequestor.getClient(SparkFlumeProtocol.Callback.class, (Transceiver)transceiver);
        return new Tuple2((Object)transceiver, (Object)client);
    }

    public SparkSinkSuite() {
        this.eventsPerBatch = 1000;
        this.channelCapacity = 5000;
        this.test("Success with ack", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            Tuple3<MemoryChannel, SparkSink, CountDownLatch> tuple3 = this.initializeChannelAndSink(this.initializeChannelAndSink$default$1(), this.initializeChannelAndSink$default$2());
            if (tuple3 == null) {
                throw new MatchError(tuple3);
            }
            MemoryChannel channel = (MemoryChannel)tuple3._1();
            SparkSink sink = (SparkSink)tuple3._2();
            CountDownLatch latch = (CountDownLatch)tuple3._3();
            Tuple3 tuple32 = new Tuple3((Object)channel, (Object)sink, (Object)latch);
            Tuple3 tuple33 = tuple32;
            MemoryChannel channel2 = (MemoryChannel)tuple33._1();
            SparkSink sink2 = (SparkSink)tuple33._2();
            CountDownLatch latch2 = (CountDownLatch)tuple33._3();
            channel2.start();
            sink2.start();
            this.putEvents(channel2, this.eventsPerBatch());
            int port = sink2.getPort();
            InetSocketAddress address = new InetSocketAddress("0.0.0.0", port);
            Tuple2 tuple2 = (Tuple2)this.getTransceiverAndClient(address, 1).apply(0);
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            NettyTransceiver transceiver = (NettyTransceiver)tuple2._1();
            SparkFlumeProtocol.Callback client = (SparkFlumeProtocol.Callback)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)transceiver, (Object)client);
            Tuple2 tuple23 = tuple22;
            NettyTransceiver transceiver2 = (NettyTransceiver)tuple23._1();
            SparkFlumeProtocol.Callback client2 = (SparkFlumeProtocol.Callback)tuple23._2();
            EventBatch events = client2.getEventBatch(1000);
            client2.ack(events.getSequenceNumber());
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)events.getEvents().size()));
            int $org_scalatest_assert_macro_right = 1000;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 62));
            latch2.await(1L, TimeUnit.SECONDS);
            this.assertChannelIsEmpty(channel2);
            sink2.stop();
            channel2.stop();
            transceiver2.close();
        }, new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 49));
        this.test("Failure with nack", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            Tuple3<MemoryChannel, SparkSink, CountDownLatch> tuple3 = this.initializeChannelAndSink(this.initializeChannelAndSink$default$1(), this.initializeChannelAndSink$default$2());
            if (tuple3 == null) {
                throw new MatchError(tuple3);
            }
            MemoryChannel channel = (MemoryChannel)tuple3._1();
            SparkSink sink = (SparkSink)tuple3._2();
            CountDownLatch latch = (CountDownLatch)tuple3._3();
            Tuple3 tuple32 = new Tuple3((Object)channel, (Object)sink, (Object)latch);
            Tuple3 tuple33 = tuple32;
            MemoryChannel channel2 = (MemoryChannel)tuple33._1();
            SparkSink sink2 = (SparkSink)tuple33._2();
            CountDownLatch latch2 = (CountDownLatch)tuple33._3();
            channel2.start();
            sink2.start();
            this.putEvents(channel2, this.eventsPerBatch());
            int port = sink2.getPort();
            InetSocketAddress address = new InetSocketAddress("0.0.0.0", port);
            Tuple2 tuple2 = (Tuple2)this.getTransceiverAndClient(address, 1).apply(0);
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            NettyTransceiver transceiver = (NettyTransceiver)tuple2._1();
            SparkFlumeProtocol.Callback client = (SparkFlumeProtocol.Callback)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)transceiver, (Object)client);
            Tuple2 tuple23 = tuple22;
            NettyTransceiver transceiver2 = (NettyTransceiver)tuple23._1();
            SparkFlumeProtocol.Callback client2 = (SparkFlumeProtocol.Callback)tuple23._2();
            EventBatch events = client2.getEventBatch(1000);
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)events.getEvents().size()));
            int $org_scalatest_assert_macro_right = 1000;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 81));
            client2.nack(events.getSequenceNumber());
            latch2.await(1L, TimeUnit.SECONDS);
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)this.availableChannelSlots(channel2)));
            int $org_scalatest_assert_macro_right2 = 4000;
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 84));
            sink2.stop();
            channel2.stop();
            transceiver2.close();
        }, new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 70));
        this.test("Failure with timeout", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            Tuple3<MemoryChannel, SparkSink, CountDownLatch> tuple3 = this.initializeChannelAndSink((scala.collection.immutable.Map<String, String>)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)SparkSinkConfig$.MODULE$.CONF_TRANSACTION_TIMEOUT()), (Object)((Object)BoxesRunTime.boxToInteger((int)1)).toString())}))), this.initializeChannelAndSink$default$2());
            if (tuple3 == null) {
                throw new MatchError(tuple3);
            }
            MemoryChannel channel = (MemoryChannel)tuple3._1();
            SparkSink sink = (SparkSink)tuple3._2();
            CountDownLatch latch = (CountDownLatch)tuple3._3();
            Tuple3 tuple32 = new Tuple3((Object)channel, (Object)sink, (Object)latch);
            Tuple3 tuple33 = tuple32;
            MemoryChannel channel2 = (MemoryChannel)tuple33._1();
            SparkSink sink2 = (SparkSink)tuple33._2();
            CountDownLatch latch2 = (CountDownLatch)tuple33._3();
            channel2.start();
            sink2.start();
            this.putEvents(channel2, this.eventsPerBatch());
            int port = sink2.getPort();
            InetSocketAddress address = new InetSocketAddress("0.0.0.0", port);
            Tuple2 tuple2 = (Tuple2)this.getTransceiverAndClient(address, 1).apply(0);
            if (tuple2 == null) {
                throw new MatchError((Object)tuple2);
            }
            NettyTransceiver transceiver = (NettyTransceiver)tuple2._1();
            SparkFlumeProtocol.Callback client = (SparkFlumeProtocol.Callback)tuple2._2();
            Tuple2 tuple22 = new Tuple2((Object)transceiver, (Object)client);
            Tuple2 tuple23 = tuple22;
            NettyTransceiver transceiver2 = (NettyTransceiver)tuple23._1();
            SparkFlumeProtocol.Callback client2 = (SparkFlumeProtocol.Callback)tuple23._2();
            EventBatch events = client2.getEventBatch(1000);
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)events.getEvents().size()));
            int $org_scalatest_assert_macro_right = 1000;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 101));
            latch2.await(1L, TimeUnit.SECONDS);
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left2 = this.convertToEqualizer(BoxesRunTime.boxToInteger((int)this.availableChannelSlots(channel2)));
            int $org_scalatest_assert_macro_right2 = 4000;
            Bool $org_scalatest_assert_macro_expr2 = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left2, "===", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), $org_scalatest_assert_macro_left2.$eq$eq$eq((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            this.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr2, (Object)"", Prettifier$.MODULE$.default(), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 103));
            sink2.stop();
            channel2.stop();
            transceiver2.close();
        }, new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 90));
        this.test("Multiple consumers", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.testMultipleConsumers(false), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 109));
        this.test("Multiple consumers with some failures", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.testMultipleConsumers(true), new Position("SparkSinkSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 113));
    }
}

