/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.testing;

import java.util.concurrent.CountDownLatch;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.testing.Controls;
import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestPauseInjection
extends BaseTestQuery {
    private static final UserSession session = UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).withUserProperties(UserProtos.UserProperties.getDefaultInstance()).withOptionManager((OptionManager)bits[0].getContext().getOptionManager()).build();

    @Test
    public void pauseInjected() {
        long expectedDuration = 1000L;
        ExtendedLatch trigger = new ExtendedLatch(1);
        Pointer ex = new Pointer();
        String controls = Controls.newBuilder().addPause(DummyClass.class, "<<pauses>>").build();
        ControlsInjectionUtil.setControls(session, controls);
        QueryContext queryContext = new QueryContext(session, bits[0].getContext());
        new ResumingThread(queryContext, trigger, (Pointer<Exception>)ex, 1000L).start();
        DummyClass dummyClass = new DummyClass(queryContext, (CountDownLatch)trigger);
        long actualDuration = dummyClass.pauses();
        Assert.assertTrue((String)String.format("Test should stop for at least %d milliseconds.", 1000L), (1000L <= actualDuration ? 1 : 0) != 0);
        Assert.assertTrue((String)"No exception should be thrown.", (ex.value == null ? 1 : 0) != 0);
        try {
            queryContext.close();
        }
        catch (Exception e) {
            Assert.fail((String)("Failed to close query context: " + e));
        }
    }

    @Test
    public void pauseOnSpecificBit() {
        Drillbit drillbit2;
        Drillbit drillbit1;
        RemoteServiceSet remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
        ZookeeperHelper zkHelper = new ZookeeperHelper();
        zkHelper.startZookeeper(1);
        DrillConfig drillConfig = zkHelper.getConfig();
        try {
            drillbit1 = Drillbit.start((DrillConfig)drillConfig, (RemoteServiceSet)remoteServiceSet);
            drillbit2 = Drillbit.start((DrillConfig)drillConfig, (RemoteServiceSet)remoteServiceSet);
        }
        catch (DrillbitStartupException e) {
            throw new RuntimeException("Failed to start two drillbits.", e);
        }
        DrillbitContext drillbitContext1 = drillbit1.getContext();
        DrillbitContext drillbitContext2 = drillbit2.getContext();
        UserSession session = UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).withUserProperties(UserProtos.UserProperties.getDefaultInstance()).withOptionManager((OptionManager)drillbitContext1.getOptionManager()).build();
        CoordinationProtos.DrillbitEndpoint drillbitEndpoint1 = drillbitContext1.getEndpoint();
        String controls = Controls.newBuilder().addPauseOnBit(DummyClass.class, "<<pauses>>", drillbitEndpoint1).build();
        ControlsInjectionUtil.setControls(session, controls);
        long expectedDuration = 1000L;
        ExtendedLatch trigger = new ExtendedLatch(1);
        Pointer ex = new Pointer();
        QueryContext queryContext = new QueryContext(session, drillbitContext1);
        new ResumingThread(queryContext, trigger, (Pointer<Exception>)ex, 1000L).start();
        DummyClass dummyClass = new DummyClass(queryContext, (CountDownLatch)trigger);
        long actualDuration = dummyClass.pauses();
        Assert.assertTrue((String)String.format("Test should stop for at least %d milliseconds.", 1000L), (1000L <= actualDuration ? 1 : 0) != 0);
        Assert.assertTrue((String)"No exception should be thrown.", (ex.value == null ? 1 : 0) != 0);
        try {
            queryContext.close();
        }
        catch (Exception e) {
            Assert.fail((String)("Failed to close query context: " + e));
        }
        ExtendedLatch trigger2 = new ExtendedLatch(1);
        QueryContext queryContext2 = new QueryContext(session, drillbitContext2);
        DummyClass dummyClass2 = new DummyClass(queryContext2, (CountDownLatch)trigger2);
        dummyClass2.pauses();
        try {
            queryContext2.close();
        }
        catch (Exception e) {
            Assert.fail((String)("Failed to close query context: " + e));
        }
    }

    private static class ResumingThread
    extends Thread {
        private final QueryContext context;
        private final ExtendedLatch latch;
        private final Pointer<Exception> ex;
        private final long millis;

        public ResumingThread(QueryContext context, ExtendedLatch latch, Pointer<Exception> ex, long millis) {
            this.context = context;
            this.latch = latch;
            this.ex = ex;
            this.millis = millis;
        }

        @Override
        public void run() {
            this.latch.awaitUninterruptibly();
            try {
                Thread.sleep(this.millis);
            }
            catch (InterruptedException ex) {
                this.ex.value = ex;
            }
            this.context.getExecutionControls().unpauseAll();
        }
    }

    private static class DummyClass {
        private static final Logger logger = LoggerFactory.getLogger(DummyClass.class);
        private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(DummyClass.class);
        private final QueryContext context;
        private final CountDownLatch latch;
        public static final String PAUSES = "<<pauses>>";

        public DummyClass(QueryContext context, CountDownLatch latch) {
            this.context = context;
            this.latch = latch;
        }

        public long pauses() {
            this.latch.countDown();
            long startTime = System.currentTimeMillis();
            injector.injectPause(this.context.getExecutionControls(), PAUSES, logger);
            long endTime = System.currentTimeMillis();
            return endTime - startTime;
        }
    }
}

