package org.apache.kafka.clients.consumer.internals;

import java.io.DataOutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/FetcherTest.class */
public class FetcherTest {
    private static final double EPSILON = 1.0E-4d;
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private FetcherMetricsRegistry metricsRegistry;
    private MockClient client;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private Fetcher<?, ?> fetcher;
    private MemoryRecords records;
    private MemoryRecords nextRecords;
    private MemoryRecords emptyRecords;
    private MemoryRecords partialRecords;
    private ExecutorService executorService;
    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
    private String topicName = "test";
    private String groupId = "test-group";
    private final String metricGroup = "consumer" + this.groupId + "-fetch-manager-metrics";
    private TopicPartition tp0 = new TopicPartition(this.topicName, 0);
    private TopicPartition tp1 = new TopicPartition(this.topicName, 1);
    private TopicPartition tp2 = new TopicPartition(this.topicName, 2);
    private TopicPartition tp3 = new TopicPartition(this.topicName, 3);
    private int validLeaderEpoch = 0;
    private MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, Collections.singletonMap(this.topicName, 4));
    private int minBytes = 1;
    private int maxBytes = Integer.MAX_VALUE;
    private int maxWaitMs = 0;
    private int fetchSize = 1000;
    private long retryBackoffMs = 100;
    private long requestTimeoutMs = 30000;
    private MockTime time = new MockTime(1);
    private ApiVersions apiVersions = new ApiVersions();

    @Before
    public void setup() {
        this.records = buildRecords(1L, 3, 1L);
        this.nextRecords = buildRecords(4L, 2, 4L);
        this.emptyRecords = buildRecords(0L, 0, 0L);
        this.partialRecords = buildRecords(4L, 1, 0L);
        this.partialRecords.buffer().putInt(8, 10000);
    }

    private void assignFromUser(Set<TopicPartition> set) {
        this.subscriptions.assignFromUser(set);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return Integer.valueOf(this.validLeaderEpoch);
        }), false, 0L);
    }

    @After
    public void teardown() throws Exception {
        if (this.metrics != null) {
            this.metrics.close();
        }
        if (this.fetcher != null) {
            this.fetcher.close();
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            Assert.assertTrue(this.executorService.awaitTermination(5L, TimeUnit.SECONDS));
        }
    }

    @Test
    public void testFetchNormal() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(3L, list.size());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).offset);
        long j = 1;
        Iterator it = list.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(j, ((ConsumerRecord) it.next()).offset());
            j++;
        }
    }

    @Test
    public void testMissingLeaderEpochInRecords() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), (byte) 0, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, System.currentTimeMillis(), -1);
        builder.append(0L, "key".getBytes(), "1".getBytes());
        builder.append(0L, "key".getBytes(), "2".getBytes());
        MemoryRecords build = builder.build();
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, build, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        Assert.assertEquals(2L, ((List) fetchedRecords.get(this.tp0)).size());
        Iterator it = ((List) fetchedRecords.get(this.tp0)).iterator();
        while (it.hasNext()) {
            Assert.assertEquals(Optional.empty(), ((ConsumerRecord) it.next()).leaderEpoch());
        }
    }

    @Test
    public void testLeaderEpochInConsumerRecord() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Integer num = 1;
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.builder(allocate, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, System.currentTimeMillis(), num.intValue());
        builder.append(0L, "key".getBytes(), num.toString().getBytes());
        builder.append(0L, "key".getBytes(), num.toString().getBytes());
        builder.close();
        Integer valueOf = Integer.valueOf(num.intValue() + 7);
        MemoryRecordsBuilder builder2 = MemoryRecords.builder(allocate, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 2L, System.currentTimeMillis(), valueOf.intValue());
        builder2.append(0L, "key".getBytes(), valueOf.toString().getBytes());
        builder2.close();
        Integer valueOf2 = Integer.valueOf(valueOf.intValue() + 5);
        MemoryRecordsBuilder builder3 = MemoryRecords.builder(allocate, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 3L, System.currentTimeMillis(), valueOf2.intValue());
        builder3.append(0L, "key".getBytes(), valueOf2.toString().getBytes());
        builder3.append(0L, "key".getBytes(), valueOf2.toString().getBytes());
        builder3.append(0L, "key".getBytes(), valueOf2.toString().getBytes());
        builder3.close();
        allocate.flip();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, readableRecords, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        Assert.assertEquals(6L, ((List) fetchedRecords.get(this.tp0)).size());
        for (ConsumerRecord consumerRecord : (List) fetchedRecords.get(this.tp0)) {
            Assert.assertEquals(Optional.of(Integer.valueOf(Integer.parseInt(Utils.utf8((byte[]) consumerRecord.value())))), consumerRecord.leaderEpoch());
        }
    }

    @Test
    public void testClearBufferedDataForTopicPartitions() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        HashSet hashSet = new HashSet();
        hashSet.add(this.tp1);
        this.fetcher.clearBufferedDataForUnassignedPartitions(hashSet);
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testFetchSkipsBlackedOutNodes() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.client.blackout((Node) this.initialUpdateResponse.brokers().iterator().next(), 500L);
        Assert.assertEquals(0L, this.fetcher.sendFetches());
        this.time.sleep(500L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
    }

    @Test
    public void testFetcherIgnoresControlRecords() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder idempotentBuilder = MemoryRecords.idempotentBuilder(allocate, CompressionType.NONE, 0L, 1L, (short) 0, 0);
        idempotentBuilder.append(0L, "key".getBytes(), (byte[]) null);
        idempotentBuilder.close();
        MemoryRecords.writeEndTransactionalMarker(allocate, 1L, this.time.milliseconds(), 0, 1L, (short) 0, new EndTransactionMarker(ControlRecordType.ABORT, 0));
        allocate.flip();
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(allocate), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(2L, this.subscriptions.position(this.tp0).offset);
        Assert.assertArrayEquals("key".getBytes(), (byte[]) ((ConsumerRecord) list.get(0)).key());
    }

    @Test
    public void testFetchError() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertFalse(fetchedRecords().containsKey(this.tp0));
    }

    private MockClient.RequestMatcher matchesOffset(TopicPartition topicPartition, long j) {
        return abstractRequest -> {
            FetchRequest fetchRequest = (FetchRequest) abstractRequest;
            return fetchRequest.fetchData().containsKey(topicPartition) && ((FetchRequest.PartitionData) fetchRequest.fetchData().get(topicPartition)).fetchOffset == j;
        };
    }

    @Test
    public void testFetchedRecordsRaisesOnSerializationErrors() {
        ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.1
            int i = 0;

            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public byte[] m15deserialize(String str, byte[] bArr) {
                int i = this.i;
                this.i = i + 1;
                if (i % 2 != 1) {
                    return bArr;
                }
                Assert.assertEquals("value-1", new String(bArr, StandardCharsets.UTF_8));
                throw new SerializationException();
            }
        };
        buildFetcher((Deserializer<?>) byteArrayDeserializer, (Deserializer<?>) byteArrayDeserializer);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(matchesOffset(this.tp0, 1L), (AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        for (int i = 0; i < 2; i++) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail("fetchedRecords should have raised");
            } catch (SerializationException e) {
                Assert.assertEquals(1L, this.subscriptions.position(this.tp0).offset);
            }
        }
    }

    @Test
    public void testParseCorruptedRecord() throws Exception {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        DataOutputStream dataOutputStream = new DataOutputStream(new ByteBufferOutputStream(allocate));
        byte[] bytes = "foo".getBytes();
        byte[] bytes2 = "baz".getBytes();
        int recordSize = LegacyRecord.recordSize((byte) 1, bytes.length, bytes2.length);
        long computeChecksum = LegacyRecord.computeChecksum((byte) 1, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0L);
        dataOutputStream.writeInt(recordSize);
        LegacyRecord.write(dataOutputStream, (byte) 1, computeChecksum, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0 + 1);
        dataOutputStream.writeInt(recordSize);
        LegacyRecord.write(dataOutputStream, (byte) 1, computeChecksum + 1, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0 + 2);
        dataOutputStream.writeInt(recordSize);
        LegacyRecord.write(dataOutputStream, (byte) 1, computeChecksum, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        dataOutputStream.writeLong(0 + 3);
        dataOutputStream.writeInt(1);
        dataOutputStream.writeLong(0 + 4);
        dataOutputStream.writeInt(recordSize);
        LegacyRecord.write(dataOutputStream, (byte) 1, computeChecksum, LegacyRecord.computeAttributes((byte) 1, CompressionType.NONE, TimestampType.CREATE_TIME), 500L, bytes, bytes2);
        allocate.flip();
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(allocate), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(1L, ((List) this.fetcher.fetchedRecords().get(this.tp0)).size());
        Assert.assertEquals(1L, this.subscriptions.position(this.tp0).offset);
        ensureBlockOnRecord(1L);
        seekAndConsumeRecord(allocate, 2L);
        ensureBlockOnRecord(3L);
        try {
            seekAndConsumeRecord(allocate, 4L);
            Assert.fail("Should have thrown exception when fail to retrieve a record from iterator.");
        } catch (KafkaException e) {
        }
        ensureBlockOnRecord(4L);
    }

    private void ensureBlockOnRecord(long j) {
        for (int i = 0; i < 2; i++) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail("fetchedRecords should have raised KafkaException");
            } catch (KafkaException e) {
                Assert.assertEquals(j, this.subscriptions.position(this.tp0).offset);
            }
        }
    }

    private void seekAndConsumeRecord(ByteBuffer byteBuffer, long j) {
        this.subscriptions.seek(this.tp0, j);
        this.fetcher.fetchedRecords();
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(byteBuffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        List list = (List) fetchedRecords().get(this.tp0);
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(j, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(j + 1, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testInvalidDefaultRecordBatch() {
        buildFetcher();
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder memoryRecordsBuilder = new MemoryRecordsBuilder(new ByteBufferOutputStream(allocate), (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 10L, 0L, (short) 0, 0, false, false, 0, 1024);
        memoryRecordsBuilder.append(10L, "key".getBytes(), "value".getBytes());
        memoryRecordsBuilder.close();
        allocate.flip();
        allocate.position(17);
        allocate.put("beef".getBytes());
        allocate.position(0);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(allocate), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        for (int i = 0; i < 2; i++) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail("fetchedRecords should have raised KafkaException");
            } catch (KafkaException e) {
                Assert.assertEquals(0L, this.subscriptions.position(this.tp0).offset);
            }
        }
    }

    @Test
    public void testParseInvalidRecordBatch() {
        buildFetcher();
        ByteBuffer buffer = MemoryRecords.withRecords((byte) 2, 0L, CompressionType.NONE, TimestampType.CREATE_TIME, new SimpleRecord[]{new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())}).buffer();
        buffer.putInt(32, buffer.get(32) ^ 87238423);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("fetchedRecords should have raised");
        } catch (KafkaException e) {
            Assert.assertEquals(0L, this.subscriptions.position(this.tp0).offset);
        }
    }

    @Test
    public void testHeaders() {
        buildFetcher();
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        builder.append(0L, "key".getBytes(), "value-2".getBytes(), new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8))});
        builder.append(0L, "key".getBytes(), "value-3".getBytes(), new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey", "headerValue2".getBytes(StandardCharsets.UTF_8))});
        MemoryRecords build = builder.build();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(matchesOffset(this.tp0, 1L), (AbstractResponse) fullFetchResponse(this.tp0, build, Errors.NONE, 100L, 0));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        List list = (List) fetchedRecords().get(this.tp0);
        Assert.assertEquals(3L, list.size());
        Iterator it = list.iterator();
        Assert.assertNull(((ConsumerRecord) it.next()).headers().lastHeader("headerKey"));
        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
        Assert.assertEquals("headerValue", new String(consumerRecord.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assert.assertEquals("headerKey", consumerRecord.headers().lastHeader("headerKey").key());
        ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
        Assert.assertEquals("headerValue2", new String(consumerRecord2.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assert.assertEquals("headerKey", consumerRecord2.headers().lastHeader("headerKey").key());
    }

    @Test
    public void testFetchMaxPollRecords() {
        buildFetcher(2);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(matchesOffset(this.tp0, 1L), (AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.client.prepareResponse(matchesOffset(this.tp0, 4L), (AbstractResponse) fullFetchResponse(this.tp0, this.nextRecords, Errors.NONE, 100L, 0));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        List list = (List) fetchedRecords().get(this.tp0);
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(3L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(1L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(2L, ((ConsumerRecord) list.get(1)).offset());
        Assert.assertEquals(0L, this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        List list2 = (List) fetchedRecords().get(this.tp0);
        Assert.assertEquals(1L, list2.size());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(3L, ((ConsumerRecord) list2.get(0)).offset());
        Assert.assertTrue(this.fetcher.sendFetches() > 0);
        this.consumerClient.poll(this.time.timer(0L));
        List list3 = (List) fetchedRecords().get(this.tp0);
        Assert.assertEquals(2L, list3.size());
        Assert.assertEquals(6L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(4L, ((ConsumerRecord) list3.get(0)).offset());
        Assert.assertEquals(5L, ((ConsumerRecord) list3.get(1)).offset());
    }

    @Test
    public void testFetchAfterPartitionWithFetchedRecordsIsUnassigned() {
        buildFetcher(2);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        this.client.prepareResponse(matchesOffset(this.tp0, 1L), (AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        List list = (List) fetchedRecords().get(this.tp0);
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(3L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(1L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(2L, ((ConsumerRecord) list.get(1)).offset());
        assignFromUser(Collections.singleton(this.tp1));
        this.client.prepareResponse(matchesOffset(this.tp1, 4L), (AbstractResponse) fullFetchResponse(this.tp1, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.seek(this.tp1, 4L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = fetchedRecords();
        Assert.assertNull(fetchedRecords.get(this.tp0));
        List list2 = (List) fetchedRecords.get(this.tp1);
        Assert.assertEquals(2L, list2.size());
        Assert.assertEquals(6L, this.subscriptions.position(this.tp1).offset);
        Assert.assertEquals(4L, ((ConsumerRecord) list2.get(0)).offset());
        Assert.assertEquals(5L, ((ConsumerRecord) list2.get(1)).offset());
    }

    @Test
    public void testFetchNonContinuousRecords() {
        buildFetcher();
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords build = builder.build();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, build, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        List list = (List) fetchedRecords().get(this.tp0);
        Assert.assertEquals(3L, list.size());
        Assert.assertEquals(31L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(15L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(20L, ((ConsumerRecord) list.get(1)).offset());
        Assert.assertEquals(30L, ((ConsumerRecord) list.get(2)).offset());
    }

    @Test
    public void testFetchRequestWhenRecordTooLarge() {
        try {
            buildFetcher();
            this.client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.FETCH.id, (short) 2, (short) 2));
            makeFetchRequestWithIncompleteRecord();
            try {
                this.fetcher.fetchedRecords();
                Assert.fail("RecordTooLargeException should have been raised");
            } catch (RecordTooLargeException e) {
                Assert.assertTrue(e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                Assert.assertEquals(0L, this.subscriptions.position(this.tp0).offset);
            }
        } finally {
            this.client.setNodeApiVersions(NodeApiVersions.create());
        }
    }

    @Test
    public void testFetchRequestInternalError() {
        buildFetcher();
        makeFetchRequestWithIncompleteRecord();
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("RecordTooLargeException should have been raised");
        } catch (KafkaException e) {
            Assert.assertTrue(e.getMessage().startsWith("Failed to make progress reading messages"));
            Assert.assertEquals(0L, this.subscriptions.position(this.tp0).offset);
        }
    }

    private void makeFetchRequestWithIncompleteRecord() {
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, MemoryRecords.readableRecords(ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0})), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testUnauthorizedTopic() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        try {
            this.fetcher.fetchedRecords();
            Assert.fail("fetchedRecords should have thrown");
        } catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), e.unauthorizedTopics());
        }
    }

    @Test
    public void testFetchDuringEagerRebalance() {
        buildFetcher();
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return Integer.valueOf(this.validLeaderEpoch);
        }));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.emptyList());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.fetchedRecords().isEmpty());
    }

    @Test
    public void testFetchDuringCooperativeRebalance() {
        buildFetcher();
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return Integer.valueOf(this.validLeaderEpoch);
        }));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = fetchedRecords();
        Assert.assertEquals(1L, fetchedRecords.size());
        Assert.assertEquals(3L, ((List) fetchedRecords.get(this.tp0)).size());
    }

    @Test
    public void testInFlightFetchOnPausedPartition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp0);
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertNull(this.fetcher.fetchedRecords().get(this.tp0));
    }

    @Test
    public void testFetchOnPausedPartition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.pause(this.tp0);
        Assert.assertFalse(this.fetcher.sendFetches() > 0);
        Assert.assertTrue(this.client.requests().isEmpty());
    }

    @Test
    public void testFetchOnCompletedFetchesForPausedAndResumedPartitions() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp0);
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = fetchedRecords();
        Assert.assertEquals("Should not return any records when partition is paused", 0L, fetchedRecords.size());
        Assert.assertTrue("Should still contain completed fetches", this.fetcher.hasCompletedFetches());
        Assert.assertFalse("Should not have any available (non-paused) completed fetches", this.fetcher.hasAvailableFetches());
        Assert.assertNull(fetchedRecords.get(this.tp0));
        Assert.assertEquals(0L, this.fetcher.sendFetches());
        this.subscriptions.resume(this.tp0);
        Assert.assertTrue("Should have available (non-paused) completed fetches", this.fetcher.hasAvailableFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords2 = fetchedRecords();
        Assert.assertEquals("Should return records when partition is resumed", 1L, fetchedRecords2.size());
        Assert.assertNotNull(fetchedRecords2.get(this.tp0));
        Assert.assertEquals(3L, ((List) fetchedRecords2.get(this.tp0)).size());
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals("Should not return records after previously paused partitions are fetched", 0L, fetchedRecords().size());
        Assert.assertFalse("Should no longer contain completed fetches", this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testFetchOnCompletedFetchesForSomePausedPartitions() {
        buildFetcher();
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.subscriptions.seek(this.tp1, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp1, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.pause(this.tp0);
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = fetchedRecords();
        Assert.assertEquals("Should return completed fetch for unpaused partitions", 1L, fetchedRecords.size());
        Assert.assertTrue("Should still contain completed fetches", this.fetcher.hasCompletedFetches());
        Assert.assertNotNull(fetchedRecords.get(this.tp1));
        Assert.assertNull(fetchedRecords.get(this.tp0));
        Assert.assertEquals("Should return no records for remaining paused partition", 0L, fetchedRecords().size());
        Assert.assertTrue("Should still contain completed fetches", this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testFetchOnCompletedFetchesForAllPausedPartitions() {
        buildFetcher();
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.subscriptions.seek(this.tp1, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp1, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.pause(this.tp0);
        this.subscriptions.pause(this.tp1);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals("Should return no records for all paused partitions", 0L, fetchedRecords().size());
        Assert.assertTrue("Should still contain completed fetches", this.fetcher.hasCompletedFetches());
        Assert.assertFalse("Should not have any available (non-paused) completed fetches", this.fetcher.hasAvailableFetches());
    }

    @Test
    public void testPartialFetchWithPausedPartitions() {
        buildFetcher(2);
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals("Should return 2 records from fetch with 3 records", 2L, ((List) fetchedRecords().get(this.tp0)).size());
        Assert.assertFalse("Should have no completed fetches", this.fetcher.hasCompletedFetches());
        this.subscriptions.pause(this.tp0);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals("Should return no records for paused partitions", 0L, fetchedRecords().size());
        Assert.assertTrue("Should have 1 entry in completed fetches", this.fetcher.hasCompletedFetches());
        Assert.assertFalse("Should not have any available (non-paused) completed fetches", this.fetcher.hasAvailableFetches());
        this.subscriptions.resume(this.tp0);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals("Should return last remaining record", 1L, ((List) fetchedRecords().get(this.tp0)).size());
        Assert.assertFalse("Should have no completed fetches", this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testFetchDiscardedAfterPausedPartitionResumedAndSeekedToNewOffset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp0);
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.subscriptions.seek(this.tp0, 3L);
        this.subscriptions.resume(this.tp0);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue("Should have 1 entry in completed fetches", this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertEquals("Should not return any records because we seeked to a new offset", 0L, fetchedRecords.size());
        Assert.assertNull(fetchedRecords.get(this.tp0));
        Assert.assertFalse("Should have no completed fetches", this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testFetchNotLeaderOrFollower() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicOrPartition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchFencedLeaderEpoch() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.FENCED_LEADER_EPOCH, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals("Should not return any records", 0L, this.fetcher.fetchedRecords().size());
        Assert.assertEquals("Should have requested metadata update", 0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownLeaderEpoch() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.UNKNOWN_LEADER_EPOCH, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals("Should not return any records", 0L, this.fetcher.fetchedRecords().size());
        Assert.assertNotEquals("Should not have requested metadata update", 0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testEpochSetInFetchRequest() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return 99;
        }));
        this.subscriptions.seek(this.tp0, 10L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(abstractRequest -> {
            if (abstractRequest instanceof FetchRequest) {
                ((FetchRequest) abstractRequest).fetchData().values().forEach(partitionData -> {
                    Assert.assertTrue("Expected Fetcher to set leader epoch in request", partitionData.currentLeaderEpoch.isPresent());
                    Assert.assertEquals("Expected leader epoch to match epoch from metadata update", 99L, ((Integer) partitionData.currentLeaderEpoch.get()).longValue());
                });
                return true;
            }
            Assert.fail("Should have seen FetchRequest");
            return false;
        }, (AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
    }

    @Test
    public void testFetchOffsetOutOfRange() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertNull(this.subscriptions.validPosition(this.tp0));
        Assert.assertNull(this.subscriptions.position(this.tp0));
    }

    @Test
    public void testStaleOutOfRangeError() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.subscriptions.seek(this.tp0, 1L);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals(1L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testFetchedRecordsAfterSeek() {
        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertTrue(this.fetcher.sendFetches() > 0);
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        this.subscriptions.seek(this.tp0, 2L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
    }

    @Test
    public void testFetchOffsetOutOfRangeException() {
        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        for (int i = 0; i < 2; i++) {
            OffsetOutOfRangeException assertThrows = Assert.assertThrows(OffsetOutOfRangeException.class, () -> {
                this.fetcher.fetchedRecords();
            });
            Assert.assertEquals(Collections.singleton(this.tp0), assertThrows.offsetOutOfRangePartitions().keySet());
            Assert.assertEquals(0L, ((Long) assertThrows.offsetOutOfRangePartitions().get(this.tp0)).longValue());
        }
    }

    @Test
    public void testFetchPositionAfterException() {
        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 1L);
        this.subscriptions.seek(this.tp1, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, (List) null, this.records));
        linkedHashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, (List) null, MemoryRecords.EMPTY));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(linkedHashMap), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        ArrayList arrayList = new ArrayList();
        fetchRecordsInto(arrayList);
        Assert.assertEquals(1L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(4L, this.subscriptions.position(this.tp1).offset);
        Assert.assertEquals(3L, arrayList.size());
        OffsetOutOfRangeException assertThrows = Assert.assertThrows(OffsetOutOfRangeException.class, () -> {
            fetchRecordsInto(arrayList);
        });
        Assert.assertEquals(Collections.singleton(this.tp0), assertThrows.offsetOutOfRangePartitions().keySet());
        Assert.assertEquals(1L, ((Long) assertThrows.offsetOutOfRangePartitions().get(this.tp0)).longValue());
        Assert.assertEquals(1L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(4L, this.subscriptions.position(this.tp1).offset);
        Assert.assertEquals(3L, arrayList.size());
    }

    private void fetchRecordsInto(List<ConsumerRecord<byte[], byte[]>> list) {
        Collection values = fetchedRecords().values();
        Objects.requireNonNull(list);
        values.forEach((v1) -> {
            r1.addAll(v1);
        });
    }

    @Test
    public void testCompletedFetchRemoval() {
        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1, this.tp2, this.tp3}));
        this.subscriptions.seek(this.tp0, 1L);
        this.subscriptions.seek(this.tp1, 1L);
        this.subscriptions.seek(this.tp2, 1L);
        this.subscriptions.seek(this.tp3, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, (List) null, this.records));
        linkedHashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, (List) null, MemoryRecords.EMPTY));
        linkedHashMap.put(this.tp2, new FetchResponse.PartitionData(Errors.NONE, 100L, 4L, 0L, (List) null, this.nextRecords));
        linkedHashMap.put(this.tp3, new FetchResponse.PartitionData(Errors.NONE, 100L, 4L, 0L, (List) null, this.partialRecords));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(linkedHashMap), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        ArrayList arrayList = new ArrayList();
        Iterator it = fetchedRecords().values().iterator();
        while (it.hasNext()) {
            arrayList.addAll((List) it.next());
        }
        Assert.assertEquals(arrayList.size(), this.subscriptions.position(this.tp1).offset - 1);
        Assert.assertEquals(4L, this.subscriptions.position(this.tp1).offset);
        Assert.assertEquals(3L, arrayList.size());
        ArrayList arrayList2 = new ArrayList();
        try {
            Iterator it2 = fetchedRecords().values().iterator();
            while (it2.hasNext()) {
                arrayList.addAll((List) it2.next());
            }
        } catch (OffsetOutOfRangeException e) {
            arrayList2.add(e);
        }
        Assert.assertEquals(1L, arrayList2.size());
        Assert.assertTrue(((OffsetOutOfRangeException) arrayList2.get(0)).offsetOutOfRangePartitions().containsKey(this.tp0));
        Assert.assertEquals(r0.offsetOutOfRangePartitions().size(), 1L);
        Iterator it3 = fetchedRecords().values().iterator();
        while (it3.hasNext()) {
            arrayList.addAll((List) it3.next());
        }
        Assert.assertEquals(6L, this.subscriptions.position(this.tp2).offset);
        Assert.assertEquals(5L, arrayList.size());
        ArrayList arrayList3 = new ArrayList();
        for (int i = 1; i <= 3; i++) {
            try {
                Iterator it4 = fetchedRecords().values().iterator();
                while (it4.hasNext()) {
                    arrayList.addAll((List) it4.next());
                }
            } catch (KafkaException e2) {
                arrayList3.add(e2);
            }
        }
        Assert.assertEquals(3, arrayList3.size());
    }

    @Test
    public void testSeekBeforeException() {
        buildFetcher(OffsetResetStrategy.NONE, new ByteArrayDeserializer(), new ByteArrayDeserializer(), 2, IsolationLevel.READ_UNCOMMITTED);
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0}));
        this.subscriptions.seek(this.tp0, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        new HashMap().put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, Optional.empty(), (List) null, this.records));
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(2L, ((List) this.fetcher.fetchedRecords().get(this.tp0)).size());
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp1, 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, Optional.empty(), (List) null, MemoryRecords.EMPTY));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(hashMap), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(1L, ((List) this.fetcher.fetchedRecords().get(this.tp0)).size());
        this.subscriptions.seek(this.tp1, 10L);
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
    }

    @Test
    public void testFetchDisconnected() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse) fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0), true);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(0L, this.fetcher.fetchedRecords().size());
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(0L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionNoOpWithPositionSet() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 5L);
        this.fetcher.resetOffsetsIfNeeded();
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L, Optional.of(Integer.valueOf(this.validLeaderEpoch))), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testFetchOffsetErrors() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, Optional.of(Integer.valueOf(this.validLeaderEpoch))), listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, Optional.of(Integer.valueOf(this.validLeaderEpoch))), listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), listOffsetResponse(Errors.NONE, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(this.subscriptions.position(this.tp0).offset, 5L);
    }

    @Test
    public void testListOffsetSendsReadUncommitted() {
        testListOffsetsSendsIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
    }

    @Test
    public void testListOffsetSendsReadCommitted() {
        testListOffsetsSendsIsolationLevel(IsolationLevel.READ_COMMITTED);
    }

    private void testListOffsetsSendsIsolationLevel(IsolationLevel isolationLevel) {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(abstractRequest -> {
            return ((ListOffsetRequest) abstractRequest).isolationLevel() == isolationLevel;
        }, (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testResetOffsetsSkipsBlackedOutConnections() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.blackout((Node) this.initialUpdateResponse.brokers().iterator().next(), 500L);
        this.fetcher.resetOffsetsIfNeeded();
        Assert.assertEquals(0L, this.consumerClient.pendingRequestCount());
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals(OffsetResetStrategy.EARLIEST, this.subscriptions.resetStrategy(this.tp0));
        this.time.sleep(500L);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L, Optional.of(Integer.valueOf(this.validLeaderEpoch))), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testResetOffsetsMetadataRefresh() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, Optional.of(Integer.valueOf(this.validLeaderEpoch))), listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingMetadataUpdates());
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testListOffsetNoUpdateMissingEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(TestUtils.metadataUpdateWith("kafka-cluster", 1, (Map<String, Errors>) Collections.emptyMap(), (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return null;
        }));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(this.tp0, Errors.NONE, 1L, 5L, 1));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.metadata.updateRequested());
        Assert.assertFalse(this.metadata.lastSeenLeaderEpoch(this.tp0).isPresent());
    }

    @Test
    public void testListOffsetUpdateEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(TestUtils.metadataUpdateWith("kafka-cluster", 1, (Map<String, Errors>) Collections.emptyMap(), (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, 1), (AbstractResponse) listOffsetResponse(this.tp0, Errors.NONE, 1L, 5L, 2));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.metadata.updateRequested());
        TestUtils.assertOptional(this.metadata.lastSeenLeaderEpoch(this.tp0), num -> {
            Assert.assertEquals(num.intValue(), 2L);
        });
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, Optional.of(Integer.valueOf(this.validLeaderEpoch))), listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingMetadataUpdates());
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testAssignmentChangeWithInFlightReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        assignFromUser(Collections.singleton(this.tp1));
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingResponses());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(this.subscriptions.isAssigned(this.tp0));
    }

    @Test
    public void testSeekWithInFlightReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.seek(this.tp0, 237L);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingResponses());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertEquals(237L, this.subscriptions.position(this.tp0).offset);
    }

    @Test(timeout = 10000)
    public void testEarlierOffsetResetArrivesLate() throws InterruptedException {
        LogContext logContext = new LogContext();
        buildFetcher((SubscriptionState) Mockito.spy(new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST)), logContext);
        assignFromUser(Collections.singleton(this.tp0));
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        CountDownLatch countDownLatch4 = new CountDownLatch(1);
        try {
            ((SubscriptionState) Mockito.doAnswer(invocationOnMock -> {
                countDownLatch.countDown();
                countDownLatch2.await();
                Object callRealMethod = invocationOnMock.callRealMethod();
                countDownLatch3.countDown();
                return callRealMethod;
            }).when(this.subscriptions)).maybeSeekUnvalidated(this.tp0, 0L, OffsetResetStrategy.EARLIEST);
            newSingleThreadExecutor.submit(() -> {
                this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
                this.fetcher.resetOffsetsIfNeeded();
                this.consumerClient.pollNoWakeup();
                this.client.respond(listOffsetResponse(Errors.NONE, 1L, 0L));
                this.consumerClient.pollNoWakeup();
                countDownLatch4.countDown();
            }, Void.class);
            countDownLatch.await();
            this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
            this.fetcher.resetOffsetsIfNeeded();
            this.consumerClient.pollNoWakeup();
            this.client.respond(listOffsetResponse(Errors.NONE, 1L, 10L));
            countDownLatch2.countDown();
            countDownLatch3.await();
            this.consumerClient.pollNoWakeup();
            countDownLatch4.await();
            Assert.assertEquals(10L, this.subscriptions.position(this.tp0).offset);
            newSingleThreadExecutor.shutdown();
            newSingleThreadExecutor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            newSingleThreadExecutor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    @Test
    public void testChangeResetWithInFlightReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasPendingResponses());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals(OffsetResetStrategy.EARLIEST, this.subscriptions.resetStrategy(this.tp0));
    }

    @Test
    public void testIdempotentResetWithInFlightReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testRestOffsetsAuthorizationFailure() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, Optional.of(Integer.valueOf(this.validLeaderEpoch))), listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1L), false);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        try {
            this.fetcher.resetOffsetsIfNeeded();
            Assert.fail("Expected authorization error to be raised");
        } catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.tp0.topic()), e.unauthorizedTopics());
        }
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assert.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.pause(this.tp0);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, Optional.of(Integer.valueOf(this.validLeaderEpoch))), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 10L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals(10L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.subscriptions.pause(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 10L);
        this.subscriptions.pause(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        Assert.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assert.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals(10L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testGetAllTopics() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertEquals(this.initialUpdateResponse.topicMetadata().size(), this.fetcher.getAllTopicMetadata(this.time.timer(5000L)).size());
    }

    @Test
    public void testGetAllTopicsDisconnect() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse) null, true);
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertEquals(this.initialUpdateResponse.topicMetadata().size(), this.fetcher.getAllTopicMetadata(this.time.timer(5000L)).size());
    }

    @Test(expected = TimeoutException.class)
    public void testGetAllTopicsTimeout() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.fetcher.getAllTopicMetadata(this.time.timer(50L));
    }

    @Test
    public void testGetAllTopicsUnauthorized() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
        try {
            this.fetcher.getAllTopicMetadata(this.time.timer(10L));
            Assert.fail();
        } catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), e.unauthorizedTopics());
        }
    }

    @Test(expected = InvalidTopicException.class)
    public void testGetTopicMetadataInvalidTopic() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.INVALID_TOPIC_EXCEPTION));
        this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L));
    }

    @Test
    public void testGetTopicMetadataUnknownTopic() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
        Assert.assertNull(this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L)).get(this.topicName));
    }

    @Test
    public void testGetTopicMetadataLeaderNotAvailable() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.LEADER_NOT_AVAILABLE));
        this.client.prepareResponse(newMetadataResponse(this.topicName, Errors.NONE));
        Assert.assertTrue(this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), this.time.timer(5000L)).containsKey(this.topicName));
    }

    @Test
    public void testGetTopicMetadataOfflinePartitions() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse newMetadataResponse = newMetadataResponse(this.topicName, Errors.NONE);
        ArrayList arrayList = new ArrayList();
        for (MetadataResponse.TopicMetadata topicMetadata : newMetadataResponse.topicMetadata()) {
            List<MetadataResponse.PartitionMetadata> partitionMetadata = topicMetadata.partitionMetadata();
            ArrayList arrayList2 = new ArrayList();
            for (MetadataResponse.PartitionMetadata partitionMetadata2 : partitionMetadata) {
                arrayList2.add(new MetadataResponse.PartitionMetadata(partitionMetadata2.error, partitionMetadata2.topicPartition, Optional.empty(), Optional.empty(), partitionMetadata2.replicaIds, partitionMetadata2.inSyncReplicaIds, partitionMetadata2.offlineReplicaIds));
            }
            arrayList.add(new MetadataResponse.TopicMetadata(topicMetadata.error(), topicMetadata.topic(), topicMetadata.isInternal(), arrayList2));
        }
        Node controller = newMetadataResponse.controller();
        this.client.prepareResponse(MetadataResponse.prepareResponse(newMetadataResponse.brokers(), newMetadataResponse.clusterId(), controller != null ? controller.id() : -1, arrayList));
        Map topicMetadata2 = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), false), this.time.timer(5000L));
        Assert.assertNotNull(topicMetadata2);
        Assert.assertNotNull(topicMetadata2.get(this.topicName));
        Assert.assertEquals(this.metadata.fetch().partitionCountForTopic(this.topicName).longValue(), ((List) topicMetadata2.get(this.topicName)).size());
    }

    @Test
    public void testQuotaMetrics() {
        buildFetcher();
        MockSelector mockSelector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(this.metrics, this.metricsRegistry);
        Node node = (Node) TestUtils.singletonCluster("test", 1).nodes().get(0);
        NetworkClient networkClient = new NetworkClient(mockSelector, this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, ClientDnsLookup.USE_ALL_DNS_IPS, this.time, true, new ApiVersions(), throttleTimeSensor, new LogContext());
        mockSelector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), ApiVersionsResponse.createApiVersionsResponse(400, (byte) 2).serialize(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion(), 0))));
        while (!networkClient.ready(node, this.time.milliseconds())) {
            networkClient.poll(1L, this.time.milliseconds());
            this.time.sleep(networkClient.throttleDelayMs(node, this.time.milliseconds()));
        }
        mockSelector.clear();
        for (int i = 1; i <= 3; i++) {
            FetchRequest.Builder forConsumer = FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap());
            forConsumer.rackId("");
            ClientRequest newClientRequest = networkClient.newClientRequest(node.idString(), forConsumer, this.time.milliseconds(), true);
            networkClient.send(newClientRequest, this.time.milliseconds());
            networkClient.poll(1L, this.time.milliseconds());
            mockSelector.completeReceive(new NetworkReceive(node.idString(), fullFetchResponse(this.tp0, this.nextRecords, Errors.NONE, i, 100 * i).serialize(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), newClientRequest.correlationId())));
            networkClient.poll(1L, this.time.milliseconds());
            this.time.sleep(networkClient.throttleDelayMs(node, this.time.milliseconds()));
            mockSelector.clear();
        }
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeMax, new String[0]));
        Assert.assertEquals(250.0d, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        Assert.assertEquals(400.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
        networkClient.close();
    }

    @Test
    public void testFetcherMetrics() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName metricInstance = this.metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        HashMap hashMap = new HashMap();
        hashMap.put("topic", this.tp0.topic());
        hashMap.put("partition", String.valueOf(this.tp0.partition()));
        MetricName metricName = this.metrics.metricName("records-lag", this.metricGroup, hashMap);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(metricInstance);
        Assert.assertEquals(Double.NaN, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        fetchRecords(this.tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
        Assert.assertEquals(100.0d, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(metricName);
        Assert.assertEquals(100.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        fetchRecords(this.tp0, builder.build(), Errors.NONE, 200L, 0);
        Assert.assertEquals(197.0d, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        Assert.assertEquals(197.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assert.assertFalse(metrics.containsKey(metricName));
    }

    @Test
    public void testFetcherLeadMetric() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName metricInstance = this.metrics.metricInstance(this.metricsRegistry.recordsLeadMin, new String[0]);
        HashMap hashMap = new HashMap(2);
        hashMap.put("topic", this.tp0.topic());
        hashMap.put("partition", String.valueOf(this.tp0.partition()));
        MetricName metricName = this.metrics.metricName("records-lead", this.metricGroup, "", hashMap);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(metricInstance);
        Assert.assertEquals(Double.NaN, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        fetchRecords(this.tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, -1L, 0L, 0);
        Assert.assertEquals(0.0d, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(metricName);
        Assert.assertEquals(0.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        fetchRecords(this.tp0, builder.build(), Errors.NONE, 200L, -1L, 0L, 0);
        Assert.assertEquals(0.0d, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        Assert.assertEquals(3.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assert.assertFalse(metrics.containsKey(metricName));
    }

    @Test
    public void testReadCommittedLagMetric() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        MetricName metricInstance = this.metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        HashMap hashMap = new HashMap();
        hashMap.put("topic", this.tp0.topic());
        hashMap.put("partition", String.valueOf(this.tp0.partition()));
        MetricName metricName = this.metrics.metricName("records-lag", this.metricGroup, hashMap);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(metricInstance);
        Assert.assertEquals(Double.NaN, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        fetchRecords(this.tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
        Assert.assertEquals(50.0d, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(metricName);
        Assert.assertEquals(50.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        fetchRecords(this.tp0, builder.build(), Errors.NONE, 200L, 150L, 0);
        Assert.assertEquals(147.0d, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        Assert.assertEquals(147.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
        this.subscriptions.unsubscribe();
        this.fetcher.sendFetches();
        Assert.assertFalse(metrics.containsKey(metricName));
    }

    @Test
    public void testFetchResponseMetrics() {
        buildFetcher();
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("bar", 0);
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2}));
        HashMap hashMap = new HashMap();
        hashMap.put("foo", 1);
        hashMap.put("bar", 1);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, hashMap, (Function<TopicPartition, Integer>) topicPartition3 -> {
            return Integer.valueOf(this.validLeaderEpoch);
        }));
        int i = 0;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (TopicPartition topicPartition4 : Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2})) {
            this.subscriptions.seek(topicPartition4, 0L);
            MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
            for (int i2 = 0; i2 < 3; i2++) {
                builder.appendWithOffset(i2, -1L, "key".getBytes(), ("value-" + i2).getBytes());
            }
            MemoryRecords build = builder.build();
            Iterator it = build.records().iterator();
            while (it.hasNext()) {
                i += ((Record) it.next()).sizeInBytes();
            }
            linkedHashMap.put(topicPartition4, new FetchResponse.PartitionData(Errors.NONE, 15L, -1L, 0L, (List) null, build));
        }
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(new FetchResponse(Errors.NONE, linkedHashMap, 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords = fetchedRecords();
        Assert.assertEquals(3L, ((List) fetchedRecords.get(topicPartition)).size());
        Assert.assertEquals(3L, ((List) fetchedRecords.get(topicPartition2)).size());
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assert.assertEquals(i, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        Assert.assertEquals(6.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
    }

    @Test
    public void testFetchResponseMetricsPartialResponse() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 1L);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        MemoryRecords build = builder.build();
        int i2 = 0;
        for (Record record : build.records()) {
            if (record.offset() >= 1) {
                i2 += record.sizeInBytes();
            }
        }
        fetchRecords(this.tp0, build, Errors.NONE, 100L, 0);
        Assert.assertEquals(i2, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        Assert.assertEquals(2.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionError() {
        buildFetcher();
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 0L);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        MemoryRecords build = builder.build();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, (List) null, build));
        hashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, 0L, (List) null, MemoryRecords.EMPTY));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(hashMap), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.fetcher.fetchedRecords();
        int i2 = 0;
        Iterator it = build.records().iterator();
        while (it.hasNext()) {
            i2 += ((Record) it.next()).sizeInBytes();
        }
        Assert.assertEquals(i2, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        Assert.assertEquals(3.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
        buildFetcher();
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        this.subscriptions.seek(this.tp0, 0L);
        this.subscriptions.seek(this.tp1, 0L);
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.subscriptions.seek(this.tp1, 5L);
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
        for (int i = 0; i < 3; i++) {
            builder.appendWithOffset(i, -1L, "key".getBytes(), ("value-" + i).getBytes());
        }
        MemoryRecords build = builder.build();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, (List) null, build));
        hashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, (List) null, MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord("val".getBytes())})));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(hashMap), 0, 0));
        this.consumerClient.poll(this.time.timer(0L));
        this.fetcher.fetchedRecords();
        int i2 = 0;
        Iterator it = build.records().iterator();
        while (it.hasNext()) {
            i2 += ((Record) it.next()).sizeInBytes();
        }
        Assert.assertEquals(i2, ((Double) kafkaMetric.metricValue()).doubleValue(), EPSILON);
        Assert.assertEquals(3.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPSILON);
    }

    @Test
    public void testFetcherMetricsTemplates() {
        buildFetcher(new MetricConfig().tags(Collections.singletonMap("client-id", "clientA")), OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(fetchedRecords().containsKey(this.tp0));
        Fetcher.throttleTimeSensor(this.metrics, this.metricsRegistry);
        HashSet hashSet = new HashSet();
        for (MetricName metricName : this.metrics.metrics().keySet()) {
            String replaceAll = metricName.name().replaceAll(this.tp0.toString(), "{topic}-{partition}");
            if (!metricName.group().equals("kafka-metrics-count")) {
                hashSet.add(new MetricNameTemplate(replaceAll, metricName.group(), "", metricName.tags().keySet()));
            }
        }
        TestUtils.checkEquals(hashSet, new HashSet(this.metricsRegistry.getAllTemplates()), "metrics", "templates");
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, int i) {
        return fetchRecords(topicPartition, memoryRecords, errors, j, -1L, i);
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, long j2, int i) {
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(topicPartition, memoryRecords, errors, j, j2, i));
        this.consumerClient.poll(this.time.timer(0L));
        return fetchedRecords();
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, long j2, long j3, int i) {
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fetchResponse(topicPartition, memoryRecords, errors, j, j2, j3, i));
        this.consumerClient.poll(this.time.timer(0L));
        return fetchedRecords();
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        buildFetcher();
        Assert.assertThrows(TimeoutException.class, () -> {
            this.fetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition(this.topicName, 2), 1000L), this.time.timer(100L));
        });
    }

    @Test
    public void testGetOffsetsForTimes() {
        buildFetcher();
        Assert.assertTrue(this.fetcher.offsetsForTimes(new HashMap(), this.time.timer(100L)).isEmpty());
        testGetOffsetsForTimesWithUnknownOffset();
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
        testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
    }

    @Test
    public void testGetOffsetsFencedLeaderEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetResponse(Errors.FENCED_LEADER_EPOCH, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
        List<Errors> asList = Arrays.asList(Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
        MetadataResponse metadataUpdateWith = TestUtils.metadataUpdateWith("dummy", 3, (Map<String, Errors>) Collections.singletonMap(this.topicName, Errors.NONE), (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return 3;
        });
        Node leaderFor = this.initialUpdateResponse.cluster().leaderFor(this.tp1);
        Node leaderFor2 = metadataUpdateWith.cluster().leaderFor(this.tp1);
        Assert.assertNotEquals(leaderFor, leaderFor2);
        for (Errors errors : asList) {
            buildFetcher();
            this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
            this.client.updateMetadata(this.initialUpdateResponse);
            HashMap hashMap = new HashMap();
            hashMap.put(this.tp0, new ListOffsetResponse.PartitionData(Errors.NONE, 10L, 4L, Optional.empty()));
            hashMap.put(this.tp1, new ListOffsetResponse.PartitionData(errors, -1L, -1L, Optional.empty()));
            this.client.prepareResponseFrom(abstractRequest -> {
                if (!(abstractRequest instanceof ListOffsetRequest)) {
                    return false;
                }
                HashMap hashMap2 = new HashMap();
                hashMap2.put(this.tp0, new ListOffsetRequest.PartitionData(10L, Optional.empty()));
                hashMap2.put(this.tp1, new ListOffsetRequest.PartitionData(10L, Optional.empty()));
                return ((ListOffsetRequest) abstractRequest).partitionTimestamps().equals(hashMap2);
            }, (AbstractResponse) new ListOffsetResponse(hashMap), leaderFor);
            this.client.prepareMetadataUpdate(metadataUpdateWith);
            HashMap hashMap2 = new HashMap(hashMap);
            hashMap2.put(this.tp1, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L, Optional.empty()));
            this.client.prepareResponseFrom(new ListOffsetResponse(hashMap2), leaderFor);
            this.client.prepareResponseFrom(abstractRequest2 -> {
                if (abstractRequest2 instanceof ListOffsetRequest) {
                    return ((ListOffsetRequest) abstractRequest2).partitionTimestamps().equals(Collections.singletonMap(this.tp1, new ListOffsetRequest.PartitionData(10L, Optional.of(3))));
                }
                return false;
            }, (AbstractResponse) listOffsetResponse(this.tp1, Errors.NONE, 10L, 5L), leaderFor2);
            Assert.assertEquals(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.tp0, new OffsetAndTimestamp(4L, 10L)), Utils.mkEntry(this.tp1, new OffsetAndTimestamp(5L, 10L))}), this.fetcher.offsetsForTimes(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.tp0, 10L), Utils.mkEntry(this.tp1, 10L)}), this.time.timer(2147483647L)));
            Assert.assertEquals(1L, this.client.numAwaitingResponses());
            this.fetcher.close();
        }
    }

    @Test
    public void testGetOffsetsUnknownLeaderEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetResponse(Errors.UNKNOWN_LEADER_EPOCH, 1L, 5L));
        this.fetcher.resetOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assert.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assert.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assert.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetsIncludesLeaderEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.updateMetadata(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return 99;
        }));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.fetcher.resetOffsetsIfNeeded();
        this.client.prepareResponse(abstractRequest -> {
            if (!(abstractRequest instanceof ListOffsetRequest)) {
                Assert.fail("Should have seen ListOffsetRequest");
                return false;
            }
            Optional optional = ((ListOffsetRequest.PartitionData) ((ListOffsetRequest) abstractRequest).partitionTimestamps().get(this.tp0)).currentLeaderEpoch;
            Assert.assertTrue("Expected Fetcher to set leader epoch in request", optional.isPresent());
            Assert.assertEquals("Expected leader epoch to match epoch from metadata update", ((Integer) optional.get()).longValue(), 99L);
            return true;
        }, (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
        buildFetcher();
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        TopicPartition topicPartition = new TopicPartition("another-topic", 0);
        this.client.reset();
        MetadataResponse metadataUpdateWith = TestUtils.metadataUpdateWith(3, Collections.singletonMap(this.topicName, 2));
        this.client.updateMetadata(metadataUpdateWith);
        this.client.prepareMetadataUpdate(metadataUpdateWith);
        this.client.prepareResponseFrom(listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        this.client.prepareResponseFrom(listOffsetResponse(this.tp1, Errors.NONE, 1000L, 32L), this.metadata.fetch().leaderFor(this.tp1));
        HashMap hashMap = new HashMap();
        hashMap.put(this.topicName, 2);
        hashMap.put("another-topic", 1);
        this.client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(3, hashMap));
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, Errors.NONE, 1000L, 54L), this.metadata.fetch().leaderFor(topicPartition));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, -1L);
        hashMap2.put(this.tp1, -1L);
        hashMap2.put(topicPartition, -1L);
        Map offsetsForTimes = this.fetcher.offsetsForTimes(hashMap2, this.time.timer(Long.MAX_VALUE));
        Assert.assertNotNull("Expect Fetcher.offsetsForTimes() to return non-null result for " + this.tp0, offsetsForTimes.get(this.tp0));
        Assert.assertNotNull("Expect Fetcher.offsetsForTimes() to return non-null result for " + this.tp1, offsetsForTimes.get(this.tp1));
        Assert.assertNotNull("Expect Fetcher.offsetsForTimes() to return non-null result for " + topicPartition, offsetsForTimes.get(topicPartition));
        Assert.assertEquals(11L, ((OffsetAndTimestamp) offsetsForTimes.get(this.tp0)).offset());
        Assert.assertEquals(32L, ((OffsetAndTimestamp) offsetsForTimes.get(this.tp1)).offset());
        Assert.assertEquals(54L, ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset());
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersDisconnectException() {
        buildFetcher();
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, new TopicPartition("another-topic", 0)}));
        this.client.reset();
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap(this.topicName, 1)));
        HashMap hashMap = new HashMap();
        hashMap.put(this.topicName, 1);
        hashMap.put("another-topic", 1);
        this.client.prepareMetadataUpdate(TestUtils.metadataUpdateWith(1, hashMap));
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), true);
        this.client.prepareResponseFrom(listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, -1L);
        Map offsetsForTimes = this.fetcher.offsetsForTimes(hashMap2, this.time.timer(Long.MAX_VALUE));
        Assert.assertNotNull("Expect Fetcher.offsetsForTimes() to return non-null result for " + this.tp0, offsetsForTimes.get(this.tp0));
        Assert.assertEquals(11L, ((OffsetAndTimestamp) offsetsForTimes.get(this.tp0)).offset());
        Assert.assertNotNull(this.metadata.fetch().partitionCountForTopic("another-topic"));
    }

    @Test(expected = TimeoutException.class)
    public void testBatchedListOffsetsMetadataErrors() {
        buildFetcher();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L, Optional.empty()));
        hashMap.put(this.tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, -1L, -1L, Optional.empty()));
        this.client.prepareResponse(new ListOffsetResponse(0, hashMap));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, -2L);
        hashMap2.put(this.tp1, -2L);
        this.fetcher.offsetsForTimes(hashMap2, this.time.timer(0L));
    }

    @Test
    public void testSkippingAbortedTransactions() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        abortTransaction(allocate, 1L, 0 + appendTransactionalRecords(allocate, 1L, 0, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes())));
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertFalse(fetchedRecords().containsKey(this.tp0));
    }

    @Test
    public void testReturnCommittedTransactions() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        commitTransaction(allocate, 1L, 0 + appendTransactionalRecords(allocate, 1L, 0, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes())));
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(abstractRequest -> {
            Assert.assertEquals(IsolationLevel.READ_COMMITTED, ((FetchRequest) abstractRequest).isolationLevel());
            return true;
        }, (AbstractResponse) fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(fetchedRecords().containsKey(this.tp0));
        Assert.assertEquals(((List) r0.get(this.tp0)).size(), 2L);
    }

    @Test
    public void testReadCommittedWithCommittedAndAbortedTransactions() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        ArrayList arrayList = new ArrayList();
        appendTransactionalRecords(allocate, 1L, 0L, new SimpleRecord("commit1-1".getBytes(), "value".getBytes()), new SimpleRecord("commit1-2".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 2L, 2L, new SimpleRecord("abort2-1".getBytes(), "value".getBytes()));
        commitTransaction(allocate, 1L, 3L);
        appendTransactionalRecords(allocate, 2L, 4L, new SimpleRecord("abort2-2".getBytes(), "value".getBytes()));
        abortTransaction(allocate, 2L, 5L);
        arrayList.add(new FetchResponse.AbortedTransaction(2L, 2L));
        appendTransactionalRecords(allocate, 1L, 6L, new SimpleRecord("abort1-1".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 2L, 7L, new SimpleRecord("commit2-1".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 1L, 8L, new SimpleRecord("abort1-2".getBytes(), "value".getBytes()));
        abortTransaction(allocate, 1L, 9L);
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 6L));
        commitTransaction(allocate, 2L, 10L);
        allocate.flip();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        HashSet hashSet = new HashSet();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(new String((byte[]) ((ConsumerRecord) it.next()).key(), StandardCharsets.UTF_8));
        }
        Assert.assertEquals(Utils.mkSet(new String[]{"commit1-1", "commit1-2", "commit2-1"}), hashSet);
    }

    @Test
    public void testMultipleAbortMarkers() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        int appendTransactionalRecords = 0 + appendTransactionalRecords(allocate, 1L, 0, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
        int abortTransaction = appendTransactionalRecords + abortTransaction(allocate, 1L, appendTransactionalRecords);
        int abortTransaction2 = abortTransaction + abortTransaction(allocate, 1L, abortTransaction);
        commitTransaction(allocate, 1L, abortTransaction2 + appendTransactionalRecords(allocate, 1L, abortTransaction2, new SimpleRecord(this.time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "commit1-2".getBytes(), "value".getBytes())));
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        Assert.assertEquals(((List) fetchedRecords.get(this.tp0)).size(), 2L);
        List list = (List) fetchedRecords.get(this.tp0);
        HashSet hashSet = new HashSet(Arrays.asList("commit1-1", "commit1-2"));
        HashSet hashSet2 = new HashSet();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            hashSet2.add(new String((byte[]) ((ConsumerRecord) it.next()).key(), StandardCharsets.UTF_8));
        }
        Assert.assertEquals(hashSet2, hashSet);
    }

    @Test
    public void testReadCommittedAbortMarkerWithNoData() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new StringDeserializer(), new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        abortTransaction(allocate, 1L, 5L);
        appendTransactionalRecords(allocate, 1L, 6L, new SimpleRecord("6".getBytes(), (byte[]) null), new SimpleRecord("7".getBytes(), (byte[]) null), new SimpleRecord("8".getBytes(), (byte[]) null));
        commitTransaction(allocate, 1L, 9L);
        allocate.flip();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords(allocate), arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(3L, list.size());
        Assert.assertEquals(Arrays.asList(6L, 7L, 8L), collectRecordOffsets(list));
    }

    @Test
    public void testUpdatePositionWithLastRecordMissingFromBatch() {
        buildFetcher();
        MemoryRecords.FilterResult filterTo = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord((byte[]) null, "value".getBytes())}).filterTo(this.tp0, new MemoryRecords.RecordFilter() { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.2
            protected MemoryRecords.RecordFilter.BatchRetention checkBatchRetention(RecordBatch recordBatch) {
                return MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY;
            }

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return record.key() != null;
            }
        }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        filterTo.outputBuffer().flip();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(filterTo.outputBuffer());
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, readableRecords, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(3L, list.size());
        for (int i = 0; i < 3; i++) {
            Assert.assertEquals(Integer.toString(i), new String((byte[]) ((ConsumerRecord) list.get(i)).key()));
        }
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdatePositionOnEmptyBatch() {
        buildFetcher();
        ByteBuffer allocate = ByteBuffer.allocate(61);
        DefaultRecordBatch.writeEmptyHeader(allocate, (byte) 2, 1L, (short) 0, 1, 37L, 54L, 7, TimestampType.CREATE_TIME, System.currentTimeMillis(), false, false);
        allocate.flip();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, readableRecords, Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(fetchedRecords().isEmpty());
        Assert.assertEquals(54 + 1, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testReadCommittedWithCompactedTopic() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new StringDeserializer(), new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        appendTransactionalRecords(allocate, 3L, 3L, new SimpleRecord("3".getBytes(), "value".getBytes()), new SimpleRecord("4".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 2L, 15L, new SimpleRecord("15".getBytes(), "value".getBytes()), new SimpleRecord("16".getBytes(), "value".getBytes()), new SimpleRecord("17".getBytes(), "value".getBytes()));
        appendTransactionalRecords(allocate, 1L, 22L, new SimpleRecord("22".getBytes(), "value".getBytes()), new SimpleRecord("23".getBytes(), "value".getBytes()));
        abortTransaction(allocate, 2L, 28L);
        appendTransactionalRecords(allocate, 3L, 30L, new SimpleRecord("30".getBytes(), "value".getBytes()), new SimpleRecord("31".getBytes(), "value".getBytes()), new SimpleRecord("32".getBytes(), "value".getBytes()));
        commitTransaction(allocate, 3L, 35L);
        appendTransactionalRecords(allocate, 1L, 39L, new SimpleRecord("39".getBytes(), "value".getBytes()), new SimpleRecord("40".getBytes(), "value".getBytes()));
        allocate.flip();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(2L, 6L));
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(MemoryRecords.readableRecords(allocate), arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertTrue(fetchedRecords.containsKey(this.tp0));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(5L, list.size());
        Assert.assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), collectRecordOffsets(list));
    }

    @Test
    public void testReturnAbortedTransactionsinUncommittedMode() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        abortTransaction(allocate, 1L, 0 + appendTransactionalRecords(allocate, 1L, 0, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes())));
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(fetchedRecords().containsKey(this.tp0));
    }

    @Test
    public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        long appendTransactionalRecords = 0 + appendTransactionalRecords(allocate, 1L, 0L, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes())) + abortTransaction(allocate, 1L, r0);
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertFalse(fetchedRecords().containsKey(this.tp0));
        Assert.assertEquals(appendTransactionalRecords, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testConsumingViaIncrementalFetchRequests() {
        buildFetcher(2);
        assignFromUser(new HashSet(Arrays.asList(this.tp0, this.tp1)));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.empty(), this.metadata.currentLeader(this.tp0)));
        this.subscriptions.seekValidated(this.tp1, new SubscriptionState.FetchPosition(1L, Optional.empty(), this.metadata.currentLeader(this.tp1)));
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 2L, 2L, 0L, (List) null, this.records));
        linkedHashMap.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, (List) null, this.emptyRecords));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, linkedHashMap, 0, 123));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Map fetchedRecords = fetchedRecords();
        Assert.assertFalse(fetchedRecords.containsKey(this.tp1));
        List list = (List) fetchedRecords.get(this.tp0);
        Assert.assertEquals(2L, list.size());
        Assert.assertEquals(3L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(1L, this.subscriptions.position(this.tp1).offset);
        Assert.assertEquals(1L, ((ConsumerRecord) list.get(0)).offset());
        Assert.assertEquals(2L, ((ConsumerRecord) list.get(1)).offset());
        Assert.assertEquals(0L, this.fetcher.sendFetches());
        Map fetchedRecords2 = fetchedRecords();
        Assert.assertFalse(fetchedRecords2.containsKey(this.tp1));
        List list2 = (List) fetchedRecords2.get(this.tp0);
        Assert.assertEquals(1L, list2.size());
        Assert.assertEquals(3L, ((ConsumerRecord) list2.get(0)).offset());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).offset);
        this.client.prepareResponse(new FetchResponse(Errors.NONE, new LinkedHashMap(), 0, 123));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(fetchedRecords().isEmpty());
        Assert.assertEquals(4L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(1L, this.subscriptions.position(this.tp1).offset);
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, 100L, 4L, 0L, (List) null, this.nextRecords));
        this.client.prepareResponse(new FetchResponse(Errors.NONE, linkedHashMap2, 0, 123));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.consumerClient.poll(this.time.timer(0L));
        Map fetchedRecords3 = fetchedRecords();
        Assert.assertFalse(fetchedRecords3.containsKey(this.tp1));
        List list3 = (List) fetchedRecords3.get(this.tp0);
        Assert.assertEquals(2L, list3.size());
        Assert.assertEquals(6L, this.subscriptions.position(this.tp0).offset);
        Assert.assertEquals(1L, this.subscriptions.position(this.tp1).offset);
        Assert.assertEquals(4L, ((ConsumerRecord) list3.get(0)).offset());
        Assert.assertEquals(5L, ((ConsumerRecord) list3.get(1)).offset());
    }

    @Test
    public void testFetcherConcurrency() throws Exception {
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 20; i++) {
            hashSet.add(new TopicPartition(this.topicName, i));
        }
        LogContext logContext = new LogContext();
        buildDependencies(new MetricConfig(), Long.MAX_VALUE, new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST), logContext);
        this.fetcher = new Fetcher<byte[], byte[]>(new LogContext(), this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, 2 * 20, true, "", new ByteArrayDeserializer(), new ByteArrayDeserializer(), this.metadata, this.subscriptions, this.metrics, this.metricsRegistry, this.time, this.retryBackoffMs, this.requestTimeoutMs, IsolationLevel.READ_UNCOMMITTED, this.apiVersions) { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.3
            protected FetchSessionHandler sessionHandler(int i2) {
                final FetchSessionHandler sessionHandler = super.sessionHandler(i2);
                if (sessionHandler == null) {
                    return null;
                }
                return new FetchSessionHandler(new LogContext(), i2) { // from class: org.apache.kafka.clients.consumer.internals.FetcherTest.3.1
                    public FetchSessionHandler.Builder newBuilder() {
                        verifySessionPartitions();
                        return sessionHandler.newBuilder();
                    }

                    public boolean handleResponse(FetchResponse<?> fetchResponse) {
                        verifySessionPartitions();
                        return sessionHandler.handleResponse(fetchResponse);
                    }

                    public void handleError(Throwable th) {
                        verifySessionPartitions();
                        sessionHandler.handleError(th);
                    }

                    private void verifySessionPartitions() {
                        try {
                            Field declaredField = FetchSessionHandler.class.getDeclaredField("sessionPartitions");
                            declaredField.setAccessible(true);
                            for (Map.Entry entry : ((LinkedHashMap) declaredField.get(sessionHandler)).entrySet()) {
                                Thread.yield();
                            }
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                };
            }
        };
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, (Map<String, Integer>) Collections.singletonMap(this.topicName, 20), (Function<TopicPartition, Integer>) topicPartition -> {
            return Integer.valueOf(this.validLeaderEpoch);
        }));
        this.fetchSize = 10000;
        assignFromUser(hashSet);
        hashSet.forEach(topicPartition2 -> {
            this.subscriptions.seek(topicPartition2, 0L);
        });
        AtomicInteger atomicInteger = new AtomicInteger(1000);
        this.executorService = Executors.newSingleThreadExecutor();
        Future submit = this.executorService.submit(() -> {
            while (atomicInteger.get() > 0) {
                synchronized (this.consumerClient) {
                    if (!this.client.requests().isEmpty()) {
                        ClientRequest peek = this.client.requests().peek();
                        FetchRequest build = peek.requestBuilder().build();
                        LinkedHashMap linkedHashMap = new LinkedHashMap();
                        for (Map.Entry entry : build.fetchData().entrySet()) {
                            TopicPartition topicPartition3 = (TopicPartition) entry.getKey();
                            long j = ((FetchRequest.PartitionData) entry.getValue()).fetchOffset;
                            linkedHashMap.put(topicPartition3, new FetchResponse.PartitionData(Errors.NONE, j + 2, j + 2, 0L, (List) null, buildRecords(j, 2, j)));
                        }
                        this.client.respondToRequest(peek, new FetchResponse(Errors.NONE, linkedHashMap, 0, 123));
                        this.consumerClient.poll(this.time.timer(0L));
                    }
                }
            }
            return Integer.valueOf(atomicInteger.get());
        });
        Map map = (Map) hashSet.stream().collect(Collectors.toMap(Function.identity(), topicPartition3 -> {
            return 0L;
        }));
        while (atomicInteger.get() > 0 && !submit.isDone()) {
            if (this.fetcher.sendFetches() == 1) {
                synchronized (this.consumerClient) {
                    this.consumerClient.poll(this.time.timer(0L));
                }
            }
            if (this.fetcher.hasCompletedFetches()) {
                Map fetchedRecords = fetchedRecords();
                if (!fetchedRecords.isEmpty()) {
                    atomicInteger.decrementAndGet();
                    fetchedRecords.forEach((topicPartition4, list) -> {
                        Assert.assertEquals(2L, list.size());
                        long longValue = ((Long) map.get(topicPartition4)).longValue();
                        Assert.assertEquals(longValue, ((ConsumerRecord) list.get(0)).offset());
                        Assert.assertEquals(longValue + 1, ((ConsumerRecord) list.get(1)).offset());
                        map.put(topicPartition4, Long.valueOf(longValue + 2));
                    });
                }
            }
        }
        Assert.assertEquals(0, submit.get());
    }

    @Test
    public void testFetcherSessionEpochUpdate() throws Exception {
        buildFetcher(2);
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap(this.topicName, 1)));
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        AtomicInteger atomicInteger = new AtomicInteger(1000);
        this.executorService = Executors.newSingleThreadExecutor();
        Future submit = this.executorService.submit(() -> {
            long j = 0;
            long j2 = 0;
            while (atomicInteger.get() > 0) {
                synchronized (this.consumerClient) {
                    if (!this.client.requests().isEmpty()) {
                        ClientRequest peek = this.client.requests().peek();
                        int epoch = peek.requestBuilder().build().metadata().epoch();
                        Assert.assertTrue(String.format("Unexpected epoch expected %d got %d", Long.valueOf(j2), Integer.valueOf(epoch)), epoch == 0 || ((long) epoch) == j2);
                        j2++;
                        LinkedHashMap linkedHashMap = new LinkedHashMap();
                        linkedHashMap.put(this.tp0, new FetchResponse.PartitionData(Errors.NONE, j + 2, j + 2, 0L, (List) null, buildRecords(j, 2, j)));
                        j += 2;
                        this.client.respondToRequest(peek, new FetchResponse(Errors.NONE, linkedHashMap, 0, 123));
                        this.consumerClient.poll(this.time.timer(0L));
                    }
                }
            }
            return Integer.valueOf(atomicInteger.get());
        });
        long j = 0;
        while (atomicInteger.get() > 0 && !submit.isDone()) {
            if (this.fetcher.sendFetches() == 1) {
                synchronized (this.consumerClient) {
                    this.consumerClient.poll(this.time.timer(0L));
                }
            }
            if (this.fetcher.hasCompletedFetches()) {
                Map fetchedRecords = fetchedRecords();
                if (!fetchedRecords.isEmpty()) {
                    atomicInteger.decrementAndGet();
                    List list = (List) fetchedRecords.get(this.tp0);
                    Assert.assertEquals(2L, list.size());
                    Assert.assertEquals(j, ((ConsumerRecord) list.get(0)).offset());
                    Assert.assertEquals(j + 1, ((ConsumerRecord) list.get(1)).offset());
                    j += 2;
                }
                Assert.assertTrue(fetchedRecords().isEmpty());
            }
        }
        Assert.assertEquals(0, submit.get());
    }

    @Test
    public void testEmptyControlBatch() {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        DefaultRecordBatch.writeEmptyHeader(allocate, (byte) 2, 1L, (short) 0, -1, 0L, 0L, -1, TimestampType.CREATE_TIME, this.time.milliseconds(), true, true);
        commitTransaction(allocate, 1L, 1 + appendTransactionalRecords(allocate, 1L, 1, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes())));
        allocate.flip();
        ArrayList arrayList = new ArrayList();
        MemoryRecords readableRecords = MemoryRecords.readableRecords(allocate);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(abstractRequest -> {
            Assert.assertEquals(IsolationLevel.READ_COMMITTED, ((FetchRequest) abstractRequest).isolationLevel());
            return true;
        }, (AbstractResponse) fullFetchResponseWithAbortedTransactions(readableRecords, arrayList, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(fetchedRecords().containsKey(this.tp0));
        Assert.assertEquals(((List) r0.get(this.tp0)).size(), 2L);
    }

    private MemoryRecords buildRecords(long j, int i, long j2) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, j);
        for (int i2 = 0; i2 < i; i2++) {
            builder.append(0L, "key".getBytes(), ("value-" + (j2 + i2)).getBytes());
        }
        return builder.build();
    }

    private int appendTransactionalRecords(ByteBuffer byteBuffer, long j, long j2, int i, SimpleRecord... simpleRecordArr) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(byteBuffer, (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, j2, this.time.milliseconds(), j, (short) 0, i, true, -1);
        for (SimpleRecord simpleRecord : simpleRecordArr) {
            builder.append(simpleRecord);
        }
        builder.build();
        return simpleRecordArr.length;
    }

    private int appendTransactionalRecords(ByteBuffer byteBuffer, long j, long j2, SimpleRecord... simpleRecordArr) {
        return appendTransactionalRecords(byteBuffer, j, j2, (int) j2, simpleRecordArr);
    }

    private void commitTransaction(ByteBuffer byteBuffer, long j, long j2) {
        MemoryRecords.writeEndTransactionalMarker(byteBuffer, j2, this.time.milliseconds(), 0, j, (short) 0, new EndTransactionMarker(ControlRecordType.COMMIT, 0));
    }

    private int abortTransaction(ByteBuffer byteBuffer, long j, long j2) {
        MemoryRecords.writeEndTransactionalMarker(byteBuffer, j2, this.time.milliseconds(), 0, j, (short) 0, new EndTransactionMarker(ControlRecordType.ABORT, 0));
        return 1;
    }

    private void testGetOffsetsForTimesWithError(Errors errors, Errors errors2, long j, long j2, Long l, Long l2) {
        this.client.reset();
        TopicPartition topicPartition = new TopicPartition("topic2", 0);
        this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.USE_ALL_DNS_IPS));
        HashMap hashMap = new HashMap();
        hashMap.put(this.topicName, 2);
        hashMap.put("topic2", 1);
        MetadataResponse metadataUpdateWith = TestUtils.metadataUpdateWith(2, hashMap);
        Cluster cluster = metadataUpdateWith.cluster();
        this.client.prepareMetadataUpdate(metadataUpdateWith, true);
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, errors, j, j), cluster.leaderFor(topicPartition));
        this.client.prepareResponseFrom(listOffsetResponse(this.tp1, errors2, j2, j2), cluster.leaderFor(this.tp1));
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, Errors.NONE, j, j), cluster.leaderFor(topicPartition));
        this.client.prepareResponseFrom(listOffsetResponse(this.tp1, Errors.NONE, j2, j2), cluster.leaderFor(this.tp1));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicPartition, 0L);
        hashMap2.put(this.tp1, 0L);
        Map offsetsForTimes = this.fetcher.offsetsForTimes(hashMap2, this.time.timer(Long.MAX_VALUE));
        if (l == null) {
            Assert.assertNull(offsetsForTimes.get(topicPartition));
        } else {
            Assert.assertEquals(l.longValue(), ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp());
            Assert.assertEquals(l.longValue(), ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset());
        }
        if (l2 == null) {
            Assert.assertNull(offsetsForTimes.get(this.tp1));
        } else {
            Assert.assertEquals(l2.longValue(), ((OffsetAndTimestamp) offsetsForTimes.get(this.tp1)).timestamp());
            Assert.assertEquals(l2.longValue(), ((OffsetAndTimestamp) offsetsForTimes.get(this.tp1)).offset());
        }
    }

    private void testGetOffsetsForTimesWithUnknownOffset() {
        this.client.reset();
        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap(this.topicName, 1)));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new ListOffsetResponse.PartitionData(Errors.NONE, -1L, -1L, Optional.empty()));
        this.client.prepareResponseFrom(new ListOffsetResponse(0, hashMap), this.metadata.fetch().leaderFor(this.tp0));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, 0L);
        Map offsetsForTimes = this.fetcher.offsetsForTimes(hashMap2, this.time.timer(Long.MAX_VALUE));
        Assert.assertTrue(offsetsForTimes.containsKey(this.tp0));
        Assert.assertNull(offsetsForTimes.get(this.tp0));
    }

    @Test
    public void testSubscriptionPositionUpdatedWithEpoch() {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, -1L, -1L, (short) -1, -1, false, 1);
        builder.appendWithOffset(0L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(1L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords build = builder.build();
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }), false, 0L);
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, build, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(fetchedRecords().containsKey(this.tp0));
        Assert.assertEquals(this.subscriptions.position(this.tp0).offset, 3L);
        TestUtils.assertOptional(this.subscriptions.position(this.tp0).offsetEpoch, num -> {
            Assert.assertEquals(num.intValue(), 1L);
        });
    }

    @Test
    public void testOffsetValidationRequestGrouping() {
        buildFetcher();
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1, this.tp2, this.tp3}));
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 3, (Map<String, Errors>) Collections.emptyMap(), (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return 5;
        }), false, 0L);
        for (TopicPartition topicPartition2 : this.subscriptions.assignedPartitions()) {
            this.subscriptions.seekUnvalidated(topicPartition2, new SubscriptionState.FetchPosition(0L, Optional.of(4), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(topicPartition2).leader, Optional.of(4))));
        }
        HashSet hashSet = new HashSet();
        for (Node node : this.metadata.fetch().nodes()) {
            this.apiVersions.update(node.idString(), NodeApiVersions.create());
            Set set = (Set) this.subscriptions.assignedPartitions().stream().filter(topicPartition3 -> {
                return this.metadata.currentLeader(topicPartition3).leader.equals(Optional.of(node));
            }).collect(Collectors.toSet());
            Stream stream = set.stream();
            Objects.requireNonNull(hashSet);
            Assert.assertTrue(stream.noneMatch((v1) -> {
                return r1.contains(v1);
            }));
            Assert.assertTrue(set.size() > 0);
            hashSet.addAll(set);
            this.client.prepareResponseFrom(abstractRequest -> {
                return set.equals(((OffsetsForLeaderEpochRequest) abstractRequest).epochsByTopicPartition().keySet());
            }, new OffsetsForLeaderEpochResponse((Map) set.stream().collect(Collectors.toMap(Function.identity(), topicPartition4 -> {
                return new EpochEndOffset(Errors.NONE, 4, 0L);
            }))), node);
        }
        Assert.assertEquals(this.subscriptions.assignedPartitions(), hashSet);
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Stream stream2 = this.subscriptions.assignedPartitions().stream();
        SubscriptionState subscriptionState = this.subscriptions;
        Objects.requireNonNull(subscriptionState);
        Assert.assertTrue(stream2.noneMatch(subscriptionState::awaitingValidation));
    }

    @Test
    public void testOffsetValidationAwaitsNodeApiVersion() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }), false, 0L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        Assert.assertFalse(this.client.isConnected(node.idString()));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        Assert.assertFalse(this.client.isConnected(node.idString()));
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertTrue(this.client.isConnected(node.idString()));
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, new EpochEndOffset(Errors.NONE, 1, 30L));
        this.client.prepareResponseFrom(new OffsetsForLeaderEpochResponse(hashMap2), node);
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertEquals(20L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testOffsetValidationSkippedForOldBroker() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition2 -> {
            return 2;
        }), false, 0L);
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition3 -> {
            return 2;
        }), false, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationSkippedForOldResponse() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }), false, 0L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        Assert.assertFalse(this.client.isConnected(node.idString()));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        Assert.assertFalse(this.client.isConnected(node.idString()));
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), (Map<String, Integer>) hashMap, (short) 8), false, 0L);
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationResetOffsetForUndefinedEpochWithDefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(new EpochEndOffset(-1, 0L), OffsetResetStrategy.EARLIEST);
    }

    @Test
    public void testOffsetValidationResetOffsetForUndefinedOffsetWithDefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(new EpochEndOffset(2, -1L), OffsetResetStrategy.EARLIEST);
    }

    @Test
    public void testOffsetValidationResetOffsetForUndefinedEpochWithUndefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(new EpochEndOffset(-1, 0L), OffsetResetStrategy.NONE);
    }

    @Test
    public void testOffsetValidationResetOffsetForUndefinedOffsetWithUndefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(new EpochEndOffset(2, -1L), OffsetResetStrategy.NONE);
    }

    @Test
    public void testOffsetValidationTriggerLogTruncationForBadOffsetWithUndefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(new EpochEndOffset(1, 1L), OffsetResetStrategy.NONE);
    }

    private void testOffsetValidationWithGivenEpochOffset(EpochEndOffset epochEndOffset, OffsetResetStrategy offsetResetStrategy) {
        buildFetcher(offsetResetStrategy);
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create());
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.client.respond(abstractRequest -> {
            OffsetsForLeaderEpochRequest.PartitionData partitionData = (OffsetsForLeaderEpochRequest.PartitionData) ((OffsetsForLeaderEpochRequest) abstractRequest).epochsByTopicPartition().get(this.tp0);
            return partitionData.currentLeaderEpoch.equals(Optional.of(1)) && partitionData.leaderEpoch == 1;
        }, (AbstractResponse) new OffsetsForLeaderEpochResponse(Collections.singletonMap(this.tp0, epochEndOffset)));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        if (offsetResetStrategy != OffsetResetStrategy.NONE) {
            this.fetcher.validateOffsetsIfNeeded();
            Assert.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
            return;
        }
        LogTruncationException assertThrows = Assert.assertThrows(LogTruncationException.class, () -> {
            this.fetcher.validateOffsetsIfNeeded();
        });
        Assert.assertEquals(Collections.singletonMap(this.tp0, 5L), assertThrows.offsetOutOfRangePartitions());
        if (epochEndOffset.hasUndefinedEpochOrOffset()) {
            Assert.assertEquals(Collections.emptyMap(), assertThrows.divergentOffsets());
        } else {
            Assert.assertEquals(Collections.singletonMap(this.tp0, new OffsetAndMetadata(epochEndOffset.endOffset(), Optional.of(Integer.valueOf(epochEndOffset.leaderEpoch())), "")), assertThrows.divergentOffsets());
        }
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationHandlesSeekWithInflightOffsetForLeaderRequest() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.fetcher.validateOffsetsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, Optional.of(1), leaderAndEpoch));
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        this.client.respond(abstractRequest -> {
            OffsetsForLeaderEpochRequest.PartitionData partitionData = (OffsetsForLeaderEpochRequest.PartitionData) ((OffsetsForLeaderEpochRequest) abstractRequest).epochsByTopicPartition().get(this.tp0);
            return partitionData.currentLeaderEpoch.equals(Optional.of(1)) && partitionData.leaderEpoch == 1;
        }, (AbstractResponse) new OffsetsForLeaderEpochResponse(Collections.singletonMap(this.tp0, new EpochEndOffset(0, 0L))));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationFencing() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition2 -> {
            return 2;
        }), false, 0L);
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        this.subscriptions.completeValidation(this.tp0);
        this.subscriptions.position(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(2), new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(2))));
        this.subscriptions.maybeValidatePositionForCurrentLeader(this.apiVersions, this.tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(3)));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, new EpochEndOffset(Errors.NONE, 2, 10L));
        this.client.prepareResponse(new OffsetsForLeaderEpochResponse(hashMap2));
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue("Expected validation to fail since leader epoch changed", this.subscriptions.awaitingValidation(this.tp0));
        this.fetcher.validateOffsetsIfNeeded();
        hashMap2.clear();
        hashMap2.put(this.tp0, new EpochEndOffset(Errors.NONE, 3, 10L));
        this.client.prepareResponse(new OffsetsForLeaderEpochResponse(hashMap2));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse("Expected validation to succeed with latest epoch", this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testSkipValidationForOlderApiVersion() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.apiVersions.update("0", NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 1;
        }), false, 0L);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        Assert.assertEquals(0L, this.fetcher.sendFetches());
        Assert.assertEquals(0L, this.fetcher.sendFetches());
        this.fetcher.resetOffsetIfNeeded(this.tp0, OffsetResetStrategy.LATEST, new Fetcher.ListOffsetData(100L, 1L, Optional.empty()));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
    }

    @Test
    public void testTruncationDetected() {
        MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), (byte) 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, -1L, -1L, (short) -1, -1, false, 1);
        builder.appendWithOffset(0L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(1L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(2L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords build = builder.build();
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1, (Map<String, Errors>) Collections.emptyMap(), hashMap, (Function<TopicPartition, Integer>) topicPartition -> {
            return 2;
        }), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create());
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        this.fetcher.validateOffsetsIfNeeded();
        Assert.assertEquals(0L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, new EpochEndOffset(Errors.NONE, 1, 10L));
        this.client.prepareResponse(new OffsetsForLeaderEpochResponse(hashMap2));
        this.consumerClient.pollNoWakeup();
        Assert.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, build, Errors.NONE, 100L, 0));
        this.consumerClient.pollNoWakeup();
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(fetchedRecords().containsKey(this.tp0));
        Assert.assertEquals(this.subscriptions.position(this.tp0).offset, 3L);
        TestUtils.assertOptional(this.subscriptions.position(this.tp0).offsetEpoch, num -> {
            Assert.assertEquals(num.intValue(), 1L);
        });
    }

    @Test
    public void testPreferredReadReplica() {
        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5L).toMillis());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(TestUtils.metadataUpdateWith(2, (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return Integer.valueOf(this.validLeaderEpoch);
        }));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds()).id(), -1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(1)));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertTrue(fetchedRecords().containsKey(this.tp0));
        Assert.assertEquals(this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds()).id(), 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(2)));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        fetchedRecords();
        Assert.assertEquals(this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds()).id(), -1L);
    }

    @Test
    public void testPreferredReadReplicaOffsetError() {
        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5L).toMillis());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(TestUtils.metadataUpdateWith(2, (Map<String, Integer>) Collections.singletonMap(this.topicName, 4), (Function<TopicPartition, Integer>) topicPartition -> {
            return Integer.valueOf(this.validLeaderEpoch);
        }));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.NONE, 100L, -1L, 0, Optional.of(1)));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        fetchedRecords();
        Assert.assertEquals(this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds()).id(), 1L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, 0, Optional.empty()));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        fetchedRecords();
        Assert.assertEquals(this.fetcher.selectReadReplica(this.tp0, Node.noNode(), this.time.milliseconds()).id(), -1L);
    }

    @Test
    public void testFetchCompletedBeforeHandlerAdded() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        this.fetcher.sendFetches();
        this.client.prepareResponse(fullFetchResponse(this.tp0, buildRecords(1L, 1, 1L), Errors.NONE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        fetchedRecords();
        Metadata.LeaderAndEpoch leaderAndEpoch = this.subscriptions.position(this.tp0).currentLeader;
        Assert.assertTrue(leaderAndEpoch.leader.isPresent());
        Node selectReadReplica = this.fetcher.selectReadReplica(this.tp0, (Node) leaderAndEpoch.leader.get(), this.time.milliseconds());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.setWakeupHook(() -> {
            if (atomicBoolean.getAndSet(true)) {
                return;
            }
            this.consumerClient.disconnectAsync(selectReadReplica);
            this.consumerClient.poll(this.time.timer(0L));
        });
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        this.consumerClient.disconnectAsync(selectReadReplica);
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertEquals(1L, this.fetcher.sendFetches());
    }

    @Test
    public void testCorruptMessageError() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 0L);
        Assert.assertEquals(1L, this.fetcher.sendFetches());
        Assert.assertFalse(this.fetcher.hasCompletedFetches());
        this.client.prepareResponse(fullFetchResponse(this.tp0, buildRecords(1L, 1, 1L), Errors.CORRUPT_MESSAGE, 100L, 0));
        this.consumerClient.poll(this.time.timer(0L));
        Assert.assertTrue(this.fetcher.hasCompletedFetches());
        Assert.assertThrows(KafkaException.class, this::fetchedRecords);
    }

    @Test
    public void testBeginningOffsets() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(listOffsetResponse(this.tp0, Errors.NONE, -2L, 2L));
        Assert.assertEquals(Collections.singletonMap(this.tp0, 2L), this.fetcher.beginningOffsets(Collections.singleton(this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsDuplicateTopicPartition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(listOffsetResponse(this.tp0, Errors.NONE, -2L, 2L));
        Assert.assertEquals(Collections.singletonMap(this.tp0, 2L), this.fetcher.beginningOffsets(Arrays.asList(this.tp0, this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsMultipleTopicPartitions() {
        buildFetcher();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 2L);
        hashMap.put(this.tp1, 4L);
        hashMap.put(this.tp2, 6L);
        assignFromUser(hashMap.keySet());
        this.client.prepareResponse(listOffsetResponse(Errors.NONE, -2L, hashMap));
        Assert.assertEquals(hashMap, this.fetcher.beginningOffsets(Arrays.asList(this.tp0, this.tp1, this.tp2), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsEmpty() {
        buildFetcher();
        Assert.assertEquals(Collections.emptyMap(), this.fetcher.beginningOffsets(Collections.emptyList(), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsets() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(listOffsetResponse(this.tp0, Errors.NONE, -1L, 5L));
        Assert.assertEquals(Collections.singletonMap(this.tp0, 5L), this.fetcher.endOffsets(Collections.singleton(this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsDuplicateTopicPartition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(listOffsetResponse(this.tp0, Errors.NONE, -1L, 5L));
        Assert.assertEquals(Collections.singletonMap(this.tp0, 5L), this.fetcher.endOffsets(Arrays.asList(this.tp0, this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsMultipleTopicPartitions() {
        buildFetcher();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 5L);
        hashMap.put(this.tp1, 7L);
        hashMap.put(this.tp2, 9L);
        assignFromUser(hashMap.keySet());
        this.client.prepareResponse(listOffsetResponse(Errors.NONE, -1L, hashMap));
        Assert.assertEquals(hashMap, this.fetcher.endOffsets(Arrays.asList(this.tp0, this.tp1, this.tp2), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsEmpty() {
        buildFetcher();
        Assert.assertEquals(Collections.emptyMap(), this.fetcher.endOffsets(Collections.emptyList(), this.time.timer(5000L)));
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long j) {
        return listOffsetRequestMatcher(j, Optional.empty());
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long j, Optional<Integer> optional) {
        return abstractRequest -> {
            return ((ListOffsetRequest) abstractRequest).partitionTimestamps().equals(Collections.singletonMap(this.tp0, new ListOffsetRequest.PartitionData(j, optional)));
        };
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long j, int i) {
        return abstractRequest -> {
            return ((ListOffsetRequest) abstractRequest).partitionTimestamps().equals(Collections.singletonMap(this.tp0, new ListOffsetRequest.PartitionData(j, Optional.of(Integer.valueOf(i)))));
        };
    }

    private ListOffsetResponse listOffsetResponse(Errors errors, long j, long j2) {
        return listOffsetResponse(this.tp0, errors, j, j2);
    }

    private ListOffsetResponse listOffsetResponse(TopicPartition topicPartition, Errors errors, long j, long j2) {
        return listOffsetResponse(topicPartition, errors, j, j2, null);
    }

    private ListOffsetResponse listOffsetResponse(TopicPartition topicPartition, Errors errors, long j, long j2, Integer num) {
        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(errors, j, j2, Optional.ofNullable(num));
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, partitionData);
        return new ListOffsetResponse(hashMap);
    }

    private ListOffsetResponse listOffsetResponse(Errors errors, long j, Map<TopicPartition, Long> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), new ListOffsetResponse.PartitionData(errors, j, entry.getValue().longValue(), Optional.empty()));
        }
        return new ListOffsetResponse(hashMap);
    }

    private FetchResponse<MemoryRecords> fullFetchResponseWithAbortedTransactions(MemoryRecords memoryRecords, List<FetchResponse.AbortedTransaction> list, Errors errors, long j, long j2, int i) {
        return new FetchResponse<>(Errors.NONE, new LinkedHashMap(Collections.singletonMap(this.tp0, new FetchResponse.PartitionData(errors, j2, j, 0L, list, memoryRecords))), i, 0);
    }

    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, int i) {
        return fullFetchResponse(topicPartition, memoryRecords, errors, j, -1L, i);
    }

    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, long j2, int i) {
        return new FetchResponse<>(Errors.NONE, new LinkedHashMap(Collections.singletonMap(topicPartition, new FetchResponse.PartitionData(errors, j, j2, 0L, (List) null, memoryRecords))), i, 0);
    }

    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, long j2, int i, Optional<Integer> optional) {
        return new FetchResponse<>(Errors.NONE, new LinkedHashMap(Collections.singletonMap(topicPartition, new FetchResponse.PartitionData(errors, j, j2, 0L, optional, (List) null, memoryRecords))), i, 0);
    }

    private FetchResponse<MemoryRecords> fetchResponse(TopicPartition topicPartition, MemoryRecords memoryRecords, Errors errors, long j, long j2, long j3, int i) {
        return new FetchResponse<>(Errors.NONE, new LinkedHashMap(Collections.singletonMap(topicPartition, new FetchResponse.PartitionData(errors, j, j2, j3, (List) null, memoryRecords))), i, 0);
    }

    private MetadataResponse newMetadataResponse(String str, Errors errors) {
        ArrayList arrayList = new ArrayList();
        if (errors == Errors.NONE) {
            this.initialUpdateResponse.topicMetadata().stream().filter(topicMetadata -> {
                return topicMetadata.topic().equals(str);
            }).findFirst().ifPresent(topicMetadata2 -> {
                arrayList.addAll(topicMetadata2.partitionMetadata());
            });
        }
        return MetadataResponse.prepareResponse(new ArrayList(this.initialUpdateResponse.brokers()), this.initialUpdateResponse.clusterId(), this.initialUpdateResponse.controller().id(), Collections.singletonList(new MetadataResponse.TopicMetadata(errors, str, false, arrayList)));
    }

    private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        return this.fetcher.fetchedRecords();
    }

    private void buildFetcher(int i) {
        buildFetcher(OffsetResetStrategy.EARLIEST, new ByteArrayDeserializer(), new ByteArrayDeserializer(), i, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher() {
        buildFetcher(Integer.MAX_VALUE);
    }

    private void buildFetcher(Deserializer<?> deserializer, Deserializer<?> deserializer2) {
        buildFetcher(OffsetResetStrategy.EARLIEST, deserializer, deserializer2, Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher(OffsetResetStrategy offsetResetStrategy) {
        buildFetcher(new MetricConfig(), offsetResetStrategy, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
    }

    private <K, V> void buildFetcher(OffsetResetStrategy offsetResetStrategy, Deserializer<K> deserializer, Deserializer<V> deserializer2, int i, IsolationLevel isolationLevel) {
        buildFetcher(new MetricConfig(), offsetResetStrategy, deserializer, deserializer2, i, isolationLevel);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy, Deserializer<K> deserializer, Deserializer<V> deserializer2, int i, IsolationLevel isolationLevel) {
        buildFetcher(metricConfig, offsetResetStrategy, deserializer, deserializer2, i, isolationLevel, Long.MAX_VALUE);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy, Deserializer<K> deserializer, Deserializer<V> deserializer2, int i, IsolationLevel isolationLevel, long j) {
        LogContext logContext = new LogContext();
        buildFetcher(metricConfig, deserializer, deserializer2, i, isolationLevel, j, new SubscriptionState(logContext, offsetResetStrategy), logContext);
    }

    private void buildFetcher(SubscriptionState subscriptionState, LogContext logContext) {
        buildFetcher(new MetricConfig(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED, Long.MAX_VALUE, subscriptionState, logContext);
    }

    private <K, V> void buildFetcher(MetricConfig metricConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2, int i, IsolationLevel isolationLevel, long j, SubscriptionState subscriptionState, LogContext logContext) {
        buildDependencies(metricConfig, j, subscriptionState, logContext);
        this.fetcher = new Fetcher<>(new LogContext(), this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, i, true, "", deserializer, deserializer2, this.metadata, this.subscriptions, this.metrics, this.metricsRegistry, this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel, this.apiVersions);
    }

    private void buildDependencies(MetricConfig metricConfig, long j, SubscriptionState subscriptionState, LogContext logContext) {
        this.time = new MockTime(1L);
        this.subscriptions = subscriptionState;
        this.metadata = new ConsumerMetadata(0L, j, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time) this.time, (Metadata) this.metadata);
        this.metrics = new Metrics(metricConfig, this.time);
        this.consumerClient = new ConsumerNetworkClient(logContext, this.client, this.metadata, this.time, 100L, 1000, Integer.MAX_VALUE);
        this.metricsRegistry = new FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumer" + this.groupId);
    }

    private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> list) {
        return (List) list.stream().map((v0) -> {
            return v0.offset();
        }).collect(Collectors.toList());
    }
}
