/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.coprocessor;

import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
import org.apache.hadoop.hbase.coprocessor.BaseRowProcessorEndpoint;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos;
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class})
public class TestRowProcessorEndpoint {
    static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class);
    private static final TableName TABLE = TableName.valueOf("testtable");
    private static final byte[] ROW = Bytes.toBytes("testrow");
    private static final byte[] ROW2 = Bytes.toBytes("testrow2");
    private static final byte[] FAM = Bytes.toBytes("friendlist");
    private static final byte[] A = Bytes.toBytes("a");
    private static final byte[] B = Bytes.toBytes("b");
    private static final byte[] C = Bytes.toBytes("c");
    private static final byte[] D = Bytes.toBytes("d");
    private static final byte[] E = Bytes.toBytes("e");
    private static final byte[] F = Bytes.toBytes("f");
    private static final byte[] G = Bytes.toBytes("g");
    private static final byte[] COUNTER = Bytes.toBytes("counter");
    private static final AtomicLong myTimer = new AtomicLong(0L);
    private final AtomicInteger failures = new AtomicInteger(0);
    private static HBaseTestingUtility util = new HBaseTestingUtility();
    private static volatile int expectedCounter = 0;
    private static int rowSize;
    private static int row2Size;
    private static volatile Table table;
    private static volatile boolean swapped;
    private volatile CountDownLatch startSignal;
    private volatile CountDownLatch doneSignal;

    @BeforeClass
    public static void setupBeforeClass() throws Exception {
        Configuration conf = util.getConfiguration();
        conf.setStrings("hbase.coprocessor.region.classes", new String[]{RowProcessorEndpoint.class.getName()});
        conf.setInt("hbase.client.retries.number", 2);
        conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
        util.startMiniCluster();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        util.shutdownMiniCluster();
    }

    public void prepareTestData() throws Exception {
        try {
            util.getHBaseAdmin().disableTable(TABLE);
            util.getHBaseAdmin().deleteTable(TABLE);
        }
        catch (Exception exception) {
            // empty catch block
        }
        table = util.createTable(TABLE, FAM);
        Put put = new Put(ROW);
        put.add(FAM, A, Bytes.add(B, C));
        put.add(FAM, B, Bytes.add(D, E, F));
        put.add(FAM, C, G);
        table.put(put);
        rowSize = put.size();
        put = new Put(ROW2);
        put.add(FAM, D, E);
        put.add(FAM, F, G);
        table.put(put);
        row2Size = put.size();
    }

    @Test
    public void testDoubleScan() throws Throwable {
        this.prepareTestData();
        CoprocessorRpcChannel channel = table.coprocessorService(ROW);
        RowProcessorEndpoint.FriendsOfFriendsProcessor processor = new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
        RowProcessorProtos.RowProcessorService.BlockingInterface service = RowProcessorProtos.RowProcessorService.newBlockingStub(channel);
        RowProcessorProtos.ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
        RowProcessorProtos.ProcessResponse protoResult = service.process(null, request);
        IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse response = IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
        HashSet<String> result = new HashSet<String>();
        result.addAll(response.getResultList());
        HashSet<String> expected = new HashSet<String>(Arrays.asList("d", "e", "f", "g"));
        Get get = new Get(ROW);
        LOG.debug((Object)("row keyvalues:" + TestRowProcessorEndpoint.stringifyKvs(table.get(get).listCells())));
        Assert.assertEquals(expected, result);
    }

    @Test
    public void testReadModifyWrite() throws Throwable {
        this.prepareTestData();
        this.failures.set(0);
        int numThreads = 100;
        this.concurrentExec(new IncrementRunner(), numThreads);
        Get get = new Get(ROW);
        LOG.debug((Object)("row keyvalues:" + TestRowProcessorEndpoint.stringifyKvs(table.get(get).listCells())));
        int finalCounter = this.incrementCounter(table);
        int failureNumber = this.failures.get();
        if (failureNumber > 0) {
            LOG.debug((Object)("We failed " + failureNumber + " times during test"));
        }
        Assert.assertEquals((long)(numThreads + 1 - failureNumber), (long)finalCounter);
    }

    private int incrementCounter(Table table) throws Throwable {
        CoprocessorRpcChannel channel = table.coprocessorService(ROW);
        RowProcessorEndpoint.IncrementCounterProcessor processor = new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
        RowProcessorProtos.RowProcessorService.BlockingInterface service = RowProcessorProtos.RowProcessorService.newBlockingStub(channel);
        RowProcessorProtos.ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
        RowProcessorProtos.ProcessResponse protoResult = service.process(null, request);
        IncrementCounterProcessorTestProtos.IncCounterProcessorResponse response = IncrementCounterProcessorTestProtos.IncCounterProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
        Integer result = response.getResponse();
        return result;
    }

    private void concurrentExec(final Runnable task, int numThreads) throws Throwable {
        this.startSignal = new CountDownLatch(numThreads);
        this.doneSignal = new CountDownLatch(numThreads);
        for (int i = 0; i < numThreads; ++i) {
            new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        TestRowProcessorEndpoint.this.startSignal.countDown();
                        TestRowProcessorEndpoint.this.startSignal.await();
                        task.run();
                    }
                    catch (Throwable e) {
                        TestRowProcessorEndpoint.this.failures.incrementAndGet();
                        e.printStackTrace();
                    }
                    TestRowProcessorEndpoint.this.doneSignal.countDown();
                }
            }).start();
        }
        this.doneSignal.await();
    }

    @Test
    public void testMultipleRows() throws Throwable {
        this.prepareTestData();
        this.failures.set(0);
        int numThreads = 100;
        this.concurrentExec(new SwapRowsRunner(), numThreads);
        LOG.debug((Object)("row keyvalues:" + TestRowProcessorEndpoint.stringifyKvs(table.get(new Get(ROW)).listCells())));
        LOG.debug((Object)("row2 keyvalues:" + TestRowProcessorEndpoint.stringifyKvs(table.get(new Get(ROW2)).listCells())));
        int failureNumber = this.failures.get();
        if (failureNumber > 0) {
            LOG.debug((Object)("We failed " + failureNumber + " times during test"));
        }
        if (!swapped) {
            Assert.assertEquals((long)rowSize, (long)table.get(new Get(ROW)).listCells().size());
            Assert.assertEquals((long)row2Size, (long)table.get(new Get(ROW2)).listCells().size());
        } else {
            Assert.assertEquals((long)rowSize, (long)table.get(new Get(ROW2)).listCells().size());
            Assert.assertEquals((long)row2Size, (long)table.get(new Get(ROW)).listCells().size());
        }
    }

    private void swapRows(Table table) throws Throwable {
        CoprocessorRpcChannel channel = table.coprocessorService(ROW);
        RowProcessorEndpoint.RowSwapProcessor processor = new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
        RowProcessorProtos.RowProcessorService.BlockingInterface service = RowProcessorProtos.RowProcessorService.newBlockingStub(channel);
        RowProcessorProtos.ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
        service.process(null, request);
    }

    @Test
    public void testTimeout() throws Throwable {
        this.prepareTestData();
        CoprocessorRpcChannel channel = table.coprocessorService(ROW);
        RowProcessorEndpoint.TimeoutProcessor processor = new RowProcessorEndpoint.TimeoutProcessor(ROW);
        RowProcessorProtos.RowProcessorService.BlockingInterface service = RowProcessorProtos.RowProcessorService.newBlockingStub(channel);
        RowProcessorProtos.ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
        boolean exceptionCaught = false;
        try {
            service.process(null, request);
        }
        catch (Exception e) {
            exceptionCaught = true;
        }
        Assert.assertTrue((boolean)exceptionCaught);
    }

    static String stringifyKvs(Collection<Cell> kvs) {
        StringBuilder out = new StringBuilder();
        out.append("[");
        if (kvs != null) {
            for (Cell kv : kvs) {
                byte[] col = CellUtil.cloneQualifier(kv);
                byte[] val = CellUtil.cloneValue(kv);
                if (Bytes.equals(col, COUNTER)) {
                    out.append(Bytes.toStringBinary(col) + ":" + Bytes.toInt(val) + " ");
                    continue;
                }
                out.append(Bytes.toStringBinary(col) + ":" + Bytes.toStringBinary(val) + " ");
            }
        }
        out.append("]");
        return out.toString();
    }

    static {
        table = null;
        swapped = false;
    }

    public static class RowProcessorEndpoint<S extends Message, T extends Message>
    extends BaseRowProcessorEndpoint<S, T>
    implements CoprocessorService {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
            try (InternalScanner scanner = null;){
                scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
                scanner = region.getScanner(scan);
                result.clear();
                scanner.next(result);
            }
        }

        public static class TimeoutProcessor
        extends BaseRowProcessor<IncrementCounterProcessorTestProtos.TimeoutProcessorRequest, IncrementCounterProcessorTestProtos.TimeoutProcessorResponse> {
            byte[] row = new byte[0];

            public TimeoutProcessor() {
            }

            public TimeoutProcessor(byte[] row) {
                this.row = row;
            }

            @Override
            public Collection<byte[]> getRowsToLock() {
                return Collections.singleton(this.row);
            }

            @Override
            public IncrementCounterProcessorTestProtos.TimeoutProcessorResponse getResult() {
                return IncrementCounterProcessorTestProtos.TimeoutProcessorResponse.getDefaultInstance();
            }

            @Override
            public void process(long now, HRegion region, List<Mutation> mutations, WALEdit walEdit) throws IOException {
                try {
                    Thread.sleep(100000L);
                }
                catch (Exception e) {
                    throw new IOException(e);
                }
            }

            @Override
            public boolean readOnly() {
                return true;
            }

            @Override
            public String getName() {
                return "timeout";
            }

            @Override
            public IncrementCounterProcessorTestProtos.TimeoutProcessorRequest getRequestData() throws IOException {
                IncrementCounterProcessorTestProtos.TimeoutProcessorRequest.Builder builder = IncrementCounterProcessorTestProtos.TimeoutProcessorRequest.newBuilder();
                builder.setRow(ByteStringer.wrap(this.row));
                return builder.build();
            }

            @Override
            public void initialize(IncrementCounterProcessorTestProtos.TimeoutProcessorRequest msg) throws IOException {
                this.row = msg.getRow().toByteArray();
            }
        }

        public static class RowSwapProcessor
        extends BaseRowProcessor<IncrementCounterProcessorTestProtos.RowSwapProcessorRequest, IncrementCounterProcessorTestProtos.RowSwapProcessorResponse> {
            byte[] row1 = new byte[0];
            byte[] row2 = new byte[0];

            RowSwapProcessor() {
            }

            RowSwapProcessor(byte[] row1, byte[] row2) {
                this.row1 = row1;
                this.row2 = row2;
            }

            @Override
            public Collection<byte[]> getRowsToLock() {
                ArrayList<byte[]> rows = new ArrayList<byte[]>();
                rows.add(this.row1);
                rows.add(this.row2);
                return rows;
            }

            @Override
            public boolean readOnly() {
                return false;
            }

            @Override
            public IncrementCounterProcessorTestProtos.RowSwapProcessorResponse getResult() {
                return IncrementCounterProcessorTestProtos.RowSwapProcessorResponse.getDefaultInstance();
            }

            @Override
            public void process(long now, HRegion region, List<Mutation> mutations, WALEdit walEdit) throws IOException {
                now = myTimer.getAndIncrement();
                ArrayList<Cell> kvs1 = new ArrayList<Cell>();
                ArrayList<Cell> kvs2 = new ArrayList<Cell>();
                RowProcessorEndpoint.doScan(region, new Scan(this.row1, this.row1), kvs1);
                RowProcessorEndpoint.doScan(region, new Scan(this.row2, this.row2), kvs2);
                if (swapped) {
                    Assert.assertEquals((long)rowSize, (long)kvs2.size());
                    Assert.assertEquals((long)row2Size, (long)kvs1.size());
                } else {
                    Assert.assertEquals((long)rowSize, (long)kvs1.size());
                    Assert.assertEquals((long)row2Size, (long)kvs2.size());
                }
                swapped = !swapped;
                ArrayList<ArrayList<Cell>> kvs = new ArrayList<ArrayList<Cell>>();
                kvs.add(kvs1);
                kvs.add(kvs2);
                byte[][] rows = new byte[][]{this.row1, this.row2};
                for (int i = 0; i < kvs.size(); ++i) {
                    for (Cell kv : (List)kvs.get(i)) {
                        Delete d = new Delete(rows[i]);
                        KeyValue kvDelete = new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv.getTimestamp(), KeyValue.Type.Delete);
                        d.addDeleteMarker(kvDelete);
                        Put p = new Put(rows[1 - i]);
                        KeyValue kvAdd = new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), now, CellUtil.cloneValue(kv));
                        p.add(kvAdd);
                        mutations.add(d);
                        walEdit.add(kvDelete);
                        mutations.add(p);
                        walEdit.add(kvAdd);
                    }
                }
            }

            @Override
            public String getName() {
                return "swap";
            }

            @Override
            public IncrementCounterProcessorTestProtos.RowSwapProcessorRequest getRequestData() throws IOException {
                IncrementCounterProcessorTestProtos.RowSwapProcessorRequest.Builder builder = IncrementCounterProcessorTestProtos.RowSwapProcessorRequest.newBuilder();
                builder.setRow1(ByteStringer.wrap(this.row1));
                builder.setRow2(ByteStringer.wrap(this.row2));
                return builder.build();
            }

            @Override
            public void initialize(IncrementCounterProcessorTestProtos.RowSwapProcessorRequest msg) {
                this.row1 = msg.getRow1().toByteArray();
                this.row2 = msg.getRow2().toByteArray();
            }
        }

        public static class FriendsOfFriendsProcessor
        extends BaseRowProcessor<IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest, IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse> {
            byte[] row = null;
            byte[] person = null;
            final Set<String> result = new HashSet<String>();

            FriendsOfFriendsProcessor() {
            }

            FriendsOfFriendsProcessor(byte[] row, byte[] person) {
                this.row = row;
                this.person = person;
            }

            @Override
            public Collection<byte[]> getRowsToLock() {
                return Collections.singleton(this.row);
            }

            @Override
            public IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse getResult() {
                IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse.Builder builder = IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse.newBuilder();
                builder.addAllResult(this.result);
                return builder.build();
            }

            @Override
            public boolean readOnly() {
                return true;
            }

            @Override
            public void process(long now, HRegion region, List<Mutation> mutations, WALEdit walEdit) throws IOException {
                ArrayList<Cell> kvs = new ArrayList<Cell>();
                Scan scan = new Scan(this.row, this.row);
                scan.addColumn(FAM, this.person);
                RowProcessorEndpoint.doScan(region, scan, kvs);
                scan = new Scan(this.row, this.row);
                for (Cell kv : kvs) {
                    byte[] friends;
                    byte[] byArray = friends = CellUtil.cloneValue(kv);
                    int n = byArray.length;
                    for (int i = 0; i < n; ++i) {
                        byte f = byArray[i];
                        scan.addColumn(FAM, new byte[]{f});
                    }
                }
                RowProcessorEndpoint.doScan(region, scan, kvs);
                this.result.clear();
                for (Cell kv : kvs) {
                    for (byte b : CellUtil.cloneValue(kv)) {
                        this.result.add((char)b + "");
                    }
                }
            }

            @Override
            public IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
                IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest.Builder builder = IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest.newBuilder();
                builder.setPerson(ByteStringer.wrap(this.person));
                builder.setRow(ByteStringer.wrap(this.row));
                builder.addAllResult(this.result);
                IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest f = builder.build();
                return f;
            }

            @Override
            public void initialize(IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest request) throws IOException {
                this.person = request.getPerson().toByteArray();
                this.row = request.getRow().toByteArray();
                this.result.clear();
                this.result.addAll(request.getResultList());
            }
        }

        public static class IncrementCounterProcessor
        extends BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest, IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
            int counter = 0;
            byte[] row = new byte[0];

            IncrementCounterProcessor() {
            }

            IncrementCounterProcessor(byte[] row) {
                this.row = row;
            }

            @Override
            public Collection<byte[]> getRowsToLock() {
                return Collections.singleton(this.row);
            }

            @Override
            public IncrementCounterProcessorTestProtos.IncCounterProcessorResponse getResult() {
                IncrementCounterProcessorTestProtos.IncCounterProcessorResponse.Builder i = IncrementCounterProcessorTestProtos.IncCounterProcessorResponse.newBuilder();
                i.setResponse(this.counter);
                return i.build();
            }

            @Override
            public boolean readOnly() {
                return false;
            }

            @Override
            public void process(long now, HRegion region, List<Mutation> mutations, WALEdit walEdit) throws IOException {
                ArrayList<Cell> kvs = new ArrayList<Cell>();
                Scan scan = new Scan(this.row, this.row);
                scan.addColumn(FAM, COUNTER);
                RowProcessorEndpoint.doScan(region, scan, kvs);
                this.counter = kvs.size() == 0 ? 0 : Bytes.toInt(CellUtil.cloneValue((Cell)kvs.iterator().next()));
                Assert.assertEquals((long)expectedCounter, (long)this.counter);
                ++this.counter;
                expectedCounter = expectedCounter + 1;
                Put p = new Put(this.row);
                KeyValue kv = new KeyValue(this.row, FAM, COUNTER, now, Bytes.toBytes(this.counter));
                p.add(kv);
                mutations.add(p);
                walEdit.add(kv);
                KeyValue metaKv = new KeyValue(this.row, WALEdit.METAFAMILY, Bytes.toBytes("I just increment counter"), Bytes.toBytes(this.counter));
                walEdit.add(metaKv);
            }

            @Override
            public IncrementCounterProcessorTestProtos.IncCounterProcessorRequest getRequestData() throws IOException {
                IncrementCounterProcessorTestProtos.IncCounterProcessorRequest.Builder builder = IncrementCounterProcessorTestProtos.IncCounterProcessorRequest.newBuilder();
                builder.setCounter(this.counter);
                builder.setRow(ByteStringer.wrap(this.row));
                return builder.build();
            }

            @Override
            public void initialize(IncrementCounterProcessorTestProtos.IncCounterProcessorRequest msg) {
                this.row = msg.getRow().toByteArray();
                this.counter = msg.getCounter();
            }
        }
    }

    class SwapRowsRunner
    implements Runnable {
        SwapRowsRunner() {
        }

        @Override
        public void run() {
            try {
                TestRowProcessorEndpoint.this.swapRows(table);
            }
            catch (Throwable e) {
                TestRowProcessorEndpoint.this.failures.incrementAndGet();
                e.printStackTrace();
            }
        }
    }

    class IncrementRunner
    implements Runnable {
        IncrementRunner() {
        }

        @Override
        public void run() {
            try {
                TestRowProcessorEndpoint.this.incrementCounter(table);
            }
            catch (Throwable e) {
                TestRowProcessorEndpoint.this.failures.incrementAndGet();
                e.printStackTrace();
            }
        }
    }
}

