/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.standard.MergeRecord;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

public class TestMergeRecord {
    private TestRunner runner;
    private CommaSeparatedRecordReader readerService;
    private MockRecordWriter writerService;

    @BeforeEach
    public void setup() throws InitializationException {
        this.runner = TestRunners.newTestRunner((Processor)new MergeRecord());
        this.readerService = new CommaSeparatedRecordReader();
        this.writerService = new MockRecordWriter("header", false, true);
        this.runner.addControllerService("reader", (ControllerService)this.readerService);
        this.runner.enableControllerService((ControllerService)this.readerService);
        this.runner.addControllerService("writer", (ControllerService)this.writerService);
        this.runner.enableControllerService((ControllerService)this.writerService);
        this.runner.setProperty(MergeRecord.RECORD_READER, "reader");
        this.runner.setProperty(MergeRecord.RECORD_WRITER, "writer");
    }

    @Test
    public void testSmallOutputIsFlushed() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "1");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "1");
        this.runner.enqueue("Name, Age\nJohn, 35\nJane, 34");
        this.runner.run(1);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 1);
        MockFlowFile mff = (MockFlowFile)this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
        mff.assertAttributeEquals("record.count", "2");
        mff.assertContentEquals("header\nJohn,35\nJane,34\n");
        this.runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(ff -> Assertions.assertEquals((Object)mff.getAttribute(CoreAttributes.UUID.key()), (Object)ff.getAttribute("merge.uuid")));
    }

    @Test
    public void testMergeSimple() {
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.enqueue("Name, Age\nJane, 34");
        this.runner.run(2);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
        MockFlowFile mff = (MockFlowFile)this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
        mff.assertAttributeEquals("record.count", "2");
        mff.assertContentEquals("header\nJohn,35\nJane,34\n");
        this.runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(ff -> Assertions.assertEquals((Object)mff.getAttribute(CoreAttributes.UUID.key()), (Object)ff.getAttribute("merge.uuid")));
    }

    @Test
    public void testMergeSimpleDifferentWriteSchema() throws InitializationException {
        List<RecordField> writeFields = Collections.singletonList(new RecordField("Name", RecordFieldType.STRING.getDataType()));
        SimpleRecordSchema writeSchema = new SimpleRecordSchema(writeFields);
        this.writerService = new MockRecordWriter("header", false, -1, true, (RecordSchema)writeSchema);
        this.runner.addControllerService("differentWriter", (ControllerService)this.writerService);
        this.runner.enableControllerService((ControllerService)this.writerService);
        this.runner.setProperty(MergeRecord.RECORD_READER, "reader");
        this.runner.setProperty(MergeRecord.RECORD_WRITER, "differentWriter");
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.enqueue("Name, Age\nJane, 34");
        this.runner.run(2);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
        MockFlowFile mff = (MockFlowFile)this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
        mff.assertAttributeEquals("record.count", "2");
        mff.assertContentEquals("header\nJohn\nJane\n");
        this.runner.getFlowFilesForRelationship(MergeRecord.REL_ORIGINAL).forEach(ff -> Assertions.assertEquals((Object)mff.getAttribute(CoreAttributes.UUID.key()), (Object)ff.getAttribute("merge.uuid")));
    }

    @Test
    public void testDifferentSchema() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "2");
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.enqueue("Name, Color\nJane, Red");
        this.runner.run(1, false, true);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        this.runner.enqueue("Name, Age\nJane, 34");
        this.runner.enqueue("Name, Color\nJohn, Blue");
        this.runner.run(1, false, false);
        this.runner.run(1, true, false);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
        List mffs = this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
        Assertions.assertEquals((long)1L, (long)mffs.stream().filter(ff -> "2".equals(ff.getAttribute("record.count"))).filter(ff -> "header\nJohn,35\nJane,34\n".equals(new String(ff.toByteArray()))).count());
        Assertions.assertEquals((long)1L, (long)mffs.stream().filter(ff -> "2".equals(ff.getAttribute("record.count"))).filter(ff -> "header\nJane,Red\nJohn,Blue\n".equals(new String(ff.toByteArray()))).count());
    }

    @Test
    public void testFailureToParse() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "3");
        this.readerService.failAfter(2);
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.enqueue("Name, Age\nJane, 34");
        this.runner.enqueue("Name, Age\nJake, 3");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(MergeRecord.REL_FAILURE, 3);
    }

    @Test
    public void testDefragment() {
        this.runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
        HashMap<String, String> attr1 = new HashMap<String, String>();
        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
        HashMap<String, String> attr2 = new HashMap<String, String>();
        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
        HashMap<String, String> attr3 = new HashMap<String, String>();
        attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
        attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
        HashMap<String, String> attr4 = new HashMap<String, String>();
        attr4.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3");
        attr4.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
        HashMap<String, String> attr5 = new HashMap<String, String>();
        attr5.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr5.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3");
        attr5.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
        this.runner.enqueue("Name, Age\nJohn, 35", attr1);
        this.runner.enqueue("Name, Age\nJane, 34", attr2);
        this.runner.enqueue("Name, Age\nJay, 24", attr3);
        this.runner.enqueue("Name, Age\nJake, 3", attr4);
        this.runner.enqueue("Name, Age\nJan, 2", attr5);
        this.runner.run(1);
        Assertions.assertEquals((int)1, (int)this.runner.getQueueSize().getObjectCount(), (String)"Fragment id=2 should remain in the incoming connection");
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
        List mffs = this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
        Assertions.assertEquals((long)1L, (long)mffs.stream().filter(ff -> "2".equals(ff.getAttribute("record.count"))).filter(ff -> "header\nJohn,35\nJane,34\n".equals(new String(ff.toByteArray()))).count());
        Assertions.assertEquals((long)1L, (long)mffs.stream().filter(ff -> "2".equals(ff.getAttribute("record.count"))).filter(ff -> "header\nJake,3\nJan,2\n".equals(new String(ff.toByteArray()))).count());
    }

    @Test
    public void testDefragmentOverMultipleCalls() {
        this.runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
        HashMap<String, String> attr1 = new HashMap<String, String>();
        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
        this.runner.enqueue("Name, Age\nJohn, 35", attr1);
        this.runner.run(2);
        Assertions.assertEquals((int)1, (int)this.runner.getQueueSize().getObjectCount(), (String)"Fragment should remain in the incoming connection");
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        this.runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
        HashMap<String, String> attr2 = new HashMap<String, String>();
        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
        this.runner.enqueue("Name, Age\nJane, 34", attr2);
        this.runner.run(1);
        Assertions.assertEquals((int)0, (int)this.runner.getQueueSize().getObjectCount(), (String)"Fragments should merge");
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
        this.runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
        List mffs = this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
        Assertions.assertEquals((long)1L, (long)mffs.stream().filter(ff -> "2".equals(ff.getAttribute("record.count"))).filter(ff -> "header\nJohn,35\nJane,34\n".equals(new String(ff.toByteArray()))).count());
    }

    @Test
    public void testDefragmentWithMultipleRecords() {
        this.runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
        HashMap<String, String> attr1 = new HashMap<String, String>();
        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
        attr1.put("record.count", "2");
        HashMap<String, String> attr2 = new HashMap<String, String>();
        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
        attr2.put("record.count", "2");
        HashMap<String, String> attr3 = new HashMap<String, String>();
        attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
        attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
        attr3.put("record.count", "2");
        this.runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1);
        this.runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2);
        this.runner.enqueue("Name, Age\nJay, 24\nJade, 28", attr3);
        this.runner.run(1);
        Assertions.assertEquals((int)1, (int)this.runner.getQueueSize().getObjectCount(), (String)"Fragment id=2 should remain in the incoming connection");
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
        List mffs = this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
        Assertions.assertEquals((long)1L, (long)mffs.stream().filter(ff -> "4".equals(ff.getAttribute("record.count"))).filter(ff -> "header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new String(ff.toByteArray()))).count());
    }

    @Test
    public void testDefragmentWithMultipleRecordsOverMultipleCalls() {
        this.runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
        HashMap<String, String> attr1 = new HashMap<String, String>();
        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
        attr1.put("record.count", "2");
        HashMap<String, String> attr2 = new HashMap<String, String>();
        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
        attr2.put("record.count", "2");
        this.runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1);
        this.runner.run(2);
        Assertions.assertEquals((int)1, (int)this.runner.getQueueSize().getObjectCount(), (String)"Fragment id=1 should remain in the incoming connection");
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        this.runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2);
        this.runner.run(1);
        Assertions.assertEquals((int)0, (int)this.runner.getQueueSize().getObjectCount(), (String)"Fragment id=1 should be merged");
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
        List mffs = this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
        Assertions.assertEquals((long)1L, (long)mffs.stream().filter(ff -> "4".equals(ff.getAttribute("record.count"))).filter(ff -> "header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new String(ff.toByteArray()))).count());
    }

    @Test
    public void testMinSize() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "2");
        this.runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.enqueue("Name, Age\nJane, 34");
        this.runner.run();
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        StringBuilder sb = new StringBuilder("Name, Age\n");
        for (int i = 0; i < 100; ++i) {
            sb.append("Person " + i + ", " + i + "\n");
        }
        this.runner.enqueue(sb.toString());
        this.runner.run(2);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
    }

    @Test
    public void testValidation() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "103");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "2");
        this.runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
        this.runner.assertNotValid();
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "2");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "103");
        this.runner.assertValid();
    }

    @Test
    public void testMinRecords() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "103");
        this.runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.enqueue("Name, Age\nJane, 34");
        StringBuilder sb = new StringBuilder("Name, Age\n");
        for (int i = 0; i < 100; ++i) {
            sb.append("Person " + i + ", " + i + "\n");
        }
        this.runner.enqueue(sb.toString());
        this.runner.run();
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.run(2);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
    }

    @Test
    public void testMaxRecords() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "5");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "10");
        for (int i = 0; i < 34; ++i) {
            this.runner.enqueue("Name, Age\nJohn, 35");
        }
        this.runner.run();
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 3);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 30);
        Assertions.assertEquals((int)4, (int)this.runner.getQueueSize().getObjectCount());
        this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).stream().forEach(ff -> ff.assertAttributeEquals("record.count", "10"));
    }

    @Test
    public void testMaxSize() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "5");
        this.runner.setProperty(MergeRecord.MAX_SIZE, "100 B");
        for (int i = 0; i < 36; ++i) {
            this.runner.enqueue("Name, Age\nJohnny, 5");
        }
        this.runner.run();
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 3);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 33);
        Assertions.assertEquals((int)3, (int)this.runner.getQueueSize().getObjectCount());
    }

    @Test
    @EnabledIfSystemProperty(named="nifi.test.performance", matches="true", disabledReason="This unit test depends on timing and could potentially cause problems in an automated build environment. However, it can be useful for manual testing")
    public void testTimeout() throws InterruptedException {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "500");
        this.runner.setProperty(MergeRecord.MAX_BIN_AGE, "500 millis");
        for (int i = 0; i < 100; ++i) {
            this.runner.enqueue("Name, Age\nJohnny, 5");
        }
        this.runner.run(1, false);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        Thread.sleep(750L);
        this.runner.run(1, true, false);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 100);
    }

    @Test
    public void testBinCount() {
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "5");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "10");
        this.runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
        this.runner.setProperty(MergeRecord.CORRELATION_ATTRIBUTE_NAME, "correlationId");
        HashMap<String, String> attrs = new HashMap<String, String>();
        for (int i = 0; i < 5; ++i) {
            attrs.put("correlationId", String.valueOf(i));
            this.runner.enqueue("Name, Age\nJohn, 3" + i, attrs);
        }
        this.runner.run(1, false);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        this.runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
        attrs.put("correlationId", "5");
        this.runner.enqueue("Name, Age\nJohn, 35", attrs);
        Assertions.assertEquals((int)5, (int)((MergeRecord)this.runner.getProcessor()).getBinCount());
        this.runner.run(1, false, false);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 1);
        this.runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
        Assertions.assertEquals((int)5, (int)((MergeRecord)this.runner.getProcessor()).getBinCount());
    }

    @Test
    public void testDefragmentOldestBinFailsWhenTooManyBins() {
        this.runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
        this.runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
        HashMap<String, String> attrs = new HashMap<String, String>();
        attrs.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "5");
        for (int i = 0; i < 5; ++i) {
            attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, String.valueOf(i));
            this.runner.enqueue("Name, Age\nJohn, 3" + i, attrs);
        }
        this.runner.run(1, false);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        this.runner.assertTransferCount(MergeRecord.REL_FAILURE, 0);
        attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "5");
        this.runner.enqueue("Name, Age\nJohn, 35", attrs);
        Assertions.assertEquals((int)5, (int)((MergeRecord)this.runner.getProcessor()).getBinCount());
        this.runner.run(1, false, false);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        this.runner.assertTransferCount(MergeRecord.REL_FAILURE, 1);
        Assertions.assertEquals((int)5, (int)((MergeRecord)this.runner.getProcessor()).getBinCount());
    }

    @Test
    public void testDefragmentExpiredBinFailsOnTimeout() throws InterruptedException {
        this.runner.setProperty(MergeRecord.MAX_BIN_COUNT, "5");
        this.runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
        this.runner.setProperty(MergeRecord.MAX_BIN_AGE, "1 millis");
        HashMap<String, String> attrs = new HashMap<String, String>();
        attrs.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "5");
        attrs.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "0");
        this.runner.enqueue("Name, Age\nJohn, 30", attrs);
        this.runner.run(1, false);
        Thread.sleep(50L);
        this.runner.run(1, false, false);
        this.runner.run(1, true, false);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 0);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
        this.runner.assertTransferCount(MergeRecord.REL_FAILURE, 1);
    }

    @Test
    public void testMergeWithMinRecordsFromVariableRegistry() {
        this.runner.setVariable("min_records", "3");
        this.runner.setVariable("max_records", "3");
        this.runner.setValidateExpressionUsage(true);
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "3");
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.enqueue("Name, Age\nJane, 34");
        this.runner.enqueue("Name, Age\nAlex, 28");
        this.runner.run(1);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 3);
        MockFlowFile mff = (MockFlowFile)this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
        mff.assertAttributeEquals("record.count", "3");
        mff.assertContentEquals("header\nJohn,35\nJane,34\nAlex,28\n");
        this.runner.clearTransferState();
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "1");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
        this.runner.enqueue("Name, Age\nJohn, 35");
        this.runner.enqueue("Name, Age\nJane, 34");
        this.runner.enqueue("Name, Age\nAlex, 28");
        this.runner.enqueue("Name, Age\nDonna, 48");
        this.runner.enqueue("Name, Age\nJoey, 45");
        this.runner.run(2);
        this.runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
        this.runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 5);
        MockFlowFile mff1 = (MockFlowFile)this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(0);
        mff1.assertAttributeEquals("record.count", "3");
        mff1.assertContentEquals("header\nJohn,35\nJane,34\nAlex,28\n");
        MockFlowFile mff2 = (MockFlowFile)this.runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).get(1);
        mff2.assertAttributeEquals("record.count", "2");
        mff2.assertContentEquals("header\nDonna,48\nJoey,45\n");
        this.runner.clearTransferState();
        this.runner.removeProperty("min_records");
        this.runner.removeProperty("max_records");
    }

    @Test
    public void testNegativeMinAndMaxRecordsValidators() {
        this.runner.setVariable("min_records", "-3");
        this.runner.setVariable("max_records", "-1");
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "3");
        this.runner.assertNotValid();
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
        this.runner.assertNotValid();
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "3");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
        this.runner.assertNotValid();
        this.runner.removeProperty(MergeRecord.MIN_RECORDS);
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
        this.runner.assertNotValid();
        this.runner.setVariable("min_records", "-1");
        this.runner.setVariable("max_records", "-3");
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
        this.runner.assertNotValid();
        this.runner.setVariable("min_records", "1");
        this.runner.setVariable("max_records", "5");
        this.runner.setProperty(MergeRecord.MIN_RECORDS, "${min_records}");
        this.runner.setProperty(MergeRecord.MAX_RECORDS, "${max_records}");
        this.runner.assertValid();
        this.runner.removeProperty("min_records");
        this.runner.removeProperty("max_records");
    }
}

