/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"monitor", "flow", "active", "inactive", "activity", "detection"})
@CapabilityDescription(value="Monitors the flow for activity and sends out an indicator when the flow has not had any data for some specified amount of time and again when the flow's activity is restored")
@WritesAttributes(value={@WritesAttribute(attribute="inactivityStartMillis", description="The time at which Inactivity began, in the form of milliseconds since Epoch"), @WritesAttribute(attribute="inactivityDurationMillis", description="The number of milliseconds that the inactivity has spanned")})
@Stateful(scopes={Scope.CLUSTER, Scope.LOCAL}, description="MonitorActivity stores the last timestamp at each node as state, so that it can examine activity at cluster wide. If 'Copy Attribute' is set to true, then flow file attributes are also persisted. In local scope, it stores last known activity timestamp if the flow is inactive.")
public class MonitorActivity
extends AbstractProcessor {
    public static final String STATE_KEY_COMMON_FLOW_ACTIVITY_INFO = "CommonFlowActivityInfo.lastSuccessfulTransfer";
    public static final String STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO = "LocalFlowActivityInfo.lastSuccessfulTransfer";
    public static final AllowableValue SCOPE_NODE = new AllowableValue("node");
    public static final AllowableValue SCOPE_CLUSTER = new AllowableValue("cluster");
    public static final AllowableValue REPORT_NODE_ALL = new AllowableValue("all");
    public static final AllowableValue REPORT_NODE_PRIMARY = new AllowableValue("primary");
    public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder().name("Threshold Duration").description("Determines how much time must elapse before considering the flow to be inactive").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("5 min").build();
    public static final PropertyDescriptor CONTINUALLY_SEND_MESSAGES = new PropertyDescriptor.Builder().name("Continually Send Messages").description("If true, will send inactivity indicator continually every Threshold Duration amount of time until activity is restored; if false, will send an indicator only when the flow first becomes inactive").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor ACTIVITY_RESTORED_MESSAGE = new PropertyDescriptor.Builder().name("Activity Restored Message").description("The message that will be the content of FlowFiles that are sent to 'activity.restored' relationship").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Activity restored at time: ${now():format('yyyy/MM/dd HH:mm:ss')} after being inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor WAIT_FOR_ACTIVITY = new PropertyDescriptor.Builder().name("Wait for Activity").description("When the processor gets started or restarted, if set to true, only send an inactive indicator if there had been activity beforehand. Otherwise send an inactive indicator even if there had not been activity beforehand.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor RESET_STATE_ON_RESTART = new PropertyDescriptor.Builder().name("Reset State on Restart").description("When the processor gets started or restarted, if set to true, the initial state will always be active. Otherwise, the last reported flow state will be preserved.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder().name("Inactivity Message").description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("Lacking activity as of time: ${now():format('yyyy/MM/dd HH:mm:ss')}; flow has been inactive for ${inactivityDurationMillis:toNumber():divide(60000)} minutes").build();
    public static final PropertyDescriptor COPY_ATTRIBUTES = new PropertyDescriptor.Builder().name("Copy Attributes").description("If true, will copy all flow file attributes from the flow file that resumed activity to the newly created indicator flow file").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor MONITORING_SCOPE = new PropertyDescriptor.Builder().name("Monitoring Scope").description("Specify how to determine activeness of the flow. 'node' means that activeness is examined at individual node separately. It can be useful if DFM expects each node should receive flow files in a distributed manner. With 'cluster', it defines the flow is active while at least one node receives flow files actively. If NiFi is running as standalone mode, this should be set as 'node', if it's 'cluster', NiFi logs a warning message and act as 'node' scope.").required(true).allowableValues(new AllowableValue[]{SCOPE_NODE, SCOPE_CLUSTER}).defaultValue(SCOPE_NODE.getValue()).build();
    public static final PropertyDescriptor REPORTING_NODE = new PropertyDescriptor.Builder().name("Reporting Node").description("Specify which node should send notification flow-files to inactive and activity.restored relationships. With 'all', every node in this cluster send notification flow-files. 'primary' means flow-files will be sent only from a primary node. If NiFi is running as standalone mode, this should be set as 'all', even if it's 'primary', NiFi act as 'all'.").required(true).allowableValues(new AllowableValue[]{REPORT_NODE_ALL, REPORT_NODE_PRIMARY}).dependsOn(MONITORING_SCOPE, new AllowableValue[]{SCOPE_CLUSTER}).defaultValue(REPORT_NODE_ALL.getValue()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All incoming FlowFiles are routed to success").build();
    public static final Relationship REL_INACTIVE = new Relationship.Builder().name("inactive").description("This relationship is used to transfer an Inactivity indicator when no FlowFiles are routed to 'success' for Threshold Duration amount of time").build();
    public static final Relationship REL_ACTIVITY_RESTORED = new Relationship.Builder().name("activity.restored").description("This relationship is used to transfer an Activity Restored indicator when FlowFiles are routing to 'success' following a period of inactivity").build();
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private final AtomicBoolean connectedWhenLastTriggered = new AtomicBoolean(false);
    private final AtomicLong lastInactiveMessage = new AtomicLong();
    private final AtomicLong inactivityStartMillis = new AtomicLong(this.nowMillis());
    private final AtomicBoolean wasActive = new AtomicBoolean(true);
    private volatile LocalFlowActivityInfo localFlowActivityInfo;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(THRESHOLD);
        properties.add(CONTINUALLY_SEND_MESSAGES);
        properties.add(INACTIVITY_MESSAGE);
        properties.add(ACTIVITY_RESTORED_MESSAGE);
        properties.add(WAIT_FOR_ACTIVITY);
        properties.add(RESET_STATE_ON_RESTART);
        properties.add(COPY_ATTRIBUTES);
        properties.add(MONITORING_SCOPE);
        properties.add(REPORTING_NODE);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_INACTIVE);
        relationships.add(REL_ACTIVITY_RESTORED);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        String storedLastSuccessfulTransfer;
        this.isClusterScope(context, true);
        long thresholdMillis = context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean();
        boolean resetStateOnRestart = context.getProperty(RESET_STATE_ON_RESTART).asBoolean();
        String string = storedLastSuccessfulTransfer = resetStateOnRestart ? null : this.tryLoadLastSuccessfulTransfer(context);
        if (storedLastSuccessfulTransfer != null) {
            this.localFlowActivityInfo = new LocalFlowActivityInfo(this.getStartupTime(), thresholdMillis, copyAttributes, Long.parseLong(storedLastSuccessfulTransfer));
            this.wasActive.set(this.localFlowActivityInfo.isActive());
            this.inactivityStartMillis.set(this.localFlowActivityInfo.getLastActivity());
        } else {
            this.localFlowActivityInfo = new LocalFlowActivityInfo(this.getStartupTime(), thresholdMillis, copyAttributes);
            this.wasActive.set(true);
        }
    }

    @OnStopped
    public void onStopped(ProcessContext context) {
        if (this.getNodeTypeProvider().isConfiguredForClustering() && context.isConnectedToCluster()) {
            StateManager stateManager = context.getStateManager();
            try {
                stateManager.clear(Scope.CLUSTER);
            }
            catch (IOException e) {
                this.getLogger().error("Failed to clear cluster state" + e, (Throwable)e);
            }
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        boolean canChangeState;
        ComponentLog logger = this.getLogger();
        boolean isClusterScope = this.isClusterScope(context, false);
        boolean isConnectedToCluster = context.isConnectedToCluster();
        boolean wasActive = this.wasActive.get();
        List flowFiles = session.get(50);
        if (!flowFiles.isEmpty()) {
            boolean firstKnownTransfer = !this.localFlowActivityInfo.hasSuccessfulTransfer();
            boolean flowStateMustBecomeActive = !wasActive || firstKnownTransfer;
            this.localFlowActivityInfo.update((FlowFile)flowFiles.get(0));
            if (isClusterScope && flowStateMustBecomeActive) {
                this.localFlowActivityInfo.forceSync();
            }
            session.transfer((Collection)flowFiles, REL_SUCCESS);
            logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()});
        } else {
            context.yield();
        }
        if (isClusterScope) {
            if (!wasActive || !this.localFlowActivityInfo.isActive()) {
                this.localFlowActivityInfo.forceSync();
            }
            this.synchronizeState(context);
        }
        long thresholdMillis = context.getProperty(THRESHOLD).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
        boolean continuallySendMessages = context.getProperty(CONTINUALLY_SEND_MESSAGES).asBoolean();
        boolean waitForActivity = context.getProperty(WAIT_FOR_ACTIVITY).asBoolean();
        boolean isActive = this.localFlowActivityInfo.isActive() || !flowFiles.isEmpty();
        long lastActivity = this.localFlowActivityInfo.getLastActivity();
        long inactivityStartMillis = this.inactivityStartMillis.get();
        boolean timeToRepeatInactiveMessage = this.lastInactiveMessage.get() + thresholdMillis <= this.nowMillis();
        boolean canReport = !isClusterScope || isConnectedToCluster || !flowFiles.isEmpty();
        boolean bl = canChangeState = !waitForActivity || this.localFlowActivityInfo.hasSuccessfulTransfer();
        if (canReport && canChangeState) {
            if (isActive) {
                this.onTriggerActiveFlow(context, session, wasActive, isClusterScope, inactivityStartMillis);
            } else if (wasActive || continuallySendMessages && timeToRepeatInactiveMessage) {
                this.onTriggerInactiveFlow(context, session, isClusterScope, lastActivity);
            }
            this.wasActive.set(isActive);
            this.inactivityStartMillis.set(lastActivity);
        } else {
            logger.trace("State transition is blocked, because we are not connected to the cluster.");
        }
    }

    protected long nowMillis() {
        return System.currentTimeMillis();
    }

    protected long getStartupTime() {
        return this.nowMillis();
    }

    protected final long getLastSuccessfulTransfer() {
        return this.localFlowActivityInfo.getLastSuccessfulTransfer();
    }

    private String tryLoadLastSuccessfulTransfer(ProcessContext context) {
        StateManager stateManager = context.getStateManager();
        try {
            StateMap localStateMap = stateManager.getState(Scope.LOCAL);
            return localStateMap.get(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO);
        }
        catch (IOException e) {
            throw new ProcessException("Failed to load local state due to " + e, (Throwable)e);
        }
    }

    private void synchronizeState(ProcessContext context) {
        ComponentLog logger = this.getLogger();
        boolean isConnectedToCluster = context.isConnectedToCluster();
        if (this.isReconnectedToCluster(isConnectedToCluster)) {
            this.localFlowActivityInfo.forceSync();
            this.connectedWhenLastTriggered.set(true);
        }
        if (!isConnectedToCluster) {
            this.connectedWhenLastTriggered.set(false);
        } else if (this.localFlowActivityInfo.syncNeeded()) {
            CommonFlowActivityInfo commonFlowActivityInfo = new CommonFlowActivityInfo(context);
            this.localFlowActivityInfo.update(commonFlowActivityInfo);
            try {
                commonFlowActivityInfo.update(this.localFlowActivityInfo);
                this.localFlowActivityInfo.setNextSyncMillis();
            }
            catch (SaveSharedFlowStateException ex) {
                logger.debug("Failed to update common state.", (Throwable)((Object)ex));
            }
        }
    }

    private void onTriggerInactiveFlow(ProcessContext context, ProcessSession session, boolean isClusterScope, long lastActivity) {
        ComponentLog logger = this.getLogger();
        boolean shouldThisNodeReport = this.shouldThisNodeReport(isClusterScope, context);
        if (shouldThisNodeReport) {
            this.sendInactivityMarker(context, session, lastActivity, logger);
        }
        this.lastInactiveMessage.set(this.nowMillis());
        this.setInactivityFlag(context.getStateManager());
    }

    private void onTriggerActiveFlow(ProcessContext context, ProcessSession session, boolean wasActive, boolean isClusterScope, long inactivityStartMillis) {
        ComponentLog logger = this.getLogger();
        boolean shouldThisNodeReport = this.shouldThisNodeReport(isClusterScope, context);
        if (!wasActive) {
            if (shouldThisNodeReport) {
                Map<String, String> attributes = this.localFlowActivityInfo.getLastSuccessfulTransferAttributes();
                this.sendActivationMarker(context, session, attributes, inactivityStartMillis, logger);
            }
            this.clearInactivityFlag(context.getStateManager());
        }
    }

    private void setInactivityFlag(StateManager stateManager) {
        try {
            stateManager.setState(Collections.singletonMap(STATE_KEY_LOCAL_FLOW_ACTIVITY_INFO, String.valueOf(this.localFlowActivityInfo.getLastActivity())), Scope.LOCAL);
        }
        catch (IOException e) {
            this.getLogger().error("Failed to set local state due to " + e, (Throwable)e);
        }
    }

    private void clearInactivityFlag(StateManager stateManager) {
        try {
            stateManager.clear(Scope.LOCAL);
        }
        catch (IOException e) {
            throw new ProcessException("Failed to clear local state due to " + e, (Throwable)e);
        }
    }

    private boolean isClusterScope(ProcessContext context, boolean logInvalidConfig) {
        if (SCOPE_CLUSTER.getValue().equals(context.getProperty(MONITORING_SCOPE).getValue())) {
            if (this.getNodeTypeProvider().isConfiguredForClustering()) {
                return true;
            }
            if (logInvalidConfig) {
                this.getLogger().warn("NiFi is running as a Standalone mode, but 'cluster' scope is set. Fallback to 'node' scope. Fix configuration to stop this message.");
            }
        }
        return false;
    }

    private boolean shouldReportOnlyOnPrimary(boolean isClusterScope, ProcessContext context) {
        if (REPORT_NODE_PRIMARY.getValue().equals(context.getProperty(REPORTING_NODE).getValue())) {
            return isClusterScope;
        }
        return false;
    }

    private boolean isReconnectedToCluster(boolean isConnectedToCluster) {
        return !this.connectedWhenLastTriggered.get() && isConnectedToCluster;
    }

    private boolean shouldThisNodeReport(boolean isClusterScope, ProcessContext context) {
        boolean shouldReportOnlyOnPrimary = this.shouldReportOnlyOnPrimary(isClusterScope, context);
        return !isClusterScope || !shouldReportOnlyOnPrimary || this.getNodeTypeProvider().isPrimary();
    }

    private void sendInactivityMarker(ProcessContext context, ProcessSession session, long inactivityStartMillis, ComponentLog logger) {
        FlowFile inactiveFlowFile = session.create();
        inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityStartMillis", String.valueOf(inactivityStartMillis));
        inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityDurationMillis", String.valueOf(this.nowMillis() - inactivityStartMillis));
        byte[] outBytes = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(StandardCharsets.UTF_8);
        inactiveFlowFile = session.write(inactiveFlowFile, out -> out.write(outBytes));
        session.getProvenanceReporter().create(inactiveFlowFile);
        session.transfer(inactiveFlowFile, REL_INACTIVE);
        logger.info("Transferred {} to 'inactive'", new Object[]{inactiveFlowFile});
    }

    private void sendActivationMarker(ProcessContext context, ProcessSession session, Map<String, String> attributes, long inactivityStartMillis, ComponentLog logger) {
        FlowFile activityRestoredFlowFile = session.create();
        attributes.remove(CoreAttributes.UUID.key());
        activityRestoredFlowFile = session.putAllAttributes(activityRestoredFlowFile, attributes);
        activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", String.valueOf(inactivityStartMillis));
        activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(this.nowMillis() - inactivityStartMillis));
        byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(StandardCharsets.UTF_8);
        activityRestoredFlowFile = session.write(activityRestoredFlowFile, out -> out.write(outBytes));
        session.getProvenanceReporter().create(activityRestoredFlowFile);
        session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED);
        logger.info("Transferred {} to 'activity.restored'", new Object[]{activityRestoredFlowFile});
    }

    private static class SaveSharedFlowStateException
    extends ProcessException {
        public SaveSharedFlowStateException(String message) {
            super(message);
        }

        public SaveSharedFlowStateException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    private static class CommonFlowActivityInfo {
        private final StateManager stateManager;
        private final StateMap storedState;
        private final Map<String, String> newState = new HashMap<String, String>();

        public CommonFlowActivityInfo(ProcessContext context) {
            this.stateManager = context.getStateManager();
            try {
                this.storedState = this.stateManager.getState(Scope.CLUSTER);
            }
            catch (IOException e) {
                throw new ProcessException("Cannot load common flow activity info.", (Throwable)e);
            }
        }

        public boolean hasSuccessfulTransfer() {
            return this.storedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO) != null;
        }

        public long getLastSuccessfulTransfer() {
            return Long.parseLong(this.storedState.get(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO));
        }

        public Map<String, String> getLastSuccessfulTransferAttributes() {
            HashMap<String, String> result = new HashMap<String, String>(this.storedState.toMap());
            result.remove(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO);
            return result;
        }

        public void update(LocalFlowActivityInfo localFlowActivityInfo) {
            boolean wasSuccessful;
            if (!localFlowActivityInfo.hasSuccessfulTransfer()) {
                return;
            }
            long lastSuccessfulTransfer = localFlowActivityInfo.getLastSuccessfulTransfer();
            if (this.hasSuccessfulTransfer() && lastSuccessfulTransfer <= this.getLastSuccessfulTransfer()) {
                return;
            }
            this.newState.putAll(localFlowActivityInfo.getLastSuccessfulTransferAttributes());
            this.newState.put(MonitorActivity.STATE_KEY_COMMON_FLOW_ACTIVITY_INFO, String.valueOf(lastSuccessfulTransfer));
            try {
                wasSuccessful = this.stateManager.replace(this.storedState, this.newState, Scope.CLUSTER);
            }
            catch (IOException e) {
                throw new SaveSharedFlowStateException("Caught exception while saving state.", e);
            }
            if (!wasSuccessful) {
                throw new SaveSharedFlowStateException("Failed to save state. Probably there was a concurrent update.");
            }
        }
    }

    private class LocalFlowActivityInfo {
        private static final long NO_VALUE = 0L;
        private static final int TIMES_SYNC_WITHIN_THRESHOLD = 3;
        private final long startupTimeMillis;
        private final long thresholdMillis;
        private final boolean saveAttributes;
        private final long syncPeriodMillis;
        private long nextSyncMillis = 0L;
        private long lastSuccessfulTransfer = 0L;
        private Map<String, String> lastSuccessfulTransferAttributes = new HashMap<String, String>();

        public LocalFlowActivityInfo(long startupTimeMillis, long thresholdMillis, boolean saveAttributes) {
            this.startupTimeMillis = startupTimeMillis;
            this.thresholdMillis = thresholdMillis;
            this.saveAttributes = saveAttributes;
            this.syncPeriodMillis = thresholdMillis / 3L;
        }

        public LocalFlowActivityInfo(long startupTimeMillis, long thresholdMillis, boolean saveAttributes, long initialLastSuccessfulTransfer) {
            this(startupTimeMillis, thresholdMillis, saveAttributes);
            this.lastSuccessfulTransfer = initialLastSuccessfulTransfer;
        }

        public boolean syncNeeded() {
            return this.nextSyncMillis <= MonitorActivity.this.nowMillis();
        }

        public void setNextSyncMillis() {
            this.nextSyncMillis = MonitorActivity.this.nowMillis() + this.syncPeriodMillis;
        }

        public void forceSync() {
            this.nextSyncMillis = MonitorActivity.this.nowMillis();
        }

        public boolean isActive() {
            if (this.hasSuccessfulTransfer()) {
                return MonitorActivity.this.nowMillis() < this.lastSuccessfulTransfer + this.thresholdMillis;
            }
            return MonitorActivity.this.nowMillis() < this.startupTimeMillis + this.thresholdMillis;
        }

        public boolean hasSuccessfulTransfer() {
            return this.lastSuccessfulTransfer != 0L;
        }

        public long getLastSuccessfulTransfer() {
            return this.lastSuccessfulTransfer;
        }

        public long getLastActivity() {
            if (this.hasSuccessfulTransfer()) {
                return this.lastSuccessfulTransfer;
            }
            return this.startupTimeMillis;
        }

        public Map<String, String> getLastSuccessfulTransferAttributes() {
            return this.lastSuccessfulTransferAttributes;
        }

        public void update(FlowFile flowFile) {
            long now = MonitorActivity.this.nowMillis();
            if (now - this.getLastActivity() > this.syncPeriodMillis) {
                this.forceSync();
            }
            this.lastSuccessfulTransfer = now;
            if (this.saveAttributes) {
                this.lastSuccessfulTransferAttributes = new HashMap<String, String>(flowFile.getAttributes());
                this.lastSuccessfulTransferAttributes.remove(CoreAttributes.UUID.key());
            }
        }

        public void update(CommonFlowActivityInfo commonFlowActivityInfo) {
            if (!commonFlowActivityInfo.hasSuccessfulTransfer()) {
                return;
            }
            long lastSuccessfulTransfer = commonFlowActivityInfo.getLastSuccessfulTransfer();
            if (lastSuccessfulTransfer <= this.getLastSuccessfulTransfer()) {
                return;
            }
            this.lastSuccessfulTransfer = lastSuccessfulTransfer;
            if (this.saveAttributes) {
                this.lastSuccessfulTransferAttributes = commonFlowActivityInfo.getLastSuccessfulTransferAttributes();
            }
        }
    }
}

