/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipInputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.StandardFlowFileMediaType;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.standard.MergeContent;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestMergeContent {
    @Test
    public void testFlowFileLargerThanBin() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK);
        runner.setProperty(MergeContent.MIN_ENTRIES, "2");
        runner.setProperty(MergeContent.MAX_ENTRIES, "2");
        runner.setProperty(MergeContent.MIN_SIZE, "1 KB");
        runner.setProperty(MergeContent.MAX_SIZE, "5 KB");
        runner.enqueue(new byte[1026]);
        runner.enqueue(new byte[6144]);
        runner.run(2);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 1);
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        Assertions.assertEquals((Object)((MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0)).getAttribute(CoreAttributes.UUID.key()), (Object)((MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).get(0)).getAttribute("merge.uuid"));
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        Assertions.assertEquals((long)6144L, (long)bundle.getSize());
        runner.assertQueueNotEmpty();
    }

    @Test
    public void testSimpleAvroConcat() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
        Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
        GenericData.Record user1 = new GenericData.Record(schema);
        user1.put("name", (Object)"Alyssa");
        user1.put("favorite_number", (Object)256);
        GenericData.Record user2 = new GenericData.Record(schema);
        user2.put("name", (Object)"Ben");
        user2.put("favorite_number", (Object)7);
        user2.put("favorite_color", (Object)"red");
        GenericData.Record user3 = new GenericData.Record(schema);
        user3.put("name", (Object)"John");
        user3.put("favorite_number", (Object)5);
        user3.put("favorite_color", (Object)"blue");
        GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
        ByteArrayOutputStream out1 = this.serializeAvroRecord(schema, (GenericRecord)user1, (DatumWriter<GenericRecord>)datumWriter);
        ByteArrayOutputStream out2 = this.serializeAvroRecord(schema, (GenericRecord)user2, (DatumWriter<GenericRecord>)datumWriter);
        ByteArrayOutputStream out3 = this.serializeAvroRecord(schema, (GenericRecord)user3, (DatumWriter<GenericRecord>)datumWriter);
        runner.enqueue(out1.toByteArray());
        runner.enqueue(out2.toByteArray());
        runner.enqueue(out3.toByteArray());
        runner.run();
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
        byte[] data = runner.getContentAsByteArray(bundle);
        Map<String, GenericRecord> users = this.getGenericRecordMap(data, schema, "name");
        Assertions.assertEquals((int)3, (int)users.size());
        Assertions.assertTrue((boolean)users.containsKey("Alyssa"));
        Assertions.assertTrue((boolean)users.containsKey("Ben"));
        Assertions.assertTrue((boolean)users.containsKey("John"));
    }

    @Test
    public void testAvroConcatWithDifferentSchemas() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
        Schema schema1 = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
        Schema schema2 = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/place.avsc"));
        GenericData.Record record1 = new GenericData.Record(schema1);
        record1.put("name", (Object)"Alyssa");
        record1.put("favorite_number", (Object)256);
        GenericData.Record record2 = new GenericData.Record(schema2);
        record2.put("name", (Object)"Some Place");
        GenericData.Record record3 = new GenericData.Record(schema1);
        record3.put("name", (Object)"John");
        record3.put("favorite_number", (Object)5);
        record3.put("favorite_color", (Object)"blue");
        GenericDatumWriter datumWriter = new GenericDatumWriter(schema1);
        ByteArrayOutputStream out1 = this.serializeAvroRecord(schema1, (GenericRecord)record1, (DatumWriter<GenericRecord>)datumWriter);
        ByteArrayOutputStream out2 = this.serializeAvroRecord(schema2, (GenericRecord)record2, (DatumWriter<GenericRecord>)datumWriter);
        ByteArrayOutputStream out3 = this.serializeAvroRecord(schema1, (GenericRecord)record3, (DatumWriter<GenericRecord>)datumWriter);
        runner.enqueue(out1.toByteArray());
        runner.enqueue(out2.toByteArray());
        runner.enqueue(out3.toByteArray());
        runner.run();
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
        byte[] data = runner.getContentAsByteArray(bundle);
        Map<String, GenericRecord> users = this.getGenericRecordMap(data, schema1, "name");
        Assertions.assertEquals((int)2, (int)users.size());
        Assertions.assertTrue((boolean)users.containsKey("Alyssa"));
        Assertions.assertTrue((boolean)users.containsKey("John"));
        MockFlowFile failure = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_FAILURE).get(0);
        byte[] failureData = runner.getContentAsByteArray(failure);
        Map<String, GenericRecord> places = this.getGenericRecordMap(failureData, schema2, "name");
        Assertions.assertEquals((int)1, (int)places.size());
        Assertions.assertTrue((boolean)places.containsKey("Some Place"));
    }

    @Test
    public void testAvroConcatWithDifferentMetadataDoNotMerge() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
        runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_DO_NOT_MERGE);
        Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
        GenericData.Record user1 = new GenericData.Record(schema);
        user1.put("name", (Object)"Alyssa");
        user1.put("favorite_number", (Object)256);
        HashMap<String, String> userMeta1 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 1");
            }
        };
        GenericData.Record user2 = new GenericData.Record(schema);
        user2.put("name", (Object)"Ben");
        user2.put("favorite_number", (Object)7);
        user2.put("favorite_color", (Object)"red");
        HashMap<String, String> userMeta2 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 2");
            }
        };
        GenericData.Record user3 = new GenericData.Record(schema);
        user3.put("name", (Object)"John");
        user3.put("favorite_number", (Object)5);
        user3.put("favorite_color", (Object)"blue");
        HashMap<String, String> userMeta3 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 1");
                this.put("test_metadata2", "Test");
            }
        };
        GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
        ByteArrayOutputStream out1 = this.serializeAvroRecord(schema, (GenericRecord)user1, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta1);
        ByteArrayOutputStream out2 = this.serializeAvroRecord(schema, (GenericRecord)user2, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta2);
        ByteArrayOutputStream out3 = this.serializeAvroRecord(schema, (GenericRecord)user3, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta3);
        runner.enqueue(out1.toByteArray());
        runner.enqueue(out2.toByteArray());
        runner.enqueue(out3.toByteArray());
        runner.run();
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
        byte[] data = runner.getContentAsByteArray(bundle);
        Map<String, GenericRecord> users = this.getGenericRecordMap(data, schema, "name");
        Assertions.assertEquals((int)1, (int)users.size());
        Assertions.assertTrue((boolean)users.containsKey("Alyssa"));
    }

    @Test
    public void testAvroConcatWithDifferentMetadataIgnore() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
        runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_IGNORE);
        Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
        GenericData.Record user1 = new GenericData.Record(schema);
        user1.put("name", (Object)"Alyssa");
        user1.put("favorite_number", (Object)256);
        HashMap<String, String> userMeta1 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 1");
            }
        };
        GenericData.Record user2 = new GenericData.Record(schema);
        user2.put("name", (Object)"Ben");
        user2.put("favorite_number", (Object)7);
        user2.put("favorite_color", (Object)"red");
        HashMap<String, String> userMeta2 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 2");
            }
        };
        GenericData.Record user3 = new GenericData.Record(schema);
        user3.put("name", (Object)"John");
        user3.put("favorite_number", (Object)5);
        user3.put("favorite_color", (Object)"blue");
        HashMap<String, String> userMeta3 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 1");
                this.put("test_metadata2", "Test");
            }
        };
        GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
        ByteArrayOutputStream out1 = this.serializeAvroRecord(schema, (GenericRecord)user1, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta1);
        ByteArrayOutputStream out2 = this.serializeAvroRecord(schema, (GenericRecord)user2, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta2);
        ByteArrayOutputStream out3 = this.serializeAvroRecord(schema, (GenericRecord)user3, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta3);
        runner.enqueue(out1.toByteArray());
        runner.enqueue(out2.toByteArray());
        runner.enqueue(out3.toByteArray());
        runner.run();
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
        byte[] data = runner.getContentAsByteArray(bundle);
        Map<String, GenericRecord> users = this.getGenericRecordMap(data, schema, "name");
        Assertions.assertEquals((int)3, (int)users.size());
        Assertions.assertTrue((boolean)users.containsKey("Alyssa"));
        Assertions.assertTrue((boolean)users.containsKey("Ben"));
        Assertions.assertTrue((boolean)users.containsKey("John"));
    }

    @Test
    public void testAvroConcatWithDifferentMetadataUseFirst() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
        runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_USE_FIRST);
        Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
        GenericData.Record user1 = new GenericData.Record(schema);
        user1.put("name", (Object)"Alyssa");
        user1.put("favorite_number", (Object)256);
        HashMap<String, String> userMeta1 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 1");
            }
        };
        GenericData.Record user2 = new GenericData.Record(schema);
        user2.put("name", (Object)"Ben");
        user2.put("favorite_number", (Object)7);
        user2.put("favorite_color", (Object)"red");
        HashMap<String, String> userMeta2 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 2");
            }
        };
        GenericData.Record user3 = new GenericData.Record(schema);
        user3.put("name", (Object)"John");
        user3.put("favorite_number", (Object)5);
        user3.put("favorite_color", (Object)"blue");
        HashMap<String, String> userMeta3 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 1");
                this.put("test_metadata2", "Test");
            }
        };
        GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
        ByteArrayOutputStream out1 = this.serializeAvroRecord(schema, (GenericRecord)user1, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta1);
        ByteArrayOutputStream out2 = this.serializeAvroRecord(schema, (GenericRecord)user2, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta2);
        ByteArrayOutputStream out3 = this.serializeAvroRecord(schema, (GenericRecord)user3, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta3);
        runner.enqueue(out1.toByteArray());
        runner.enqueue(out2.toByteArray());
        runner.enqueue(out3.toByteArray());
        runner.run();
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
        byte[] data = runner.getContentAsByteArray(bundle);
        Map<String, GenericRecord> users = this.getGenericRecordMap(data, schema, "name");
        Assertions.assertEquals((int)3, (int)users.size());
        Assertions.assertTrue((boolean)users.containsKey("Alyssa"));
        Assertions.assertTrue((boolean)users.containsKey("Ben"));
        Assertions.assertTrue((boolean)users.containsKey("John"));
    }

    @Test
    public void testAvroConcatWithDifferentMetadataKeepCommon() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
        runner.setProperty(MergeContent.MIN_ENTRIES, "3");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_AVRO);
        runner.setProperty(MergeContent.METADATA_STRATEGY, MergeContent.METADATA_STRATEGY_ALL_COMMON);
        Schema schema = new Schema.Parser().parse(new File("src/test/resources/TestMergeContent/user.avsc"));
        GenericData.Record user1 = new GenericData.Record(schema);
        user1.put("name", (Object)"Alyssa");
        user1.put("favorite_number", (Object)256);
        HashMap<String, String> userMeta1 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 1");
            }
        };
        GenericData.Record user2 = new GenericData.Record(schema);
        user2.put("name", (Object)"Ben");
        user2.put("favorite_number", (Object)7);
        user2.put("favorite_color", (Object)"red");
        HashMap<String, String> userMeta2 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 2");
            }
        };
        GenericData.Record user3 = new GenericData.Record(schema);
        user3.put("name", (Object)"John");
        user3.put("favorite_number", (Object)5);
        user3.put("favorite_color", (Object)"blue");
        HashMap<String, String> userMeta3 = new HashMap<String, String>(){
            {
                this.put("test_metadata1", "Test 1");
                this.put("test_metadata2", "Test");
            }
        };
        GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
        ByteArrayOutputStream out1 = this.serializeAvroRecord(schema, (GenericRecord)user1, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta1);
        ByteArrayOutputStream out2 = this.serializeAvroRecord(schema, (GenericRecord)user2, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta2);
        ByteArrayOutputStream out3 = this.serializeAvroRecord(schema, (GenericRecord)user3, (DatumWriter<GenericRecord>)datumWriter, (Map<String, String>)userMeta3);
        runner.enqueue(out1.toByteArray());
        runner.enqueue(out2.toByteArray());
        runner.enqueue(out3.toByteArray());
        runner.run();
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
        byte[] data = runner.getContentAsByteArray(bundle);
        Map<String, GenericRecord> users = this.getGenericRecordMap(data, schema, "name");
        Assertions.assertEquals((int)2, (int)users.size());
        Assertions.assertTrue((boolean)users.containsKey("Alyssa"));
        Assertions.assertTrue((boolean)users.containsKey("John"));
    }

    private Map<String, GenericRecord> getGenericRecordMap(byte[] data, Schema schema, String key) throws IOException {
        GenericDatumReader datumReader = new GenericDatumReader(schema);
        SeekableByteArrayInput input = new SeekableByteArrayInput(data);
        DataFileReader dataFileReader = new DataFileReader((SeekableInput)input, (DatumReader)datumReader);
        HashMap<String, GenericRecord> records = new HashMap<String, GenericRecord>();
        while (dataFileReader.hasNext()) {
            GenericRecord user = (GenericRecord)dataFileReader.next();
            records.put(user.get(key).toString(), user);
        }
        return records;
    }

    private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter<GenericRecord> datumWriter) throws IOException {
        return this.serializeAvroRecord(schema, user2, datumWriter, null);
    }

    private ByteArrayOutputStream serializeAvroRecord(Schema schema, GenericRecord user2, DatumWriter<GenericRecord> datumWriter, Map<String, String> metadata) throws IOException {
        ByteArrayOutputStream out2 = new ByteArrayOutputStream();
        DataFileWriter dataFileWriter2 = new DataFileWriter(datumWriter);
        if (metadata != null) {
            metadata.forEach((arg_0, arg_1) -> ((DataFileWriter)dataFileWriter2).setMeta(arg_0, arg_1));
        }
        dataFileWriter2.create(schema, (OutputStream)out2);
        dataFileWriter2.append((Object)user2);
        dataFileWriter2.close();
        return out2;
    }

    @Test
    public void testSimpleBinaryConcat() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        this.createFlowFiles(runner);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
        runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).stream().forEach(ff -> Assertions.assertEquals((Object)bundle.getAttribute(CoreAttributes.UUID.key()), (Object)ff.getAttribute("merge.uuid")));
    }

    @Test
    public void testSimpleBinaryConcatSingleBin() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.MAX_BIN_COUNT, "1");
        this.createFlowFiles(runner);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertContentEquals("Hello, World!");
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
    }

    @Test
    public void testSimpleBinaryConcatWithTextDelimiters() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
        runner.setProperty(MergeContent.HEADER, "@");
        runner.setProperty(MergeContent.DEMARCATOR, "#");
        runner.setProperty(MergeContent.FOOTER, "$");
        this.createFlowFiles(runner);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertContentEquals("@Hello#, #World!$".getBytes("UTF-8"));
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
    }

    @Test
    public void testSimpleBinaryConcatWithTextDelimitersHeaderOnly() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
        runner.setProperty(MergeContent.HEADER, "@");
        this.createFlowFiles(runner);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertContentEquals("@Hello, World!".getBytes("UTF-8"));
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
    }

    @Test
    public void testSimpleBinaryConcatWithFileDelimiters() throws IOException, InterruptedException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_FILENAME);
        runner.setProperty(MergeContent.HEADER, "${header}");
        runner.setProperty(MergeContent.DEMARCATOR, "${demarcator}");
        runner.setProperty(MergeContent.FOOTER, "${footer}");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
        attributes.put("header", "src/test/resources/TestMergeContent/head");
        attributes.put("demarcator", "src/test/resources/TestMergeContent/demarcate");
        attributes.put("footer", "src/test/resources/TestMergeContent/foot");
        runner.enqueue("Hello".getBytes("UTF-8"), attributes);
        runner.enqueue(", ".getBytes("UTF-8"), attributes);
        runner.enqueue("World!".getBytes("UTF-8"), attributes);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertContentEquals("(|)Hello***, ***World!___".getBytes("UTF-8"));
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
    }

    @Test
    void testSimpleBinaryConcatNoDelimiters() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_NONE);
        runner.setProperty(MergeContent.HEADER, "aHeader");
        runner.setProperty(MergeContent.DEMARCATOR, "; ");
        runner.setProperty(MergeContent.FOOTER, "aFooter");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
        runner.enqueue("First", attributes);
        runner.enqueue("Second", attributes);
        runner.enqueue("Third", attributes);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertContentEquals("FirstSecondThird");
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
    }

    @Test
    public void testTextDelimitersValidation() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_TEXT);
        runner.setProperty(MergeContent.HEADER, "");
        runner.setProperty(MergeContent.DEMARCATOR, "");
        runner.setProperty(MergeContent.FOOTER, "");
        Collection results = new HashSet();
        ProcessContext context = runner.getProcessContext();
        if (context instanceof MockProcessContext) {
            MockProcessContext mockContext = (MockProcessContext)context;
            results = mockContext.validate();
        }
        Assertions.assertEquals((int)3, (int)results.size());
        for (ValidationResult vr : results) {
            Assertions.assertTrue((boolean)vr.toString().contains("cannot be empty"));
        }
    }

    @Test
    public void testFileDelimitersValidation() {
        String doesNotExistFile = "src/test/resources/TestMergeContent/does_not_exist";
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.DELIMITER_STRATEGY, MergeContent.DELIMITER_STRATEGY_FILENAME);
        runner.setProperty(MergeContent.HEADER, "src/test/resources/TestMergeContent/does_not_exist");
        runner.setProperty(MergeContent.DEMARCATOR, "src/test/resources/TestMergeContent/does_not_exist");
        runner.setProperty(MergeContent.FOOTER, "src/test/resources/TestMergeContent/does_not_exist");
        Collection results = new HashSet();
        ProcessContext context = runner.getProcessContext();
        if (context instanceof MockProcessContext) {
            MockProcessContext mockContext = (MockProcessContext)context;
            results = mockContext.validate();
        }
        Assertions.assertEquals((int)3, (int)results.size());
        for (ValidationResult vr : results) {
            Assertions.assertTrue((boolean)vr.toString().contains("is invalid because File " + new File("src/test/resources/TestMergeContent/does_not_exist").toString() + " does not exist"));
        }
    }

    @Test
    public void testMimeTypeIsOctetStreamIfConflictingWithBinaryConcat() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        this.createFlowFiles(runner);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/zip");
        runner.enqueue(new byte[0], attributes);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 4);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/octet-stream");
    }

    @Test
    public void testOldestBinIsExpired() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 day");
        runner.setProperty(MergeContent.MAX_BIN_COUNT, "50");
        runner.setProperty(MergeContent.MIN_ENTRIES, "10");
        runner.setProperty(MergeContent.MAX_ENTRIES, "10");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "correlationId");
        HashMap<String, String> attrs = new HashMap<String, String>();
        for (int i = 0; i < 49; ++i) {
            attrs.put("correlationId", String.valueOf(i));
            for (int j = 0; j < 5; ++j) {
                runner.enqueue(new byte[0], attrs);
            }
        }
        runner.run();
        runner.assertQueueNotEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 0);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
        attrs.remove("correlationId");
        runner.clearTransferState();
        runner.run(1, false, true);
        runner.enqueue(new byte[0], attrs);
        attrs.put("correlationId", "abc");
        runner.enqueue(new byte[0], attrs);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 5);
    }

    @Test
    public void testSimpleBinaryConcatWaitsForMin() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        runner.setProperty(MergeContent.MIN_SIZE, "20 KB");
        this.createFlowFiles(runner);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 0);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
    }

    @Test
    public void testZip() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
        this.createFlowFiles(runner);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        try (ByteArrayInputStream rawIn = new ByteArrayInputStream(runner.getContentAsByteArray(bundle));
             ZipInputStream in = new ZipInputStream(rawIn);){
            Assertions.assertNotNull((Object)in.getNextEntry());
            byte[] part1 = IOUtils.toByteArray((InputStream)in);
            Assertions.assertArrayEquals((byte[])"Hello".getBytes("UTF-8"), (byte[])part1);
            in.getNextEntry();
            byte[] part2 = IOUtils.toByteArray((InputStream)in);
            Assertions.assertArrayEquals((byte[])", ".getBytes("UTF-8"), (byte[])part2);
            in.getNextEntry();
            byte[] part3 = IOUtils.toByteArray((InputStream)in);
            Assertions.assertArrayEquals((byte[])"World!".getBytes("UTF-8"), (byte[])part3);
        }
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/zip");
    }

    @Test
    public void testZipException() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_ZIP);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
        attributes.put("filename", "duplicate-filename.txt");
        runner.enqueue("Hello".getBytes("UTF-8"), attributes);
        runner.enqueue(", ".getBytes("UTF-8"), attributes);
        runner.enqueue("World!".getBytes("UTF-8"), attributes);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 2);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
    }

    @Test
    public void testTar() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_TAR);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
        attributes.put(CoreAttributes.FILENAME.key(), "AShortFileName");
        runner.enqueue("Hello".getBytes("UTF-8"), attributes);
        attributes.put(CoreAttributes.FILENAME.key(), "ALongerrrFileName");
        runner.enqueue(", ".getBytes("UTF-8"), attributes);
        attributes.put(CoreAttributes.FILENAME.key(), "AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName");
        runner.enqueue("World!".getBytes("UTF-8"), attributes);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        try (ByteArrayInputStream rawIn = new ByteArrayInputStream(runner.getContentAsByteArray(bundle));
             TarArchiveInputStream in = new TarArchiveInputStream((InputStream)rawIn);){
            TarArchiveEntry entry = in.getNextEntry();
            Assertions.assertNotNull((Object)entry);
            Assertions.assertEquals((Object)"AShortFileName", (Object)entry.getName());
            byte[] part1 = IOUtils.toByteArray((InputStream)in);
            Assertions.assertArrayEquals((byte[])"Hello".getBytes("UTF-8"), (byte[])part1);
            entry = in.getNextEntry();
            Assertions.assertEquals((Object)"ALongerrrFileName", (Object)entry.getName());
            byte[] part2 = IOUtils.toByteArray((InputStream)in);
            Assertions.assertArrayEquals((byte[])", ".getBytes("UTF-8"), (byte[])part2);
            entry = in.getNextEntry();
            Assertions.assertEquals((Object)"AReallyLongggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggggFileName", (Object)entry.getName());
            byte[] part3 = IOUtils.toByteArray((InputStream)in);
            Assertions.assertArrayEquals((byte[])"World!".getBytes("UTF-8"), (byte[])part3);
        }
        bundle.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/tar");
    }

    @Test
    public void testFlowFileStream() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MIN_ENTRIES, "2");
        runner.setProperty(MergeContent.MAX_ENTRIES, "2");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_FLOWFILE_STREAM_V3);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("path", "folder");
        runner.enqueue(Paths.get("src/test/resources/TestUnpackContent/folder/cal.txt", new String[0]), attributes);
        runner.enqueue(Paths.get("src/test/resources/TestUnpackContent/folder/date.txt", new String[0]), attributes);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 2);
        MockFlowFile merged = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        merged.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), StandardFlowFileMediaType.VERSION_3.getMediaType());
    }

    @Test
    public void testDefragment() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
    }

    @Test
    public void testDefragmentWithFragmentCountOnLastFragmentOnly() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
    }

    @Test
    public void testDefragmentWithFragmentCountOnMiddleFragment() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
        String fragmentId = "Fragment Id";
        runner.enqueue("Fragment 1 without count ".getBytes("UTF-8"), (Map)new HashMap<String, String>(){
            {
                this.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "Fragment Id");
                this.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
            }
        });
        runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), (Map)new HashMap<String, String>(){
            {
                this.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "Fragment Id");
                this.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
                this.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
            }
        });
        runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), (Map)new HashMap<String, String>(){
            {
                this.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "Fragment Id");
                this.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
            }
        });
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("Fragment 1 without count Fragment 2 with count Fragment 3 without count".getBytes("UTF-8"));
    }

    @Test
    public void testDefragmentWithDifferentFragmentCounts() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
        String fragmentId = "Fragment Id";
        runner.enqueue("Fragment 1 with count ".getBytes("UTF-8"), (Map)new HashMap<String, String>(){
            {
                this.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "Fragment Id");
                this.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
                this.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "2");
            }
        });
        runner.enqueue("Fragment 2 with count ".getBytes("UTF-8"), (Map)new HashMap<String, String>(){
            {
                this.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "Fragment Id");
                this.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
                this.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
            }
        });
        runner.enqueue("Fragment 3 without count".getBytes("UTF-8"), (Map)new HashMap<String, String>(){
            {
                this.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "Fragment Id");
                this.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
            }
        });
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 0);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 3);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 0);
    }

    @Test
    public void testDefragmentDuplicateFragment() throws IOException, InterruptedException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run(1, false);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
        runner.clearTransferState();
        Thread.sleep(1100L);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_FAILURE, 1);
        runner.assertTransferCount(MergeContent.REL_MERGED, 0);
    }

    @Test
    public void testDefragmentWithTooManyFragments() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
    }

    @Test
    public void testDefragmentWithTooFewFragments() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "2 secs");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "5");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run(1, false);
        while (true) {
            try {
                Thread.sleep(3000L);
            }
            catch (InterruptedException interruptedException) {
                continue;
            }
            break;
        }
        runner.run(1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 4);
    }

    @Test
    public void testDefragmentOutOfOrder() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
        runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL).stream().forEach(ff -> Assertions.assertEquals((Object)assembled.getAttribute(CoreAttributes.UUID.key()), (Object)ff.getAttribute("merge.uuid")));
    }

    @Test
    public void testDefragmentMultipleMingledSegments() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
        HashMap<String, String> secondAttrs = new HashMap<String, String>();
        secondAttrs.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "TWO");
        secondAttrs.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "3");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        secondAttrs.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("No x ".getBytes("UTF-8"), secondAttrs);
        secondAttrs.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("in ".getBytes("UTF-8"), secondAttrs);
        secondAttrs.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("Nixon".getBytes("UTF-8"), secondAttrs);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run(1);
        runner.assertTransferCount(MergeContent.REL_MERGED, 2);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
        MockFlowFile assembledTwo = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(1);
        assembledTwo.assertContentEquals("No x in Nixon".getBytes("UTF-8"));
    }

    @Test
    public void testDefragmentOldStyleAttributes() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("segment.identifier", "1");
        attributes.put("segment.count", "4");
        attributes.put("segment.index", "1");
        attributes.put("segment.original.filename", "originalfilename");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        attributes.put("segment.index", "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        attributes.put("segment.index", "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put("segment.index", "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
        assembled.assertAttributeEquals(CoreAttributes.FILENAME.key(), "originalfilename");
    }

    @Test
    public void testDefragmentMultipleOnTriggers() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1");
        attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        runner.run();
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2");
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        runner.run();
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        runner.run();
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile assembled = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));
    }

    @Test
    public void testMergeBasedOnCorrelation() throws IOException, InterruptedException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
        runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "attr");
        runner.setProperty(MergeContent.MAX_ENTRIES, "3");
        runner.setProperty(MergeContent.MIN_ENTRIES, "1");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("attr", "b");
        runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
        runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
        attributes.put("attr", "c");
        runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
        attributes.put("attr", "b");
        runner.enqueue("Panama".getBytes("UTF-8"), attributes);
        runner.run(2);
        runner.assertTransferCount(MergeContent.REL_MERGED, 2);
        List mergedFiles = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED);
        MockFlowFile merged1 = (MockFlowFile)mergedFiles.get(0);
        MockFlowFile merged2 = (MockFlowFile)mergedFiles.get(1);
        String attr1 = merged1.getAttribute("attr");
        String attr2 = merged2.getAttribute("attr");
        if ("c".equals(attr1)) {
            Assertions.assertEquals((Object)"b", (Object)attr2);
            merged1.assertContentEquals("A Canal ", "UTF-8");
            merged2.assertContentEquals("A Man A Plan Panama", "UTF-8");
        } else {
            Assertions.assertEquals((Object)"b", (Object)attr1);
            Assertions.assertEquals((Object)"c", (Object)attr2);
            merged1.assertContentEquals("A Man A Plan Panama", "UTF-8");
            merged2.assertContentEquals("A Canal ", "UTF-8");
        }
    }

    @Test
    public void testMaxBinAge() throws InterruptedException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_BIN_PACK);
        runner.setProperty(MergeContent.MAX_BIN_AGE, "2 sec");
        runner.setProperty(MergeContent.CORRELATION_ATTRIBUTE_NAME, "attr");
        runner.setProperty(MergeContent.MAX_ENTRIES, "500");
        runner.setProperty(MergeContent.MIN_ENTRIES, "500");
        for (int i = 0; i < 50; ++i) {
            runner.enqueue(new byte[0]);
        }
        runner.run(5, false);
        runner.assertTransferCount(MergeContent.REL_MERGED, 0);
        runner.clearTransferState();
        Thread.sleep(3000L);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
    }

    @Test
    public void testUniqueAttributes() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_UNIQUE);
        runner.setProperty(MergeContent.MAX_SIZE, "2 B");
        runner.setProperty(MergeContent.MIN_SIZE, "2 B");
        HashMap<String, String> attr1 = new HashMap<String, String>();
        attr1.put("abc", "xyz");
        attr1.put("xyz", "123");
        attr1.put("hello", "good-bye");
        HashMap<String, String> attr2 = new HashMap<String, String>();
        attr2.put("abc", "xyz");
        attr2.put("xyz", "321");
        attr2.put("world", "aaa");
        runner.enqueue(new byte[1], attr1);
        runner.enqueue(new byte[1], attr2);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile outFile = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        outFile.assertAttributeEquals("abc", "xyz");
        outFile.assertAttributeEquals("hello", "good-bye");
        outFile.assertAttributeEquals("world", "aaa");
        outFile.assertAttributeNotExists("xyz");
    }

    @Test
    public void testCommonAttributesOnly() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(AttributeStrategyUtil.ATTRIBUTE_STRATEGY, AttributeStrategyUtil.ATTRIBUTE_STRATEGY_ALL_COMMON);
        runner.setProperty(MergeContent.MAX_SIZE, "2 B");
        runner.setProperty(MergeContent.MIN_SIZE, "2 B");
        HashMap<String, String> attr1 = new HashMap<String, String>();
        attr1.put("abc", "xyz");
        attr1.put("xyz", "123");
        attr1.put("hello", "good-bye");
        HashMap<String, String> attr2 = new HashMap<String, String>();
        attr2.put("abc", "xyz");
        attr2.put("xyz", "321");
        attr2.put("world", "aaa");
        runner.enqueue(new byte[1], attr1);
        runner.enqueue(new byte[1], attr2);
        runner.run();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        MockFlowFile outFile = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        outFile.assertAttributeEquals("abc", "xyz");
        outFile.assertAttributeNotExists("hello");
        outFile.assertAttributeNotExists("world");
        outFile.assertAttributeNotExists("xyz");
        HashSet<String> uuids = new HashSet<String>();
        for (MockFlowFile mff : runner.getFlowFilesForRelationship(MergeContent.REL_ORIGINAL)) {
            uuids.add(mff.getAttribute(CoreAttributes.UUID.key()));
        }
        uuids.add(outFile.getAttribute(CoreAttributes.UUID.key()));
        Assertions.assertEquals((int)3, (int)uuids.size());
    }

    @Test
    public void testCountAttribute() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MAX_BIN_AGE, "1 sec");
        runner.setProperty(MergeContent.MERGE_FORMAT, MergeContent.MERGE_FORMAT_CONCAT);
        this.createFlowFiles(runner);
        runner.run(2);
        runner.assertQueueEmpty();
        runner.assertTransferCount(MergeContent.REL_MERGED, 1);
        runner.assertTransferCount(MergeContent.REL_FAILURE, 0);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 3);
        MockFlowFile bundle = (MockFlowFile)runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0);
        bundle.assertContentEquals("Hello, World!".getBytes("UTF-8"));
        bundle.assertAttributeEquals("merge.count", "3");
        bundle.assertAttributeExists("merge.bin.age");
    }

    @Test
    public void testLeavesSmallBinUnmerged() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MergeContent());
        runner.setProperty(MergeContent.MIN_ENTRIES, "5");
        runner.setProperty(MergeContent.MAX_ENTRIES, "5");
        runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
        for (int i = 0; i < 17; ++i) {
            runner.enqueue(String.valueOf(i) + "\n");
        }
        runner.run(5);
        runner.assertTransferCount(MergeContent.REL_MERGED, 3);
        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 15);
        Assertions.assertEquals((int)2, (int)runner.getQueueSize().getObjectCount());
    }

    private void createFlowFiles(TestRunner testRunner) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");
        attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1");
        testRunner.enqueue("Hello", attributes);
        testRunner.enqueue(", ", attributes);
        testRunner.enqueue("World!", attributes);
    }
}

