package org.apache.kafka.clients.producer.internals;

import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/SenderTest.class */
public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 1048576;
    private static final short ACKS_ALL = -1;
    private static final String CLIENT_ID = "clientId";
    private static final double EPS = 1.0E-4d;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private TopicPartition tp0 = new TopicPartition("test", 0);
    private TopicPartition tp1 = new TopicPartition("test", 1);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private int batchSize = 16384;
    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
    private ApiVersions apiVersions = new ApiVersions();
    private Cluster cluster = TestUtils.singletonCluster("test", 2);
    private Metrics metrics = null;
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private SenderMetricsRegistry senderMetricsRegistry = null;
    private final LogContext logContext = new LogContext();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/SenderTest$OffsetAndError.class */
    public class OffsetAndError {
        long offset;
        Errors error;

        OffsetAndError(long j, Errors errors) {
            this.offset = j;
            this.error = errors;
        }
    }

    @Before
    public void setup() {
        this.client.setNode((Node) this.cluster.nodes().get(0));
        setupWithTransactionState(null);
    }

    @After
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("We should have a single produce request in flight.", 1L, this.client.inFlightRequestCount());
        Assert.assertTrue(this.client.hasInFlightRequests());
        this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("All requests completed.", 0L, this.client.inFlightRequestCount());
        Assert.assertFalse(this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Request should be completed", futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
    }

    @Test
    public void testMessageFormatDownConversion() throws Exception {
        this.apiVersions.update("0", NodeApiVersions.create());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create(Collections.singleton(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.1
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                MemoryRecords memoryRecords;
                ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
                return produceRequest.version() == 2 && (memoryRecords = (MemoryRecords) produceRequest.partitionRecordsOrFail().get(SenderTest.this.tp0)) != null && memoryRecords.sizeInBytes() > 0 && memoryRecords.hasMatchingMagic((byte) 1);
            }
        }, (AbstractResponse) produceResponse(this.tp0, 0L, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Request should be completed", futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
    }

    @Test
    public void testDownConversionForMismatchedMagicValues() throws Exception {
        this.apiVersions.update("0", NodeApiVersions.create());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create(Collections.singleton(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp1, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.apiVersions.update("0", NodeApiVersions.create());
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.NONE, 0L, -1L, 100L);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, partitionResponse);
        hashMap.put(this.tp1, partitionResponse);
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.2
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
                if (produceRequest.version() != 2) {
                    return false;
                }
                Map partitionRecordsOrFail = produceRequest.partitionRecordsOrFail();
                if (partitionRecordsOrFail.size() != 2) {
                    return false;
                }
                for (MemoryRecords memoryRecords : partitionRecordsOrFail.values()) {
                    if (memoryRecords == null || memoryRecords.sizeInBytes() == 0 || !memoryRecords.hasMatchingMagic((byte) 1)) {
                        return false;
                    }
                }
                return true;
            }
        }, new ProduceResponse(hashMap, 0));
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Request should be completed", futureRecordMetadata.isDone());
        Assert.assertTrue("Request should be completed", futureRecordMetadata2.isDone());
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        MockSelector mockSelector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
        Node node = (Node) TestUtils.singletonCluster("test", 1).nodes().get(0);
        NetworkClient networkClient = new NetworkClient(mockSelector, this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, this.time, true, new ApiVersions(), throttleTimeSensor, this.logContext);
        mockSelector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), ApiVersionsResponse.createApiVersionsResponse(400, (byte) 2).serialize(ApiKeys.API_VERSIONS.latestVersion(), new ResponseHeader(0)))));
        while (!networkClient.ready(node, this.time.milliseconds())) {
            networkClient.poll(1L, this.time.milliseconds());
        }
        mockSelector.clear();
        for (int i = 1; i <= 3; i++) {
            ClientRequest newClientRequest = networkClient.newClientRequest(node.idString(), ProduceRequest.Builder.forCurrentMagic((short) 1, 1000, Collections.emptyMap()), this.time.milliseconds(), true, (RequestCompletionHandler) null);
            networkClient.send(newClientRequest, this.time.milliseconds());
            networkClient.poll(1L, this.time.milliseconds());
            mockSelector.completeReceive(new NetworkReceive(node.idString(), produceResponse(this.tp0, i, Errors.NONE, 100 * i).serialize(ApiKeys.PRODUCE.latestVersion(), new ResponseHeader(newClientRequest.correlationId()))));
            networkClient.poll(1L, this.time.milliseconds());
            mockSelector.clear();
        }
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.senderMetricsRegistry.produceThrottleTimeAvg);
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.senderMetricsRegistry.produceThrottleTimeMax);
        Assert.assertEquals(250.0d, kafkaMetric.value(), EPS);
        Assert.assertEquals(400.0d, kafkaMetric2.value(), EPS);
        networkClient.close();
    }

    @Test
    public void testSenderMetricsTemplates() throws Exception {
        this.metrics.close();
        this.metrics = new Metrics(new MetricConfig().tags(Collections.singletonMap("client-id", "clientA")));
        SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, 1000, 50L, (TransactionManager) null, this.apiVersions);
        this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L);
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.run(this.time.milliseconds());
        Sender.throttleTimeSensor(senderMetricsRegistry);
        HashSet hashSet = new HashSet();
        for (MetricName metricName : this.metrics.metrics().keySet()) {
            if (!metricName.group().equals("kafka-metrics-count")) {
                hashSet.add(new MetricNameTemplate(metricName.name(), metricName.group(), "", metricName.tags().keySet()));
            }
        }
        TestUtils.checkEquals(hashSet, new HashSet(senderMetricsRegistry.allTemplates()), "metrics", "templates");
    }

    @Test
    public void testRetries() throws Exception {
        Metrics metrics = new Metrics();
        try {
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, new SenderMetricsRegistry(metrics), this.time, 1000, 50L, (TransactionManager) null, this.apiVersions);
            FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String destination = this.client.requests().peek().destination();
            Node node = new Node(Integer.parseInt(destination), "localhost", 0);
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue(this.client.hasInFlightRequests());
            Assert.assertTrue("Client ready status should be true", this.client.isReady(node, 0L));
            this.client.disconnect(destination);
            Assert.assertEquals(0L, this.client.inFlightRequestCount());
            Assert.assertFalse(this.client.hasInFlightRequests());
            Assert.assertFalse("Client ready status should be false", this.client.isReady(node, 0L));
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue(this.client.hasInFlightRequests());
            this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
            sender.run(this.time.milliseconds());
            Assert.assertTrue("Request should have retried and completed", futureRecordMetadata.isDone());
            Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
            FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
            sender.run(this.time.milliseconds());
            for (int i = 0; i < 1 + 1; i++) {
                this.client.disconnect(this.client.requests().peek().destination());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
            }
            sender.run(this.time.milliseconds());
            assertFutureFailure(futureRecordMetadata2, NetworkException.class);
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    @Test
    public void testSendInOrder() throws Exception {
        Metrics metrics = new Metrics();
        try {
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 1, new SenderMetricsRegistry(metrics), this.time, 1000, 50L, (TransactionManager) null, this.apiVersions);
            this.metadata.update(TestUtils.clusterWith(2, "test", 2), Collections.emptySet(), this.time.milliseconds());
            TopicPartition topicPartition = new TopicPartition("test", 1);
            this.accumulator.append(topicPartition, 0L, "key1".getBytes(), "value1".getBytes(), (Header[]) null, (Callback) null, 1000L);
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String destination = this.client.requests().peek().destination();
            Assert.assertEquals(ApiKeys.PRODUCE, this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.parseInt(destination), "localhost", 0);
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue(this.client.hasInFlightRequests());
            Assert.assertTrue("Client ready status should be true", this.client.isReady(node, 0L));
            this.time.sleep(900L);
            this.accumulator.append(topicPartition, 0L, "key2".getBytes(), "value2".getBytes(), (Header[]) null, (Callback) null, 1000L);
            this.metadata.update(TestUtils.singletonCluster("test", 2), Collections.emptySet(), this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue(this.client.hasInFlightRequests());
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    @Test
    public void testAppendInExpiryCallback() throws InterruptedException {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference();
        final byte[] bytes = "key".getBytes();
        final byte[] bytes2 = "value".getBytes();
        Callback callback = new Callback() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.3
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (!(exc instanceof TimeoutException)) {
                    if (exc != null) {
                        atomicReference.compareAndSet(null, exc);
                    }
                } else {
                    atomicInteger.incrementAndGet();
                    try {
                        SenderTest.this.accumulator.append(SenderTest.this.tp1, 0L, bytes, bytes2, Record.EMPTY_HEADERS, (Callback) null, 1000L);
                    } catch (InterruptedException e) {
                        throw new RuntimeException("Unexpected interruption", e);
                    }
                }
            }
        };
        for (int i = 0; i < 10; i++) {
            this.accumulator.append(this.tp1, 0L, bytes, bytes2, (Header[]) null, callback, 1000L);
        }
        this.time.sleep(10000L);
        Node node = (Node) this.cluster.nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 100L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("Callbacks not invoked for expiry", 10, atomicInteger.get());
        Assert.assertNull("Unexpected exception", atomicReference.get());
        Assert.assertTrue(this.accumulator.batches().containsKey(this.tp1));
        Assert.assertEquals(1L, ((Deque) this.accumulator.batches().get(this.tp1)).size());
        Assert.assertEquals(10, ((ProducerBatch) ((Deque) this.accumulator.batches().get(this.tp1)).peekFirst()).recordCount);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.kafka.clients.MockClient] */
    /* JADX WARN: Type inference failed for: r4v3, types: [long, org.apache.kafka.clients.MockClient] */
    @Test
    public void testMetadataTopicExpiry() throws Exception {
        ?? r0 = 0;
        this.metadata.update(Cluster.empty(), Collections.emptySet(), this.time.milliseconds());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Topic not added to metadata", this.metadata.containsTopic(this.tp0.topic()));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        MockClient mockClient = this.client;
        ?? r4 = 0 + 1;
        r0.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("Request completed.", 0L, this.client.inFlightRequestCount());
        Assert.assertFalse(this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Request should be completed", futureRecordMetadata.isDone());
        Assert.assertTrue("Topic not retained in metadata list", this.metadata.containsTopic(this.tp0.topic()));
        this.time.sleep(300000L);
        this.metadata.update(Cluster.empty(), Collections.emptySet(), this.time.milliseconds());
        Assert.assertFalse("Unused topic has not been expired", this.metadata.containsTopic(this.tp0.topic()));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Topic not added to metadata", this.metadata.containsTopic(this.tp0.topic()));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        MockClient mockClient2 = this.client;
        long j = r4 + 1;
        r4.respond(produceResponse(this.tp0, r4, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals("Request completed.", 0L, this.client.inFlightRequestCount());
        Assert.assertFalse(this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue("Request should be completed", futureRecordMetadata2.isDone());
    }

    @Test
    public void testInitProducerIdRequest() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(343434L, transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals(0L, transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        prepareAndReceiveInitProducerId(343434L, Errors.CLUSTER_AUTHORIZATION_FAILED);
        Assert.assertFalse(transactionManager.hasProducerId());
        Assert.assertTrue(transactionManager.hasError());
        Assert.assertTrue(transactionManager.lastError() instanceof ClusterAuthorizationException);
        assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testCanRetryWithoutIdempotence() throws Exception {
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Node node = new Node(Integer.parseInt(this.client.requests().peek().destination()), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertTrue(this.client.hasInFlightRequests());
        Assert.assertTrue("Client ready status should be true", this.client.isReady(node, 0L));
        Assert.assertFalse(futureRecordMetadata.isDone());
        this.client.respond(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.4
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                Assert.assertFalse(((ProduceRequest) abstractRequest).isIdempotent());
                return true;
            }
        }, (AbstractResponse) produceResponse(this.tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        try {
            futureRecordMetadata.get();
        } catch (Exception e) {
            Assert.assertTrue(e.getCause() instanceof TopicAuthorizationException);
        }
    }

    @Test
    public void testIdempotenceWithMultipleInflights() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertEquals(1L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
    }

    @Test
    public void testIdempotenceWithMultipleInflightsRetriedInOrder() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        FutureRecordMetadata futureRecordMetadata3 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(3L, this.client.inFlightRequestCount());
        Assert.assertEquals(3L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertFalse(futureRecordMetadata3.isDone());
        Assert.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        sendIdempotentProducerResponse(0, this.tp0, Errors.LEADER_NOT_AVAILABLE, -1L);
        this.sender.run(this.time.milliseconds());
        FutureRecordMetadata futureRecordMetadata4 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.run(this.time.milliseconds());
        sendIdempotentProducerResponse(2, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(3L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
        Assert.assertFalse(this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertEquals(1L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
        Assert.assertFalse(this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(2, this.tp0, Errors.NONE, 2L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue(futureRecordMetadata3.isDone());
        Assert.assertEquals(2L, ((RecordMetadata) futureRecordMetadata3.get()).offset());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(3, this.tp0, Errors.NONE, 3L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(3L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue(futureRecordMetadata4.isDone());
        Assert.assertEquals(3L, ((RecordMetadata) futureRecordMetadata4.get()).offset());
    }

    @Test
    public void testIdempotenceWithMultipleInflightsWhereFirstFailsFatallyAndSequenceOfFutureBatchesIsAdjusted() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        sendIdempotentProducerResponse(0, this.tp0, Errors.MESSAGE_TOO_LARGE, -1L);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata, RecordTooLargeException.class);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
    }

    @Test
    public void testMustNotRetryOutOfOrderSequenceForNextBatch() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L);
        this.sender.run(this.time.milliseconds());
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.run(this.time.milliseconds());
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(3L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        sendIdempotentProducerResponse(2, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata2, OutOfOrderSequenceException.class);
    }

    @Test
    public void testCorrectHandlingOfOutOfOrderResponses() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        ClientRequest peek = this.client.requests().peek();
        this.client.respondToRequest((ClientRequest) this.client.requests().toArray()[1], produceResponse(this.tp0, -1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, ACKS_ALL));
        this.sender.run(this.time.milliseconds());
        Deque deque = (Deque) this.accumulator.batches().get(this.tp0);
        Assert.assertEquals(1L, deque.size());
        Assert.assertEquals(1L, ((ProducerBatch) deque.peekFirst()).baseSequence());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(peek, produceResponse(this.tp0, -1L, Errors.NOT_LEADER_FOR_PARTITION, ACKS_ALL));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, deque.size());
        Assert.assertEquals(0L, ((ProducerBatch) deque.peekFirst()).baseSequence());
        Assert.assertEquals(1L, ((ProducerBatch) deque.peekLast()).baseSequence());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertEquals(1L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
    }

    @Test
    public void testCorrectHandlingOfOutOfOrderResponsesWhenSecondSucceeds() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        ClientRequest peek = this.client.requests().peek();
        this.client.respondToRequest((ClientRequest) this.client.requests().toArray()[1], produceResponse(this.tp0, 1L, Errors.NONE, 1));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertEquals(1L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
        Assert.assertFalse(futureRecordMetadata.isDone());
        Deque deque = (Deque) this.accumulator.batches().get(this.tp0);
        Assert.assertEquals(0L, deque.size());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(peek, produceResponse(this.tp0, -1L, Errors.REQUEST_TIMED_OUT, ACKS_ALL));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, deque.size());
        Assert.assertEquals(0L, ((ProducerBatch) deque.peekFirst()).baseSequence());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(0L, deque.size());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
    }

    @Test
    public void testExpiryOfUnsentBatchesShouldNotCauseUnresolvedSequences() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        Node node = (Node) this.cluster.nodes().get(0);
        this.time.sleep(10000L);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 10L);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata, TimeoutException.class);
        Assert.assertFalse(transactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(0, this.tp0, Errors.REQUEST_TIMED_OUT, -1L);
        this.sender.run(this.time.milliseconds());
        Node node = (Node) this.cluster.nodes().get(0);
        this.time.sleep(10000L);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 10L);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata, TimeoutException.class);
        Assert.assertTrue(transactionManager.hasUnresolvedSequence(this.tp0));
        FutureRecordMetadata futureRecordMetadata3 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.time.sleep(20L);
        Assert.assertFalse(futureRecordMetadata2.isDone());
        this.sender.run(this.time.milliseconds());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertEquals(1L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
        Deque deque = (Deque) this.accumulator.batches().get(this.tp0);
        Assert.assertEquals(1L, deque.size());
        Assert.assertFalse(((ProducerBatch) deque.peekFirst()).hasSequence());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertTrue(transactionManager.hasUnresolvedSequence(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(transactionManager.hasUnresolvedSequence(this.tp0));
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, deque.size());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertFalse(futureRecordMetadata3.isDone());
    }

    @Test
    public void testExpiryOfFirstBatchShouldCauseResetIfFutureBatchesFail() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_FOR_PARTITION, -1L);
        this.sender.run(this.time.milliseconds());
        Node node = (Node) this.cluster.nodes().get(0);
        this.time.sleep(10000L);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 10L);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata, TimeoutException.class);
        Assert.assertTrue(transactionManager.hasUnresolvedSequence(this.tp0));
        FutureRecordMetadata futureRecordMetadata3 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.time.sleep(20L);
        Assert.assertFalse(futureRecordMetadata2.isDone());
        this.sender.run(this.time.milliseconds());
        sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1L);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata2, OutOfOrderSequenceException.class);
        Deque deque = (Deque) this.accumulator.batches().get(this.tp0);
        Assert.assertEquals(1L, deque.size());
        Assert.assertFalse(((ProducerBatch) deque.peekFirst()).hasSequence());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertFalse(transactionManager.hasProducerId());
        Assert.assertFalse(transactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, 0L, "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_FOR_PARTITION, -1L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Node node = (Node) this.cluster.nodes().get(0);
        this.time.sleep(10000L);
        this.client.disconnect(node.idString());
        this.client.blackout(node, 10L);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata, TimeoutException.class);
        Assert.assertTrue(transactionManager.hasUnresolvedSequence(this.tp0));
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertEquals(0L, ((Deque) this.accumulator.batches().get(this.tp0)).size());
        Assert.assertTrue(transactionManager.hasProducerId(343434L));
        prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId(343435L));
    }

    @Test
    public void testResetOfProducerStateShouldAllowQueuedBatchesToDrain() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, (short) 0));
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_FOR_PARTITION));
        linkedHashMap.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond(produceResponse(linkedHashMap));
        sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
        prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        Assert.assertEquals(343435L, transactionManager.producerIdAndEpoch().producerId);
        sender.run(this.time.milliseconds());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        this.client.respond(produceResponse(this.tp1, 10L, Errors.NONE, ACKS_ALL));
        sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertEquals(10L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp1).longValue());
    }

    @Test
    public void testBatchesDrainedWithOldProducerIdShouldFailWithOutOfOrderSequenceOnSubsequentRetry() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, (short) 0));
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_FOR_PARTITION));
        linkedHashMap.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond(produceResponse(linkedHashMap));
        sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
        prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        Assert.assertEquals(343435L, transactionManager.producerIdAndEpoch().producerId);
        sender.run(this.time.milliseconds());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        this.client.respond(produceResponse(this.tp1, 0L, Errors.NOT_LEADER_FOR_PARTITION, ACKS_ALL));
        sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        try {
            futureRecordMetadata2.get();
            Assert.fail("Should have raised an OutOfOrderSequenceException");
        } catch (Exception e) {
            Assert.assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
        }
    }

    @Test
    public void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        ClientRequest peek = this.client.requests().peek();
        this.client.respondToRequest((ClientRequest) this.client.requests().toArray()[1], produceResponse(this.tp0, 1000L, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1000L, transactionManager.lastAckedOffset(this.tp0));
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(peek, produceResponse(this.tp0, -1L, Errors.DUPLICATE_SEQUENCE_NUMBER, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(1000L, transactionManager.lastAckedOffset(this.tp0));
        Assert.assertFalse(this.client.hasInFlightRequests());
        RecordMetadata recordMetadata = (RecordMetadata) futureRecordMetadata.get();
        Assert.assertFalse(recordMetadata.hasOffset());
        Assert.assertEquals(-1L, recordMetadata.offset());
    }

    @Test
    public void testUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(1000L, ((RecordMetadata) futureRecordMetadata.get()).offset());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(1000L, transactionManager.lastAckedOffset(this.tp0));
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L);
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(3L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertFalse(this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertEquals(1012L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
        Assert.assertEquals(1012L, transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testUnknownProducerErrorShouldBeRetriedWhenLogStartOffsetIsUnknown() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(1000L, ((RecordMetadata) futureRecordMetadata.get()).offset());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(1000L, transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, -1L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertFalse(this.client.hasInFlightRequests());
        this.sender.run(this.time.milliseconds());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertEquals(1011L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
        Assert.assertEquals(1011L, transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testUnknownProducerErrorShouldBeRetriedForFutureBatchesWhenFirstFails() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(1000L, ((RecordMetadata) futureRecordMetadata.get()).offset());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(1000L, transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata futureRecordMetadata3 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(3L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertFalse(futureRecordMetadata3.isDone());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        Assert.assertFalse(futureRecordMetadata3.isDone());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(2, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertFalse(futureRecordMetadata3.isDone());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(1011L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
        Assert.assertEquals(1011L, transactionManager.lastAckedOffset(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1012L, 1010L);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.client.hasInFlightRequests());
        Assert.assertTrue(futureRecordMetadata3.isDone());
        Assert.assertEquals(1012L, ((RecordMetadata) futureRecordMetadata3.get()).offset());
        Assert.assertEquals(1012L, transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testShouldRaiseOutOfOrderSequenceExceptionToUserIfLogWasNotTruncated() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(-1L, transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(1000L, ((RecordMetadata) futureRecordMetadata.get()).offset());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(1000L, transactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertFalse(futureRecordMetadata2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata2, OutOfOrderSequenceException.class);
    }

    void sendIdempotentProducerResponse(int i, TopicPartition topicPartition, Errors errors, long j) {
        sendIdempotentProducerResponse(i, topicPartition, errors, j, -1L);
    }

    void sendIdempotentProducerResponse(final int i, TopicPartition topicPartition, Errors errors, long j, long j2) {
        this.client.respond(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.5
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
                Assert.assertTrue(produceRequest.isIdempotent());
                Iterator it = ((MemoryRecords) produceRequest.partitionRecordsOrFail().get(SenderTest.this.tp0)).batches().iterator();
                RecordBatch recordBatch = (RecordBatch) it.next();
                Assert.assertFalse(it.hasNext());
                Assert.assertEquals(i, recordBatch.baseSequence());
                return true;
            }
        }, (AbstractResponse) produceResponse(topicPartition, j, errors, 0, j2));
    }

    @Test
    public void testClusterAuthorizationExceptionInProduceRequest() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.6
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return (abstractRequest instanceof ProduceRequest) && ((ProduceRequest) abstractRequest).isIdempotent();
            }
        }, (AbstractResponse) produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata, ClusterAuthorizationException.class);
        Assert.assertTrue(transactionManager.hasFatalError());
        assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testCancelInFlightRequestAfterFatalError() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        this.client.respond(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.7
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return (abstractRequest instanceof ProduceRequest) && ((ProduceRequest) abstractRequest).isIdempotent();
            }
        }, (AbstractResponse) produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(transactionManager.hasFatalError());
        assertFutureFailure(futureRecordMetadata, ClusterAuthorizationException.class);
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata2, ClusterAuthorizationException.class);
        this.client.respond(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.8
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return (abstractRequest instanceof ProduceRequest) && ((ProduceRequest) abstractRequest).isIdempotent();
            }
        }, (AbstractResponse) produceResponse(this.tp1, 0L, Errors.NONE, 0));
        this.sender.run(this.time.milliseconds());
    }

    @Test
    public void testUnsupportedForMessageFormatInProduceRequest() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.9
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return (abstractRequest instanceof ProduceRequest) && ((ProduceRequest) abstractRequest).isIdempotent();
            }
        }, (AbstractResponse) produceResponse(this.tp0, -1L, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata, UnsupportedForMessageFormatException.class);
        Assert.assertFalse(transactionManager.hasError());
    }

    @Test
    public void testUnsupportedVersionInProduceRequest() throws Exception {
        TransactionManager transactionManager = new TransactionManager();
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assert.assertTrue(transactionManager.hasProducerId());
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.10
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return (abstractRequest instanceof ProduceRequest) && ((ProduceRequest) abstractRequest).isIdempotent();
            }
        });
        this.sender.run(this.time.milliseconds());
        assertFutureFailure(futureRecordMetadata, UnsupportedVersionException.class);
        Assert.assertTrue(transactionManager.hasFatalError());
        assertSendFailure(UnsupportedVersionException.class);
    }

    @Test
    public void testSequenceNumberIncrement() throws InterruptedException {
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, (short) 0));
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.11
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                if (!(abstractRequest instanceof ProduceRequest)) {
                    return false;
                }
                Iterator it = ((MemoryRecords) ((ProduceRequest) abstractRequest).partitionRecordsOrFail().get(SenderTest.this.tp0)).batches().iterator();
                Assert.assertTrue(it.hasNext());
                RecordBatch recordBatch = (RecordBatch) it.next();
                Assert.assertFalse(it.hasNext());
                Assert.assertEquals(0L, recordBatch.baseSequence());
                Assert.assertEquals(343434L, recordBatch.producerId());
                Assert.assertEquals(0L, recordBatch.producerEpoch());
                return true;
            }
        }, (AbstractResponse) produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(0L, transactionManager.lastAckedSequence(this.tp0));
        Assert.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testAbortRetryWhenProducerIdChanges() throws InterruptedException {
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, (short) 0));
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        Metrics metrics = new Metrics();
        SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, senderMetricsRegistry, this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        String destination = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(destination).intValue(), "localhost", 0);
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        Assert.assertTrue("Client ready status should be true", this.client.isReady(node, 0L));
        this.client.disconnect(destination);
        Assert.assertEquals(0L, this.client.inFlightRequestCount());
        Assert.assertFalse("Client ready status should be false", this.client.isReady(node, 0L));
        transactionManager.resetProducerId();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343435L, (short) 0));
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals("Expected requests to be aborted after pid change", 0L, this.client.inFlightRequestCount());
        Assert.assertTrue("Expected non-zero value for record send errors", ((KafkaMetric) metrics.metrics().get(senderMetricsRegistry.recordErrorRate)).value() > 0.0d);
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(343434L, (short) 0));
        setupWithTransactionState(transactionManager);
        this.client.setNode(new Node(1, "localhost", 33343));
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, 1000, 50L, transactionManager, this.apiVersions);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        sender.run(this.time.milliseconds());
        sender.run(this.time.milliseconds());
        Assert.assertEquals(1L, this.client.inFlightRequestCount());
        this.client.respond(produceResponse(this.tp0, 0L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
        sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId());
    }

    @Test
    public void testIdempotentSplitBatchAndSend() throws Exception {
        TopicPartition topicPartition = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager transactionManager = new TransactionManager();
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
        testSplitBatchAndSend(transactionManager, producerIdAndEpoch, topicPartition);
    }

    @Test
    public void testTransactionalSplitBatchAndSend() throws Exception {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        TopicPartition topicPartition = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager transactionManager = new TransactionManager(this.logContext, "testSplitBatchAndSend", 60000, 100L);
        setupWithTransactionState(transactionManager);
        doInitTransactions(transactionManager, producerIdAndEpoch);
        transactionManager.beginTransaction();
        transactionManager.maybeAddPartitionToTransaction(topicPartition);
        this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, Errors.NONE)));
        this.sender.run(this.time.milliseconds());
        testSplitBatchAndSend(transactionManager, producerIdAndEpoch, topicPartition);
    }

    private void testSplitBatchAndSend(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch, TopicPartition topicPartition) throws Exception {
        String str = topicPartition.topic();
        CompressionRatioEstimator.setEstimation(str, CompressionType.GZIP, 0.2f);
        Metrics metrics = new Metrics();
        try {
            this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, 1048576L, CompressionType.GZIP, 0L, 0L, metrics, this.time, new ApiVersions(), transactionManager);
            SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, 1000, 1000L, transactionManager, new ApiVersions());
            this.metadata.update(TestUtils.clusterWith(2, str, 2), Collections.emptySet(), this.time.milliseconds());
            FutureRecordMetadata futureRecordMetadata = this.accumulator.append(topicPartition, 0L, "key1".getBytes(), new byte[this.batchSize / 2], (Header[]) null, (Callback) null, 1000L).future;
            FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(topicPartition, 0L, "key2".getBytes(), new byte[this.batchSize / 2], (Header[]) null, (Callback) null, 1000L).future;
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals("The next sequence should be 2", 2L, transactionManager.sequenceNumber(topicPartition).longValue());
            String destination = this.client.requests().peek().destination();
            Assert.assertEquals(ApiKeys.PRODUCE, this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.valueOf(destination).intValue(), "localhost", 0);
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue("Client ready status should be true", this.client.isReady(node, 0L));
            HashMap hashMap = new HashMap();
            hashMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
            this.client.respond(new ProduceResponse(hashMap));
            sender.run(this.time.milliseconds());
            Assert.assertEquals("The next sequence should be 2", 2L, transactionManager.sequenceNumber(topicPartition).longValue());
            Assert.assertEquals(CompressionType.GZIP.rate - 0.005f, CompressionRatioEstimator.estimation(str, CompressionType.GZIP), 0.01d);
            sender.run(this.time.milliseconds());
            Assert.assertEquals("The next sequence number should be 2", 2L, transactionManager.sequenceNumber(topicPartition).longValue());
            Assert.assertFalse("The future shouldn't have been done.", futureRecordMetadata.isDone());
            Assert.assertFalse("The future shouldn't have been done.", futureRecordMetadata2.isDone());
            String destination2 = this.client.requests().peek().destination();
            Assert.assertEquals(ApiKeys.PRODUCE, this.client.requests().peek().requestBuilder().apiKey());
            Node node2 = new Node(Integer.valueOf(destination2).intValue(), "localhost", 0);
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue("Client ready status should be true", this.client.isReady(node2, 0L));
            hashMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
            this.client.respond(produceRequestMatcher(topicPartition, producerIdAndEpoch, 0, transactionManager.isTransactional()), (AbstractResponse) new ProduceResponse(hashMap));
            sender.run(this.time.milliseconds());
            Assert.assertTrue("The future should have been done.", futureRecordMetadata.isDone());
            Assert.assertEquals("The next sequence number should still be 2", 2L, transactionManager.sequenceNumber(topicPartition).longValue());
            Assert.assertEquals("The last ack'd sequence number should be 0", 0L, transactionManager.lastAckedSequence(topicPartition));
            Assert.assertFalse("The future shouldn't have been done.", futureRecordMetadata2.isDone());
            Assert.assertEquals("Offset of the first message should be 0", 0L, ((RecordMetadata) futureRecordMetadata.get()).offset());
            sender.run(this.time.milliseconds());
            String destination3 = this.client.requests().peek().destination();
            Assert.assertEquals(ApiKeys.PRODUCE, this.client.requests().peek().requestBuilder().apiKey());
            Node node3 = new Node(Integer.valueOf(destination3).intValue(), "localhost", 0);
            Assert.assertEquals(1L, this.client.inFlightRequestCount());
            Assert.assertTrue("Client ready status should be true", this.client.isReady(node3, 0L));
            hashMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L));
            this.client.respond(produceRequestMatcher(topicPartition, producerIdAndEpoch, 1, transactionManager.isTransactional()), (AbstractResponse) new ProduceResponse(hashMap));
            sender.run(this.time.milliseconds());
            Assert.assertTrue("The future should have been done.", futureRecordMetadata2.isDone());
            Assert.assertEquals("The next sequence number should be 2", 2L, transactionManager.sequenceNumber(topicPartition).longValue());
            Assert.assertEquals("The last ack'd sequence number should be 1", 1L, transactionManager.lastAckedSequence(topicPartition));
            Assert.assertEquals("Offset of the first message should be 1", 1L, ((RecordMetadata) futureRecordMetadata2.get()).offset());
            Assert.assertTrue("There should be no batch in the accumulator", ((Deque) this.accumulator.batches().get(topicPartition)).isEmpty());
            Assert.assertTrue("There should be a split", ((KafkaMetric) metrics.metrics().get(senderMetricsRegistry.batchSplitRate)).value() > 0.0d);
            metrics.close();
        } catch (Throwable th) {
            try {
                metrics.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private MockClient.RequestMatcher produceRequestMatcher(final TopicPartition topicPartition, final ProducerIdAndEpoch producerIdAndEpoch, final int i, final boolean z) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.12
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                MemoryRecords memoryRecords;
                if (!(abstractRequest instanceof ProduceRequest) || (memoryRecords = (MemoryRecords) ((ProduceRequest) abstractRequest).partitionRecordsOrFail().get(topicPartition)) == null) {
                    return false;
                }
                List list = TestUtils.toList(memoryRecords.batches());
                if (list.isEmpty() || list.size() > 1) {
                    return false;
                }
                MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) list.get(0);
                return mutableRecordBatch.baseOffset() == 0 && mutableRecordBatch.baseSequence() == i && mutableRecordBatch.producerId() == producerIdAndEpoch.producerId && mutableRecordBatch.producerEpoch() == producerIdAndEpoch.epoch && mutableRecordBatch.isTransactional() == z;
            }
        };
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i, long j2) {
        return new ProduceResponse(Collections.singletonMap(topicPartition, new ProduceResponse.PartitionResponse(errors, j, -1L, j2)), i);
    }

    private ProduceResponse produceResponse(Map<TopicPartition, OffsetAndError> map) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<TopicPartition, OffsetAndError> entry : map.entrySet()) {
            linkedHashMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(entry.getValue().error, entry.getValue().offset, -1L, -1L));
        }
        return new ProduceResponse(linkedHashMap);
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i) {
        return produceResponse(topicPartition, j, errors, i, -1L);
    }

    private void setupWithTransactionState(TransactionManager transactionManager) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("client-id", CLIENT_ID);
        this.metrics = new Metrics(new MetricConfig().tags(linkedHashMap), this.time);
        this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, 1048576L, CompressionType.NONE, 0L, 0L, this.metrics, this.time, this.apiVersions, transactionManager);
        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
        this.sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, Integer.MAX_VALUE, this.senderMetricsRegistry, this.time, 1000, 50L, transactionManager, this.apiVersions);
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
    }

    private void assertSendFailure(Class<? extends RuntimeException> cls) throws Exception {
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), (Header[]) null, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        try {
            futureRecordMetadata.get();
            Assert.fail("Future should have raised " + cls.getSimpleName());
        } catch (ExecutionException e) {
            Assert.assertTrue(cls.isAssignableFrom(e.getCause().getClass()));
        }
    }

    private void prepareAndReceiveInitProducerId(long j, Errors errors) {
        short s = 0;
        if (errors != Errors.NONE) {
            s = ACKS_ALL;
        }
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.13
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return (abstractRequest instanceof InitProducerIdRequest) && ((InitProducerIdRequest) abstractRequest).transactionalId() == null;
            }
        }, (AbstractResponse) new InitProducerIdResponse(0, errors, j, s));
        this.sender.run(this.time.milliseconds());
    }

    private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
        transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        prepareInitPidResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(transactionManager.hasProducerId());
    }

    private void prepareFindCoordinatorResponse(Errors errors) {
        this.client.prepareResponse(new FindCoordinatorResponse(errors, (Node) this.cluster.nodes().get(0)));
    }

    private void prepareInitPidResponse(Errors errors, long j, short s) {
        this.client.prepareResponse(new InitProducerIdResponse(0, errors, j, s));
    }

    private void assertFutureFailure(Future<?> future, Class<? extends Exception> cls) throws InterruptedException {
        Assert.assertTrue(future.isDone());
        try {
            future.get();
            Assert.fail("Future should have raised " + cls.getName());
        } catch (ExecutionException e) {
            Class<?> cls2 = e.getCause().getClass();
            Assert.assertTrue("Unexpected cause " + cls2.getName(), cls.isAssignableFrom(cls2));
        }
    }
}
