/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.procedure2.store.wal;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.AbstractHBaseTool;

public class ProcedureWALPerformanceEvaluation
extends AbstractHBaseTool {
    protected static final HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility();
    public static int DEFAULT_NUM_THREADS = 20;
    public static Option NUM_THREADS_OPTION = new Option("threads", true, "Number of parallel threads which will write insert/updates/deletes to WAL. Default: " + DEFAULT_NUM_THREADS);
    public static int DEFAULT_NUM_PROCS = 1000000;
    public static Option NUM_PROCS_OPTION = new Option("procs", true, "Total number of procedures. Each procedure writes one insert and one update. Default: " + DEFAULT_NUM_PROCS);
    public static int DEFAULT_NUM_WALS = 0;
    public static Option NUM_WALS_OPTION = new Option("wals", true, "Number of WALs to write. If -ve or 0, uses hbase.procedure.store.wal.roll.threshold conf to roll the logs. Default: " + DEFAULT_NUM_WALS);
    public static int DEFAULT_STATE_SIZE = 1024;
    public static Option STATE_SIZE_OPTION = new Option("size", true, "Size of serialized state in bytes to write on update. Default: " + DEFAULT_STATE_SIZE + "bytes");
    public static Option SYNC_OPTION = new Option("sync", true, "Type of sync to use when writing WAL contents to file system. Accepted values: hflush, hsync, nosync. Default: hflush");
    public static String DEFAULT_SYNC_OPTION = "hflush";
    public int numThreads;
    public long numProcs;
    public long numProcsPerWal = Long.MAX_VALUE;
    public int numWals;
    public String syncType;
    public int stateSize;
    static byte[] serializedState;
    private WALProcedureStore store;
    private AtomicLong procIds = new AtomicLong(0L);
    private AtomicBoolean workersFailed = new AtomicBoolean(false);
    private static final int WORKER_THREADS_TIMEOUT_SEC = 600;

    private void setupConf() {
        this.conf.setBoolean("hbase.procedure.store.wal.use.hsync", "hsync".equals(this.syncType));
        if (this.numWals > 0) {
            this.conf.setLong("hbase.procedure.store.wal.roll.threshold", Long.MAX_VALUE);
            this.numProcsPerWal = this.numProcs / (long)this.numWals;
        }
    }

    private void setupProcedureStore() throws IOException {
        Path testDir = UTIL.getDataTestDir();
        FileSystem fs = testDir.getFileSystem(this.conf);
        Path logDir = new Path(testDir, "proc-logs");
        System.out.println("Logs directory : " + logDir.toString());
        fs.delete(logDir, true);
        this.store = "nosync".equals(this.syncType) ? new NoSyncWalProcedureStore(this.conf, fs, logDir) : ProcedureTestingUtility.createWalStore(this.conf, fs, logDir);
        this.store.start(this.numThreads);
        this.store.recoverLease();
        this.store.load((ProcedureStore.ProcedureLoader)new ProcedureTestingUtility.LoadCounter());
        System.out.println("Starting new log : " + this.store.getActiveLogs().get(this.store.getActiveLogs().size() - 1));
    }

    private void tearDownProcedureStore() {
        this.store.stop(false);
        try {
            this.store.getFileSystem().delete(this.store.getWALDir(), true);
        }
        catch (IOException e) {
            System.err.println("Error: Couldn't delete log dir. You can delete it manually to free up disk space. Location: " + this.store.getWALDir().toString());
            e.printStackTrace();
        }
    }

    public void processOptions(CommandLine cmd) {
        this.numThreads = this.getOptionAsInt(cmd, NUM_THREADS_OPTION.getOpt(), DEFAULT_NUM_THREADS);
        this.numProcs = this.getOptionAsInt(cmd, NUM_PROCS_OPTION.getOpt(), DEFAULT_NUM_PROCS);
        this.numWals = this.getOptionAsInt(cmd, NUM_WALS_OPTION.getOpt(), DEFAULT_NUM_WALS);
        this.syncType = cmd.getOptionValue(SYNC_OPTION.getOpt(), DEFAULT_SYNC_OPTION);
        assert ("hsync".equals(this.syncType) || "hflush".equals(this.syncType) || "nosync".equals(this.syncType)) : "sync argument can only accept one of these three values: hsync, hflush, nosync";
        this.stateSize = this.getOptionAsInt(cmd, STATE_SIZE_OPTION.getOpt(), DEFAULT_STATE_SIZE);
        serializedState = new byte[this.stateSize];
        this.setupConf();
    }

    public void addOptions() {
        this.addOption(NUM_THREADS_OPTION);
        this.addOption(NUM_PROCS_OPTION);
        this.addOption(NUM_WALS_OPTION);
        this.addOption(SYNC_OPTION);
        this.addOption(STATE_SIZE_OPTION);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int doWork() {
        try {
            this.setupProcedureStore();
            ExecutorService executor = Executors.newFixedThreadPool(this.numThreads);
            Future[] futures = new Future[this.numThreads];
            long start = System.currentTimeMillis();
            for (int i = 0; i < this.numThreads; ++i) {
                futures[i] = executor.submit(new Worker(start));
            }
            boolean failure = false;
            try {
                for (Future future : futures) {
                    long timeout = start + 600000L - System.currentTimeMillis();
                    failure |= future.get(timeout, TimeUnit.MILLISECONDS).equals(1);
                }
            }
            catch (Exception e) {
                System.err.println("Exception in worker thread.");
                e.printStackTrace();
                int n = 1;
                this.tearDownProcedureStore();
                return n;
            }
            executor.shutdown();
            if (failure) {
                int e = 1;
                return e;
            }
            long timeTaken = System.currentTimeMillis() - start;
            System.out.println("******************************************");
            System.out.println("Num threads    : " + this.numThreads);
            System.out.println("Num procedures : " + this.numProcs);
            System.out.println("Sync type      : " + this.syncType);
            System.out.println("Time taken     : " + (float)timeTaken / 1000.0f + "sec");
            System.out.println("******************************************");
            int n = 0;
            return n;
        }
        catch (IOException e) {
            e.printStackTrace();
            int n = 1;
            return n;
        }
        finally {
            this.tearDownProcedureStore();
        }
    }

    public static void main(String[] args) throws IOException {
        ProcedureWALPerformanceEvaluation tool = new ProcedureWALPerformanceEvaluation();
        tool.setConf(UTIL.getConfiguration());
        tool.run(args);
    }

    public static class NoSyncWalProcedureStore
    extends WALProcedureStore {
        public NoSyncWalProcedureStore(Configuration conf, FileSystem fs, Path logDir) {
            super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery(){

                public void recoverFileLease(FileSystem fs, Path path) throws IOException {
                }
            });
        }

        protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) throws IOException {
            long totalSynced = 0L;
            for (int i = 0; i < count; ++i) {
                totalSynced += (long)slots[offset + i].size();
            }
            return totalSynced;
        }
    }

    class Worker
    implements Callable<Integer> {
        final long start;

        public Worker(long start) {
            this.start = start;
        }

        @Override
        public Integer call() throws IOException {
            while (true) {
                if (ProcedureWALPerformanceEvaluation.this.workersFailed.get()) {
                    return 1;
                }
                long procId = ProcedureWALPerformanceEvaluation.this.procIds.getAndIncrement();
                if (procId >= ProcedureWALPerformanceEvaluation.this.numProcs) break;
                if (procId != 0L && procId % 10000L == 0L) {
                    long ms = System.currentTimeMillis() - this.start;
                    System.out.println("Wrote " + procId + " procedures in " + StringUtils.humanTimeDiff((long)ms));
                }
                try {
                    if (procId > 0L && procId % ProcedureWALPerformanceEvaluation.this.numProcsPerWal == 0L) {
                        ProcedureWALPerformanceEvaluation.this.store.rollWriterForTesting();
                        System.out.println("Starting new log : " + ProcedureWALPerformanceEvaluation.this.store.getActiveLogs().get(ProcedureWALPerformanceEvaluation.this.store.getActiveLogs().size() - 1));
                    }
                }
                catch (IOException ioe) {
                    ProcedureWALPerformanceEvaluation.this.workersFailed.set(true);
                    System.err.println("Exception when rolling log file. Current procId = " + procId);
                    ioe.printStackTrace();
                    return 1;
                }
                ProcedureTestingUtility.TestProcedure proc = new ProcedureTestingUtility.TestProcedure(procId);
                proc.setData(serializedState);
                ProcedureWALPerformanceEvaluation.this.store.insert((Procedure)proc, null);
                ProcedureWALPerformanceEvaluation.this.store.update((Procedure)proc);
            }
            return 0;
        }
    }
}

