/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.impl;

import com.mapr.db.Table;
import com.mapr.db.TableDescriptor;
import com.mapr.db.TableSplitInternal;
import com.mapr.db.TabletInfo;
import com.mapr.db.impl.ConditionImpl;
import com.mapr.db.impl.IdCodec;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.db.impl.TableDescriptorImpl;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.MarlinDocumentStream;
import com.mapr.streams.impl.MarlinSplitterCore;
import com.mapr.streams.impl.admin.MarlinAdminImpl;
import com.mapr.streams.impl.admin.TopicFeedInfo;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.ojai.Document;
import org.ojai.DocumentConstants;
import org.ojai.DocumentStream;
import org.ojai.FieldPath;
import org.ojai.Value;
import org.ojai.store.DocumentMutation;
import org.ojai.store.DocumentStore;
import org.ojai.store.Query;
import org.ojai.store.QueryCondition;
import org.ojai.store.QueryResult;
import org.ojai.store.exceptions.MultiOpException;
import org.ojai.store.exceptions.StoreException;
import org.ojai.util.EmptyDocumentStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageStore
implements DocumentStore {
    static final Logger LOG = LoggerFactory.getLogger(MessageStore.class);
    Table table;
    List<QueryCondition> findConditions;
    int numParallelScans;
    long maxCacheMemory;
    static Marlinserver.MarlinInternalDefaults mdef = Marlinserver.MarlinInternalDefaults.getDefaultInstance();
    static String messageStartKey = String.format(mdef.getKeyPrefixFeedId() + mdef.getKeyFmtFeedId(), 0);
    static QueryCondition messageStartCondition = MapRDBImpl.newCondition().is(DocumentConstants.ID_FIELD, QueryCondition.Op.GREATER_OR_EQUAL, messageStartKey).build();
    static final FieldPath PRODUCER_FIELD = FieldPath.parseFrom((String)mdef.getFMsgsProd());
    static final FieldPath MSG_COUNT_FIELD = FieldPath.parseFrom((String)mdef.getFMsgsCount());

    private void commonInit(String streamPath, Configuration conf, Pattern topicRegex, List<String> topics) throws IOException {
        Admin admin = Streams.newAdmin(new Configuration());
        this.table = MapRDBImpl.getTable((String)streamPath);
        TableDescriptor desc = this.table.getTableDescriptor();
        assert (desc instanceof TableDescriptorImpl);
        if (!((TableDescriptorImpl)desc).isStream()) {
            throw new IOException("The path " + streamPath + " does not refer to a valid stream");
        }
        HashSet<String> topicSet = new HashSet<String>();
        MarlinAdminImpl madmin = (MarlinAdminImpl)admin;
        Map<String, List<TopicFeedInfo>> topicMap = madmin.listTopicsForStream(streamPath);
        if (topics != null && topics.size() > 0) {
            Collections.sort(topics);
        }
        block0: for (Map.Entry<String, List<TopicFeedInfo>> entry : topicMap.entrySet()) {
            if (topics != null && topics.size() > 0) {
                for (String topic : topics) {
                    if (topic == null) {
                        throw new IllegalArgumentException("NULL topic not allowed");
                    }
                    int cmp = topic.compareTo(entry.getKey());
                    if (cmp > 0) continue block0;
                    if (cmp != 0) continue;
                    topicSet.add(entry.getKey());
                }
                continue;
            }
            if (topicRegex != null) {
                Matcher m = topicRegex.matcher(entry.getKey());
                if (!m.matches()) continue;
                topicSet.add(entry.getKey());
                continue;
            }
            topicSet.add(entry.getKey());
        }
        ArrayList<TopicMeta> topicMetaList = new ArrayList<TopicMeta>();
        for (String topic : topicSet) {
            Marlinserver.MarlinTopicMetaEntry metaEntry = madmin.getTopicMetaEntry(streamPath + ":" + topic);
            if (metaEntry.getIsDeleted()) continue;
            LOG.debug("Topic: " + topic + ", feeds: " + metaEntry.getFeedIdsList());
            topicMetaList.add(new TopicMeta(topic, metaEntry.getFeedIdsList()));
        }
        admin.close();
        Collections.sort(topicMetaList);
        TabletInfo[] tabletInfoArray = this.table.getTabletInfos(messageStartCondition);
        List splits = MarlinSplitterCore.getMarlinSplits((String)this.table.getName(), (TabletInfo[])tabletInfoArray);
        this.findConditions = new ArrayList<QueryCondition>();
        int feedPrefixLength = mdef.getKeyPrefixFeedId().length() + mdef.getKeyWidthFeedId();
        block3: for (TableSplitInternal split : splits) {
            String splitStart = IdCodec.decodeString((byte[])split.getStartRow());
            String splitStop = IdCodec.decodeString((byte[])split.getStopRow());
            String feedStr = splitStart.substring(mdef.getKeyPrefixFeedId().length(), feedPrefixLength);
            int feedId = Integer.parseInt(feedStr, 16);
            LOG.debug("Split: [" + splitStart + ", " + splitStop + ")");
            LOG.debug("Feed Str: " + feedStr + ", id: " + feedId);
            LOG.debug("[" + feedPrefixLength + "] " + splitStart.length() + " " + splitStop.length());
            String splitStartTopic = null;
            String splitStopTopic = null;
            String splitFeedPrefix = splitStart.substring(0, feedPrefixLength);
            if (splitStart.length() != feedPrefixLength) {
                splitStartTopic = splitStart.substring(feedPrefixLength + 1, splitStart.length() - 1);
                LOG.debug("Split start topic = " + splitStartTopic);
            }
            if (splitStop.length() != feedPrefixLength && splitStop.length() != 0) {
                splitStopTopic = splitStop.substring(feedPrefixLength + 1, splitStop.length() - 1);
                LOG.debug("Split stop topic = " + splitStopTopic);
            }
            for (TopicMeta topicMeta : topicMetaList) {
                if (splitStartTopic != null && topicMeta.getTopic().compareTo(splitStartTopic) < 0) continue;
                if (splitStopTopic != null && topicMeta.getTopic().compareTo(splitStopTopic) >= 0) continue block3;
                if (!topicMeta.hasFeedId(feedId)) continue;
                String topicStartKey = new String(splitFeedPrefix + mdef.getKeyPrefixTopicName() + topicMeta.getTopic() + ":");
                String topicStopKey = new String(splitFeedPrefix + mdef.getKeyPrefixTopicName() + topicMeta.getTopic() + ";");
                ConditionImpl findCondition = MapRDBImpl.newCondition().and().is(DocumentConstants.ID_FIELD, QueryCondition.Op.GREATER_OR_EQUAL, topicStartKey).is(DocumentConstants.ID_FIELD, QueryCondition.Op.LESS, topicStopKey).close().build();
                LOG.debug("Find condition: " + findCondition);
                this.findConditions.add((QueryCondition)findCondition);
            }
        }
        int maxParallelScans = conf.getInt("streams.analytics.max_scanner_threads", 16);
        this.numParallelScans = this.findConditions.size() < maxParallelScans ? this.findConditions.size() : maxParallelScans;
        this.maxCacheMemory = conf.getLong("streams.analytics.cache_memory", 0x6400000L);
    }

    public MessageStore(String streamPathInMapRFS, Configuration conf, String ... topics) throws IOException {
        ArrayList<String> l = new ArrayList<String>();
        if (topics != null) {
            for (String topic : topics) {
                l.add(topic);
            }
        }
        this.commonInit(streamPathInMapRFS, conf, null, l);
    }

    public MessageStore(String streamPathInMapRFS, Configuration conf, Pattern topicRegex) throws IOException {
        this.commonInit(streamPathInMapRFS, conf, topicRegex, null);
    }

    public int getNumSplits() {
        return this.findConditions.size();
    }

    public boolean isReadOnly() {
        return true;
    }

    public void flush() throws StoreException {
        throw new StoreException("flush() operation not supported on read-only DocumentStore");
    }

    public DocumentStream find() throws StoreException {
        ArrayList<DocumentStream> dbDocumentStreams = new ArrayList<DocumentStream>();
        for (QueryCondition cond : this.findConditions) {
            dbDocumentStreams.add(this.table.find(cond));
        }
        MarlinDocumentStream mds = new MarlinDocumentStream(dbDocumentStreams, null, this.numParallelScans, this.maxCacheMemory);
        if (mds.accessExceptionOccurred) {
            return new EmptyDocumentStream();
        }
        if (mds.exception != null) {
            throw mds.exception;
        }
        return mds;
    }

    public DocumentStream find(String ... paths) throws StoreException {
        ArrayList<FieldPath> ps = new ArrayList<FieldPath>();
        for (String path : paths) {
            ps.add(FieldPath.parseFrom((String)path));
        }
        return this.scan(ps);
    }

    public DocumentStream find(FieldPath ... paths) throws StoreException {
        ArrayList<FieldPath> ps = new ArrayList<FieldPath>();
        for (FieldPath path : paths) {
            ps.add(path);
        }
        return this.scan(ps);
    }

    private DocumentStream scan(List<FieldPath> paths) throws StoreException {
        ArrayList<FieldPath> jsonPaths = new ArrayList<FieldPath>();
        boolean projectProducer = false;
        boolean projectKey = false;
        boolean projectValue = false;
        for (FieldPath fieldPath : paths) {
            if (fieldPath == null) {
                throw new IllegalArgumentException("NULL projection path provided");
            }
            if (fieldPath.asPathString().equals("producer")) {
                projectProducer = true;
                continue;
            }
            if (fieldPath.asPathString().equals("key")) {
                projectKey = true;
                continue;
            }
            if (!fieldPath.asPathString().equals("value")) continue;
            projectValue = true;
        }
        if (projectProducer && !projectKey && !projectValue) {
            jsonPaths.add(PRODUCER_FIELD);
            jsonPaths.add(MSG_COUNT_FIELD);
        }
        if (!(projectProducer || projectKey || projectValue)) {
            jsonPaths.add(DocumentConstants.ID_FIELD);
            jsonPaths.add(MSG_COUNT_FIELD);
        }
        for (FieldPath fieldPath : jsonPaths) {
            LOG.debug("JSON DB projection: " + fieldPath);
        }
        ArrayList<DocumentStream> dbDocumentStreams = new ArrayList<DocumentStream>();
        for (QueryCondition cond : this.findConditions) {
            if (jsonPaths.size() > 0) {
                dbDocumentStreams.add(this.table.find(cond, jsonPaths.toArray(new FieldPath[jsonPaths.size()])));
                continue;
            }
            dbDocumentStreams.add(this.table.find(cond));
        }
        MarlinDocumentStream marlinDocumentStream = new MarlinDocumentStream(dbDocumentStreams, paths, this.numParallelScans, this.maxCacheMemory);
        if (marlinDocumentStream.accessExceptionOccurred) {
            return new EmptyDocumentStream();
        }
        if (marlinDocumentStream.exception != null) {
            throw marlinDocumentStream.exception;
        }
        return marlinDocumentStream;
    }

    public DocumentStream find(QueryCondition c) throws StoreException {
        throw new StoreException("scan(QueryCondition) operation not yet supported");
    }

    public DocumentStream find(QueryCondition c, String ... paths) throws StoreException {
        throw new StoreException("scan(QueryCondition, String...) operation not yet supported");
    }

    public DocumentStream find(QueryCondition c, FieldPath ... paths) throws StoreException {
        throw new StoreException("scan(QueryCondition, FieldPath...) operation not yet supported");
    }

    public QueryResult find(Query query) throws StoreException {
        throw new StoreException("find(Query query) operation not yet supported");
    }

    public DocumentStream findQuery(Query query) throws StoreException {
        throw new StoreException("findQuery(Query query) operation not yet supported");
    }

    public DocumentStream findQuery(String queryJSON) throws StoreException {
        throw new StoreException("findQuery(String queryJSON) operation not yet supported");
    }

    public void close() throws StoreException {
        this.table.close();
    }

    public void insertOrReplace(Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insertOrReplace(Value _id, Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insertOrReplace(Document doc, FieldPath fieldAsKey) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insertOrReplace(Document doc, String fieldAsKey) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insertOrReplace(DocumentStream stream) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void insertOrReplace(DocumentStream stream, FieldPath fieldAsKey) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void insertOrReplace(DocumentStream stream, String fieldAsKey) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void update(Value _id, DocumentMutation m) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void delete(Value _id) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void delete(Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void delete(Document doc, FieldPath fieldAsKey) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void delete(Document doc, String fieldAsKey) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void delete(DocumentStream stream) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void delete(DocumentStream stream, FieldPath fieldAsKey) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void delete(DocumentStream stream, String fieldAsKey) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void insert(Value _id, Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insert(Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insert(Document doc, FieldPath fieldAsKey) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insert(Document doc, String fieldAsKey) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insert(DocumentStream stream) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void insert(DocumentStream stream, FieldPath fieldAsKey) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void insert(DocumentStream stream, String fieldAsKey) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void replace(Value _id, Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void replace(Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void replace(Document doc, FieldPath fieldAsKey) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void replace(Document doc, String fieldAsKey) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void replace(DocumentStream stream) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void replace(DocumentStream stream, FieldPath fieldAsKey) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void replace(DocumentStream stream, String fieldAsKey) throws MultiOpException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(Value _id, String field, byte inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(Value _id, String field, short inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(Value _id, String field, int inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(Value _id, String field, long inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(Value _id, String field, float inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(Value _id, String field, double inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(Value _id, String field, BigDecimal inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public boolean checkAndMutate(Value _id, QueryCondition condition, DocumentMutation m) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public boolean checkAndDelete(Value _id, QueryCondition condition) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public boolean checkAndReplace(Value _id, QueryCondition condition, Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public String endTrackingWrites() throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void beginTrackingWrites() throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void beginTrackingWrites(String previousContext) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void clearTrackedWrites() throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insertOrReplace(String _id, Document r) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void update(String _id, DocumentMutation mutation) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void delete(String _id) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void insert(String _id, Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void replace(String _id, Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(String _id, String field, byte inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(String _id, String field, short inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(String _id, String field, int inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(String _id, String field, long inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(String _id, String field, float inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(String _id, String field, double inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public void increment(String _id, String field, BigDecimal inc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public boolean checkAndMutate(String _id, QueryCondition condition, DocumentMutation mutation) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public boolean checkAndDelete(String _id, QueryCondition condition) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    public boolean checkAndReplace(String _id, QueryCondition condition, Document doc) throws StoreException {
        throw MessageStore.readOnlyStore();
    }

    private static StoreException readOnlyStore() {
        return new StoreException("Operation not supported on read-only DocumentStore");
    }

    private static StoreException unsupportedOperation() {
        return new StoreException("Operation not supported on read-only DocumentStore");
    }

    public Document findById(String _id) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(Value _id) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(String _id, String ... fieldPaths) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(String _id, FieldPath ... fieldPaths) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(Value _id, String ... fieldPaths) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(Value _id, FieldPath ... fieldPaths) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(String _id, QueryCondition condition) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(Value _id, QueryCondition condition) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(String _id, QueryCondition c, String ... fieldPaths) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(String _id, QueryCondition condition, FieldPath ... fieldPaths) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(Value _id, QueryCondition condition, String ... fieldPaths) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    public Document findById(Value _id, QueryCondition condition, FieldPath ... fieldPaths) throws StoreException {
        throw MessageStore.unsupportedOperation();
    }

    private class TopicMeta
    implements Comparable<TopicMeta> {
        String topic;
        List<Integer> feedIds;

        public TopicMeta(String t, List<Integer> feeds) {
            this.topic = t;
            this.feedIds = feeds;
        }

        public String getTopic() {
            return this.topic;
        }

        public boolean hasFeedId(int id) {
            for (Integer feedId : this.feedIds) {
                if (feedId != id) continue;
                return true;
            }
            return false;
        }

        @Override
        public int compareTo(TopicMeta compareMeta) {
            int cmp = this.topic.compareTo(compareMeta.getTopic());
            return cmp;
        }
    }
}

