/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.standard.ForkRecord;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

@DisabledOnOs(value={OS.WINDOWS}, disabledReason="Pretty printing is not portable as these fail on windows")
public class TestForkRecord {
    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
    private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();

    private List<RecordField> getDefaultFields() {
        ArrayList<RecordField> fields = new ArrayList<RecordField>();
        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
        fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
        fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
        fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
        fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
        fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType()));
        fields.add(new RecordField("country", RecordFieldType.STRING.getDataType()));
        return fields;
    }

    private RecordSchema getAccountSchema() {
        ArrayList<RecordField> accountFields = new ArrayList<RecordField>();
        accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
        accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
        return new SimpleRecordSchema(accountFields);
    }

    private RecordSchema getAccountWithTransactionSchema() {
        ArrayList<RecordField> accountFields = new ArrayList<RecordField>();
        accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
        accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
        DataType transactionRecordType = RecordFieldType.RECORD.getRecordDataType(this.getTransactionSchema());
        DataType transactionsType = RecordFieldType.ARRAY.getArrayDataType(transactionRecordType);
        accountFields.add(new RecordField("transactions", transactionsType));
        return new SimpleRecordSchema(accountFields);
    }

    private RecordSchema getTransactionSchema() {
        ArrayList<RecordField> transactionFields = new ArrayList<RecordField>();
        transactionFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
        transactionFields.add(new RecordField("amount", RecordFieldType.DOUBLE.getDataType()));
        return new SimpleRecordSchema(transactionFields);
    }

    @Test
    public void testForkExtractSimpleWithoutParentFields() throws IOException, MalformedRecordException, InitializationException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(this.getAccountSchema());
        DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
        List<RecordField> fields = this.getDefaultFields();
        fields.add(new RecordField("accounts", accountsType));
        SimpleRecordSchema schema = new SimpleRecordSchema(fields);
        JsonRecordReader readerService = new JsonRecordReader((RecordSchema)schema);
        CustomRecordWriter writerService = new CustomRecordWriter("header", false, this.getAccountSchema());
        runner.addControllerService("reader", (ControllerService)readerService);
        runner.enableControllerService((ControllerService)readerService);
        runner.addControllerService("writer", (ControllerService)writerService);
        runner.enableControllerService((ControllerService)writerService);
        runner.setProperty(ForkRecord.RECORD_READER, "reader");
        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
        runner.setProperty("my-path", "/accounts");
        runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath());
        runner.run(1);
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        MockFlowFile mff = (MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
        mff.assertAttributeEquals("record.count", "2");
        mff.assertContentEquals("header\n42,4750.89\n43,48212.38\n");
    }

    @Test
    public void testForkExtractSimpleWithParentFields() throws IOException, MalformedRecordException, InitializationException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(this.getAccountSchema());
        DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
        List<RecordField> fields = this.getDefaultFields();
        fields.add(new RecordField("accounts", accountsType));
        SimpleRecordSchema schema = new SimpleRecordSchema(fields);
        List<RecordField> fieldsWrite = this.getDefaultFields();
        fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
        SimpleRecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
        JsonRecordReader readerService = new JsonRecordReader((RecordSchema)schema);
        CustomRecordWriter writerService = new CustomRecordWriter("header", false, (RecordSchema)schemaWrite);
        runner.addControllerService("reader", (ControllerService)readerService);
        runner.enableControllerService((ControllerService)readerService);
        runner.addControllerService("writer", (ControllerService)writerService);
        runner.enableControllerService((ControllerService)writerService);
        runner.setProperty(ForkRecord.RECORD_READER, "reader");
        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
        runner.setProperty("my-path", "/accounts");
        runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath());
        runner.run(1);
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        MockFlowFile mff = (MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
        mff.assertAttributeEquals("record.count", "2");
        mff.assertContentEquals("header\n42,John Doe,123 My Street,My City,MS,11111,USA,4750.89\n43,John Doe,123 My Street,My City,MS,11111,USA,48212.38\n");
    }

    @Test
    public void testForkExtractNotAnArray() throws IOException, MalformedRecordException, InitializationException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(this.getAccountSchema());
        DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
        List<RecordField> fields = this.getDefaultFields();
        fields.add(new RecordField("accounts", accountsType));
        SimpleRecordSchema schema = new SimpleRecordSchema(fields);
        List<RecordField> fieldsWrite = this.getDefaultFields();
        fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
        SimpleRecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
        JsonRecordReader readerService = new JsonRecordReader((RecordSchema)schema);
        CustomRecordWriter writerService = new CustomRecordWriter("header", false, (RecordSchema)schemaWrite);
        runner.addControllerService("reader", (ControllerService)readerService);
        runner.enableControllerService((ControllerService)readerService);
        runner.addControllerService("writer", (ControllerService)writerService);
        runner.enableControllerService((ControllerService)writerService);
        runner.setProperty(ForkRecord.RECORD_READER, "reader");
        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
        runner.setProperty("my-path", "/country");
        runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-array.json").toPath());
        runner.run(1);
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        MockFlowFile mff = (MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
        mff.assertAttributeEquals("record.count", "0");
    }

    @Test
    public void testForkExtractNotAnArrayOfRecords() throws IOException, MalformedRecordException, InitializationException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        DataType accountRecordType = RecordFieldType.STRING.getDataType();
        DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
        List<RecordField> fields = this.getDefaultFields();
        fields.add(new RecordField("accounts", accountsType));
        SimpleRecordSchema schema = new SimpleRecordSchema(fields);
        List<RecordField> fieldsWrite = this.getDefaultFields();
        fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
        SimpleRecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
        JsonRecordReader readerService = new JsonRecordReader((RecordSchema)schema);
        CustomRecordWriter writerService = new CustomRecordWriter("header", false, (RecordSchema)schemaWrite);
        runner.addControllerService("reader", (ControllerService)readerService);
        runner.enableControllerService((ControllerService)readerService);
        runner.addControllerService("writer", (ControllerService)writerService);
        runner.enableControllerService((ControllerService)writerService);
        runner.setProperty(ForkRecord.RECORD_READER, "reader");
        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
        runner.setProperty("my-path", "/accounts");
        runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-array-strings.json").toPath());
        runner.run(1);
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        MockFlowFile mff = (MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
        mff.assertAttributeEquals("record.count", "0");
    }

    @Test
    public void testForkExtractComplexWithParentFields() throws IOException, MalformedRecordException, InitializationException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(this.getAccountWithTransactionSchema());
        DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
        List<RecordField> fields = this.getDefaultFields();
        fields.add(new RecordField("accounts", accountsType));
        SimpleRecordSchema schema = new SimpleRecordSchema(fields);
        List<RecordField> fieldsWrite = this.getDefaultFields();
        fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
        fieldsWrite.add(new RecordField("amount", RecordFieldType.DOUBLE.getDataType()));
        SimpleRecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
        JsonRecordReader readerService = new JsonRecordReader((RecordSchema)schema);
        CustomRecordWriter writerService = new CustomRecordWriter("header", false, (RecordSchema)schemaWrite);
        runner.addControllerService("reader", (ControllerService)readerService);
        runner.enableControllerService((ControllerService)readerService);
        runner.addControllerService("writer", (ControllerService)writerService);
        runner.enableControllerService((ControllerService)writerService);
        runner.setProperty(ForkRecord.RECORD_READER, "reader");
        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
        runner.setProperty("my-path", "/accounts[*]/transactions");
        runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-nested-array.json").toPath());
        runner.run(1);
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        MockFlowFile mff = (MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
        mff.assertAttributeEquals("record.count", "4");
        mff.assertContentEquals("header\n5,John Doe,123 My Street,My City,MS,11111,USA,4750.89,150.31\n6,John Doe,123 My Street,My City,MS,11111,USA,4750.89,-15.31\n7,John Doe,123 My Street,My City,MS,11111,USA,48212.38,36.78\n8,John Doe,123 My Street,My City,MS,11111,USA,48212.38,-21.34\n");
    }

    @Test
    public void testForkExtractComplexWithoutParentFields() throws IOException, MalformedRecordException, InitializationException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(this.getAccountWithTransactionSchema());
        DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
        List<RecordField> fields = this.getDefaultFields();
        fields.add(new RecordField("accounts", accountsType));
        SimpleRecordSchema schema = new SimpleRecordSchema(fields);
        ArrayList<RecordField> fieldsWrite = new ArrayList<RecordField>();
        fieldsWrite.add(new RecordField("id", RecordFieldType.INT.getDataType()));
        fieldsWrite.add(new RecordField("amount", RecordFieldType.DOUBLE.getDataType()));
        SimpleRecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
        JsonRecordReader readerService = new JsonRecordReader((RecordSchema)schema);
        CustomRecordWriter writerService = new CustomRecordWriter("header", false, (RecordSchema)schemaWrite);
        runner.addControllerService("reader", (ControllerService)readerService);
        runner.enableControllerService((ControllerService)readerService);
        runner.addControllerService("writer", (ControllerService)writerService);
        runner.enableControllerService((ControllerService)writerService);
        runner.setProperty(ForkRecord.RECORD_READER, "reader");
        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "false");
        runner.setProperty("my-path", "/accounts[*]/transactions");
        runner.enqueue(new File("src/test/resources/TestForkRecord/single-element-nested-nested-array.json").toPath());
        runner.run(1);
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        MockFlowFile mff = (MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
        mff.assertAttributeEquals("record.count", "4");
        mff.assertContentEquals("header\n5,150.31\n6,-15.31\n7,36.78\n8,-21.34\n");
    }

    @Test
    public void testForkExtractComplexWithParentFieldsAndNull() throws IOException, MalformedRecordException, InitializationException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(this.getAccountWithTransactionSchema());
        DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
        List<RecordField> fields = this.getDefaultFields();
        fields.add(new RecordField("accounts", accountsType));
        SimpleRecordSchema schema = new SimpleRecordSchema(fields);
        List<RecordField> fieldsWrite = this.getDefaultFields();
        fieldsWrite.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
        fieldsWrite.add(new RecordField("amount", RecordFieldType.DOUBLE.getDataType()));
        SimpleRecordSchema schemaWrite = new SimpleRecordSchema(fieldsWrite);
        JsonRecordReader readerService = new JsonRecordReader((RecordSchema)schema);
        CustomRecordWriter writerService = new CustomRecordWriter("header", false, (RecordSchema)schemaWrite);
        runner.addControllerService("reader", (ControllerService)readerService);
        runner.enableControllerService((ControllerService)readerService);
        runner.addControllerService("writer", (ControllerService)writerService);
        runner.enableControllerService((ControllerService)writerService);
        runner.setProperty(ForkRecord.RECORD_READER, "reader");
        runner.setProperty(ForkRecord.RECORD_WRITER, "writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
        runner.setProperty("my-path", "/accounts[*]/transactions");
        runner.enqueue(new File("src/test/resources/TestForkRecord/two-elements-nested-nested-array-null.json").toPath());
        runner.run(1);
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        MockFlowFile mff = (MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0);
        mff.assertAttributeEquals("record.count", "4");
        mff.assertContentEquals("header\n5,John Doe,123 My Street,My City,MS,11111,USA,4750.89,150.31\n6,John Doe,123 My Street,My City,MS,11111,USA,4750.89,-15.31\n7,John Doe,123 My Street,My City,MS,11111,USA,48212.38,36.78\n8,John Doe,123 My Street,My City,MS,11111,USA,48212.38,-21.34\n");
    }

    @Test
    public void testSplitMode() throws InitializationException, IOException {
        String expectedOutput = null;
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        JsonTreeReader jsonReader = new JsonTreeReader();
        runner.addControllerService("record-reader", (ControllerService)jsonReader);
        String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc", new String[0])));
        String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc", new String[0])));
        runner.setProperty((ControllerService)jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
        runner.setProperty((ControllerService)jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
        runner.enableControllerService((ControllerService)jsonReader);
        runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
        JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
        runner.addControllerService("record-writer", (ControllerService)jsonWriter);
        runner.setProperty((ControllerService)jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
        runner.setProperty((ControllerService)jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
        runner.setProperty((ControllerService)jsonWriter, "Pretty Print JSON", "true");
        runner.setProperty((ControllerService)jsonWriter, "Schema Write Strategy", "full-schema-attribute");
        runner.enableControllerService((ControllerService)jsonWriter);
        runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_SPLIT);
        runner.setProperty("my-path", "/address");
        runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json", new String[0]));
        runner.run();
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/split-address.json", new String[0])));
        ((MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0)).assertContentEquals(expectedOutput);
        ((MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0)).assertAttributeEquals("record.count", "5");
        runner.clearTransferState();
        runner.setProperty("my-path", "/bankAccounts[*]/last5Transactions");
        runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json", new String[0]));
        runner.run();
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/split-transactions.json", new String[0])));
        ((MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0)).assertContentEquals(expectedOutput);
        ((MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0)).assertAttributeEquals("record.count", "6");
    }

    @Test
    public void testExtractMode() throws InitializationException, IOException {
        String expectedOutput = null;
        TestRunner runner = TestRunners.newTestRunner((Processor)new ForkRecord());
        JsonTreeReader jsonReader = new JsonTreeReader();
        runner.addControllerService("record-reader", (ControllerService)jsonReader);
        String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/schema.avsc", new String[0])));
        String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/schema/extract-schema.avsc", new String[0])));
        runner.setProperty((ControllerService)jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
        runner.setProperty((ControllerService)jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
        runner.enableControllerService((ControllerService)jsonReader);
        runner.setProperty(ForkRecord.RECORD_READER, "record-reader");
        JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
        runner.addControllerService("record-writer", (ControllerService)jsonWriter);
        runner.setProperty((ControllerService)jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
        runner.setProperty((ControllerService)jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
        runner.setProperty((ControllerService)jsonWriter, "Pretty Print JSON", "true");
        runner.setProperty((ControllerService)jsonWriter, "Schema Write Strategy", "full-schema-attribute");
        runner.enableControllerService((ControllerService)jsonWriter);
        runner.setProperty(ForkRecord.RECORD_WRITER, "record-writer");
        runner.setProperty(ForkRecord.MODE, ForkRecord.MODE_EXTRACT);
        runner.setProperty(ForkRecord.INCLUDE_PARENT_FIELDS, "true");
        runner.setProperty("my-path", "/bankAccounts[*]/last5Transactions");
        runner.enqueue(Paths.get("src/test/resources/TestForkRecord/input/complex-input-json.json", new String[0]));
        runner.run();
        runner.assertTransferCount(ForkRecord.REL_ORIGINAL, 1);
        runner.assertTransferCount(ForkRecord.REL_FORK, 1);
        expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestForkRecord/output/extract-transactions.json", new String[0])));
        ((MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0)).assertContentEquals(expectedOutput);
        ((MockFlowFile)runner.getFlowFilesForRelationship(ForkRecord.REL_FORK).get(0)).assertAttributeEquals("record.count", "6");
    }

    private class CustomRecordWriter
    extends MockRecordWriter {
        RecordSchema schema;

        public CustomRecordWriter(String header, boolean quoteValues, RecordSchema schema) {
            super(header, quoteValues);
            this.schema = schema;
        }

        public RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
            return this.schema;
        }
    }

    private class JsonRecordReader
    extends AbstractControllerService
    implements RecordReaderFactory {
        RecordSchema schema;

        public JsonRecordReader(RecordSchema schema) {
            this.schema = schema;
        }

        public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
            return new JsonTreeRowRecordReader(in, logger, this.schema, TestForkRecord.this.dateFormat, TestForkRecord.this.timeFormat, TestForkRecord.this.timestampFormat);
        }

        public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
            return new JsonTreeRowRecordReader(in, logger, this.schema, TestForkRecord.this.dateFormat, TestForkRecord.this.timeFormat, TestForkRecord.this.timestampFormat);
        }
    }
}

