/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.server;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.math3.util.Pair;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.SingleRowListener;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.RepeatTestRule;
import org.apache.drill.exec.ZookeeperHelper;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.physical.impl.ScreenCreator;
import org.apache.drill.exec.physical.impl.SingleSenderCreator;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.pojo.PojoRecordReader;
import org.apache.drill.exec.testing.Controls;
import org.apache.drill.exec.testing.ControlsInjectionUtil;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.ForemanException;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.test.DrillTest;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDrillbitResilience
extends DrillTest {
    private static final Logger logger = LoggerFactory.getLogger(TestDrillbitResilience.class);
    private static ZookeeperHelper zkHelper;
    private static RemoteServiceSet remoteServiceSet;
    private static final Map<String, Drillbit> drillbits;
    private static DrillClient drillClient;
    private static final int NUM_RUNS = 3;
    private static final String TEST_QUERY = "select * from sys.memory";
    private static final String DRILLBIT_ALPHA = "alpha";
    private static final String DRILLBIT_BETA = "beta";
    private static final String DRILLBIT_GAMMA = "gamma";

    private static void startDrillbit(String name, RemoteServiceSet remoteServiceSet) {
        if (drillbits.containsKey(name)) {
            throw new IllegalStateException("Drillbit named \"" + name + "\" already exists");
        }
        try {
            Drillbit drillbit = Drillbit.start((DrillConfig)zkHelper.getConfig(), (RemoteServiceSet)remoteServiceSet);
            drillbits.put(name, drillbit);
        }
        catch (DrillbitStartupException e) {
            throw new RuntimeException("Failed to start Drillbit \"" + name + "\"", e);
        }
    }

    private static void stopDrillbit(String name) {
        Drillbit drillbit = drillbits.get(name);
        if (drillbit == null) {
            throw new IllegalStateException("No Drillbit named \"" + name + "\" found");
        }
        try {
            drillbit.close();
        }
        catch (Exception e) {
            String message = "Error shutting down Drillbit \"" + name + "\"";
            System.err.println(message + '.');
            logger.warn(message, (Throwable)e);
        }
    }

    private static void stopAllDrillbits() {
        for (String name : drillbits.keySet()) {
            TestDrillbitResilience.stopDrillbit(name);
        }
        drillbits.clear();
    }

    private static CoordinationProtos.DrillbitEndpoint getEndpoint(String name) {
        Drillbit drillbit = drillbits.get(name);
        if (drillbit == null) {
            throw new IllegalStateException("No Drillbit named \"" + name + "\" found.");
        }
        return drillbit.getContext().getEndpoint();
    }

    @BeforeClass
    public static void startSomeDrillbits() throws Exception {
        System.setProperty("drill.exec.http.enabled", "false");
        zkHelper = new ZookeeperHelper(true);
        zkHelper.startZookeeper(1);
        remoteServiceSet = RemoteServiceSet.getLocalServiceSet();
        TestDrillbitResilience.startDrillbit(DRILLBIT_ALPHA, remoteServiceSet);
        TestDrillbitResilience.startDrillbit(DRILLBIT_BETA, remoteServiceSet);
        TestDrillbitResilience.startDrillbit(DRILLBIT_GAMMA, remoteServiceSet);
        DrillConfig drillConfig = zkHelper.getConfig();
        drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1, null);
        TestDrillbitResilience.clearAllInjections();
    }

    @AfterClass
    public static void shutdownAllDrillbits() {
        if (drillClient != null) {
            drillClient.close();
            drillClient = null;
        }
        TestDrillbitResilience.stopAllDrillbits();
        if (remoteServiceSet != null) {
            AutoCloseables.close((AutoCloseable)remoteServiceSet, (Logger)logger);
            remoteServiceSet = null;
        }
        zkHelper.stopZookeeper();
    }

    private static void clearAllInjections() {
        Preconditions.checkNotNull((Object)drillClient);
        ControlsInjectionUtil.clearControls(drillClient);
    }

    private static void assertDrillbitsOk() {
        SingleRowListener listener = new SingleRowListener(){
            private final BufferAllocator bufferAllocator = RootAllocatorFactory.newRoot((DrillConfig)TestDrillbitResilience.access$000().getConfig());
            private final RecordBatchLoader loader = new RecordBatchLoader(this.bufferAllocator);

            @Override
            public void rowArrived(QueryDataBatch queryResultBatch) {
                UserBitShared.QueryData queryData = queryResultBatch.getHeader();
                try {
                    this.loader.load(queryData.getDef(), queryResultBatch.getData());
                }
                catch (SchemaChangeException e) {
                    Assert.fail((String)e.toString());
                }
                Assert.assertEquals((long)1L, (long)this.loader.getRecordCount());
                BatchSchema batchSchema = this.loader.getSchema();
                Assert.assertEquals((long)1L, (long)batchSchema.getFieldCount());
                MaterializedField countField = batchSchema.getColumn(0);
                TypeProtos.MinorType fieldType = countField.getType().getMinorType();
                Assert.assertEquals((Object)TypeProtos.MinorType.BIGINT, (Object)fieldType);
                VectorWrapper vw = (VectorWrapper)this.loader.iterator().next();
                Object obj = vw.getValueVector().getAccessor().getObject(0);
                Assert.assertTrue((boolean)(obj instanceof Long));
                Long countValue = (Long)obj;
                Assert.assertEquals((long)drillbits.size(), (long)countValue.intValue());
                this.loader.clear();
            }

            @Override
            public void cleanup() {
                this.bufferAllocator.close();
            }
        };
        try {
            QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, "select count(*) from sys.memory", listener);
            listener.waitForCompletion();
            UserBitShared.QueryResult.QueryState state = listener.getQueryState();
            Assert.assertTrue((String)String.format("QueryState should be COMPLETED (and not %s).", state), (state == UserBitShared.QueryResult.QueryState.COMPLETED ? 1 : 0) != 0);
        }
        catch (Exception e) {
            throw new RuntimeException("Couldn't query active drillbits", e);
        }
        List<UserBitShared.DrillPBError> errorList = listener.getErrorList();
        Assert.assertTrue((String)"There should not be any errors when checking if Drillbits are OK.", (boolean)errorList.isEmpty());
    }

    @After
    public void checkDrillbits() {
        TestDrillbitResilience.clearAllInjections();
        TestDrillbitResilience.assertDrillbitsOk();
    }

    private static void setControls(String controls) {
        ControlsInjectionUtil.setControls(drillClient, controls);
    }

    private static void setSessionOption(String option, String value) {
        ControlsInjectionUtil.setSessionOption(drillClient, option, value);
    }

    private static void assertExceptionMessage(Throwable throwable, Class<? extends Throwable> exceptionClass, String desc) {
        Assert.assertTrue((String)"Throwable was not of UserException type.", (boolean)(throwable instanceof UserException));
        UserBitShared.ExceptionWrapper cause = ((UserException)throwable).getOrCreatePBError(false).getException();
        Assert.assertEquals((String)"Exception class names should match.", (Object)exceptionClass.getName(), (Object)cause.getExceptionClass());
        Assert.assertEquals((String)"Exception sites should match.", (Object)desc, (Object)cause.getMessage());
    }

    @Test
    public void settingNoOpInjectionsAndQuery() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        String controls = Controls.newBuilder().addExceptionOnBit(((Object)((Object)this)).getClass(), "noop", RuntimeException.class, TestDrillbitResilience.getEndpoint(DRILLBIT_BETA)).build();
        TestDrillbitResilience.setControls(controls);
        WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, TEST_QUERY, listener);
        Pair<UserBitShared.QueryResult.QueryState, Exception> pair = listener.waitForCompletion();
        TestDrillbitResilience.assertStateCompleted(pair, UserBitShared.QueryResult.QueryState.COMPLETED);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    private static void testForeman(String desc) {
        String controls = Controls.newBuilder().addException(Foreman.class, desc, ForemanException.class).build();
        TestDrillbitResilience.assertFailsWithException(controls, ForemanException.class, desc);
    }

    @Test
    @RepeatTestRule.Repeat(count=3)
    public void foreman_runTryBeginning() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        TestDrillbitResilience.testForeman("run-try-beginning");
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    @Ignore
    public void foreman_runTryEnd() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        TestDrillbitResilience.testForeman("run-try-end");
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    private static void assertStateCompleted(Pair<UserBitShared.QueryResult.QueryState, Exception> result, UserBitShared.QueryResult.QueryState expectedState) {
        UserBitShared.QueryResult.QueryState actualState = (UserBitShared.QueryResult.QueryState)result.getFirst();
        Exception exception = (Exception)result.getSecond();
        if (actualState != expectedState || exception != null) {
            Assert.fail((String)String.format("Query state is incorrect (expected: %s, actual: %s) AND/OR \nException thrown: %s", expectedState, actualState, exception == null ? "none." : exception));
        }
    }

    private static void assertCancelledWithoutException(String controls, WaitUntilCompleteListener listener, String query) {
        TestDrillbitResilience.setControls(controls);
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, query, listener);
        Pair<UserBitShared.QueryResult.QueryState, Exception> result = listener.waitForCompletion();
        TestDrillbitResilience.assertStateCompleted(result, UserBitShared.QueryResult.QueryState.CANCELED);
    }

    private static void assertCancelledWithoutException(String controls, WaitUntilCompleteListener listener) {
        TestDrillbitResilience.assertCancelledWithoutException(controls, listener, TEST_QUERY);
    }

    @Test
    public void passThrough() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        WaitUntilCompleteListener listener = new WaitUntilCompleteListener(){

            @Override
            public void queryIdArrived(UserBitShared.QueryId queryId) {
                super.queryIdArrived(queryId);
                ExtendedLatch trigger = new ExtendedLatch(1);
                new ResumingThread(queryId, (Pointer<Exception>)this.ex, trigger).start();
                trigger.countDown();
            }
        };
        String controls = Controls.newBuilder().addPause(PojoRecordReader.class, "read-next").build();
        TestDrillbitResilience.setControls(controls);
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, TEST_QUERY, listener);
        Pair<UserBitShared.QueryResult.QueryState, Exception> result = listener.waitForCompletion();
        TestDrillbitResilience.assertStateCompleted(result, UserBitShared.QueryResult.QueryState.COMPLETED);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    @Ignore
    public void cancelWhenQueryIdArrives() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        WaitUntilCompleteListener listener = new WaitUntilCompleteListener(){

            @Override
            public void queryIdArrived(UserBitShared.QueryId queryId) {
                super.queryIdArrived(queryId);
                this.cancelAndResume();
            }
        };
        String controls = Controls.newBuilder().addPause(FragmentExecutor.class, "fragment-running").build();
        TestDrillbitResilience.assertCancelledWithoutException(controls, listener);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    @RepeatTestRule.Repeat(count=3)
    public void cancelInMiddleOfFetchingResults() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        WaitUntilCompleteListener listener = new WaitUntilCompleteListener(){
            private boolean cancelRequested = false;

            @Override
            public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
                if (!this.cancelRequested) {
                    this.check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                    this.cancelAndResume();
                    this.cancelRequested = true;
                }
                result.release();
            }
        };
        String controls = Controls.newBuilder().addPause(ScreenCreator.class, "sending-data", 1).build();
        TestDrillbitResilience.assertCancelledWithoutException(controls, listener);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    @RepeatTestRule.Repeat(count=3)
    public void cancelAfterAllResultsProduced() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        WaitUntilCompleteListener listener = new WaitUntilCompleteListener(){
            private int count = 0;

            @Override
            public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
                if (++this.count == drillbits.size()) {
                    this.check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                    this.cancelAndResume();
                }
                result.release();
            }
        };
        String controls = Controls.newBuilder().addPause(ScreenCreator.class, "send-complete").build();
        TestDrillbitResilience.assertCancelledWithoutException(controls, listener);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    @RepeatTestRule.Repeat(count=3)
    public void cancelAfterEverythingIsCompleted() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        WaitUntilCompleteListener listener = new WaitUntilCompleteListener(){
            private int count = 0;

            @Override
            public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
                if (++this.count == drillbits.size()) {
                    this.check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                    this.cancelAndResume();
                }
                result.release();
            }
        };
        String controls = Controls.newBuilder().addPause(Foreman.class, "foreman-cleanup").build();
        TestDrillbitResilience.assertCancelledWithoutException(controls, listener);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    public void successfullyCompletes() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, TEST_QUERY, listener);
        Pair<UserBitShared.QueryResult.QueryState, Exception> result = listener.waitForCompletion();
        TestDrillbitResilience.assertStateCompleted(result, UserBitShared.QueryResult.QueryState.COMPLETED);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    private static void assertFailsWithException(String controls, Class<? extends Throwable> exceptionClass, String exceptionDesc, String query) {
        TestDrillbitResilience.setControls(controls);
        WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
        QueryTestUtil.testWithListener(drillClient, UserBitShared.QueryType.SQL, query, listener);
        Pair<UserBitShared.QueryResult.QueryState, Exception> result = listener.waitForCompletion();
        UserBitShared.QueryResult.QueryState state = (UserBitShared.QueryResult.QueryState)result.getFirst();
        Assert.assertTrue((String)String.format("Query state should be FAILED (and not %s).", state), (state == UserBitShared.QueryResult.QueryState.FAILED ? 1 : 0) != 0);
        TestDrillbitResilience.assertExceptionMessage((Throwable)result.getSecond(), exceptionClass, exceptionDesc);
    }

    private static void assertFailsWithException(String controls, Class<? extends Throwable> exceptionClass, String exceptionDesc) {
        TestDrillbitResilience.assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY);
    }

    @Test
    public void failsWhenParsing() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        String exceptionDesc = "sql-parsing";
        Class<ForemanSetupException> exceptionClass = ForemanSetupException.class;
        String controls = Controls.newBuilder().addException(DrillSqlWorker.class, "sql-parsing", exceptionClass).build();
        TestDrillbitResilience.assertFailsWithException(controls, exceptionClass, "sql-parsing");
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    public void failsWhenSendingFragments() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        String exceptionDesc = "send-fragments";
        Class<ForemanException> exceptionClass = ForemanException.class;
        String controls = Controls.newBuilder().addException(Foreman.class, "send-fragments", exceptionClass).build();
        TestDrillbitResilience.assertFailsWithException(controls, exceptionClass, "send-fragments");
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    public void failsDuringExecution() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        String exceptionDesc = "fragment-execution";
        Class<IOException> exceptionClass = IOException.class;
        String controls = Controls.newBuilder().addException(FragmentExecutor.class, "fragment-execution", exceptionClass).build();
        TestDrillbitResilience.assertFailsWithException(controls, exceptionClass, "fragment-execution");
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    @RepeatTestRule.Repeat(count=3)
    public void interruptingBlockedMergingRecordBatch() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        String control = Controls.newBuilder().addPause(MergingRecordBatch.class, "waiting-for-data", 1).build();
        TestDrillbitResilience.interruptingBlockedFragmentsWaitingForData(control);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    @RepeatTestRule.Repeat(count=3)
    public void interruptingBlockedUnorderedReceiverBatch() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        String control = Controls.newBuilder().addPause(UnorderedReceiverBatch.class, "waiting-for-data", 1).build();
        TestDrillbitResilience.interruptingBlockedFragmentsWaitingForData(control);
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void interruptingBlockedFragmentsWaitingForData(String control) {
        try {
            TestDrillbitResilience.setSessionOption("planner.slice_target", "1");
            TestDrillbitResilience.setSessionOption(PlannerSettings.HASHAGG.getOptionName(), "false");
            String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
            TestDrillbitResilience.assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData(), "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city");
        }
        finally {
            TestDrillbitResilience.setSessionOption("planner.slice_target", Long.toString(100000L));
            TestDrillbitResilience.setSessionOption(PlannerSettings.HASHAGG.getOptionName(), PlannerSettings.HASHAGG.getDefault().bool_val.toString());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @RepeatTestRule.Repeat(count=3)
    public void interruptingPartitionerThreadFragment() {
        try {
            TestDrillbitResilience.setSessionOption("planner.slice_target", "1");
            TestDrillbitResilience.setSessionOption(PlannerSettings.HASHAGG.getOptionName(), "true");
            TestDrillbitResilience.setSessionOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
            long before = TestDrillbitResilience.countAllocatedMemory();
            String controls = Controls.newBuilder().addLatch(PartitionerDecorator.class, "partitioner-sender-latch").addPause(PartitionerDecorator.class, "wait-for-fragment-interrupt", 1).build();
            String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
            TestDrillbitResilience.assertCancelledWithoutException(controls, new ListenerThatCancelsQueryAfterFirstBatchOfData(), "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city");
            long after = TestDrillbitResilience.countAllocatedMemory();
            Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
        }
        finally {
            TestDrillbitResilience.setSessionOption("planner.slice_target", Long.toString(100000L));
            TestDrillbitResilience.setSessionOption(PlannerSettings.HASHAGG.getOptionName(), PlannerSettings.HASHAGG.getDefault().bool_val.toString());
            TestDrillbitResilience.setSessionOption(PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName(), Long.toString(PlannerSettings.PARTITION_SENDER_SET_THREADS.getDefault().num_val));
        }
    }

    @Test
    @Ignore
    public void interruptingWhileFragmentIsBlockedInAcquiringSendingTicket() {
        long before = TestDrillbitResilience.countAllocatedMemory();
        String control = Controls.newBuilder().addPause(SingleSenderCreator.SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1).build();
        TestDrillbitResilience.assertCancelledWithoutException(control, new ListenerThatCancelsQueryAfterFirstBatchOfData());
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @RepeatTestRule.Repeat(count=3)
    public void memoryLeaksWhenCancelled() {
        TestDrillbitResilience.setSessionOption("planner.slice_target", "10");
        long before = TestDrillbitResilience.countAllocatedMemory();
        try {
            String controls = Controls.newBuilder().addPause(ScreenCreator.class, "sending-data", 1).build();
            String query = null;
            try {
                query = BaseTestQuery.getFile("queries/tpch/09.sql");
                query = query.substring(0, query.length() - 1);
            }
            catch (IOException e) {
                Assert.fail((String)("Failed to get query file: " + e));
            }
            WaitUntilCompleteListener listener = new WaitUntilCompleteListener(){
                private volatile boolean cancelRequested = false;

                @Override
                public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
                    if (!this.cancelRequested) {
                        this.check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                        this.cancelAndResume();
                        this.cancelRequested = true;
                    }
                    result.release();
                }
            };
            TestDrillbitResilience.assertCancelledWithoutException(controls, listener, query);
            long after = TestDrillbitResilience.countAllocatedMemory();
            Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
        }
        finally {
            TestDrillbitResilience.setSessionOption("planner.slice_target", Long.toString(100000L));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore
    public void memoryLeaksWhenFailed() {
        TestDrillbitResilience.setSessionOption("planner.slice_target", "10");
        long before = TestDrillbitResilience.countAllocatedMemory();
        try {
            String exceptionDesc = "fragment-execution";
            Class<IOException> exceptionClass = IOException.class;
            String controls = Controls.newBuilder().addException(FragmentExecutor.class, "fragment-execution", exceptionClass).build();
            String query = null;
            try {
                query = BaseTestQuery.getFile("queries/tpch/09.sql");
                query = query.substring(0, query.length() - 1);
            }
            catch (IOException e) {
                Assert.fail((String)("Failed to get query file: " + e));
            }
            TestDrillbitResilience.assertFailsWithException(controls, exceptionClass, "fragment-execution", query);
            long after = TestDrillbitResilience.countAllocatedMemory();
            Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
        }
        finally {
            TestDrillbitResilience.setSessionOption("planner.slice_target", Long.toString(100000L));
        }
    }

    @Test
    public void failsAfterMSorterSorting() {
        String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
        Class<RuntimeException> typeOfException = RuntimeException.class;
        long before = TestDrillbitResilience.countAllocatedMemory();
        String controls = Controls.newBuilder().addException(ExternalSortBatch.class, "after-sort", typeOfException).build();
        TestDrillbitResilience.assertFailsWithException(controls, typeOfException, "after-sort", "select n_name from cp.`tpch/nation.parquet` order by n_name");
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    @Test
    public void failsAfterMSorterSetup() {
        String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
        Class<RuntimeException> typeOfException = RuntimeException.class;
        long before = TestDrillbitResilience.countAllocatedMemory();
        String controls = Controls.newBuilder().addException(ExternalSortBatch.class, "after-setup", typeOfException).build();
        TestDrillbitResilience.assertFailsWithException(controls, typeOfException, "after-setup", "select n_name from cp.`tpch/nation.parquet` order by n_name");
        long after = TestDrillbitResilience.countAllocatedMemory();
        Assert.assertEquals((String)String.format("We are leaking %d bytes", after - before), (long)before, (long)after);
    }

    private static long countAllocatedMemory() {
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        long allocated = 0L;
        for (String name : drillbits.keySet()) {
            allocated += drillbits.get(name).getContext().getAllocator().getAllocatedMemory();
        }
        return allocated;
    }

    static /* synthetic */ ZookeeperHelper access$000() {
        return zkHelper;
    }

    static {
        drillbits = new HashMap<String, Drillbit>();
    }

    private static class ResumingThread
    extends Thread {
        private final UserBitShared.QueryId queryId;
        private final Pointer<Exception> ex;
        private final ExtendedLatch latch;

        public ResumingThread(UserBitShared.QueryId queryId, Pointer<Exception> ex, ExtendedLatch latch) {
            this.queryId = queryId;
            this.ex = ex;
            this.latch = latch;
        }

        @Override
        public void run() {
            this.latch.awaitUninterruptibly();
            DrillRpcFuture resumeAck = drillClient.resumeQuery(this.queryId);
            try {
                resumeAck.checkedGet();
            }
            catch (RpcException ex) {
                this.ex.value = ex;
            }
        }
    }

    private static class CancellingThread
    extends Thread {
        private final UserBitShared.QueryId queryId;
        private final Pointer<Exception> ex;
        private final ExtendedLatch latch;

        public CancellingThread(UserBitShared.QueryId queryId, Pointer<Exception> ex, ExtendedLatch latch) {
            this.queryId = queryId;
            this.ex = ex;
            this.latch = latch;
        }

        @Override
        public void run() {
            DrillRpcFuture cancelAck = drillClient.cancelQuery(this.queryId);
            try {
                cancelAck.checkedGet();
            }
            catch (RpcException ex) {
                this.ex.value = ex;
            }
            if (this.latch != null) {
                this.latch.countDown();
            }
        }
    }

    private static class ListenerThatCancelsQueryAfterFirstBatchOfData
    extends WaitUntilCompleteListener {
        private boolean cancelRequested = false;

        private ListenerThatCancelsQueryAfterFirstBatchOfData() {
        }

        @Override
        public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
            if (!this.cancelRequested) {
                this.check(this.queryId != null, "Query id should not be null, since we have waited long enough.", new Object[0]);
                new CancellingThread(this.queryId, (Pointer<Exception>)this.ex, null).start();
                this.cancelRequested = true;
            }
            result.release();
        }
    }

    private static class WaitUntilCompleteListener
    implements UserResultsListener {
        private final ExtendedLatch latch = new ExtendedLatch(1);
        protected UserBitShared.QueryId queryId = null;
        protected volatile Pointer<Exception> ex = new Pointer();
        protected volatile UserBitShared.QueryResult.QueryState state = null;

        private WaitUntilCompleteListener() {
        }

        protected final void check(boolean condition, String format, Object ... args) {
            if (!condition) {
                this.ex.value = new IllegalStateException(String.format(format, args));
            }
        }

        protected final void cancelAndResume() {
            Preconditions.checkNotNull((Object)this.queryId);
            ExtendedLatch trigger = new ExtendedLatch(1);
            new CancellingThread(this.queryId, this.ex, trigger).start();
            new ResumingThread(this.queryId, this.ex, trigger).start();
        }

        public void queryIdArrived(UserBitShared.QueryId queryId) {
            this.queryId = queryId;
        }

        public void submissionFailed(UserException ex) {
            this.ex.value = ex;
            this.state = UserBitShared.QueryResult.QueryState.FAILED;
            this.latch.countDown();
        }

        public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
            this.state = state;
            this.latch.countDown();
        }

        public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
            result.release();
        }

        public final Pair<UserBitShared.QueryResult.QueryState, Exception> waitForCompletion() {
            this.latch.awaitUninterruptibly();
            return new Pair((Object)this.state, this.ex.value);
        }
    }
}

