package org.apache.kafka.clients.consumer.internals;

import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/FetcherTest.class */
public class FetcherTest {
    private static final double EPSILON = 1.0E-4d;
    private MemoryRecords records;
    private MemoryRecords nextRecords;
    private MemoryRecords emptyRecords;
    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
    private String topicName = "test";
    private String groupId = "test-group";
    private final String metricGroup = "consumer" + this.groupId + "-fetch-manager-metrics";
    private TopicPartition tp0 = new TopicPartition(this.topicName, 0);
    private TopicPartition tp1 = new TopicPartition(this.topicName, 1);
    private int minBytes = 1;
    private int maxBytes = Integer.MAX_VALUE;
    private int maxWaitMs = 0;
    private int fetchSize = 1000;
    private long retryBackoffMs = 100;
    private long requestTimeoutMs = 30000;
    private MockTime time = new MockTime(1);
    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
    private MockClient client = new MockClient(this.time, this.metadata);
    private Cluster cluster = TestUtils.singletonCluster(this.topicName, 2);
    private Node node = (Node) this.cluster.nodes().get(0);
    private Metrics metrics = new Metrics(this.time);
    FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + this.groupId);
    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(new LogContext(), this.client, this.metadata, this.time, 100, 1000, Integer.MAX_VALUE);
    private Fetcher<byte[], byte[]> fetcher = createFetcher(this.subscriptions, this.metrics);
    private Metrics fetcherMetrics = new Metrics(this.time);
    private Fetcher<byte[], byte[]> fetcherNoAutoReset = createFetcher(this.subscriptionsNoAutoReset, this.fetcherMetrics);

    @Before
    public void setup() throws Exception {
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.setNode(this.node);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        builder.append(0L, "key".getBytes(), "value-2".getBytes());
        builder.append(0L, "key".getBytes(), "value-3".getBytes());
        this.records = builder.build();
        MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 4L);
        builder2.append(0L, "key".getBytes(), "value-4".getBytes());
        builder2.append(0L, "key".getBytes(), "value-5".getBytes());
        this.nextRecords = builder2.build();
        this.emptyRecords = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L).build();
    }

    @After
    public void teardown() {
        this.metrics.close();
        this.fetcherMetrics.close();
        this.fetcher.close();
        this.fetcherMetrics.close();
    }

    @Test
    public void testFetchNormal() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(3L, list.size());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).longValue());
        long j = 1;
        Iterator it = list.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(j, ((ConsumerRecord) it.next()).offset());
            j++;
        }
    }

    @Test
    public void testFetcherIgnoresControlRecords() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder idempotentBuilder = MemoryRecords.idempotentBuilder(allocate, CompressionType.NONE, 0L, 1L, (short) 0, 0);
        idempotentBuilder.append(0L, "key".getBytes(), (byte[]) null);
        idempotentBuilder.close();
        MemoryRecords.writeEndTransactionalMarker(allocate, 1L, this.time.milliseconds(), 0, 1L, (short) 0, new EndTransactionMarker(ControlRecordType.ABORT, 0));
        allocate.flip();
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(allocate), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(2L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertArrayEquals("key".getBytes(), (byte[]) ((ConsumerRecord) list.get(0)).key());
    }

    @Test
    public void testFetchError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertFalse(this.fetcher.fetchedRecords().containsKey(this.tp0));
    }

    private MockClient.RequestMatcher matchesOffset(final TopicPartition topicPartition, final long j) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.1
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                FetchRequest fetchRequest = (FetchRequest) abstractRequest;
                return fetchRequest.fetchData().containsKey(topicPartition) && ((FetchRequest.PartitionData) fetchRequest.fetchData().get(topicPartition)).fetchOffset == j;
            }
        };
    }

    @Test
    public void testFetchedRecordsRaisesOnSerializationErrors() {
        ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.2
            int i = 0;

            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public byte[] m6deserialize(String str, byte[] bArr) {
                int i = this.i;
                this.i = i + 1;
                if (i % 2 != 1) {
                    return bArr;
                }
                Assert.assertEquals("value-1", new String(bArr, StandardCharsets.UTF_8));
                throw new SerializationException();
            }
        };
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(this.time), byteArrayDeserializer, byteArrayDeserializer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(matchesOffset(this.tp0, 1L), (AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        for (int i = 0; i < 2; i++) {
            try {
                createFetcher.fetchedRecords();
                Assert.fail("fetchedRecords should have raised");
            } catch (SerializationException e) {
                Assert.assertEquals(1L, this.subscriptions.position(this.tp0).longValue());
            }
        }
    }

    @Test
    public void testParseCorruptedRecord() throws Exception {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(allocate));
        byte[] bytes = "foo".getBytes();
        byte[] bytes2 = "baz".getBytes();
        int recordSize = LegacyRecord.recordSize((byte) 1, bytes.length, bytes2.length);
        long computeChecksum = LegacyRecord.computeChecksum((byte) 1, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0L);
        dataOutputStream.writeInt(recordSize);
        LegacyRecord.write(dataOutputStream, (byte) 1, computeChecksum, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0 + 1);
        dataOutputStream.writeInt(recordSize);
        LegacyRecord.write(dataOutputStream, (byte) 1, computeChecksum + 1, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0 + 2);
        dataOutputStream.writeInt(recordSize);
        LegacyRecord.write(dataOutputStream, (byte) 1, computeChecksum, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0 + 3);
        dataOutputStream.writeInt(1);
        dataOutputStream.writeLong(0 + 4);
        dataOutputStream.writeInt(recordSize);
        LegacyRecord.write(dataOutputStream, (byte) 1, computeChecksum, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        allocate.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(allocate), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(1L, ((List) this.fetcher.fetchedRecords().get(this.tp0)).size());
        Assert.assertEquals(1L, this.subscriptions.position(this.tp0).longValue());
        ensureBlockOnRecord(1L);
        seekAndConsumeRecord(allocate, 2L);
        ensureBlockOnRecord(3L);
        try {
            seekAndConsumeRecord(allocate, 4L);
            Assert.fail("Should have thrown exception when fail to retrieve a record from iterator.");
        } catch (KafkaException e) {
        }
        ensureBlockOnRecord(4L);
    }

    private void ensureBlockOnRecord(long j) {
        for (int i = 0; i < 2; i++) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail("fetchedRecords should have raised KafkaException");
            } catch (KafkaException e) {
                Assert.assertEquals(j, this.subscriptions.position(this.tp0).longValue());
            }
        }
    }

    private void seekAndConsumeRecord(ByteBuffer byteBuffer, long j) {
        this.subscriptions.seek(this.tp0, j);
        this.fetcher.fetchedRecords();
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(byteBuffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        List list = (List) this.fetcher.fetchedRecords().get(this.tp0);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(j, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(j + 1, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testInvalidDefaultRecordBatch() {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(new ByteBufferOutputStream(allocate), (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 10L, 0L, (short) 0, 0, false, false, 0, 1024);
        memoryRecordsBuilder.append(10L, "key".getBytes(), "value".getBytes());
        memoryRecordsBuilder.close();
        allocate.flip();
        allocate.position(17);
        allocate.put("beef".getBytes());
        allocate.position(0);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(allocate), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        for (int i = 0; i < 2; i++) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail("fetchedRecords should have raised KafkaException");
            } catch (KafkaException e) {
                Assert.assertEquals(0L, this.subscriptions.position(this.tp0).longValue());
            }
        }
    }

    @Test
    public void testParseInvalidRecordBatch() throws Exception {
        ByteBuffer buffer = MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, new SimpleRecord[]{new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())}).buffer();
        buffer.putInt(32, buffer.get(32) ^ 87238423);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("fetchedRecords should have raised");
        } catch (KafkaException e) {
            Assert.assertEquals(0L, this.subscriptions.position(this.tp0).longValue());
        }
    }

    @Test
    public void testHeaders() {
        Fetcher<byte[], byte[]> createFetcher = createFetcher(this.subscriptions, new Metrics(this.time));
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        builder.append(0L, "key".getBytes(), "value-2".getBytes(), new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8))});
        builder.append(0L, "key".getBytes(), "value-3".getBytes(), new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey", "headerValue2".getBytes(StandardCharsets.UTF_8))});
        MemoryRecords build = builder.build();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(matchesOffset(this.tp0, 1L), (AbstractResponse) fullFetchResponse(this.tp0, build, Errors.NONE, 100L, 0));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        List list = (List) createFetcher.fetchedRecords().get(this.tp0);
        Assert.assertEquals(3L, list.size());
        Iterator it = list.iterator();
        Assert.assertNull(((ConsumerRecord) it.next()).headers().lastHeader("headerKey"));
        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
        Assert.assertEquals("headerValue", new String(consumerRecord.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assert.assertEquals("headerKey", consumerRecord.headers().lastHeader("headerKey").key());
        ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
        Assert.assertEquals("headerValue2", new String(consumerRecord2.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assert.assertEquals("headerKey", consumerRecord2.headers().lastHeader("headerKey").key());
    }

    @Test
    public void testFetchMaxPollRecords() {
        Fetcher<byte[], byte[]> createFetcher = createFetcher(this.subscriptions, new Metrics(this.time), 2);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(matchesOffset(this.tp0, 1L), (AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.client.prepareResponse(matchesOffset(this.tp0, 4L), (AbstractResponse) fullFetchResponse(this.tp0, this.nextRecords, Errors.NONE, 100L, 0));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        List list = (List) createFetcher.fetchedRecords().get(this.tp0);
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(3L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertEquals(1L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(2L, ((ConsumerRecord) list.get(1)).offset());
        Assert.assertEquals(0L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        List list2 = (List) createFetcher.fetchedRecords().get(this.tp0);
        Assert.assertEquals(1L, list2.size());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertEquals(3L, ((ConsumerRecord) list2.get(0)).offset());
        Assert.assertTrue(createFetcher.sendFetches() > 0);
        this.consumerClient.poll(0L);
        List list3 = (List) createFetcher.fetchedRecords().get(this.tp0);
        Assert.assertEquals(2L, list3.size());
        Assert.assertEquals(6L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertEquals(4L, ((ConsumerRecord) list3.get(0)).offset());
        Assert.assertEquals(5L, ((ConsumerRecord) list3.get(1)).offset());
    }

    @Test
    public void testFetchAfterPartitionWithFetchedRecordsIsUnassigned() {
        Fetcher<byte[], byte[]> createFetcher = createFetcher(this.subscriptions, new Metrics(this.time), 2);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(matchesOffset(this.tp0, 1L), (AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        List list = (List) createFetcher.fetchedRecords().get(this.tp0);
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(3L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertEquals(1L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(2L, ((ConsumerRecord) list.get(1)).offset());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.client.prepareResponse(matchesOffset(this.tp1, 4L), (AbstractResponse) fullFetchResponse(this.tp1, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.seek(this.tp1, 4L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        Map fetchedRecords = createFetcher.fetchedRecords();
        Assert.assertNull(fetchedRecords.get(this.tp0));
        List list2 = (List) fetchedRecords.get(this.tp1);
        Assert.assertEquals(2L, list2.size());
        Assert.assertEquals(6L, this.subscriptions.position(this.tp1).longValue());
        Assert.assertEquals(4L, ((ConsumerRecord) list2.get(0)).offset());
        Assert.assertEquals(5L, ((ConsumerRecord) list2.get(1)).offset());
    }

    @Test
    public void testFetchNonContinuousRecords() {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords build = builder.build();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, build, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        List list = (List) this.fetcher.fetchedRecords().get(this.tp0);
        Assert.assertEquals(3L, list.size());
        Assert.assertEquals(31L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertEquals(15L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(20L, ((ConsumerRecord) list.get(1)).offset());
        Assert.assertEquals(30L, ((ConsumerRecord) list.get(2)).offset());
    }

    @Test
    public void testFetchRequestWhenRecordTooLarge() {
        try {
            this.client.setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.FETCH.id, (short) 2, (short) 2))));
            makeFetchRequestWithIncompleteRecord();
            try {
                this.fetcher.fetchedRecords();
                Assert.fail("RecordTooLargeException should have been raised");
            } catch (RecordTooLargeException e) {
                Assert.assertTrue(e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                Assert.assertEquals(0L, this.subscriptions.position(this.tp0).longValue());
            }
        } finally {
            this.client.setNodeApiVersions(NodeApiVersions.create());
        }
    }

    @Test
    public void testFetchRequestInternalError() {
        makeFetchRequestWithIncompleteRecord();
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("RecordTooLargeException should have been raised");
        } catch (KafkaException e) {
            Assert.assertTrue(e.getMessage().startsWith("Failed to make progress reading messages"));
            Assert.assertEquals(0L, this.subscriptions.position(this.tp0).longValue());
        }
    }

    private void makeFetchRequestWithIncompleteRecord() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0})), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testUnauthorizedTopic() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("fetchedRecords should have thrown");
        } catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), e.unauthorizedTopics());
        }
    }

    @Test
    public void testFetchDuringRebalance() {
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.fetchedRecords().isEmpty());
    }

    @Test
    public void testInFlightFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp0);
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertNull(this.fetcher.fetchedRecords().get(this.tp0));
    }

    @Test
    public void testFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.pause(this.tp0);
        Assert.assertFalse(this.fetcher.sendFetches() > 0);
        Assert.assertTrue(this.client.requests().isEmpty());
    }

    @Test
    public void testFetchNotLeaderForPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicOrPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchOffsetOutOfRange() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals((Object) null, this.subscriptions.position(this.tp0));
    }

    @Test
    public void testStaleOutOfRangeError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.subscriptions.seek(this.tp0, 1L);
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals(1L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testFetchedRecordsAfterSeek() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptionsNoAutoReset.seek(this.tp0, 0L);
        Assert.assertTrue(this.fetcherNoAutoReset.sendFetches() > 0);
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse(this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp0));
        this.subscriptionsNoAutoReset.seek(this.tp0, 2L);
        Assert.assertEquals(0L, this.fetcherNoAutoReset.fetchedRecords().size());
    }

    @Test
    public void testFetchOffsetOutOfRangeException() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptionsNoAutoReset.seek(this.tp0, 0L);
        this.fetcherNoAutoReset.sendFetches();
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse(this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp0));
        for (int i = 0; i < 2; i++) {
            try {
                this.fetcherNoAutoReset.fetchedRecords();
                Assert.fail("Should have thrown OffsetOutOfRangeException");
            } catch (OffsetOutOfRangeException e) {
                Assert.assertTrue(e.offsetOutOfRangePartitions().containsKey(this.tp0));
                Assert.assertEquals(e.offsetOutOfRangePartitions().size(), 1L);
            }
        }
    }

    @Test
    public void testFetchPositionAfterException() {
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptionsNoAutoReset.seek(this.tp0, 1L);
        this.subscriptionsNoAutoReset.seek(this.tp1, 1L);
        Assert.assertEquals(1L, this.fetcherNoAutoReset.sendFetches());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, (List) null, this.records));
        linkedHashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, (List) null, MemoryRecords.EMPTY));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(linkedHashMap), 0, 0));
        this.consumerClient.poll(0L);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = this.fetcherNoAutoReset.fetchedRecords().values().iterator();
        while (it.hasNext()) {
            arrayList.addAll((List) it.next());
        }
        Assert.assertEquals(arrayList.size(), this.subscriptionsNoAutoReset.position(this.tp1).longValue() - 1);
        try {
            Iterator it2 = this.fetcherNoAutoReset.fetchedRecords().values().iterator();
            while (it2.hasNext()) {
                arrayList.addAll((List) it2.next());
            }
        } catch (OffsetOutOfRangeException e) {
            arrayList2.add(e);
        }
        Assert.assertEquals(4L, this.subscriptionsNoAutoReset.position(this.tp1).longValue());
        Assert.assertEquals(3L, arrayList.size());
        Assert.assertEquals(1L, arrayList2.size());
        Assert.assertTrue(((OffsetOutOfRangeException) arrayList2.get(0)).offsetOutOfRangePartitions().containsKey(this.tp0));
        Assert.assertEquals(r0.offsetOutOfRangePartitions().size(), 1L);
    }

    @Test
    public void testSeekBeforeException() {
        Fetcher<byte[], byte[]> createFetcher = createFetcher(this.subscriptionsNoAutoReset, new Metrics(this.time), 2);
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0}));
        this.subscriptionsNoAutoReset.seek(this.tp0, 1L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        new HashMap().put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, (List) null, this.records));
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(2L, ((List) createFetcher.fetchedRecords().get(this.tp0)).size());
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptionsNoAutoReset.seek(this.tp1, 1L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, (List) null, MemoryRecords.EMPTY));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(hashMap), 0, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals(1L, ((List) createFetcher.fetchedRecords().get(this.tp0)).size());
        this.subscriptionsNoAutoReset.seek(this.tp1, 10L);
        Assert.assertEquals(0L, createFetcher.fetchedRecords().size());
    }

    @Test
    public void testFetchDisconnected() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0), true);
        this.consumerClient.poll(0L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(0L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testUpdateFetchPositionNoOpWithPositionSet() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 5L);
        this.fetcher.resetOffsetsIfNeeded();
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testListOffsetsSendsIsolationLevel() {
        for (final IsolationLevel isolationLevel : IsolationLevel.values()) {
            Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
            this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
            this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
            this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.3
                @Override // org.apache.kafka.clients.MockClient.RequestMatcher
                public boolean matches(AbstractRequest abstractRequest) {
                    return ((ListOffsetRequest) abstractRequest).isolationLevel() == isolationLevel;
                }
            }, (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
            createFetcher.resetOffsetsIfNeeded();
            this.consumerClient.pollNoWakeup();
            Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
            Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
            Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
        }
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testResetOffsetsMetadataRefresh() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), listOffsetResponse(Errors.NOT_LEADER_FOR_PARTITION, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet());
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingMetadataUpdates());
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet());
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingMetadataUpdates());
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testAssignmentChangeWithInFlightReset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingResponses());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(this.subscriptions.isAssigned(this.tp0));
    }

    @Test
    public void testSeekWithInFlightReset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.seek(this.tp0, 237L);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingResponses());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertEquals(237L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testChangeResetWithInFlightReset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingResponses());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals(OffsetResetStrategy.EARLIEST, this.subscriptions.resetStrategy(this.tp0));
    }

    @Test
    public void testIdempotentResetWithInFlightReset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testRestOffsetsAuthorizationFailure() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        try {
            this.fetcher.resetOffsetsIfNeeded();
            Assert.fail("Expected authorization error to be raised");
        } catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.tp0.topic()), e.unauthorizedTopics());
        }
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.pause(this.tp0);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 10L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals(10L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.subscriptions.pause(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 10L);
        this.subscriptions.pause(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals(10L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testGetAllTopics() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertEquals(this.cluster.topics().size(), this.fetcher.getAllTopicMetadata(5000L).size());
    }

    @Test
    public void testGetAllTopicsDisconnect() {
        this.client.prepareResponse((AbstractResponse) null, true);
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertEquals(this.cluster.topics().size(), this.fetcher.getAllTopicMetadata(5000L).size());
    }

    @Test(expected = TimeoutException.class)
    public void testGetAllTopicsTimeout() {
        this.fetcher.getAllTopicMetadata(50L);
    }

    @Test
    public void testGetAllTopicsUnauthorized() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
        try {
            this.fetcher.getAllTopicMetadata(10L);
            Assert.fail();
        } catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), e.unauthorizedTopics());
        }
    }

    @Test(expected = InvalidTopicException.class)
    public void testGetTopicMetadataInvalidTopic() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.INVALID_TOPIC_EXCEPTION));
        this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), 5000L);
    }

    @Test
    public void testGetTopicMetadataUnknownTopic() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
        Assert.assertNull(this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), 5000L).get(this.topicName));
    }

    @Test
    public void testGetTopicMetadataLeaderNotAvailable() {
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.LEADER_NOT_AVAILABLE));
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertTrue(this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), 5000L).containsKey(this.topicName));
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        MockSelector mockSelector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(this.metrics, this.metricsRegistry);
        Node node = (Node) TestUtils.singletonCluster("test", 1).nodes().get(0);
        NetworkClient networkClient = new NetworkClient(mockSelector, this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, this.time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
        mockSelector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), ApiVersionsResponse.createApiVersionsResponse(400, (byte) 2).serialize(ApiKeys.API_VERSIONS.latestVersion(), new ResponseHeader(0)))));
        while (!networkClient.ready(node, this.time.milliseconds())) {
            networkClient.poll(1L, this.time.milliseconds());
        }
        mockSelector.clear();
        for (int i = 1; i <= 3; i++) {
            ClientRequest newClientRequest = networkClient.newClientRequest(node.idString(), FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap()), this.time.milliseconds(), true, (RequestCompletionHandler) null);
            networkClient.send(newClientRequest, this.time.milliseconds());
            networkClient.poll(1L, this.time.milliseconds());
            mockSelector.completeReceive(new NetworkReceive(node.idString(), fullFetchResponse(this.tp0, this.nextRecords, Errors.NONE, i, 100 * i).serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(newClientRequest.correlationId()))));
            networkClient.poll(1L, this.time.milliseconds());
            mockSelector.clear();
        }
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeMax, new String[0]));
        Assert.assertEquals(250.0d, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(400.0d, kafkaMetric2.value(), EPSILON);
        networkClient.close();
    }

    @Test
    public void testFetcherMetrics() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName metricInstance = this.metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        HashMap hashMap = new HashMap();
        hashMap.put("topic", this.tp0.topic());
        hashMap.put("partition", String.valueOf(this.tp0.partition()));
        MetricName metricName = this.metrics.metricName("records-lag", this.metricGroup, hashMap);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(metricInstance);
        Assert.assertEquals(Double.NEGATIVE_INFINITY, kafkaMetric.value(), EPSILON);
        fetchRecords(this.tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
        Assert.assertEquals(100.0d, kafkaMetric.value(), EPSILON);
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(metricName);
        Assert.assertEquals(100.0d, kafkaMetric2.value(), EPSILON);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        fetchRecords(this.tp0, builder.build(), Errors.NONE, 200L, 0);
        Assert.assertEquals(197.0d, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(197.0d, kafkaMetric2.value(), EPSILON);
        this.subscriptions.unsubscribe();
        Assert.assertFalse(metrics.containsKey(metricName));
    }

    @Test
    public void testReadCommittedLagMetric() {
        Metrics metrics = new Metrics();
        this.fetcher = createFetcher(this.subscriptions, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName metricInstance = metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        HashMap hashMap = new HashMap();
        hashMap.put("topic", this.tp0.topic());
        hashMap.put("partition", String.valueOf(this.tp0.partition()));
        MetricName metricName = metrics.metricName("records-lag", this.metricGroup, hashMap);
        MetricName metricName2 = metrics.metricName(this.tp0 + ".records-lag", this.metricGroup);
        Map metrics2 = metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics2.get(metricInstance);
        Assert.assertEquals(Double.NEGATIVE_INFINITY, kafkaMetric.value(), EPSILON);
        fetchRecords(this.tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
        Assert.assertEquals(50.0d, kafkaMetric.value(), EPSILON);
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics2.get(metricName);
        Assert.assertEquals(50.0d, kafkaMetric2.value(), EPSILON);
        Assert.assertEquals(50.0d, ((KafkaMetric) metrics2.get(metricName2)).value(), EPSILON);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        fetchRecords(this.tp0, builder.build(), Errors.NONE, 200L, 150L, 0);
        Assert.assertEquals(147.0d, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(147.0d, kafkaMetric2.value(), EPSILON);
        this.subscriptions.unsubscribe();
        Assert.assertFalse(metrics2.containsKey(metricName));
        Assert.assertFalse(metrics2.containsKey(metricName2));
    }

    @Test
    public void testFetchResponseMetrics() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("bar", 0);
        HashMap hashMap = new HashMap();
        hashMap.put("foo", 1);
        hashMap.put("bar", 1);
        this.metadata.update(TestUtils.clusterWith(1, hashMap), Collections.emptySet(), this.time.milliseconds());
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2}));
        int i = 0;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (TopicPartition topicPartition3 : Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2})) {
            this.subscriptions.seek(topicPartition3, 0L);
            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
            for (int i2 = 0; i2 < 3; i2++) {
                builder.appendWithOffset(i2, -1L, "key".getBytes(), ("value-" + i2).getBytes());
            }
            MemoryRecords build = builder.build();
            Iterator it = build.records().iterator();
            while (it.hasNext()) {
                i += ((Record) it.next()).sizeInBytes();
            }
            linkedHashMap.put(topicPartition3, new FetchResponse.PartitionData(Errors.NONE, 15L, -1L, 0L, (List) null, build));
        }
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(new FetchResponse(Errors.NONE, linkedHashMap, 0, 0));
        this.consumerClient.poll(0L);
        Map fetchedRecords = this.fetcher.fetchedRecords();
        Assert.assertEquals(3L, ((List) fetchedRecords.get(topicPartition)).size());
        Assert.assertEquals(3L, ((List) fetchedRecords.get(topicPartition2)).size());
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assert.assertEquals(i, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(6.0d, kafkaMetric2.value(), EPSILON);
    }

    @Test
    public void testFetchResponseMetricsPartialResponse() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        MemoryRecords build = builder.build();
        int i2 = 0;
        for (Record record : build.records()) {
            if (record.offset() >= 1) {
                i2 += record.sizeInBytes();
            }
        }
        fetchRecords(this.tp0, build, Errors.NONE, 100L, 0);
        Assert.assertEquals(i2, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(2.0d, kafkaMetric2.value(), EPSILON);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionError() {
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 0L);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        MemoryRecords build = builder.build();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, (List) null, build));
        hashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, 0L, (List) null, MemoryRecords.EMPTY));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(hashMap), 0, 0));
        this.consumerClient.poll(0L);
        this.fetcher.fetchedRecords();
        int i2 = 0;
        Iterator it = build.records().iterator();
        while (it.hasNext()) {
            i2 += ((Record) it.next()).sizeInBytes();
        }
        Assert.assertEquals(i2, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(3.0d, kafkaMetric2.value(), EPSILON);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 0L);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.seek(this.tp1, 5L);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        MemoryRecords build = builder.build();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, (List) null, build));
        hashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, (List) null, MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord("val".getBytes())})));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(hashMap), 0, 0));
        this.consumerClient.poll(0L);
        this.fetcher.fetchedRecords();
        int i2 = 0;
        Iterator it = build.records().iterator();
        while (it.hasNext()) {
            i2 += ((Record) it.next()).sizeInBytes();
        }
        Assert.assertEquals(i2, kafkaMetric.value(), EPSILON);
        Assert.assertEquals(3.0d, kafkaMetric2.value(), EPSILON);
    }

    @Test
    public void testFetcherMetricsTemplates() throws Exception {
        this.metrics.close();
        Map singletonMap = Collections.singletonMap("client-id", "clientA");
        this.metrics = new Metrics(new MetricConfig().tags(singletonMap));
        this.metricsRegistry = new FetcherMetricsRegistry(singletonMap.keySet(), "consumer" + this.groupId);
        this.fetcher.close();
        this.fetcher = createFetcher(this.subscriptions, this.metrics);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(this.fetcher.fetchedRecords().containsKey(this.tp0));
        Fetcher.throttleTimeSensor(this.metrics, this.metricsRegistry);
        HashSet hashSet = new HashSet();
        for (MetricName metricName : this.metrics.metrics().keySet()) {
            String replaceAll = metricName.name().replaceAll(this.tp0.toString(), "{topic}-{partition}");
            if (!metricName.group().equals("kafka-metrics-count")) {
                hashSet.add(new MetricNameTemplate(replaceAll, metricName.group(), "", metricName.tags().keySet()));
            }
        }
        TestUtils.checkEquals(hashSet, new HashSet(this.metricsRegistry.getAllTemplates()), "metrics", "templates");
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, int i) {
        return fetchRecords(topicPartition, memoryRecords, errors, j, -1L, i);
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, long j2, int i) {
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(topicPartition, memoryRecords, errors, j, j2, i));
        this.consumerClient.poll(0L);
        return this.fetcher.fetchedRecords();
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        try {
            this.fetcher.offsetsByTimes(Collections.singletonMap(new TopicPartition(this.topicName, 2), 1000L), 100L);
            Assert.fail("Should throw timeout exception.");
        } catch (TimeoutException e) {
        }
    }

    @Test
    public void testGetOffsetsForTimes() {
        Assert.assertTrue(this.fetcher.offsetsByTimes(new HashMap(), 100L).isEmpty());
        testGetOffsetsForTimesWithUnknownOffset();
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
        testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
    }

    @Test(expected = TimeoutException.class)
    public void testBatchedListOffsetsMetadataErrors() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L));
        hashMap.put(this.tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, -1L, -1L));
        this.client.prepareResponse(new ListOffsetResponse(0, hashMap));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, -2L);
        hashMap2.put(this.tp1, -2L);
        this.fetcher.offsetsByTimes(hashMap2, 0L);
    }

    @Test
    public void testSkippingAbortedTransactions() {
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        abortTransaction(allocate, 1L, 0 + appendTransactionalRecords(allocate, 1L, 0, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes())));
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        Assert.assertFalse(createFetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Assert.assertFalse(createFetcher.fetchedRecords().containsKey(this.tp0));
    }

    @Test
    public void testReturnCommittedTransactions() {
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        int appendTransactionalRecords = 0 + appendTransactionalRecords(allocate, 1L, 0, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        int commitTransaction = appendTransactionalRecords + commitTransaction(allocate, 1L, appendTransactionalRecords);
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        Assert.assertFalse(createFetcher.hasCompletedFetches());
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.4
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                Assert.assertEquals(IsolationLevel.READ_COMMITTED, ((FetchRequest) abstractRequest).isolationLevel());
                return true;
            }
        }, (AbstractResponse) fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Assert.assertTrue(createFetcher.fetchedRecords().containsKey(this.tp0));
        Assert.assertEquals(((List) r0.get(this.tp0)).size(), 2L);
    }

    @Test
    public void testReadCommittedWithCommittedAndAbortedTransactions() {
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        ArrayList arrayList = new ArrayList();
        appendTransactionalRecords(allocate, 1L, 0L, new SimpleRecord("commit1-1".getBytes(), "value".getBytes()), new SimpleRecord("commit1-2".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 2L, 2L, new SimpleRecord("abort2-1".getBytes(), "value".getBytes()));
        commitTransaction(allocate, 1L, 3L);
        appendTransactionalRecords(allocate, 2L, 4L, new SimpleRecord("abort2-2".getBytes(), "value".getBytes()));
        abortTransaction(allocate, 2L, 5L);
        arrayList.add(new FetchResponse.AbortedTransaction(2L, 2L));
        appendTransactionalRecords(allocate, 1L, 6L, new SimpleRecord("abort1-1".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 2L, 7L, new SimpleRecord("commit2-1".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 1L, 8L, new SimpleRecord("abort1-2".getBytes(), "value".getBytes()));
        abortTransaction(allocate, 1L, 9L);
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 6L));
        commitTransaction(allocate, 2L, 10L);
        allocate.flip();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        Assert.assertFalse(createFetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Map fetchedRecords = createFetcher.fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        HashSet hashSet = new HashSet();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(new String((byte[]) ((ConsumerRecord) it.next()).key(), StandardCharsets.UTF_8));
        }
        Assert.assertEquals(Utils.mkSet(new String[]{"commit1-1", "commit1-2", "commit2-1"}), hashSet);
    }

    @Test
    public void testMultipleAbortMarkers() {
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        int appendTransactionalRecords = 0 + appendTransactionalRecords(allocate, 1L, 0, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
        int abortTransaction = appendTransactionalRecords + abortTransaction(allocate, 1L, appendTransactionalRecords);
        int abortTransaction2 = abortTransaction + abortTransaction(allocate, 1L, abortTransaction);
        commitTransaction(allocate, 1L, abortTransaction2 + appendTransactionalRecords(allocate, 1L, abortTransaction2, new SimpleRecord(this.time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "commit1-2".getBytes(), "value".getBytes())));
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        Assert.assertFalse(createFetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Map fetchedRecords = createFetcher.fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        Assert.assertEquals(((List) fetchedRecords.get(this.tp0)).size(), 2L);
        List list = (List) fetchedRecords.get(this.tp0);
        HashSet hashSet = new HashSet(Arrays.asList("commit1-1", "commit1-2"));
        HashSet hashSet2 = new HashSet();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            hashSet2.add(new String((byte[]) ((ConsumerRecord) it.next()).key(), StandardCharsets.UTF_8));
        }
        Assert.assertTrue(hashSet2.equals(hashSet));
    }

    @Test
    public void testReadCommittedAbortMarkerWithNoData() {
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new StringDeserializer(), new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        abortTransaction(allocate, 1L, 5L);
        appendTransactionalRecords(allocate, 1L, 6L, new SimpleRecord("6".getBytes(), (byte[]) null), new SimpleRecord("7".getBytes(), (byte[]) null), new SimpleRecord("8".getBytes(), (byte[]) null));
        commitTransaction(allocate, 1L, 9L);
        allocate.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords(allocate), arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Map fetchedRecords = createFetcher.fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(3L, list.size());
        Assert.assertEquals(Arrays.asList(6L, 7L, 8L), collectRecordOffsets(list));
    }

    @Test
    public void testUpdatePositionWithLastRecordMissingFromBatch() {
        MemoryRecords.FilterResult filterTo = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord((byte[]) null, "value".getBytes())}).filterTo(this.tp0, new MemoryRecords.RecordFilter() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.5
            protected MemoryRecords.RecordFilter.BatchRetention checkBatchRetention(RecordBatch recordBatch) {
                return MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY;
            }

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return record.key() != null;
            }
        }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        filterTo.output.flip();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(filterTo.output);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, readableRecords, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(3L, list.size());
        for (int i = 0; i < 3; i++) {
            Assert.assertEquals(Integer.toString(i), new String((byte[]) ((ConsumerRecord) list.get(i)).key()));
        }
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testUpdatePositionOnEmptyBatch() {
        ByteBuffer allocate = ByteBuffer.allocate(61);
        DefaultRecordBatch.writeEmptyHeader(allocate, (byte) 2, 1L, (short) 0, 1, 37L, 54L, 7, TimestampType.CREATE_TIME, System.currentTimeMillis(), false, false);
        allocate.flip();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, readableRecords, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(this.fetcher.fetchedRecords().isEmpty());
        Assert.assertEquals(54 + 1, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testReadCommittedWithCompactedTopic() {
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new StringDeserializer(), new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        appendTransactionalRecords(allocate, 3L, 3L, new SimpleRecord("3".getBytes(), "value".getBytes()), new SimpleRecord("4".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 2L, 15L, new SimpleRecord("15".getBytes(), "value".getBytes()), new SimpleRecord("16".getBytes(), "value".getBytes()), new SimpleRecord("17".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 1L, 22L, new SimpleRecord("22".getBytes(), "value".getBytes()), new SimpleRecord("23".getBytes(), "value".getBytes()));
        abortTransaction(allocate, 2L, 28L);
        appendTransactionalRecords(allocate, 3L, 30L, new SimpleRecord("30".getBytes(), "value".getBytes()), new SimpleRecord("31".getBytes(), "value".getBytes()), new SimpleRecord("32".getBytes(), "value".getBytes()));
        commitTransaction(allocate, 3L, 35L);
        appendTransactionalRecords(allocate, 1L, 39L, new SimpleRecord("39".getBytes(), "value".getBytes()), new SimpleRecord("40".getBytes(), "value".getBytes()));
        allocate.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(2L, 6L));
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords(allocate), arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Map fetchedRecords = createFetcher.fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(5L, list.size());
        Assert.assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), collectRecordOffsets(list));
    }

    @Test
    public void testReturnAbortedTransactionsinUncommittedMode() {
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        abortTransaction(allocate, 1L, 0 + appendTransactionalRecords(allocate, 1L, 0, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes())));
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        Assert.assertFalse(createFetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Assert.assertTrue(createFetcher.fetchedRecords().containsKey(this.tp0));
    }

    @Test
    public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() {
        Fetcher createFetcher = createFetcher(this.subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        long appendTransactionalRecords = 0 + appendTransactionalRecords(allocate, 1L, 0L, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes())) + abortTransaction(allocate, 1L, r0);
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, createFetcher.sendFetches());
        Assert.assertFalse(createFetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Assert.assertFalse(createFetcher.fetchedRecords().containsKey(this.tp0));
        Assert.assertEquals(appendTransactionalRecords, this.subscriptions.position(this.tp0).longValue());
    }

    @Test
    public void testConsumingViaIncrementalFetchRequests() {
        Fetcher<byte[], byte[]> createFetcher = createFetcher(this.subscriptions, new Metrics(this.time), 2);
        this.subscriptions.assignFromUser(new HashSet(Arrays.asList(this.tp0, this.tp1)));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 1L);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 2L, 2L, 0L, (List) null, this.records));
        linkedHashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, (List) null, this.emptyRecords));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, linkedHashMap, 0, 123));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        Assert.assertFalse(createFetcher.hasCompletedFetches());
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.hasCompletedFetches());
        Map fetchedRecords = createFetcher.fetchedRecords();
        Assert.assertFalse(fetchedRecords.containsKey(this.tp1));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(3L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertEquals(1L, this.subscriptions.position(this.tp1).longValue());
        Assert.assertEquals(1L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(2L, ((ConsumerRecord) list.get(1)).offset());
        Assert.assertEquals(0L, createFetcher.sendFetches());
        Map fetchedRecords2 = createFetcher.fetchedRecords();
        Assert.assertFalse(fetchedRecords2.containsKey(this.tp1));
        List list2 = (List) fetchedRecords2.get(this.tp0);
        Assert.assertEquals(1L, list2.size());
        Assert.assertEquals(3L, ((ConsumerRecord) list2.get(0)).offset());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).longValue());
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(), 0, 123));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        Assert.assertTrue(createFetcher.fetchedRecords().isEmpty());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertEquals(1L, this.subscriptions.position(this.tp1).longValue());
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, 4L, 0L, (List) null, this.nextRecords));
        new FetchResponse(Errors.NONE, new LinkedHashMap(linkedHashMap), 0, 0);
        this.client.prepareResponse(new FetchResponse(Errors.NONE, linkedHashMap2, 0, 123));
        Assert.assertEquals(1L, createFetcher.sendFetches());
        this.consumerClient.poll(0L);
        Map fetchedRecords3 = createFetcher.fetchedRecords();
        Assert.assertFalse(fetchedRecords3.containsKey(this.tp1));
        List list3 = (List) fetchedRecords3.get(this.tp0);
        Assert.assertEquals(2L, list3.size());
        Assert.assertEquals(6L, this.subscriptions.position(this.tp0).longValue());
        Assert.assertEquals(1L, this.subscriptions.position(this.tp1).longValue());
        Assert.assertEquals(4L, ((ConsumerRecord) list3.get(0)).offset());
        Assert.assertEquals(5L, ((ConsumerRecord) list3.get(1)).offset());
    }

    private int appendTransactionalRecords(ByteBuffer byteBuffer, long j, long j2, int i, SimpleRecord... simpleRecordArr) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(byteBuffer, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, j2, this.time.milliseconds(), j, (short) 0, i, true, -1);
        for (SimpleRecord simpleRecord : simpleRecordArr) {
            builder.append(simpleRecord);
        }
        builder.build();
        return simpleRecordArr.length;
    }

    private int appendTransactionalRecords(ByteBuffer byteBuffer, long j, long j2, SimpleRecord... simpleRecordArr) {
        return appendTransactionalRecords(byteBuffer, j, j2, (int) j2, simpleRecordArr);
    }

    private int commitTransaction(ByteBuffer byteBuffer, long j, long j2) {
        MemoryRecords.writeEndTransactionalMarker(byteBuffer, j2, this.time.milliseconds(), 0, j, (short) 0, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
        return 1;
    }

    private int abortTransaction(ByteBuffer byteBuffer, long j, long j2) {
        MemoryRecords.writeEndTransactionalMarker(byteBuffer, j2, this.time.milliseconds(), 0, j, (short) 0, new EndTransactionMarker(ControlRecordType.ABORT, 0));
        return 1;
    }

    private void testGetOffsetsForTimesWithError(Errors errors, Errors errors2, long j, long j2, Long l, Long l2) {
        this.client.reset();
        TopicPartition topicPartition = new TopicPartition("topic2", 0);
        this.metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))), Collections.emptySet(), this.time.milliseconds());
        HashMap hashMap = new HashMap();
        hashMap.put(this.topicName, 2);
        hashMap.put("topic2", 1);
        this.cluster = TestUtils.clusterWith(2, hashMap);
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet(), true);
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, errors, j, j), this.cluster.leaderFor(topicPartition));
        this.client.prepareResponseFrom(listOffsetResponse(this.tp1, errors2, j2, j2), this.cluster.leaderFor(this.tp1));
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, Errors.NONE, j, j), this.cluster.leaderFor(topicPartition));
        this.client.prepareResponseFrom(listOffsetResponse(this.tp1, Errors.NONE, j2, j2), this.cluster.leaderFor(this.tp1));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicPartition, 0L);
        hashMap2.put(this.tp1, 0L);
        Map offsetsByTimes = this.fetcher.offsetsByTimes(hashMap2, Long.MAX_VALUE);
        if (l == null) {
            Assert.assertNull(offsetsByTimes.get(topicPartition));
        } else {
            Assert.assertEquals(l.longValue(), ((OffsetAndTimestamp) offsetsByTimes.get(topicPartition)).timestamp());
            Assert.assertEquals(l.longValue(), ((OffsetAndTimestamp) offsetsByTimes.get(topicPartition)).offset());
        }
        if (l2 == null) {
            Assert.assertNull(offsetsByTimes.get(this.tp1));
        } else {
            Assert.assertEquals(l2.longValue(), ((OffsetAndTimestamp) offsetsByTimes.get(this.tp1)).timestamp());
            Assert.assertEquals(l2.longValue(), ((OffsetAndTimestamp) offsetsByTimes.get(this.tp1)).offset());
        }
    }

    private void testGetOffsetsForTimesWithUnknownOffset() {
        this.client.reset();
        Cluster clusterWith = TestUtils.clusterWith(1, this.topicName, 1);
        this.metadata.update(clusterWith, Collections.emptySet(), this.time.milliseconds());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new ListOffsetResponse.PartitionData(Errors.NONE, -1L, -1L));
        this.client.prepareResponseFrom(new ListOffsetResponse(0, hashMap), clusterWith.leaderFor(this.tp0));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, 0L);
        Map offsetsByTimes = this.fetcher.offsetsByTimes(hashMap2, Long.MAX_VALUE);
        Assert.assertTrue(offsetsByTimes.containsKey(this.tp0));
        Assert.assertNull(offsetsByTimes.get(this.tp0));
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(final long j) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.6
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return j == ((Long) ((ListOffsetRequest) abstractRequest).partitionTimestamps().get(FetcherTest.this.tp0)).longValue();
            }
        };
    }

    private ListOffsetResponse listOffsetResponse(Errors errors, long j, long j2) {
        return listOffsetResponse(this.tp0, errors, j, j2);
    }

    private ListOffsetResponse listOffsetResponse(TopicPartition topicPartition, Errors errors, long j, long j2) {
        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(errors, j, j2);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, partitionData);
        return new ListOffsetResponse(hashMap);
    }

    private FetchResponse fullFetchResponseWithAbortedTransactions(MemoryRecords memoryRecords, List<FetchResponse.AbortedTransaction> list, Errors errors, long j, long j2, int i) {
        return new FetchResponse(Errors.NONE, new LinkedHashMap(Collections.singletonMap(this.tp0, new FetchResponse.PartitionData(errors, j2, j, 0L, list, memoryRecords))), i, 0);
    }

    private FetchResponse fullFetchResponse(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, int i) {
        return fullFetchResponse(topicPartition, memoryRecords, errors, j, -1L, i);
    }

    private FetchResponse fullFetchResponse(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, long j2, int i) {
        return new FetchResponse(Errors.NONE, new LinkedHashMap(Collections.singletonMap(topicPartition, new FetchResponse.PartitionData(errors, j, j2, 0L, (List) null, memoryRecords))), i, 0);
    }

    private MetadataResponse newMetadataResponse(String str, Errors errors) {
        ArrayList arrayList = new ArrayList();
        if (errors == Errors.NONE) {
            for (PartitionInfo partitionInfo : this.cluster.partitionsForTopic(str)) {
                arrayList.add(new MetadataResponse.PartitionMetadata(Errors.NONE, partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()), Arrays.asList(partitionInfo.inSyncReplicas()), Arrays.asList(partitionInfo.offlineReplicas())));
            }
        }
        return new MetadataResponse(this.cluster.nodes(), (String) null, -1, Arrays.asList(new MetadataResponse.TopicMetadata(errors, str, false, arrayList)));
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptionState, Metrics metrics, int i) {
        return createFetcher(subscriptionState, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), i, IsolationLevel.READ_UNCOMMITTED);
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptionState, Metrics metrics) {
        return createFetcher(subscriptionState, metrics, Integer.MAX_VALUE);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptionState, Metrics metrics, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return createFetcher(subscriptionState, metrics, deserializer, deserializer2, Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptionState, Metrics metrics, Deserializer<K> deserializer, Deserializer<V> deserializer2, int i, IsolationLevel isolationLevel) {
        return new Fetcher<>(new LogContext(), this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, i, true, deserializer, deserializer2, this.metadata, subscriptionState, metrics, this.metricsRegistry, this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel);
    }

    private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<ConsumerRecord<T, T>> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Long.valueOf(it.next().offset()));
        }
        return arrayList;
    }
}
