/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.hbase;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.HBaseClientService;
import org.apache.nifi.hbase.VisibilityFetchSupport;
import org.apache.nifi.hbase.io.JsonRowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
import org.apache.nifi.hbase.util.ObjectSerDe;
import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.scheduling.SchedulingStrategy;

@TriggerWhenEmpty
@TriggerSerially
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"hbase", "get", "ingest"})
@CapabilityDescription(value="This Processor polls HBase for any records in the specified table. The processor keeps track of the timestamp of the cells that it receives, so that as new records are pushed to HBase, they will automatically be pulled. Each record is output in JSON format, as {\"row\": \"<row key>\", \"cells\": { \"<column 1 family>:<column 1 qualifier>\": \"<cell 1 value>\", \"<column 2 family>:<column 2 qualifier>\": \"<cell 2 value>\", ... }}. For each record received, a Provenance RECEIVE event is emitted with the format hbase://<table name>/<row key>, where <row key> is the UTF-8 encoded value of the row's key.")
@WritesAttributes(value={@WritesAttribute(attribute="hbase.table", description="The name of the HBase table that the data was pulled from"), @WritesAttribute(attribute="mime.type", description="Set to application/json to indicate that output is JSON")})
@Stateful(scopes={Scope.CLUSTER}, description="After performing a fetching from HBase, stores a timestamp of the last-modified cell that was found. In addition, it stores the ID of the row(s) and the value of each cell that has that timestamp as its modification date. This is stored across the cluster and allows the next fetch to avoid duplicating data, even if this Processor is run on Primary Node only and the Primary Node changes.")
@DefaultSchedule(strategy=SchedulingStrategy.TIMER_DRIVEN, period="1 min")
public class GetHBase
extends AbstractProcessor
implements VisibilityFetchSupport {
    static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
    static final AllowableValue NONE = new AllowableValue("None", "None");
    static final AllowableValue CURRENT_TIME = new AllowableValue("Current Time", "Current Time");
    static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder().name("HBase Client Service").description("Specifies the Controller Service to use for accessing HBase.").required(true).identifiesControllerService(HBaseClientService.class).build();
    static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder().name("Distributed Cache Service").description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase so that if a new node begins pulling data, it won't duplicate all of the work that has been done.").required(false).identifiesControllerService(DistributedMapCacheClient.class).build();
    static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("Character Set").description("Specifies which character set is used to encode the data in HBase").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();
    static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("Table Name").description("The name of the HBase Table to put data into").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor COLUMNS = new PropertyDescriptor.Builder().name("Columns").description("A comma-separated list of \"<colFamily>:<colQualifier>\" pairs to return when scanning. To return all columns for a given family, leave off the qualifier such as \"<colFamily1>,<colFamily2>\".").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.createRegexMatchingValidator((Pattern)COLUMNS_PATTERN)).build();
    static final PropertyDescriptor FILTER_EXPRESSION = new PropertyDescriptor.Builder().name("Filter Expression").description("An HBase filter expression that will be applied to the scan. This property can not be used when also using the Columns property.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor INITIAL_TIMERANGE = new PropertyDescriptor.Builder().name("Initial Time Range").description("The time range to use on the first scan of a table. None will pull the entire table on the first scan, Current Time will pull entries from that point forward.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new AllowableValue[]{NONE, CURRENT_TIME}).defaultValue(NONE.getValue()).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are routed to this relationship").build();
    private final AtomicReference<ScanResult> lastResult = new AtomicReference();
    private volatile List<Column> columns = new ArrayList<Column>();
    private volatile boolean justElectedPrimaryNode = false;
    private volatile String previousTable = null;

    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(HBASE_CLIENT_SERVICE);
        properties.add(DISTRIBUTED_CACHE_SERVICE);
        properties.add(TABLE_NAME);
        properties.add(COLUMNS);
        properties.add(AUTHORIZATIONS);
        properties.add(FILTER_EXPRESSION);
        properties.add(INITIAL_TIMERANGE);
        properties.add(CHARSET);
        return properties;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        String columns = validationContext.getProperty(COLUMNS).evaluateAttributeExpressions().getValue();
        String filter = validationContext.getProperty(FILTER_EXPRESSION).evaluateAttributeExpressions().getValue();
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>();
        if (!StringUtils.isBlank((CharSequence)columns) && !StringUtils.isBlank((CharSequence)filter)) {
            problems.add(new ValidationResult.Builder().subject(FILTER_EXPRESSION.getDisplayName()).input(filter).valid(false).explanation("a filter expression can not be used in conjunction with the Columns property").build());
        }
        return problems;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (descriptor.equals((Object)TABLE_NAME)) {
            this.lastResult.set(null);
        }
    }

    @OnScheduled
    public void parseColumns(ProcessContext context) throws IOException {
        String columnsValue;
        StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER);
        if (stateMap.getVersion() < 0L) {
            DistributedMapCacheClient client = (DistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
            ScanResult scanResult = this.getState(client);
            if (scanResult != null) {
                context.getStateManager().setState(scanResult.toFlatMap(), Scope.CLUSTER);
            }
            this.clearState(client);
        }
        String[] columns = (columnsValue = context.getProperty(COLUMNS).evaluateAttributeExpressions().getValue()) == null || columnsValue.isEmpty() ? new String[]{} : columnsValue.split(",");
        this.columns.clear();
        for (String column : columns) {
            if (column.contains(":")) {
                String[] parts = column.split(":");
                byte[] cf = parts[0].getBytes(Charset.forName("UTF-8"));
                byte[] cq = parts[1].getBytes(Charset.forName("UTF-8"));
                this.columns.add(new Column(cf, cq));
                continue;
            }
            byte[] cf = column.getBytes(Charset.forName("UTF-8"));
            this.columns.add(new Column(cf, null));
        }
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState newState) {
        this.justElectedPrimaryNode = newState == PrimaryNodeState.ELECTED_PRIMARY_NODE;
    }

    @OnRemoved
    public void onRemoved(ProcessContext context) {
        DistributedMapCacheClient client = (DistributedMapCacheClient)context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
        if (client != null) {
            this.clearState(client);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
        String initialTimeRange = context.getProperty(INITIAL_TIMERANGE).getValue();
        String filterExpression = context.getProperty(FILTER_EXPRESSION).evaluateAttributeExpressions().getValue();
        List authorizations = this.getAuthorizations(context, null);
        HBaseClientService hBaseClientService = (HBaseClientService)context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
        if (this.previousTable != null && !tableName.equals(this.previousTable)) {
            try {
                session.clearState(Scope.CLUSTER);
            }
            catch (IOException ioe) {
                this.getLogger().warn("Failed to clear Cluster State", (Throwable)ioe);
            }
            this.previousTable = tableName;
        }
        try {
            Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions().getValue());
            JsonRowSerializer serializer = new JsonRowSerializer(charset);
            this.lastResult.set(this.getState(session));
            long defaultMinTime = initialTimeRange.equals(NONE.getValue()) ? 0L : System.currentTimeMillis();
            long minTime = this.lastResult.get() == null ? defaultMinTime : this.lastResult.get().getTimestamp();
            HashMap<String, Set<String>> cellsMatchingTimestamp = new HashMap<String, Set<String>>();
            AtomicReference<Long> rowsPulledHolder = new AtomicReference<Long>(0L);
            AtomicReference<Long> latestTimestampHolder = new AtomicReference<Long>(minTime);
            hBaseClientService.scan(tableName, this.columns, filterExpression, minTime, authorizations, (rowKey, resultCells) -> {
                String rowKeyString = new String(rowKey, StandardCharsets.UTF_8);
                long latestCellTimestamp = 0L;
                for (ResultCell cell : resultCells) {
                    if (cell.getTimestamp() <= latestCellTimestamp) continue;
                    latestCellTimestamp = cell.getTimestamp();
                }
                if (latestCellTimestamp < minTime) {
                    this.getLogger().debug("latest cell timestamp for row {} is {}, which is earlier than the minimum time of {}", new Object[]{rowKeyString, latestCellTimestamp, minTime});
                    return;
                }
                if (latestCellTimestamp == minTime) {
                    boolean allSeen = true;
                    ResultCell[] resultCellArray = resultCells;
                    int n = resultCellArray.length;
                    for (int cell = 0; cell < n; ++cell) {
                        ScanResult latestResult;
                        ResultCell cell2 = resultCellArray[cell];
                        if (cell2.getTimestamp() != latestCellTimestamp || (latestResult = this.lastResult.get()) != null && latestResult.contains(cell2)) continue;
                        allSeen = false;
                        break;
                    }
                    if (allSeen) {
                        this.getLogger().debug("all cells for row {} have already been seen", new Object[]{rowKeyString});
                        return;
                    }
                }
                if (latestCellTimestamp >= (Long)latestTimestampHolder.get()) {
                    if (latestCellTimestamp > (Long)latestTimestampHolder.get()) {
                        latestTimestampHolder.set(latestCellTimestamp);
                        cellsMatchingTimestamp.clear();
                    }
                    for (ResultCell cell : resultCells) {
                        long ts = cell.getTimestamp();
                        if (ts != latestCellTimestamp) continue;
                        byte[] rowValue = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
                        byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
                        String rowHash = new String(rowValue, StandardCharsets.UTF_8);
                        HashSet<String> cellHashes = (HashSet<String>)cellsMatchingTimestamp.get(rowHash);
                        if (cellHashes == null) {
                            cellHashes = new HashSet<String>();
                            cellsMatchingTimestamp.put(rowHash, cellHashes);
                        }
                        cellHashes.add(new String(cellValue, StandardCharsets.UTF_8));
                    }
                }
                FlowFile flowFile = session.create();
                flowFile = session.write(flowFile, out -> serializer.serialize(rowKey, resultCells, out));
                HashMap<String, String> attributes = new HashMap<String, String>();
                attributes.put("hbase.table", tableName);
                attributes.put("mime.type", "application/json");
                flowFile = session.putAllAttributes(flowFile, attributes);
                session.getProvenanceReporter().receive(flowFile, hBaseClientService.toTransitUri(tableName, rowKeyString));
                session.transfer(flowFile, REL_SUCCESS);
                this.getLogger().debug("Received {} from HBase with row key {}", new Object[]{flowFile, rowKeyString});
                long rowsPulled = (Long)rowsPulledHolder.get();
                rowsPulledHolder.set(++rowsPulled);
                if (++rowsPulled % (long)this.getBatchSize() == 0L) {
                    session.commitAsync();
                }
            });
            ScanResult scanResults = new ScanResult(latestTimestampHolder.get(), cellsMatchingTimestamp);
            ScanResult latestResult = this.lastResult.get();
            if (latestResult == null || scanResults.getTimestamp() > latestResult.getTimestamp()) {
                session.setState(scanResults.toFlatMap(), Scope.CLUSTER);
                session.commitAsync(() -> this.updateScanResultsIfNewer(scanResults));
            } else if (scanResults.getTimestamp() == latestResult.getTimestamp()) {
                HashMap<String, Set<String>> combinedResults = new HashMap<String, Set<String>>(scanResults.getMatchingCells());
                for (Map.Entry<String, Set<String>> entry : scanResults.getMatchingCells().entrySet()) {
                    combinedResults.put(entry.getKey(), new HashSet(entry.getValue()));
                }
                for (Map.Entry<String, Set<String>> entry : latestResult.getMatchingCells().entrySet()) {
                    Set existing = (Set)combinedResults.get(entry.getKey());
                    if (existing == null) {
                        combinedResults.put(entry.getKey(), new HashSet(entry.getValue()));
                        continue;
                    }
                    existing.addAll((Collection)entry.getValue());
                }
                ScanResult scanResult = new ScanResult(scanResults.getTimestamp(), combinedResults);
                session.setState(scanResult.toFlatMap(), Scope.CLUSTER);
                session.commitAsync(() -> this.updateScanResultsIfNewer(scanResult));
            }
        }
        catch (IOException e) {
            this.getLogger().error("Failed to receive data from HBase due to {}", (Throwable)e);
            session.rollback();
        }
        finally {
            context.yield();
        }
    }

    private void updateScanResultsIfNewer(ScanResult scanResult) {
        this.lastResult.getAndUpdate(current -> current == null || scanResult.getTimestamp() > current.getTimestamp() ? scanResult : current);
    }

    protected int getBatchSize() {
        return 500;
    }

    protected File getStateDir() {
        return new File("conf/state");
    }

    protected File getStateFile() {
        return new File(this.getStateDir(), "getHBase-" + this.getIdentifier());
    }

    protected String getKey() {
        return "getHBase-" + this.getIdentifier() + "-state";
    }

    protected List<Column> getColumns() {
        return this.columns;
    }

    private void clearState(DistributedMapCacheClient client) {
        File localState = this.getStateFile();
        if (localState.exists()) {
            localState.delete();
        }
        if (client != null) {
            try {
                client.remove((Object)this.getKey(), (Serializer)new StringSerDe());
            }
            catch (IOException e) {
                this.getLogger().warn("Processor state was not cleared from distributed cache due to {}", new Object[]{e});
            }
        }
    }

    private ScanResult getState(ProcessSession session) throws IOException {
        StateMap stateMap = session.getState(Scope.CLUSTER);
        if (stateMap.getVersion() < 0L) {
            return null;
        }
        return ScanResult.fromFlatMap(stateMap.toMap());
    }

    private ScanResult getState(DistributedMapCacheClient client) throws IOException {
        File file;
        StringSerDe stringSerDe = new StringSerDe();
        ObjectSerDe objectSerDe = new ObjectSerDe();
        ScanResult scanResult = this.lastResult.get();
        if (scanResult == null || this.justElectedPrimaryNode) {
            if (client != null) {
                Object obj = client.get((Object)this.getKey(), (Serializer)stringSerDe, (Deserializer)objectSerDe);
                if (obj == null || !(obj instanceof ScanResult)) {
                    scanResult = null;
                } else {
                    scanResult = (ScanResult)obj;
                    this.getLogger().debug("Retrieved state from the distributed cache, previous timestamp was {}", new Object[]{scanResult.getTimestamp()});
                }
            }
            this.justElectedPrimaryNode = false;
        }
        if ((file = this.getStateFile()).exists()) {
            try (FileInputStream fis = new FileInputStream(file);
                 ObjectInputStream ois = new ObjectInputStream(fis);){
                Object obj = ois.readObject();
                if (obj != null && obj instanceof ScanResult) {
                    ScanResult localScanResult = (ScanResult)obj;
                    if (scanResult == null || localScanResult.getTimestamp() > scanResult.getTimestamp()) {
                        scanResult = localScanResult;
                        this.getLogger().debug("Using last timestamp from local state because it was newer than the distributed cache, or no value existed in the cache");
                    }
                }
            }
            catch (IOException | ClassNotFoundException ioe) {
                this.getLogger().warn("Failed to recover persisted state from {} due to {}. Assuming that state from distributed cache is correct.", new Object[]{file, ioe});
            }
        }
        return scanResult;
    }

    public static class ScanResult
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final long latestTimestamp;
        private final Map<String, Set<String>> matchingCellHashes;
        private static final Pattern CELL_ID_PATTERN = Pattern.compile(Pattern.quote("row.") + "(\\d+)(\\.(\\d+))?");

        public ScanResult(long timestamp, Map<String, Set<String>> cellHashes) {
            this.latestTimestamp = timestamp;
            this.matchingCellHashes = cellHashes;
        }

        public long getTimestamp() {
            return this.latestTimestamp;
        }

        public Map<String, Set<String>> getMatchingCells() {
            return this.matchingCellHashes;
        }

        public boolean contains(ResultCell cell) {
            if (cell.getTimestamp() != this.latestTimestamp) {
                return false;
            }
            byte[] row = Arrays.copyOfRange(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength() + cell.getRowOffset());
            String rowHash = new String(row, StandardCharsets.UTF_8);
            Set<String> cellHashes = this.matchingCellHashes.get(rowHash);
            if (cellHashes == null) {
                return false;
            }
            byte[] cellValue = Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength() + cell.getValueOffset());
            String cellHash = new String(cellValue, StandardCharsets.UTF_8);
            return cellHashes.contains(cellHash);
        }

        public Map<String, String> toFlatMap() {
            HashMap<String, String> map = new HashMap<String, String>();
            map.put("timestamp", String.valueOf(this.latestTimestamp));
            int rowCounter = 0;
            for (Map.Entry<String, Set<String>> entry : this.matchingCellHashes.entrySet()) {
                String rowId = entry.getKey();
                String rowIdKey = "row." + rowCounter;
                String cellKeyPrefix = rowIdKey + ".";
                map.put(rowIdKey, rowId);
                Set<String> cellValues = entry.getValue();
                int cellCounter = 0;
                for (String cellValue : cellValues) {
                    String cellId = cellKeyPrefix + cellCounter++;
                    map.put(cellId, cellValue);
                }
                ++rowCounter;
            }
            return map;
        }

        public static ScanResult fromFlatMap(Map<String, String> map) {
            if (map == null) {
                return null;
            }
            String timestampValue = map.get("timestamp");
            if (timestampValue == null) {
                return null;
            }
            long timestamp = Long.parseLong(timestampValue);
            HashMap<String, HashSet<String>> rowIndexToMatchingCellHashes = new HashMap<String, HashSet<String>>();
            HashMap<String, String> rowIndexToId = new HashMap<String, String>();
            for (Map.Entry<String, String> entry : map.entrySet()) {
                String key = entry.getKey();
                Matcher matcher = CELL_ID_PATTERN.matcher(key);
                if (!matcher.matches()) continue;
                String rowIndex = matcher.group(1);
                String cellIndex = matcher.group(3);
                HashSet<String> cellHashes = (HashSet<String>)rowIndexToMatchingCellHashes.get(rowIndex);
                if (cellHashes == null) {
                    cellHashes = new HashSet<String>();
                    rowIndexToMatchingCellHashes.put(rowIndex, cellHashes);
                }
                if (cellIndex == null) {
                    rowIndexToId.put(rowIndex, entry.getValue());
                    continue;
                }
                cellHashes.add(entry.getValue());
            }
            HashMap<String, Set<String>> matchingCellHashes = new HashMap<String, Set<String>>(rowIndexToMatchingCellHashes.size());
            for (Map.Entry entry : rowIndexToMatchingCellHashes.entrySet()) {
                String rowIndex = (String)entry.getKey();
                String rowId = (String)rowIndexToId.get(rowIndex);
                Set cellValues = (Set)entry.getValue();
                matchingCellHashes.put(rowId, cellValues);
            }
            return new ScanResult(timestamp, matchingCellHashes);
        }

        public static class StateKeys {
            public static final String TIMESTAMP = "timestamp";
            public static final String ROW_ID_PREFIX = "row.";
        }
    }
}

