/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.server;

import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import mockit.Injectable;
import mockit.NonStrictExpectations;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.AckSender;
import org.apache.drill.exec.rpc.data.DataConnectionManager;
import org.apache.drill.exec.rpc.data.DataResponseHandler;
import org.apache.drill.exec.rpc.data.DataServer;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.vector.Float8Vector;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestBitRpc
extends ExecTest {
    static final Logger logger = LoggerFactory.getLogger(TestBitRpc.class);

    @Test
    public void testConnectionBackpressure(@Injectable WorkManager.WorkerBee bee, final @Injectable WorkEventBus workBus, final @Injectable FragmentManager fman, final @Injectable FragmentContext fcon) throws Exception {
        final BootStrapContext c = new BootStrapContext(DrillConfig.create());
        BootStrapContext c2 = new BootStrapContext(DrillConfig.create());
        new NonStrictExpectations(){
            {
                workBus.getFragmentManagerIfExists((ExecProtos.FragmentHandle)any);
                result = fman;
                workBus.getFragmentManager((ExecProtos.FragmentHandle)any);
                result = fman;
                fman.getFragmentContext();
                result = fcon;
                fcon.getAllocator();
                result = c.getAllocator();
            }
        };
        int port = 1234;
        BitComTestHandler drp = new BitComTestHandler();
        DataServer server = new DataServer(c, workBus, (DataResponseHandler)drp);
        port = server.bind(port, true);
        CoordinationProtos.DrillbitEndpoint ep = CoordinationProtos.DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build();
        DataConnectionManager manager = new DataConnectionManager(ep, c2);
        DataTunnel tunnel = new DataTunnel(manager);
        AtomicLong max = new AtomicLong(0L);
        for (int i = 0; i < 40; ++i) {
            long t1 = System.currentTimeMillis();
            tunnel.sendRecordBatch((RpcOutcomeListener)new TimingOutcome(max), new FragmentWritableBatch(false, UserBitShared.QueryId.getDefaultInstance(), 1, 1, 1, 1, TestBitRpc.getRandomBatch(c.getAllocator(), 5000)));
            System.out.println(System.currentTimeMillis() - t1);
        }
        System.out.println(String.format("Max time: %d", max.get()));
        Assert.assertTrue((max.get() > 2700L ? 1 : 0) != 0);
        Thread.sleep(5000L);
    }

    private static WritableBatch getRandomBatch(BufferAllocator allocator, int records) {
        ArrayList vectors = Lists.newArrayList();
        for (int i = 0; i < 5; ++i) {
            Float8Vector v = (Float8Vector)TypeHelper.getNewVector(MaterializedField.create((SchemaPath)new SchemaPath("a", ExpressionPosition.UNKNOWN), (TypeProtos.MajorType)Types.required((TypeProtos.MinorType)TypeProtos.MinorType.FLOAT8)), allocator);
            v.allocateNew(records);
            v.getMutator().generateTestData(records);
            vectors.add(v);
        }
        return WritableBatch.getBatchNoHV((int)records, (Iterable)vectors, (boolean)false);
    }

    private class BitComTestHandler
    implements DataResponseHandler {
        int v = 0;

        private BitComTestHandler() {
        }

        public void informOutOfMemory() {
        }

        public void handle(FragmentManager manager, BitData.FragmentRecordBatch fragmentBatch, DrillBuf data, AckSender sender) throws FragmentSetupException, IOException {
            try {
                ++this.v;
                if (this.v % 10 == 0) {
                    System.out.println("sleeping.");
                    Thread.sleep(3000L);
                }
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            sender.sendOk();
        }
    }

    private class TimingOutcome
    implements RpcOutcomeListener<GeneralRPCProtos.Ack> {
        private AtomicLong max;
        private Stopwatch watch = new Stopwatch().start();

        public TimingOutcome(AtomicLong max) {
            this.max = max;
        }

        public void failed(RpcException ex) {
            ex.printStackTrace();
        }

        public void success(GeneralRPCProtos.Ack value, ByteBuf buffer) {
            long nowMax;
            long micros = this.watch.elapsed(TimeUnit.MILLISECONDS);
            System.out.println(String.format("Total time to send: %d, start time %d", micros, System.currentTimeMillis() - micros));
            while ((nowMax = this.max.get()) < micros && !this.max.compareAndSet(nowMax, micros)) {
            }
        }

        public void interrupted(InterruptedException e) {
        }
    }
}

