/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.file;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.file.FileStreamSourceTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class FileStreamSourceTaskTest {
    private static final String TOPIC = "test";
    private File tempFile;
    private Map<String, String> config;
    private OffsetStorageReader offsetStorageReader;
    private SourceTaskContext context;
    private FileStreamSourceTask task;

    @BeforeEach
    public void setup() throws IOException {
        this.tempFile = File.createTempFile("file-stream-source-task-test", null);
        this.config = new HashMap<String, String>();
        this.config.put("file", this.tempFile.getAbsolutePath());
        this.config.put("topic", TOPIC);
        this.config.put("batch.size", String.valueOf(2000));
        this.task = new FileStreamSourceTask(2);
        this.offsetStorageReader = (OffsetStorageReader)Mockito.mock(OffsetStorageReader.class);
        this.context = (SourceTaskContext)Mockito.mock(SourceTaskContext.class);
        this.task.initialize(this.context);
    }

    @AfterEach
    public void teardown() throws IOException {
        Files.deleteIfExists(this.tempFile.toPath());
    }

    @Test
    public void testNormalLifecycle() throws InterruptedException, IOException {
        this.expectOffsetLookupReturnNone();
        this.task.start(this.config);
        OutputStream os = Files.newOutputStream(this.tempFile.toPath(), new OpenOption[0]);
        Assertions.assertNull((Object)this.task.poll());
        os.write("partial line".getBytes());
        os.flush();
        Assertions.assertNull((Object)this.task.poll());
        os.write(" finished\n".getBytes());
        os.flush();
        List records = this.task.poll();
        Assertions.assertEquals((int)1, (int)records.size());
        Assertions.assertEquals((Object)TOPIC, (Object)((SourceRecord)records.get(0)).topic());
        Assertions.assertEquals((Object)"partial line finished", (Object)((SourceRecord)records.get(0)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(0)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 22L), (Object)((SourceRecord)records.get(0)).sourceOffset());
        Assertions.assertNull((Object)this.task.poll());
        os.write("line1\rline2\r\nline3\nline4\n\r".getBytes());
        os.flush();
        records = this.task.poll();
        Assertions.assertEquals((int)4, (int)records.size());
        Assertions.assertEquals((Object)"line1", (Object)((SourceRecord)records.get(0)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(0)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 28L), (Object)((SourceRecord)records.get(0)).sourceOffset());
        Assertions.assertEquals((Object)"line2", (Object)((SourceRecord)records.get(1)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(1)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 35L), (Object)((SourceRecord)records.get(1)).sourceOffset());
        Assertions.assertEquals((Object)"line3", (Object)((SourceRecord)records.get(2)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(2)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 41L), (Object)((SourceRecord)records.get(2)).sourceOffset());
        Assertions.assertEquals((Object)"line4", (Object)((SourceRecord)records.get(3)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(3)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 47L), (Object)((SourceRecord)records.get(3)).sourceOffset());
        os.write("subsequent text".getBytes());
        os.flush();
        records = this.task.poll();
        Assertions.assertEquals((int)1, (int)records.size());
        Assertions.assertEquals((Object)"", (Object)((SourceRecord)records.get(0)).value());
        Assertions.assertEquals(Collections.singletonMap("filename", this.tempFile.getAbsolutePath()), (Object)((SourceRecord)records.get(0)).sourcePartition());
        Assertions.assertEquals(Collections.singletonMap("position", 48L), (Object)((SourceRecord)records.get(0)).sourceOffset());
        os.close();
        this.task.stop();
        this.verifyAll();
    }

    @Test
    public void testBatchSize() throws IOException, InterruptedException {
        this.expectOffsetLookupReturnNone();
        this.config.put("batch.size", "5000");
        this.task.start(this.config);
        OutputStream os = Files.newOutputStream(this.tempFile.toPath(), new OpenOption[0]);
        this.writeTimesAndFlush(os, 10000, "Neque porro quisquam est qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit...\n".getBytes());
        Assertions.assertEquals((int)2, (int)this.task.bufferSize());
        List records = this.task.poll();
        Assertions.assertEquals((int)5000, (int)records.size());
        Assertions.assertEquals((int)128, (int)this.task.bufferSize());
        records = this.task.poll();
        Assertions.assertEquals((int)5000, (int)records.size());
        Assertions.assertEquals((int)128, (int)this.task.bufferSize());
        os.close();
        this.task.stop();
        this.verifyAll();
    }

    @Test
    public void testBufferResize() throws IOException, InterruptedException {
        int batchSize = 1000;
        this.expectOffsetLookupReturnNone();
        this.config.put("batch.size", Integer.toString(batchSize));
        this.task.start(this.config);
        OutputStream os = Files.newOutputStream(this.tempFile.toPath(), new OpenOption[0]);
        Assertions.assertEquals((int)2, (int)this.task.bufferSize());
        this.writeAndAssertBufferSize(batchSize, os, "1\n".getBytes(), 2);
        this.writeAndAssertBufferSize(batchSize, os, "3 \n".getBytes(), 4);
        this.writeAndAssertBufferSize(batchSize, os, "7     \n".getBytes(), 8);
        this.writeAndAssertBufferSize(batchSize, os, "8      \n".getBytes(), 8);
        this.writeAndAssertBufferSize(batchSize, os, "9       \n".getBytes(), 16);
        byte[] bytes = new byte[1025];
        Arrays.fill(bytes, (byte)42);
        bytes[bytes.length - 1] = 10;
        this.writeAndAssertBufferSize(batchSize, os, bytes, 2048);
        this.writeAndAssertBufferSize(batchSize, os, "9       \n".getBytes(), 2048);
        os.close();
        this.task.stop();
        this.verifyAll();
    }

    private void writeAndAssertBufferSize(int batchSize, OutputStream os, byte[] bytes, int expectBufferSize) throws IOException, InterruptedException {
        this.writeTimesAndFlush(os, batchSize, bytes);
        List records = this.task.poll();
        Assertions.assertEquals((int)batchSize, (int)records.size());
        String expectedLine = new String(bytes, 0, bytes.length - 1);
        for (SourceRecord record : records) {
            Assertions.assertEquals((Object)expectedLine, (Object)record.value());
        }
        Assertions.assertEquals((int)expectBufferSize, (int)this.task.bufferSize());
    }

    private void writeTimesAndFlush(OutputStream os, int times, byte[] line) throws IOException {
        for (int i = 0; i < times; ++i) {
            os.write(line);
        }
        os.flush();
    }

    @Test
    public void testUsingSystemInputSourceOnMissingFile() throws InterruptedException {
        String data = "line\n";
        System.setIn(new ByteArrayInputStream(data.getBytes()));
        this.config.remove("file");
        this.task.start(this.config);
        List records = this.task.poll();
        Assertions.assertEquals((int)1, (int)records.size());
        Assertions.assertEquals((Object)TOPIC, (Object)((SourceRecord)records.get(0)).topic());
        Assertions.assertEquals((Object)"line", (Object)((SourceRecord)records.get(0)).value());
        this.task.stop();
    }

    @Test
    public void testInvalidFile() throws InterruptedException {
        this.config.put("file", "bogusfilename");
        this.task.start(this.config);
        for (int i = 0; i < 3; ++i) {
            Assertions.assertNull((Object)this.task.poll());
        }
    }

    private void expectOffsetLookupReturnNone() {
        Mockito.when((Object)this.context.offsetStorageReader()).thenReturn((Object)this.offsetStorageReader);
        Mockito.when((Object)this.offsetStorageReader.offset(ArgumentMatchers.anyMap())).thenReturn(null);
    }

    private void verifyAll() {
        ((SourceTaskContext)Mockito.verify((Object)this.context)).offsetStorageReader();
        ((OffsetStorageReader)Mockito.verify((Object)this.offsetStorageReader)).offset(ArgumentMatchers.anyMap());
    }
}

