/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.util;

import java.io.IOException;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.MultiThreadedAction;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;

public abstract class MultiThreadedWriterBase
extends MultiThreadedAction {
    private static final Log LOG = LogFactory.getLog(MultiThreadedWriterBase.class);
    protected BlockingQueue<Long> wroteKeys;
    protected AtomicLong nextKeyToWrite = new AtomicLong();
    protected AtomicLong wroteUpToKey = new AtomicLong();
    protected Set<Long> failedKeySet = new ConcurrentSkipListSet<Long>();
    protected AtomicLong wroteKeyQueueSize = new AtomicLong();
    protected boolean trackWroteKeys;

    public MultiThreadedWriterBase(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, String actionLetter) throws IOException {
        super(dataGen, conf, tableName, actionLetter);
        this.wroteKeys = this.createWriteKeysQueue(conf);
    }

    protected BlockingQueue<Long> createWriteKeysQueue(Configuration conf) {
        return new ArrayBlockingQueue<Long>(10000);
    }

    @Override
    public void start(long startKey, long endKey, int numThreads) throws IOException {
        super.start(startKey, endKey, numThreads);
        this.nextKeyToWrite.set(startKey);
        this.wroteUpToKey.set(startKey - 1L);
        if (this.trackWroteKeys) {
            new Thread((Runnable)new WroteKeysTracker(), "MultiThreadedWriterBase-WroteKeysTracker-" + System.currentTimeMillis()).start();
            this.numThreadsWorking.incrementAndGet();
        }
    }

    protected String getRegionDebugInfoSafe(Table table, byte[] rowKey) {
        HRegionLocation cached = null;
        HRegionLocation real = null;
        try {
            cached = this.connection.getRegionLocation(this.tableName, rowKey, false);
            real = this.connection.getRegionLocation(this.tableName, rowKey, true);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        String result = "no information can be obtained";
        if (cached != null) {
            result = "cached: " + cached.toString();
        }
        if (real != null && real.getServerName() != null) {
            if (cached != null && cached.getServerName() != null && real.equals(cached)) {
                result = result + "; cache is up to date";
            } else {
                result = cached != null ? result + "; " : "";
                result = result + "real: " + real.toString();
            }
        }
        return result;
    }

    public int getNumWriteFailures() {
        return this.failedKeySet.size();
    }

    public long wroteUpToKey() {
        return this.wroteUpToKey.get();
    }

    public boolean failedToWriteKey(long k) {
        return this.failedKeySet.contains(k);
    }

    @Override
    protected String progressInfo() {
        StringBuilder sb = new StringBuilder();
        MultiThreadedWriterBase.appendToStatus(sb, "wroteUpTo", this.wroteUpToKey.get());
        MultiThreadedWriterBase.appendToStatus(sb, "wroteQSize", this.wroteKeyQueueSize.get());
        return sb.toString();
    }

    public void setTrackWroteKeys(boolean enable) {
        this.trackWroteKeys = enable;
    }

    private class WroteKeysTracker
    implements Runnable {
        private WroteKeysTracker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Thread.currentThread().setName(this.getClass().getSimpleName());
            try {
                long expectedKey = MultiThreadedWriterBase.this.startKey;
                PriorityQueue<Long> sortedKeys = new PriorityQueue<Long>();
                while (expectedKey < MultiThreadedWriterBase.this.endKey) {
                    Long k;
                    try {
                        k = MultiThreadedWriterBase.this.wroteKeys.poll(1L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        LOG.info((Object)"Inserted key tracker thread interrupted", (Throwable)e);
                        break;
                    }
                    if (k == null) continue;
                    if (k == expectedKey) {
                        MultiThreadedWriterBase.this.wroteUpToKey.set(k);
                        ++expectedKey;
                    } else {
                        sortedKeys.add(k);
                    }
                    while (!sortedKeys.isEmpty() && (k = (Long)sortedKeys.peek()) == expectedKey) {
                        sortedKeys.poll();
                        MultiThreadedWriterBase.this.wroteUpToKey.set(k);
                        ++expectedKey;
                    }
                    MultiThreadedWriterBase.this.wroteKeyQueueSize.set(MultiThreadedWriterBase.this.wroteKeys.size() + sortedKeys.size());
                }
            }
            catch (Exception ex) {
                LOG.error((Object)"Error in inserted/updaed key tracker", (Throwable)ex);
            }
            finally {
                MultiThreadedWriterBase.this.numThreadsWorking.decrementAndGet();
            }
        }
    }
}

