/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.ojai.store.impl;

import com.mapr.ojai.store.impl.BlockingHandoff;
import com.mapr.tests.BaseTest;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

public class TestBlockingHandoff
extends BaseTest {
    private static final ExecutorService executorService = new ThreadPoolExecutor(2, 100, 2L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){

        @Override
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            String threadName = "TestBlockingHandoff-" + thread.getName();
            thread.setName(threadName);
            thread.setDaemon(true);
            return thread;
        }
    });

    @AfterClass
    public static void afterClass() {
        executorService.shutdownNow();
    }

    private static void uncheckedAwait(CountDownLatch latch) {
        try {
            latch.await();
        }
        catch (InterruptedException ie) {
            throw new RuntimeException(ie);
        }
    }

    @Test
    public void testProducerConsumer() {
        TestHandoff<Integer> handoff = new TestHandoff<Integer>(Long.MAX_VALUE);
        int maxValue = 1000;
        CountDownLatch producerStartLatch = new CountDownLatch(1);
        CountDownLatch producerDoneLatch = new CountDownLatch(1);
        IntegerProducer producer = new IntegerProducer(handoff, producerStartLatch, 1000, producerDoneLatch);
        CountDownLatch consumerDoneLatch = new CountDownLatch(1);
        IntegerConsumer consumer = new IntegerConsumer((BlockingHandoff<Integer>)handoff, consumerDoneLatch);
        executorService.execute(producer);
        executorService.execute(new LatchedEos<Integer>(producerDoneLatch, handoff));
        executorService.execute(consumer);
        producerStartLatch.countDown();
        TestBlockingHandoff.uncheckedAwait(consumerDoneLatch);
        Assert.assertEquals((long)1001L, (long)producer.value.get());
        Assert.assertEquals((long)1000L, (long)consumer.lastValue);
    }

    @Test
    public void testManyProducers() {
        TestHandoff<Integer> handoff = new TestHandoff<Integer>(Long.MAX_VALUE);
        int maxValue = 1000;
        int numProducers = 10;
        CountDownLatch producerStartLatch = new CountDownLatch(1);
        CountDownLatch producerDoneLatch = new CountDownLatch(10);
        AtomicInteger producerInt = new AtomicInteger(0);
        CountDownLatch consumerDoneLatch = new CountDownLatch(1);
        Consumer consumer = new Consumer(handoff, consumerDoneLatch);
        for (int i = 0; i < 10; ++i) {
            executorService.execute(new IntegerProducer(handoff, producerStartLatch, producerDoneLatch, 1000, producerInt));
        }
        executorService.execute(new LatchedEos(producerDoneLatch, handoff));
        executorService.execute(consumer);
        producerStartLatch.countDown();
        TestBlockingHandoff.uncheckedAwait(consumerDoneLatch);
        Assert.assertEquals((long)1000L, (long)consumer.numConsumed.get());
    }

    @Test
    public void testManyConsumers() {
        TestHandoff<Integer> handoff = new TestHandoff<Integer>(Long.MAX_VALUE);
        int maxValue = 1000;
        CountDownLatch producerStartLatch = new CountDownLatch(1);
        CountDownLatch producerDoneLatch = new CountDownLatch(1);
        IntegerProducer producer = new IntegerProducer(handoff, producerStartLatch, 1000, producerDoneLatch);
        int numConsumers = 10;
        CountDownLatch consumerDoneLatch = new CountDownLatch(10);
        AtomicInteger numConsumed = new AtomicInteger(0);
        executorService.execute(producer);
        executorService.execute(new LatchedEos<Integer>(producerDoneLatch, handoff));
        for (int i = 0; i < 10; ++i) {
            executorService.execute(new Consumer<Integer>(numConsumed, handoff, consumerDoneLatch));
        }
        producerStartLatch.countDown();
        TestBlockingHandoff.uncheckedAwait(consumerDoneLatch);
        Assert.assertEquals((long)1000L, (long)numConsumed.get());
    }

    @Test
    public void testManyMany() {
        int i;
        TestHandoff<Integer> handoff = new TestHandoff<Integer>(Long.MAX_VALUE);
        int maxValue = 1000;
        int numProducers = 10;
        CountDownLatch producerStartLatch = new CountDownLatch(1);
        CountDownLatch producerDoneLatch = new CountDownLatch(10);
        AtomicInteger producerInt = new AtomicInteger(0);
        int numConsumers = 10;
        CountDownLatch consumerDoneLatch = new CountDownLatch(10);
        AtomicInteger numConsumed = new AtomicInteger(0);
        for (i = 0; i < 10; ++i) {
            executorService.execute(new IntegerProducer(handoff, producerStartLatch, producerDoneLatch, 1000, producerInt));
        }
        executorService.execute(new LatchedEos(producerDoneLatch, handoff));
        for (i = 0; i < 10; ++i) {
            executorService.execute(new Consumer<Integer>(numConsumed, handoff, consumerDoneLatch));
        }
        producerStartLatch.countDown();
        TestBlockingHandoff.uncheckedAwait(consumerDoneLatch);
        Assert.assertEquals((long)1000L, (long)numConsumed.get());
    }

    private static class IntegerConsumer
    extends Consumer<Integer> {
        int lastValue;

        public IntegerConsumer(BlockingHandoff<Integer> handoff, CountDownLatch doneLatch) {
            super(handoff, doneLatch);
        }

        @Override
        protected boolean verifyValue(Integer value) {
            if (value <= this.lastValue) {
                Assert.fail((String)String.format("values not increasing! (value = %d, lastValue = %d)", value, this.lastValue));
                return false;
            }
            this.lastValue = value;
            return true;
        }
    }

    private static class IntegerProducer
    extends Producer<Integer> {
        private final int maxValue;
        final AtomicInteger value;

        public IntegerProducer(BlockingHandoff<Integer> handoff, CountDownLatch startLatch, int maxValue, CountDownLatch completionLatch) {
            this(handoff, startLatch, completionLatch, maxValue, new AtomicInteger(0));
        }

        public IntegerProducer(BlockingHandoff<Integer> handoff, CountDownLatch startLatch, CountDownLatch completionLatch, int maxValue, AtomicInteger atomicInt) {
            super(handoff, startLatch, completionLatch);
            this.maxValue = maxValue;
            this.value = atomicInt;
        }

        @Override
        protected boolean putNext(BlockingHandoff<Integer> handoff) {
            int theValue = this.value.incrementAndGet();
            if (theValue > this.maxValue) {
                return false;
            }
            handoff.put((Object)theValue);
            return true;
        }
    }

    private static class Consumer<T>
    implements Runnable {
        private final BlockingHandoff<T> handoff;
        final AtomicInteger numConsumed;
        private final CountDownLatch doneLatch;

        public Consumer(BlockingHandoff<T> handoff, CountDownLatch doneLatch) {
            this(new AtomicInteger(0), handoff, doneLatch);
        }

        public Consumer(AtomicInteger numConsumed, BlockingHandoff<T> handoff, CountDownLatch doneLatch) {
            this.numConsumed = numConsumed;
            this.handoff = handoff;
            this.doneLatch = doneLatch;
        }

        protected boolean verifyValue(T value) {
            return true;
        }

        @Override
        public void run() {
            Iterator iter = this.handoff.iterator();
            try {
                while (iter.hasNext()) {
                    Object value = iter.next();
                    this.numConsumed.incrementAndGet();
                    this.verifyValue(value);
                }
            }
            catch (Exception exception) {
            }
            finally {
                this.doneLatch.countDown();
            }
        }
    }

    private static abstract class Producer<T>
    implements Runnable {
        private final BlockingHandoff<T> handoff;
        private final CountDownLatch startLatch;
        private final CountDownLatch completionLatch;

        public Producer(BlockingHandoff<T> handoff, CountDownLatch startLatch, CountDownLatch completionLatch) {
            this.handoff = handoff;
            this.startLatch = startLatch;
            this.completionLatch = completionLatch;
        }

        protected abstract boolean putNext(BlockingHandoff<T> var1);

        @Override
        public void run() {
            TestBlockingHandoff.uncheckedAwait(this.startLatch);
            while (this.putNext(this.handoff)) {
            }
            this.completionLatch.countDown();
        }
    }

    private static class TestHandoff<T>
    extends BlockingHandoff<T> {
        public TestHandoff(long timeoutMs) {
            super(timeoutMs);
        }

        protected void itemTimeout(long timeoutMs) {
            throw new RuntimeException("itemTimeout(" + timeoutMs + ")");
        }

        protected void setup(CountDownLatch readyLatch) {
            readyLatch.countDown();
        }

        protected void iteratorTimeout() {
            throw new RuntimeException("iteratorTimeout()");
        }
    }

    private static class LatchedEos<T>
    implements Runnable {
        private final CountDownLatch latch;
        private final BlockingHandoff<T> handoff;

        public LatchedEos(CountDownLatch latch, BlockingHandoff<T> handoff) {
            this.latch = latch;
            this.handoff = handoff;
        }

        @Override
        public void run() {
            try {
                this.latch.await();
            }
            catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
            this.handoff.putEos();
        }
    }
}

