package com.mapr.ojai.store.impl;

import com.mapr.tests.BaseTest;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/mapr/ojai/store/impl/TestBlockingHandoff.class */
public class TestBlockingHandoff extends BaseTest {
    private static final ExecutorService executorService = new ThreadPoolExecutor(2, 100, 2, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { // from class: com.mapr.ojai.store.impl.TestBlockingHandoff.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName("TestBlockingHandoff-" + thread.getName());
            thread.setDaemon(true);
            return thread;
        }
    });

    /* loaded from: input_file:com/mapr/ojai/store/impl/TestBlockingHandoff$Consumer.class */
    private static class Consumer<T> implements Runnable {
        private final BlockingHandoff<T> handoff;
        final AtomicInteger numConsumed;
        private final CountDownLatch doneLatch;

        public Consumer(BlockingHandoff<T> blockingHandoff, CountDownLatch countDownLatch) {
            this(new AtomicInteger(0), blockingHandoff, countDownLatch);
        }

        public Consumer(AtomicInteger atomicInteger, BlockingHandoff<T> blockingHandoff, CountDownLatch countDownLatch) {
            this.numConsumed = atomicInteger;
            this.handoff = blockingHandoff;
            this.doneLatch = countDownLatch;
        }

        protected boolean verifyValue(T t) {
            return true;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            Iterator it = this.handoff.iterator();
            while (it.hasNext()) {
                try {
                    Object next = it.next();
                    this.numConsumed.incrementAndGet();
                    verifyValue(next);
                } catch (Exception e) {
                    return;
                } finally {
                    this.doneLatch.countDown();
                }
            }
        }
    }

    /* loaded from: input_file:com/mapr/ojai/store/impl/TestBlockingHandoff$IntegerConsumer.class */
    private static class IntegerConsumer extends Consumer<Integer> {
        int lastValue;

        public IntegerConsumer(BlockingHandoff<Integer> blockingHandoff, CountDownLatch countDownLatch) {
            super(blockingHandoff, countDownLatch);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.mapr.ojai.store.impl.TestBlockingHandoff.Consumer
        public boolean verifyValue(Integer num) {
            if (num.intValue() <= this.lastValue) {
                Assert.fail(String.format("values not increasing! (value = %d, lastValue = %d)", num, Integer.valueOf(this.lastValue)));
                return false;
            }
            this.lastValue = num.intValue();
            return true;
        }
    }

    /* loaded from: input_file:com/mapr/ojai/store/impl/TestBlockingHandoff$IntegerProducer.class */
    private static class IntegerProducer extends Producer<Integer> {
        private final int maxValue;
        final AtomicInteger value;

        public IntegerProducer(BlockingHandoff<Integer> blockingHandoff, CountDownLatch countDownLatch, int i, CountDownLatch countDownLatch2) {
            this(blockingHandoff, countDownLatch, countDownLatch2, i, new AtomicInteger(0));
        }

        public IntegerProducer(BlockingHandoff<Integer> blockingHandoff, CountDownLatch countDownLatch, CountDownLatch countDownLatch2, int i, AtomicInteger atomicInteger) {
            super(blockingHandoff, countDownLatch, countDownLatch2);
            this.maxValue = i;
            this.value = atomicInteger;
        }

        @Override // com.mapr.ojai.store.impl.TestBlockingHandoff.Producer
        protected boolean putNext(BlockingHandoff<Integer> blockingHandoff) {
            int incrementAndGet = this.value.incrementAndGet();
            if (incrementAndGet > this.maxValue) {
                return false;
            }
            blockingHandoff.put(Integer.valueOf(incrementAndGet));
            return true;
        }
    }

    /* loaded from: input_file:com/mapr/ojai/store/impl/TestBlockingHandoff$LatchedEos.class */
    private static class LatchedEos<T> implements Runnable {
        private final CountDownLatch latch;
        private final BlockingHandoff<T> handoff;

        public LatchedEos(CountDownLatch countDownLatch, BlockingHandoff<T> blockingHandoff) {
            this.latch = countDownLatch;
            this.handoff = blockingHandoff;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.latch.await();
                this.handoff.putEos();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:com/mapr/ojai/store/impl/TestBlockingHandoff$Producer.class */
    private static abstract class Producer<T> implements Runnable {
        private final BlockingHandoff<T> handoff;
        private final CountDownLatch startLatch;
        private final CountDownLatch completionLatch;

        public Producer(BlockingHandoff<T> blockingHandoff, CountDownLatch countDownLatch, CountDownLatch countDownLatch2) {
            this.handoff = blockingHandoff;
            this.startLatch = countDownLatch;
            this.completionLatch = countDownLatch2;
        }

        protected abstract boolean putNext(BlockingHandoff<T> blockingHandoff);

        @Override // java.lang.Runnable
        public void run() {
            TestBlockingHandoff.uncheckedAwait(this.startLatch);
            do {
            } while (putNext(this.handoff));
            this.completionLatch.countDown();
        }
    }

    /* loaded from: input_file:com/mapr/ojai/store/impl/TestBlockingHandoff$TestHandoff.class */
    private static class TestHandoff<T> extends BlockingHandoff<T> {
        public TestHandoff(long j) {
            super(j);
        }

        protected void itemTimeout(long j) {
            throw new RuntimeException("itemTimeout(" + j + ")");
        }

        protected void setup(CountDownLatch countDownLatch) {
            countDownLatch.countDown();
        }

        protected void iteratorTimeout() {
            throw new RuntimeException("iteratorTimeout()");
        }
    }

    @AfterClass
    public static void afterClass() {
        executorService.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void uncheckedAwait(CountDownLatch countDownLatch) {
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testProducerConsumer() {
        TestHandoff testHandoff = new TestHandoff(Long.MAX_VALUE);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        IntegerProducer integerProducer = new IntegerProducer(testHandoff, countDownLatch, 1000, countDownLatch2);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        IntegerConsumer integerConsumer = new IntegerConsumer(testHandoff, countDownLatch3);
        executorService.execute(integerProducer);
        executorService.execute(new LatchedEos(countDownLatch2, testHandoff));
        executorService.execute(integerConsumer);
        countDownLatch.countDown();
        uncheckedAwait(countDownLatch3);
        Assert.assertEquals(1001L, integerProducer.value.get());
        Assert.assertEquals(1000L, integerConsumer.lastValue);
    }

    @Test
    public void testManyProducers() {
        TestHandoff testHandoff = new TestHandoff(Long.MAX_VALUE);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(10);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        Consumer consumer = new Consumer(testHandoff, countDownLatch3);
        for (int i = 0; i < 10; i++) {
            executorService.execute(new IntegerProducer(testHandoff, countDownLatch, countDownLatch2, 1000, atomicInteger));
        }
        executorService.execute(new LatchedEos(countDownLatch2, testHandoff));
        executorService.execute(consumer);
        countDownLatch.countDown();
        uncheckedAwait(countDownLatch3);
        Assert.assertEquals(1000L, consumer.numConsumed.get());
    }

    @Test
    public void testManyConsumers() {
        TestHandoff testHandoff = new TestHandoff(Long.MAX_VALUE);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        IntegerProducer integerProducer = new IntegerProducer(testHandoff, countDownLatch, 1000, countDownLatch2);
        CountDownLatch countDownLatch3 = new CountDownLatch(10);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        executorService.execute(integerProducer);
        executorService.execute(new LatchedEos(countDownLatch2, testHandoff));
        for (int i = 0; i < 10; i++) {
            executorService.execute(new Consumer(atomicInteger, testHandoff, countDownLatch3));
        }
        countDownLatch.countDown();
        uncheckedAwait(countDownLatch3);
        Assert.assertEquals(1000L, atomicInteger.get());
    }

    @Test
    public void testManyMany() {
        TestHandoff testHandoff = new TestHandoff(Long.MAX_VALUE);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(10);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CountDownLatch countDownLatch3 = new CountDownLatch(10);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        for (int i = 0; i < 10; i++) {
            executorService.execute(new IntegerProducer(testHandoff, countDownLatch, countDownLatch2, 1000, atomicInteger));
        }
        executorService.execute(new LatchedEos(countDownLatch2, testHandoff));
        for (int i2 = 0; i2 < 10; i2++) {
            executorService.execute(new Consumer(atomicInteger2, testHandoff, countDownLatch3));
        }
        countDownLatch.countDown();
        uncheckedAwait(countDownLatch3);
        Assert.assertEquals(1000L, atomicInteger2.get());
    }
}
