/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.flume;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.channel.AbstractChannel;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.spark.streaming.flume.sink.SparkSink;
import org.apache.spark.streaming.flume.sink.SparkSinkConfig$;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005-c!\u0002\u000e\u001c\u0001m)\u0003\"\u0002\u0017\u0001\t\u0003q\u0003bB\u0019\u0001\u0005\u0004%IA\r\u0005\u0007m\u0001\u0001\u000b\u0011B\u001a\t\u000f]\u0002!\u0019!C\u0001e!1\u0001\b\u0001Q\u0001\nMBq!\u000f\u0001C\u0002\u0013%!\u0007\u0003\u0004;\u0001\u0001\u0006Ia\r\u0005\bw\u0001\u0011\r\u0011\"\u00033\u0011\u0019a\u0004\u0001)A\u0005g!)Q\b\u0001C\u0001e!9a\b\u0001b\u0001\n\u0013y\u0004BB(\u0001A\u0003%\u0001\tC\u0004Q\u0001\t\u0007I\u0011B)\t\re\u0003\u0001\u0015!\u0003S\u0011\u0015Q\u0006\u0001\"\u0001\\\u0011\u0015a\u0006\u0001\"\u0001^\u0011\u0015Q\u0007\u0001\"\u0001l\u0011\u0015y\u0007\u0001\"\u0001q\u0011\u0019\t\u0019\u0002\u0001C\u0001W\"9\u0011Q\u0003\u0001\u0005\n\u0005]\u0001BBA\u000e\u0001\u0011\u00051N\u0002\u0004\u0002\u001e\u0001!\u0011q\u0004\u0005\t\u0017Z\u0011\t\u0011)A\u0005\u0011\"1AF\u0006C\u0001\u0003\u007fAq!a\u0012\u0017\t\u0003\nIEA\u000bQ_2d\u0017N\\4GYVlW\rV3tiV#\u0018\u000e\\:\u000b\u0005qi\u0012!\u00024mk6,'B\u0001\u0010 \u0003%\u0019HO]3b[&twM\u0003\u0002!C\u0005)1\u000f]1sW*\u0011!eI\u0001\u0007CB\f7\r[3\u000b\u0003\u0011\n1a\u001c:h'\t\u0001a\u0005\u0005\u0002(U5\t\u0001FC\u0001*\u0003\u0015\u00198-\u00197b\u0013\tY\u0003F\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0004\u0001Q\tq\u0006\u0005\u00021\u00015\t1$\u0001\u0006cCR\u001c\u0007nQ8v]R,\u0012a\r\t\u0003OQJ!!\u000e\u0015\u0003\u0007%sG/A\u0006cCR\u001c\u0007nQ8v]R\u0004\u0013AD3wK:$8\u000fU3s\u0005\u0006$8\r[\u0001\u0010KZ,g\u000e^:QKJ\u0014\u0015\r^2iA\u0005)Bo\u001c;bY\u00163XM\u001c;t!\u0016\u00148\t[1o]\u0016d\u0017A\u0006;pi\u0006dWI^3oiN\u0004VM]\"iC:tW\r\u001c\u0011\u0002\u001f\rD\u0017M\u001c8fY\u000e\u000b\u0007/Y2jif\f\u0001c\u00195b]:,GnQ1qC\u000eLG/\u001f\u0011\u0002\u001d\u001d,G\u000fV8uC2,e/\u001a8ug\u0006A1\r[1o]\u0016d7/F\u0001A!\r\te\tS\u0007\u0002\u0005*\u00111\tR\u0001\b[V$\u0018M\u00197f\u0015\t)\u0005&\u0001\u0006d_2dWm\u0019;j_:L!a\u0012\"\u0003\u0017\u0005\u0013(/Y=Ck\u001a4WM\u001d\t\u0003\u00136k\u0011A\u0013\u0006\u0003\u00172\u000bqa\u00195b]:,GN\u0003\u0002\u001dC%\u0011aJ\u0013\u0002\u000e\u001b\u0016lwN]=DQ\u0006tg.\u001a7\u0002\u0013\rD\u0017M\u001c8fYN\u0004\u0013!B:j].\u001cX#\u0001*\u0011\u0007\u000535\u000b\u0005\u0002U/6\tQK\u0003\u0002W7\u0005!1/\u001b8l\u0013\tAVKA\u0005Ta\u0006\u00148nU5oW\u000611/\u001b8lg\u0002\nqb\u001d;beR\u001c\u0016N\\4mKNKgn\u001b\u000b\u0002g\u0005\u00112\u000f^1si6+H\u000e^5qY\u0016\u001c\u0016N\\6t)\u0005q\u0006cA0hg9\u0011\u0001-\u001a\b\u0003C\u0012l\u0011A\u0019\u0006\u0003G6\na\u0001\u0010:p_Rt\u0014\"A\u0015\n\u0005\u0019D\u0013a\u00029bG.\fw-Z\u0005\u0003Q&\u00141aU3r\u0015\t1\u0007&A\u0014tK:$G)\u0019;b\u0003:$WI\\:ve\u0016\fE\u000e\u001c#bi\u0006D\u0015m\u001d\"fK:\u0014VmY3jm\u0016$G#\u00017\u0011\u0005\u001dj\u0017B\u00018)\u0005\u0011)f.\u001b;\u0002\u0019\u0005\u001c8/\u001a:u\u001fV$\b/\u001e;\u0015\t1\f\u0018Q\u0002\u0005\u0006eJ\u0001\ra]\u0001\u000e_V$\b/\u001e;IK\u0006$WM]:\u0011\u0007QL80D\u0001v\u0015\t1x/\u0001\u0003vi&d'\"\u0001=\u0002\t)\fg/Y\u0005\u0003uV\u0014A\u0001T5tiB!A\u000f @\u007f\u0013\tiXOA\u0002NCB\u00042a`A\u0004\u001d\u0011\t\t!a\u0001\u0011\u0005\u0005D\u0013bAA\u0003Q\u00051\u0001K]3eK\u001aLA!!\u0003\u0002\f\t11\u000b\u001e:j]\u001eT1!!\u0002)\u0011\u001d\tyA\u0005a\u0001\u0003#\tAb\\;uaV$(i\u001c3jKN\u00042\u0001^=\u007f\u0003Y\t7o]3si\u000eC\u0017M\u001c8fYN\f%/Z#naRL\u0018\u0001F1tg\u0016\u0014Ho\u00115b]:,G.S:F[B$\u0018\u0010F\u0002m\u00033AQa\u0013\u000bA\u0002!\u000bQa\u00197pg\u0016\u0014A\u0002\u0016=o'V\u0014W.\u001b;uKJ\u001cRAFA\u0011\u0003[\u0001B!a\t\u0002*5\u0011\u0011Q\u0005\u0006\u0004\u0003O9\u0018\u0001\u00027b]\u001eLA!a\u000b\u0002&\t1qJ\u00196fGR\u0004b!a\f\u00026\u0005eRBAA\u0019\u0015\r\t\u0019$^\u0001\u000bG>t7-\u001e:sK:$\u0018\u0002BA\u001c\u0003c\u0011\u0001bQ1mY\u0006\u0014G.\u001a\t\u0005\u0003G\tY$\u0003\u0003\u0002>\u0005\u0015\"\u0001\u0002,pS\u0012$B!!\u0011\u0002FA\u0019\u00111\t\f\u000e\u0003\u0001AQa\u0013\rA\u0002!\u000bAaY1mYR\u0011\u0011\u0011\b")
public class PollingFlumeTestUtils {
    private final int org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount;
    private final int eventsPerBatch;
    private final int totalEventsPerChannel = this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() * this.eventsPerBatch();
    private final int channelCapacity;
    private final ArrayBuffer<MemoryChannel> channels = new ArrayBuffer();
    private final ArrayBuffer<SparkSink> sinks = new ArrayBuffer();

    public int org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() {
        return this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount;
    }

    public int eventsPerBatch() {
        return this.eventsPerBatch;
    }

    private int totalEventsPerChannel() {
        return this.totalEventsPerChannel;
    }

    private int channelCapacity() {
        return this.channelCapacity;
    }

    public int getTotalEvents() {
        return this.totalEventsPerChannel() * this.channels().size();
    }

    private ArrayBuffer<MemoryChannel> channels() {
        return this.channels;
    }

    private ArrayBuffer<SparkSink> sinks() {
        return this.sinks;
    }

    public int startSingleSink() {
        this.channels().clear();
        this.sinks().clear();
        Context context = new Context();
        context.put("capacity", ((Object)BoxesRunTime.boxToInteger((int)this.channelCapacity())).toString());
        context.put("transactionCapacity", "1000");
        context.put("keep-alive", "0");
        MemoryChannel channel = new MemoryChannel();
        Configurables.configure((Object)channel, context);
        SparkSink sink = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure((Object)sink, context);
        sink.setChannel(channel);
        sink.start();
        this.channels().$plus$eq((Object)channel);
        this.sinks().$plus$eq((Object)sink);
        return sink.getPort();
    }

    public Seq<Object> startMultipleSinks() {
        this.channels().clear();
        this.sinks().clear();
        Context context = new Context();
        context.put("capacity", ((Object)BoxesRunTime.boxToInteger((int)this.channelCapacity())).toString());
        context.put("transactionCapacity", "1000");
        context.put("keep-alive", "0");
        MemoryChannel channel = new MemoryChannel();
        Configurables.configure((Object)channel, context);
        MemoryChannel channel2 = new MemoryChannel();
        Configurables.configure((Object)channel2, context);
        SparkSink sink = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure((Object)sink, context);
        sink.setChannel(channel);
        sink.start();
        SparkSink sink2 = new SparkSink();
        context.put(SparkSinkConfig$.MODULE$.CONF_HOSTNAME(), "localhost");
        context.put(SparkSinkConfig$.MODULE$.CONF_PORT(), String.valueOf(0));
        Configurables.configure((Object)sink2, context);
        sink2.setChannel(channel2);
        sink2.start();
        this.sinks().$plus$eq((Object)sink);
        this.sinks().$plus$eq((Object)sink2);
        this.channels().$plus$eq((Object)channel);
        this.channels().$plus$eq((Object)channel2);
        return (Seq)this.sinks().map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToInteger((int)x$1.getPort()), ArrayBuffer$.MODULE$.canBuildFrom());
    }

    public void sendDataAndEnsureAllDataHasBeenReceived() {
        ExecutorService executor = Executors.newCachedThreadPool();
        ExecutorCompletionService executorCompletion = new ExecutorCompletionService(executor);
        CountDownLatch latch = new CountDownLatch(this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount() * this.channels().size());
        this.sinks().foreach((Function1 & Serializable & scala.Serializable)x$2 -> {
            x$2.countdownWhenBatchReceived(latch);
            return BoxedUnit.UNIT;
        });
        this.channels().foreach((Function1 & Serializable & scala.Serializable)channel -> executorCompletion.submit(new TxnSubmitter(this, (MemoryChannel)channel)));
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.channels().size()).foreach((Function1 & Serializable & scala.Serializable)i -> executorCompletion.take());
        latch.await(15L, TimeUnit.SECONDS);
    }

    public void assertOutput(List<Map<String, String>> outputHeaders, List<String> outputBodies) {
        Predef$.MODULE$.require(outputHeaders.size() == outputBodies.size());
        int eventSize = outputHeaders.size();
        if (eventSize != this.totalEventsPerChannel() * this.channels().size()) {
            throw new AssertionError((Object)new StringBuilder(26).append("Expected ").append(this.totalEventsPerChannel() * this.channels().size()).append(" events, but was ").append(eventSize).toString());
        }
        IntRef counter = IntRef.create((int)0);
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.channels().size()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)k -> RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.totalEventsPerChannel()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            String eventBodyToVerify = new StringBuilder(1).append(((AbstractChannel)this.channels().apply(k)).getName()).append("-").append(i).toString();
            Map<String, String> eventHeaderToVerify = Collections.singletonMap(new StringBuilder(5).append("test-").append(i).toString(), "header");
            boolean found = false;
            for (int j = 0; j < eventSize && !found; ++j) {
                String string = eventBodyToVerify;
                Object e = outputBodies.get(j);
                if (string != null ? !string.equals(e) : e != null) continue;
                Map<String, String> map = eventHeaderToVerify;
                Object e2 = outputHeaders.get(j);
                if (map != null ? !((Object)map).equals(e2) : e2 != null) continue;
                found = true;
                ++counter$1.elem;
            }
        }));
        if (counter.elem != this.totalEventsPerChannel() * this.channels().size()) {
            throw new AssertionError((Object)new StringBuilder(30).append("111 Expected ").append(this.totalEventsPerChannel() * this.channels().size()).append(" events, but was ").append(counter.elem).toString());
        }
    }

    public void assertChannelsAreEmpty() {
        this.channels().foreach((Function1 & Serializable & scala.Serializable)channel -> {
            this.assertChannelIsEmpty(channel);
            return BoxedUnit.UNIT;
        });
    }

    private void assertChannelIsEmpty(MemoryChannel channel) {
        Field queueRemaining = channel.getClass().getDeclaredField("queueRemaining");
        queueRemaining.setAccessible(true);
        Method m = queueRemaining.get(channel).getClass().getDeclaredMethod("availablePermits", new Class[0]);
        if (BoxesRunTime.unboxToInt((Object)m.invoke(queueRemaining.get(channel), new Object[0])) != this.channelCapacity()) {
            throw new AssertionError((Object)new StringBuilder(21).append("Channel ").append(channel.getName()).append(" is not empty").toString());
        }
    }

    public void close() {
        this.sinks().foreach((Function1 & Serializable & scala.Serializable)x$3 -> {
            x$3.stop();
            return BoxedUnit.UNIT;
        });
        this.sinks().clear();
        this.channels().foreach((Function1 & Serializable & scala.Serializable)x$4 -> {
            x$4.stop();
            return BoxedUnit.UNIT;
        });
        this.channels().clear();
    }

    public PollingFlumeTestUtils() {
        this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount = 5;
        this.eventsPerBatch = 100;
        this.channelCapacity = 5000;
    }

    public class TxnSubmitter
    implements Callable<Void> {
        private final MemoryChannel channel;
        public final /* synthetic */ PollingFlumeTestUtils $outer;

        @Override
        public Void call() {
            IntRef t = IntRef.create((int)0);
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$$outer().org$apache$spark$streaming$flume$PollingFlumeTestUtils$$batchCount()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                Transaction tx = $this.channel.getTransaction();
                tx.begin();
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$$outer().eventsPerBatch()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)j -> {
                    $this.channel.put(EventBuilder.withBody(new StringBuilder(1).append($this.channel.getName()).append("-").append(t$1.elem).toString().getBytes(StandardCharsets.UTF_8), Collections.singletonMap(new StringBuilder(5).append("test-").append(t$1.elem).toString(), "header")));
                    ++t$1.elem;
                });
                tx.commit();
                tx.close();
                Thread.sleep(500L);
            });
            return null;
        }

        public /* synthetic */ PollingFlumeTestUtils org$apache$spark$streaming$flume$PollingFlumeTestUtils$TxnSubmitter$$$outer() {
            return this.$outer;
        }

        public TxnSubmitter(PollingFlumeTestUtils $outer, MemoryChannel channel) {
            this.channel = channel;
            if ($outer == null) {
                throw null;
            }
            this.$outer = $outer;
        }
    }
}

