package org.apache.hadoop.hdfs.server.federation.store.driver.impl;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreFileBaseImpl.class */
public abstract class StateStoreFileBaseImpl extends StateStoreSerializableImpl {
    private static final String TMP_MARK = ".tmp";
    private boolean initialized = false;
    private ExecutorService concurrentStoreAccessPool;
    private static final Logger LOG = LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
    private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10);
    private static final Pattern OLD_TMP_RECORD_PATTERN = Pattern.compile(".+\\.(\\d+)\\.tmp");

    protected abstract <T extends BaseRecord> BufferedReader getReader(String str);

    @VisibleForTesting
    public abstract <T extends BaseRecord> BufferedWriter getWriter(String str);

    protected abstract boolean exists(String str);

    protected abstract boolean mkdir(String str);

    protected abstract boolean rename(String str, String str2);

    protected abstract boolean remove(String str);

    protected abstract List<String> getChildren(String str);

    protected abstract String getRootDir();

    protected abstract int getConcurrentFilesAccessNumThreads();

    public void setInitialized(boolean z) {
        this.initialized = z;
    }

    @Override // org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
    public boolean initDriver() {
        String rootDir = getRootDir();
        try {
            if (rootDir == null) {
                LOG.error("Invalid root directory, unable to initialize driver.");
                return false;
            }
            if (!exists(rootDir) && !mkdir(rootDir)) {
                LOG.error("Cannot create State Store root directory {}", rootDir);
                return false;
            }
            setInitialized(true);
            int concurrentFilesAccessNumThreads = getConcurrentFilesAccessNumThreads();
            if (concurrentFilesAccessNumThreads <= 1) {
                LOG.info("File based state store will be accessed serially");
                return true;
            }
            this.concurrentStoreAccessPool = new ThreadPoolExecutor(concurrentFilesAccessNumThreads, concurrentFilesAccessNumThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("state-store-file-based-concurrent-%d").setDaemon(true).build());
            LOG.info("File based state store will be accessed concurrently with {} max threads", Integer.valueOf(concurrentFilesAccessNumThreads));
            return true;
        } catch (Exception e) {
            LOG.error("Cannot initialize filesystem using root directory {}", rootDir, e);
            return false;
        }
    }

    @Override // org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
    public void close() throws Exception {
        if (this.concurrentStoreAccessPool != null) {
            this.concurrentStoreAccessPool.shutdown();
            LOG.info("Concurrent store access pool is terminated: {}", Boolean.valueOf(this.concurrentStoreAccessPool.awaitTermination(5L, TimeUnit.SECONDS)));
            this.concurrentStoreAccessPool = null;
        }
    }

    @Override // org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
    public <T extends BaseRecord> boolean initRecordStorage(String str, Class<T> cls) {
        String str2 = getRootDir() + "/" + str;
        try {
            if (exists(str2)) {
                return true;
            }
            LOG.info("{} data directory doesn't exist, creating it", str2);
            if (mkdir(str2)) {
                return true;
            }
            LOG.error("Cannot create data directory {}", str2);
            return false;
        } catch (Exception e) {
            LOG.error("Cannot create data directory {}", str2, e);
            return false;
        }
    }

    @Override // org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreRecordOperations
    public <T extends BaseRecord> QueryResult<T> get(Class<T> cls) throws IOException {
        verifyDriverReady();
        long monotonicNow = Time.monotonicNow();
        StateStoreMetrics metrics = getMetrics();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        try {
            String pathForClass = getPathForClass(cls);
            List<String> children = getChildren(pathForClass);
            ArrayList arrayList = new ArrayList();
            children.forEach(str -> {
                arrayList.add(() -> {
                    return getRecordsFromFileAndRemoveOldTmpRecords(cls, synchronizedList, pathForClass, str);
                });
            });
            if (this.concurrentStoreAccessPool != null) {
                Iterator it = this.concurrentStoreAccessPool.invokeAll(arrayList).iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get();
                }
            } else {
                arrayList.forEach(callable -> {
                    try {
                        callable.call();
                    } catch (Exception e) {
                        LOG.error("Failed to retrieve record using file operations.", e);
                        throw new RuntimeException(e);
                    }
                });
            }
            if (metrics != null) {
                metrics.addRead(Time.monotonicNow() - monotonicNow);
            }
            return new QueryResult<>(synchronizedList, getTime());
        } catch (Exception e) {
            if (metrics != null) {
                metrics.addFailure(Time.monotonicNow() - monotonicNow);
            }
            String str2 = "Cannot fetch records for " + cls.getSimpleName();
            LOG.error(str2, e);
            throw new IOException(str2, e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends BaseRecord> Void getRecordsFromFileAndRemoveOldTmpRecords(Class<T> cls, List<T> list, String str, String str2) throws IOException {
        String str3 = str + "/" + str2;
        if (!str2.endsWith(TMP_MARK)) {
            list.add(getRecord(str3, cls));
            return null;
        }
        LOG.debug("There is a temporary file {} in {}", str2, str);
        if (!isOldTempRecord(str2)) {
            return null;
        }
        LOG.warn("Removing {} as it's an old temporary record", str2);
        remove(str3);
        return null;
    }

    @VisibleForTesting
    public static boolean isOldTempRecord(String str) {
        if (!str.endsWith(TMP_MARK)) {
            return false;
        }
        Matcher matcher = OLD_TMP_RECORD_PATTERN.matcher(str);
        if (matcher.find()) {
            return Time.now() - Long.parseLong(matcher.group(1)) > OLD_TMP_RECORD_MS;
        }
        return false;
    }

    private <T extends BaseRecord> T getRecord(String str, Class<T> cls) throws IOException {
        BufferedReader reader = getReader(str);
        while (true) {
            try {
                String readLine = reader.readLine();
                if (readLine == null) {
                    if (reader != null) {
                        reader.close();
                    }
                    throw new IOException("Cannot read " + str + " for record " + cls.getSimpleName());
                }
                if (!readLine.startsWith("#") && readLine.length() > 0) {
                    try {
                        T t = (T) newRecord(readLine, cls, false);
                        if (reader != null) {
                            reader.close();
                        }
                        return t;
                    } catch (Exception e) {
                        LOG.error("Cannot parse line {} in file {}", new Object[]{readLine, str, e});
                    }
                }
            } catch (Throwable th) {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    private <T extends BaseRecord> String getPathForClass(Class<T> cls) {
        String recordName = StateStoreUtils.getRecordName(cls);
        StringBuilder sb = new StringBuilder();
        sb.append(getRootDir());
        if (sb.charAt(sb.length() - 1) != '/') {
            sb.append("/");
        }
        sb.append(recordName);
        return sb.toString();
    }

    @Override // org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver
    public boolean isDriverReady() {
        return this.initialized;
    }

    @Override // org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreRecordOperations
    public <T extends BaseRecord> StateStoreOperationResult putAll(List<T> list, boolean z, boolean z2) throws StateStoreUnavailableException {
        verifyDriverReady();
        if (list.isEmpty()) {
            return StateStoreOperationResult.getDefaultSuccessResult();
        }
        long monotonicNow = Time.monotonicNow();
        StateStoreMetrics metrics = getMetrics();
        HashMap hashMap = new HashMap();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        for (T t : list) {
            String pathForClass = getPathForClass(t.getClass());
            String primaryKey = getPrimaryKey(t);
            String str = pathForClass + "/" + primaryKey;
            if (!exists(str)) {
                hashMap.put(str, t);
            } else if (z) {
                t.setDateModified(getTime());
                hashMap.put(str, t);
            } else if (z2) {
                LOG.error("Attempt to insert record {} that already exists", str);
                synchronizedList.add(getOriginalPrimaryKey(primaryKey));
                atomicBoolean.set(false);
            } else {
                LOG.debug("Not updating {}", t);
            }
        }
        ArrayList arrayList = new ArrayList();
        hashMap.entrySet().forEach(entry -> {
            arrayList.add(() -> {
                return writeRecordToFile(atomicBoolean, entry, synchronizedList);
            });
        });
        if (this.concurrentStoreAccessPool != null) {
            List list2 = null;
            try {
                list2 = this.concurrentStoreAccessPool.invokeAll(arrayList);
            } catch (InterruptedException e) {
                atomicBoolean.set(false);
                LOG.error("Failed to put record concurrently.", e);
            }
            if (list2 != null) {
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    try {
                        ((Future) it.next()).get();
                    } catch (InterruptedException | ExecutionException e2) {
                        atomicBoolean.set(false);
                        LOG.error("Failed to retrieve results from concurrent record put runs.", e2);
                    }
                }
            }
        } else {
            arrayList.forEach(callable -> {
                try {
                    callable.call();
                } catch (Exception e3) {
                    atomicBoolean.set(false);
                    LOG.error("Failed to put record.", e3);
                }
            });
        }
        long monotonicNow2 = Time.monotonicNow();
        if (metrics != null) {
            if (atomicBoolean.get()) {
                metrics.addWrite(monotonicNow2 - monotonicNow);
            } else {
                metrics.addFailure(monotonicNow2 - monotonicNow);
            }
        }
        return new StateStoreOperationResult(synchronizedList, atomicBoolean.get());
    }

    private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean atomicBoolean, Map.Entry<String, T> entry, List<String> list) {
        String key = entry.getKey();
        T value = entry.getValue();
        String primaryKey = getPrimaryKey(value);
        String str = key + "." + Time.now() + TMP_MARK;
        boolean z = true;
        try {
            BufferedWriter writer = getWriter(str);
            try {
                writer.write(serializeString(value));
                if (writer != null) {
                    writer.close();
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.error("Cannot write {}", str, e);
            z = false;
            list.add(getOriginalPrimaryKey(primaryKey));
            atomicBoolean.set(false);
        }
        if (!z || rename(str, key)) {
            return null;
        }
        LOG.error("Failed committing record into {}", key);
        list.add(getOriginalPrimaryKey(primaryKey));
        atomicBoolean.set(false);
        return null;
    }

    @Override // org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreRecordOperations
    public <T extends BaseRecord> int remove(Class<T> cls, Query<T> query) throws StateStoreUnavailableException {
        verifyDriverReady();
        if (query == null) {
            return 0;
        }
        long monotonicNow = Time.monotonicNow();
        StateStoreMetrics metrics = getMetrics();
        int i = 0;
        try {
            boolean z = true;
            Iterator it = StateStoreUtils.filterMultiple(query, get(cls).getRecords()).iterator();
            while (it.hasNext()) {
                String str = getPathForClass(cls) + "/" + getPrimaryKey((BaseRecord) it.next());
                if (remove(str)) {
                    i++;
                } else {
                    LOG.error("Cannot remove record {}", str);
                    z = false;
                }
            }
            if (!z) {
                LOG.error("Cannot remove records {} query {}", cls, query);
                if (metrics != null) {
                    metrics.addFailure(Time.monotonicNow() - monotonicNow);
                }
            }
        } catch (IOException e) {
            LOG.error("Cannot remove records {} query {}", new Object[]{cls, query, e});
            if (metrics != null) {
                metrics.addFailure(Time.monotonicNow() - monotonicNow);
            }
        }
        if (i > 0 && metrics != null) {
            metrics.addRemove(Time.monotonicNow() - monotonicNow);
        }
        return i;
    }

    @Override // org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreRecordOperations
    public <T extends BaseRecord> boolean removeAll(Class<T> cls) throws StateStoreUnavailableException {
        verifyDriverReady();
        long monotonicNow = Time.monotonicNow();
        StateStoreMetrics metrics = getMetrics();
        boolean z = true;
        String pathForClass = getPathForClass(cls);
        Iterator<String> it = getChildren(pathForClass).iterator();
        while (it.hasNext()) {
            if (!remove(pathForClass + "/" + it.next())) {
                z = false;
            }
        }
        if (metrics != null) {
            long monotonicNow2 = Time.monotonicNow() - monotonicNow;
            if (z) {
                metrics.addRemove(monotonicNow2);
            } else {
                metrics.addFailure(monotonicNow2);
            }
        }
        return z;
    }
}
