package org.apache.kafka.clients.producer.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/TransactionManagerTest.class */
public class TransactionManagerTest {
    private static final int MAX_REQUEST_SIZE = 1048576;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = Integer.MAX_VALUE;
    private static final String CLIENT_ID = "clientId";
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private static final long DEFAULT_RETRY_BACKOFF_MS = 100;
    private final String transactionalId = "foobar";
    private final int transactionTimeoutMs = 1121;
    private final String topic = "test";
    private TopicPartition tp0 = new TopicPartition("test", 0);
    private TopicPartition tp1 = new TopicPartition("test", 1);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, true, new ClusterResourceListeners());
    private ApiVersions apiVersions = new ApiVersions();
    private Cluster cluster = TestUtils.singletonCluster("test", 2);
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private TransactionManager transactionManager = null;
    private Node brokerNode = null;
    private final LogContext logContext = new LogContext();

    @Before
    public void setup() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("client-id", CLIENT_ID);
        MetricConfig tags = new MetricConfig().tags(linkedHashMap);
        this.brokerNode = new Node(0, "localhost", 2211);
        this.transactionManager = new TransactionManager(this.logContext, "foobar", 1121, DEFAULT_RETRY_BACKOFF_MS);
        Metrics metrics = new Metrics(tags, this.time);
        SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
        this.accumulator = new RecordAccumulator(this.logContext, 16384, 1048576L, CompressionType.NONE, 0L, 0L, metrics, this.time, this.apiVersions, this.transactionManager);
        this.sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, MAX_RETRIES, senderMetricsRegistry, this.time, 1000, 50L, this.transactionManager, this.apiVersions);
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.setNode(this.brokerNode);
    }

    @Test
    public void testEndTxnNotSentIfIncompleteBatches() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        this.transactionManager.beginCommit();
        Assert.assertNull(this.transactionManager.nextRequestHandler(true));
        Assert.assertTrue(this.transactionManager.nextRequestHandler(false).isEndTxn());
    }

    @Test(expected = IllegalStateException.class)
    public void testFailIfNotReadyForSendNoProducerId() {
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test
    public void testFailIfNotReadyForSendIdempotentProducer() {
        new TransactionManager().failIfNotReadyForSend();
    }

    @Test(expected = KafkaException.class)
    public void testFailIfNotReadyForSendIdempotentProducerFatalError() {
        TransactionManager transactionManager = new TransactionManager();
        transactionManager.transitionToFatalError(new KafkaException());
        transactionManager.failIfNotReadyForSend();
    }

    @Test(expected = IllegalStateException.class)
    public void testFailIfNotReadyForSendNoOngoingTransaction() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected = KafkaException.class)
    public void testFailIfNotReadyForSendAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError(new KafkaException());
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test(expected = KafkaException.class)
    public void testFailIfNotReadyForSendAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.transitionToFatalError(new KafkaException());
        this.transactionManager.failIfNotReadyForSend();
    }

    @Test
    public void testHasOngoingTransactionSuccessfulAbort() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions(13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.beginAbort();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionSuccessfulCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions(13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.beginCommit();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionAbortableError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions(13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginAbort();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testHasOngoingTransactionFatalError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        doInitTransactions(13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.beginTransaction();
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMaybeAddPartitionToTransaction() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
    }

    @Test
    public void testAddPartitionToTransactionOverridesRetryBackoffForConcurrentTransactions() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.CONCURRENT_TRANSACTIONS);
        this.sender.run(this.time.milliseconds());
        TransactionManager.TxnRequestHandler nextRequestHandler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull(nextRequestHandler);
        Assert.assertEquals(20L, nextRequestHandler.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffForRegularRetriableError() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.COORDINATOR_NOT_AVAILABLE);
        this.sender.run(this.time.milliseconds());
        TransactionManager.TxnRequestHandler nextRequestHandler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull(nextRequestHandler);
        Assert.assertEquals(DEFAULT_RETRY_BACKOFF_MS, nextRequestHandler.retryBackoffMs());
    }

    @Test
    public void testAddPartitionToTransactionRetainsRetryBackoffWhenPartitionsAlreadyAdded() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        Assert.assertTrue(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertTrue(this.transactionManager.isPartitionPendingAdd(topicPartition));
        prepareAddPartitionsToTxn(topicPartition, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(topicPartition));
        TopicPartition topicPartition2 = new TopicPartition("foo", 1);
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition2);
        prepareAddPartitionsToTxn(topicPartition2, Errors.CONCURRENT_TRANSACTIONS);
        TransactionManager.TxnRequestHandler nextRequestHandler = this.transactionManager.nextRequestHandler(false);
        Assert.assertNotNull(nextRequestHandler);
        Assert.assertEquals(DEFAULT_RETRY_BACKOFF_MS, nextRequestHandler.retryBackoffMs());
    }

    @Test(expected = IllegalStateException.class)
    public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected = IllegalStateException.class)
    public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected = KafkaException.class)
    public void testMaybeAddPartitionToTransactionAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.transitionToAbortableError(new KafkaException());
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test(expected = KafkaException.class)
    public void testMaybeAddPartitionToTransactionAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.transitionToFatalError(new KafkaException());
        this.transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.transitionToAbortableError(new KafkaException());
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.transitionToFatalError(new KafkaException());
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testIsSendToPartitionAllowedWithPartitionNotAdded() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
    }

    @Test(expected = IllegalStateException.class)
    public void testInvalidSequenceIncrement() {
        new TransactionManager().incrementSequenceNumber(this.tp0, 3333);
    }

    @Test
    public void testDefaultSequenceNumber() {
        TransactionManager transactionManager = new TransactionManager();
        Assert.assertEquals(transactionManager.sequenceNumber(this.tp0).intValue(), 0L);
        transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assert.assertEquals(transactionManager.sequenceNumber(this.tp0).intValue(), 3L);
    }

    @Test
    public void testProducerIdReset() {
        TransactionManager transactionManager = new TransactionManager();
        Assert.assertEquals(transactionManager.sequenceNumber(this.tp0).intValue(), 0L);
        transactionManager.incrementSequenceNumber(this.tp0, 3);
        Assert.assertEquals(transactionManager.sequenceNumber(this.tp0).intValue(), 3L);
        transactionManager.resetProducerId();
        Assert.assertEquals(transactionManager.sequenceNumber(this.tp0).intValue(), 0L);
    }

    @Test
    public void testBasicTransaction() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, "myconsumergroup");
        Assert.assertFalse(this.transactionManager.hasPendingOffsetCommits());
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        Assert.assertFalse(sendOffsetsToTransaction.isCompleted());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp1, Errors.NONE);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short) 1, hashMap2);
        Assert.assertEquals((Object) null, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertNotNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assert.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasPendingOffsetCommits());
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        this.transactionManager.beginCommit();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse(this.transactionManager.isCompleting());
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testDisconnectAndRetry() {
        this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, true, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
    }

    @Test
    public void testUnsupportedFindCoordinator() {
        this.transactionManager.initializeTransactions();
        this.client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.1
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) abstractRequest;
                Assert.assertEquals(findCoordinatorRequest.coordinatorType(), FindCoordinatorRequest.CoordinatorType.TRANSACTION);
                Assert.assertEquals(findCoordinatorRequest.coordinatorKey(), "foobar");
                return true;
            }
        });
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasFatalError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof UnsupportedVersionException);
    }

    @Test
    public void testUnsupportedInitTransactions() {
        this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasError());
        Assert.assertNotNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.2
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                Assert.assertEquals(((InitProducerIdRequest) abstractRequest).transactionalId(), "foobar");
                Assert.assertEquals(r0.transactionTimeoutMs(), 1121L);
                return true;
            }
        });
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasFatalError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof UnsupportedVersionException);
    }

    @Test
    public void testUnsupportedForMessageFormatInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), "consumer");
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.run(this.time.milliseconds());
        prepareTxnOffsetCommitResponse("consumer", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof UnsupportedForMessageFormatException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof UnsupportedForMessageFormatException);
        assertFatalError(UnsupportedForMessageFormatException.class);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectAfterSend() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NONE, true, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object) null, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        Assert.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(initializeTransactions.isCompleted());
        Assert.assertTrue(this.transactionManager.hasProducerId());
        Assert.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals(1L, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnDisconnectBeforeSend() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        this.client.disconnect(this.brokerNode.idString());
        this.client.blackout(this.brokerNode, DEFAULT_RETRY_BACKOFF_MS);
        this.sender.run(this.time.milliseconds());
        this.time.sleep(110L);
        Assert.assertEquals((Object) null, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        Assert.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(initializeTransactions.isCompleted());
        Assert.assertTrue(this.transactionManager.hasProducerId());
        Assert.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals(1L, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testLookupCoordinatorOnNotCoordinatorError() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NOT_COORDINATOR, false, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((Object) null, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        Assert.assertFalse(this.transactionManager.hasProducerId());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertFalse(initializeTransactions.isCompleted());
        prepareInitPidResponse(Errors.NONE, false, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(initializeTransactions.isCompleted());
        Assert.assertTrue(this.transactionManager.hasProducerId());
        Assert.assertEquals(13131L, this.transactionManager.producerIdAndEpoch().producerId);
        Assert.assertEquals(1L, this.transactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(initializeTransactions.isCompleted());
        Assert.assertFalse(initializeTransactions.isSuccessful());
        Assert.assertTrue(initializeTransactions.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInInitProducerId() {
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, 13131L, (short) -1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(initializeTransactions.isCompleted());
        Assert.assertFalse(initializeTransactions.isSuccessful());
        Assert.assertTrue(initializeTransactions.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInFindCoordinator() {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(39L)), "consumer");
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof GroupAuthorizationException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof GroupAuthorizationException);
        Assert.assertEquals("consumer", sendOffsetsToTransaction.error().groupId());
        assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testGroupAuthorizationFailureInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), "consumer");
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.run(this.time.milliseconds());
        prepareTxnOffsetCommitResponse("consumer", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.GROUP_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof GroupAuthorizationException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof GroupAuthorizationException);
        Assert.assertEquals("consumer", sendOffsetsToTransaction.error().groupId());
        assertAbortableError(GroupAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddOffsetsToTxn() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), "consumer");
        prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, "consumer", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInTxnOffsetCommit() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(Collections.singletonMap(topicPartition, new OffsetAndMetadata(39L)), "consumer");
        prepareAddOffsetsToTxnResponse(Errors.NONE, "consumer", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "consumer");
        this.sender.run(this.time.milliseconds());
        prepareTxnOffsetCommitResponse("consumer", 13131L, (short) 1, Collections.singletonMap(topicPartition, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertFalse(sendOffsetsToTransaction.isSuccessful());
        Assert.assertTrue(sendOffsetsToTransaction.error() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testTopicAuthorizationFailureInAddPartitions() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        TopicPartition topicPartition2 = new TopicPartition("bar", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition2);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED);
        hashMap.put(topicPartition2, Errors.OPERATION_NOT_ATTEMPTED);
        prepareAddPartitionsToTxn(hashMap);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof TopicAuthorizationException);
        Assert.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition));
        Assert.assertFalse(this.transactionManager.isPartitionPendingAdd(topicPartition2));
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition2));
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertEquals(Collections.singleton(topicPartition.topic()), this.transactionManager.lastError().unauthorizedTopics());
        assertAbortableError(TopicAuthorizationException.class);
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionNotStarted() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(topicPartition, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        assertFutureFailed(futureRecordMetadata);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isReady());
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        Assert.assertNotNull(futureRecordMetadata2.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testRecoveryFromAbortableErrorTransactionStarted() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(topicPartition, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(topicPartition, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        assertFutureFailed(futureRecordMetadata);
        assertFutureFailed(futureRecordMetadata2);
        Assert.assertTrue(this.transactionManager.isReady());
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata3 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata3.isDone());
        Assert.assertNotNull(futureRecordMetadata3.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        this.accumulator.beginFlush();
        prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(topicPartition, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse(this.transactionManager.isPartitionAdded(topicPartition));
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        assertFutureFailed(futureRecordMetadata2);
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertNotNull(futureRecordMetadata.get());
        Assert.assertTrue(futureRecordMetadata.isDone());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isReady());
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata3 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxn(Collections.singletonMap(this.tp0, Errors.NONE));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isPartitionAdded(this.tp0));
        Assert.assertFalse(this.transactionManager.hasPartitionsToAdd());
        this.transactionManager.beginCommit();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata3.isDone());
        Assert.assertNotNull(futureRecordMetadata3.get());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testTransactionalIdAuthorizationFailureInAddPartitions() {
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(topicPartition);
        prepareAddPartitionsToTxn(topicPartition, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertTrue(this.transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
        assertFatalError(TransactionalIdAuthorizationException.class);
    }

    @Test
    public void testFlushPendingPartitionsOnCommit() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(beginCommit.isCompleted());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        Assert.assertFalse(beginCommit.isCompleted());
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        Assert.assertTrue(this.transactionManager.isCompleting());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginCommit.isCompleted());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    @Test
    public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp1, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertTrue(futureRecordMetadata2.isDone());
    }

    @Test(expected = ExecutionException.class)
    public void testProducerFencedException() throws InterruptedException, ExecutionException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertTrue(this.transactionManager.hasError());
        futureRecordMetadata.get();
    }

    @Test
    public void testDisallowCommitOnProduceFailure() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(beginCommit.isCompleted());
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginCommit.isCompleted());
        try {
            beginCommit.await();
            Assert.fail();
        } catch (KafkaException e) {
        }
        try {
            futureRecordMetadata.get();
            Assert.fail("Expected produce future to raise an exception");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof OutOfOrderSequenceException);
        }
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testAllowAbortOnProduceFailure() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        Assert.assertFalse(this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        Assert.assertFalse(this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        Assert.assertTrue(this.transactionManager.isAborting());
        Assert.assertFalse(this.transactionManager.hasError());
        sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isAborting());
        Assert.assertFalse(this.transactionManager.hasError());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.accumulator.hasUndrained());
        this.transactionManager.beginCommit();
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertFalse(futureRecordMetadata.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertFalse(futureRecordMetadata.isDone());
        sendProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasInFlightTransactionalRequest());
        sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxn(this.tp0, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.accumulator.hasUndrained());
        this.accumulator.beginFlush();
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        this.transactionManager.beginCommit();
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertFalse(futureRecordMetadata.isDone());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertTrue(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertFalse(futureRecordMetadata.isDone());
        sendProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertFalse(this.accumulator.hasUndrained());
        Assert.assertFalse(this.accumulator.hasIncomplete());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasInFlightTransactionalRequest());
        sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.hasInFlightTransactionalRequest());
        Assert.assertTrue(this.transactionManager.isReady());
    }

    @Test
    public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        Assert.assertFalse(this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future.isDone());
        this.sender.run(this.time.milliseconds());
        this.transactionManager.transitionToAbortableError(new KafkaException());
        sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, this.tp0, (short) 1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        Assert.assertNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        try {
            futureRecordMetadata.get();
            Assert.fail("Expected produce future to raise an exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof KafkaException);
        }
    }

    @Test
    public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short) 1, 13131L);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(futureRecordMetadata.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        try {
            futureRecordMetadata.get();
            Assert.fail("Expected produce future to raise an exception");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof KafkaException);
        }
    }

    @Test
    public void testAbortResendsProduceRequestIfRetried() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.REQUEST_TIMED_OUT, 13131L, (short) 1);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(futureRecordMetadata.isDone());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.isReady());
        Assert.assertEquals(this.tp0.topic(), ((RecordMetadata) futureRecordMetadata.get()).topic());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnAddPartitions() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, this.tp0, (short) 1, 13131L);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
    }

    @Test
    public void testHandlingOfUnknownTopicPartitionErrorOnTxnOffsetCommit() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(hashMap, "myconsumergroup");
        prepareAddOffsetsToTxnResponse(Errors.NONE, "myconsumergroup", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(sendOffsetsToTransaction.isCompleted());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.GROUP, "myconsumergroup");
        prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short) 1, hashMap2);
        Assert.assertEquals((Object) null, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertNotNull(this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.GROUP));
        Assert.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasPendingOffsetCommits());
        Assert.assertFalse(sendOffsetsToTransaction.isCompleted());
        hashMap2.put(this.tp1, Errors.NONE);
        prepareTxnOffsetCommitResponse("myconsumergroup", 13131L, (short) 1, hashMap2);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(sendOffsetsToTransaction.isCompleted());
        Assert.assertTrue(sendOffsetsToTransaction.isSuccessful());
    }

    @Test
    public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
        verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, this.tp0, (short) 1, 13131L);
        this.sender.run(this.time.milliseconds());
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        Assert.assertFalse(beginAbort.isCompleted());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
    }

    @Test
    public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(hashMap, "myconsumergroup");
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, "myconsumergroup", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(beginAbort.isCompleted());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.isReady());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
    }

    @Test
    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp1, new OffsetAndMetadata(1L));
        this.transactionManager.sendOffsetsToTransaction(hashMap, "myconsumergroup");
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_SERVER_ERROR, "myconsumergroup", 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(beginAbort.isCompleted());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertFalse(beginAbort.isSuccessful());
        Assert.assertTrue(this.transactionManager.hasFatalError());
    }

    @Test
    public void testNoDrainWhenPartitionsPending() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L);
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L);
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node = new Node(0, "localhost", 1111);
        Node node2 = new Node(1, "localhost", 1112);
        Cluster cluster = new Cluster((String) null, Arrays.asList(node, node2), Arrays.asList(new PartitionInfo("test", 0, node, (Node[]) null, (Node[]) null), new PartitionInfo("test", 1, node2, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        HashSet hashSet = new HashSet();
        hashSet.add(node);
        hashSet.add(node2);
        Map drain = this.accumulator.drain(cluster, hashSet, MAX_RETRIES, this.time.milliseconds());
        Assert.assertTrue(drain.containsKey(Integer.valueOf(node.id())));
        Assert.assertTrue(((List) drain.get(Integer.valueOf(node.id()))).isEmpty());
        Assert.assertTrue(drain.containsKey(Integer.valueOf(node2.id())));
        Assert.assertTrue(((List) drain.get(Integer.valueOf(node2.id()))).isEmpty());
        Assert.assertFalse(this.transactionManager.hasError());
    }

    @Test
    public void testAllowDrainInAbortableErrorState() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        prepareAddPartitionsToTxn(this.tp1, Errors.NONE);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp1));
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        prepareAddPartitionsToTxn(this.tp0, Errors.TOPIC_AUTHORIZATION_FAILED);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Node node = new Node(1, "localhost", 1112);
        Cluster cluster = new Cluster((String) null, Arrays.asList(node), Arrays.asList(new PartitionInfo("test", 1, node, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L);
        Assert.assertTrue(this.accumulator.drain(cluster, Collections.singleton(node), MAX_RETRIES, this.time.milliseconds()).containsKey(Integer.valueOf(node.id())));
        Assert.assertEquals(1L, ((List) r0.get(Integer.valueOf(node.id()))).size());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testRaiseErrorWhenNoPartitionsPendingOnDrain() throws InterruptedException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L);
        Node node = new Node(0, "localhost", 1111);
        Cluster cluster = new Cluster((String) null, Arrays.asList(node), Arrays.asList(new PartitionInfo("test", 0, node, (Node[]) null, (Node[]) null)), Collections.emptySet(), Collections.emptySet());
        HashSet hashSet = new HashSet();
        hashSet.add(node);
        Map drain = this.accumulator.drain(cluster, hashSet, MAX_RETRIES, this.time.milliseconds());
        Assert.assertTrue(drain.containsKey(Integer.valueOf(node.id())));
        Assert.assertTrue(((List) drain.get(Integer.valueOf(node.id()))).isEmpty());
    }

    @Test
    public void resendFailedProduceRequestAfterAbortableError() throws Exception {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(futureRecordMetadata.isDone());
        this.transactionManager.transitionToAbortableError(new KafkaException());
        prepareProduceResponse(Errors.NONE, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertNotNull(futureRecordMetadata.get());
    }

    @Test
    public void testTransitionToAbortableErrorOnBatchExpiry() throws InterruptedException, ExecutionException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        this.time.sleep(10000L);
        Node node = (Node) this.cluster.nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, DEFAULT_RETRY_BACKOFF_MS);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        try {
            futureRecordMetadata.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testTransitionToAbortableErrorOnMultipleBatchExpiry() throws InterruptedException, ExecutionException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        this.transactionManager.maybeAddPartitionToTransaction(this.tp1);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(this.tp1, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        hashMap.put(this.tp1, Errors.NONE);
        prepareAddPartitionsToTxn(hashMap);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp1));
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp1));
        Assert.assertFalse(futureRecordMetadata.isDone());
        Assert.assertFalse(futureRecordMetadata2.isDone());
        this.time.sleep(10000L);
        Node node = (Node) this.cluster.nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, DEFAULT_RETRY_BACKOFF_MS);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        Assert.assertTrue(futureRecordMetadata2.isDone());
        try {
            futureRecordMetadata.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
        try {
            futureRecordMetadata2.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e2) {
            Assert.assertTrue(e2.getCause() instanceof TimeoutException);
        }
        Assert.assertTrue(this.transactionManager.hasAbortableError());
    }

    @Test
    public void testDropCommitOnBatchExpiry() throws InterruptedException, ExecutionException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        Assert.assertFalse(futureRecordMetadata.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node node = (Node) this.cluster.nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, DEFAULT_RETRY_BACKOFF_MS);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        try {
            futureRecordMetadata.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginCommit.isCompleted());
        Assert.assertFalse(beginCommit.isSuccessful());
        Assert.assertTrue(this.transactionManager.hasAbortableError());
        Assert.assertTrue(this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse(this.transactionManager.isCompleting());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginAbort.isCompleted());
        Assert.assertTrue(beginAbort.isSuccessful());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    @Test
    public void testTransitionToFatalErrorWhenRetriedBatchIsExpired() throws InterruptedException, ExecutionException {
        doInitTransactions(13131L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        FutureRecordMetadata futureRecordMetadata = this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future;
        Assert.assertFalse(futureRecordMetadata.isDone());
        prepareAddPartitionsToTxnResponse(Errors.NONE, this.tp0, (short) 1, 13131L);
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertFalse(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.transactionContainsPartition(this.tp0));
        Assert.assertTrue(this.transactionManager.isSendToPartitionAllowed(this.tp0));
        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, 13131L, (short) 1);
        this.sender.run(this.time.milliseconds());
        Assert.assertFalse(futureRecordMetadata.isDone());
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.time.sleep(10000L);
        Node node = (Node) this.cluster.nodes().get(0);
        this.client.disconnect(node.idString());
        this.client.blackout(node, DEFAULT_RETRY_BACKOFF_MS);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(futureRecordMetadata.isDone());
        try {
            futureRecordMetadata.get();
            Assert.fail("Expected to get a TimeoutException since the queued ProducerBatch should have been expired");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof TimeoutException);
        }
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(beginCommit.isCompleted());
        Assert.assertFalse(beginCommit.isSuccessful());
        Assert.assertTrue(this.transactionManager.hasFatalError());
        Assert.assertFalse(this.transactionManager.hasOngoingTransaction());
    }

    private void verifyAddPartitionsFailsWithPartitionLevelError(Errors errors) throws InterruptedException {
        doInitTransactions(1L, (short) 1);
        this.transactionManager.beginTransaction();
        this.transactionManager.maybeAddPartitionToTransaction(this.tp0);
        Assert.assertFalse(this.accumulator.append(this.tp0, this.time.milliseconds(), "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, (Callback) null, 1000L).future.isDone());
        prepareAddPartitionsToTxn(this.tp0, errors);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasError());
        Assert.assertFalse(this.transactionManager.transactionContainsPartition(this.tp0));
    }

    private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> map) {
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.3
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                Assert.assertEquals(new HashSet(((AddPartitionsToTxnRequest) abstractRequest).partitions()), new HashSet(map.keySet()));
                return true;
            }
        }, (AbstractResponse) new AddPartitionsToTxnResponse(0, map));
    }

    private void prepareAddPartitionsToTxn(TopicPartition topicPartition, Errors errors) {
        prepareAddPartitionsToTxn(Collections.singletonMap(topicPartition, errors));
    }

    private void prepareFindCoordinatorResponse(Errors errors, boolean z, final FindCoordinatorRequest.CoordinatorType coordinatorType, final String str) {
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.4
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) abstractRequest;
                Assert.assertEquals(findCoordinatorRequest.coordinatorType(), coordinatorType);
                Assert.assertEquals(findCoordinatorRequest.coordinatorKey(), str);
                return true;
            }
        }, new FindCoordinatorResponse(errors, this.brokerNode), z);
    }

    private void prepareInitPidResponse(Errors errors, boolean z, long j, short s) {
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.5
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                Assert.assertEquals(((InitProducerIdRequest) abstractRequest).transactionalId(), "foobar");
                Assert.assertEquals(r0.transactionTimeoutMs(), 1121L);
                return true;
            }
        }, new InitProducerIdResponse(0, errors, j, s), z);
    }

    private void sendProduceResponse(Errors errors, long j, short s) {
        this.client.respond(produceRequestMatcher(j, s), (AbstractResponse) produceResponse(this.tp0, 0L, errors, 0));
    }

    private void prepareProduceResponse(Errors errors, long j, short s) {
        this.client.prepareResponse(produceRequestMatcher(j, s), (AbstractResponse) produceResponse(this.tp0, 0L, errors, 0));
    }

    private MockClient.RequestMatcher produceRequestMatcher(final long j, final short s) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.6
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
                MemoryRecords memoryRecords = (MemoryRecords) produceRequest.partitionRecordsOrFail().get(TransactionManagerTest.this.tp0);
                Assert.assertNotNull(memoryRecords);
                Iterator it = memoryRecords.batches().iterator();
                Assert.assertTrue(it.hasNext());
                MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) it.next();
                Assert.assertFalse(it.hasNext());
                Assert.assertTrue(mutableRecordBatch.isTransactional());
                Assert.assertEquals(j, mutableRecordBatch.producerId());
                Assert.assertEquals(s, mutableRecordBatch.producerEpoch());
                Assert.assertEquals("foobar", produceRequest.transactionalId());
                return true;
            }
        };
    }

    private void prepareAddPartitionsToTxnResponse(Errors errors, TopicPartition topicPartition, short s, long j) {
        this.client.prepareResponse(addPartitionsRequestMatcher(topicPartition, s, j), (AbstractResponse) new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, errors)));
    }

    private void sendAddPartitionsToTxnResponse(Errors errors, TopicPartition topicPartition, short s, long j) {
        this.client.respond(addPartitionsRequestMatcher(topicPartition, s, j), (AbstractResponse) new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, errors)));
    }

    private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition, final short s, final long j) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.7
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) abstractRequest;
                Assert.assertEquals(j, addPartitionsToTxnRequest.producerId());
                Assert.assertEquals(s, addPartitionsToTxnRequest.producerEpoch());
                Assert.assertEquals(Collections.singletonList(topicPartition), addPartitionsToTxnRequest.partitions());
                Assert.assertEquals("foobar", addPartitionsToTxnRequest.transactionalId());
                return true;
            }
        };
    }

    private void prepareEndTxnResponse(Errors errors, TransactionResult transactionResult, long j, short s) {
        this.client.prepareResponse(endTxnMatcher(transactionResult, j, s), (AbstractResponse) new EndTxnResponse(0, errors));
    }

    private void sendEndTxnResponse(Errors errors, TransactionResult transactionResult, long j, short s) {
        this.client.respond(endTxnMatcher(transactionResult, j, s), (AbstractResponse) new EndTxnResponse(0, errors));
    }

    private MockClient.RequestMatcher endTxnMatcher(final TransactionResult transactionResult, final long j, final short s) {
        return new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.8
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                EndTxnRequest endTxnRequest = (EndTxnRequest) abstractRequest;
                Assert.assertEquals("foobar", endTxnRequest.transactionalId());
                Assert.assertEquals(j, endTxnRequest.producerId());
                Assert.assertEquals(s, endTxnRequest.producerEpoch());
                Assert.assertEquals(transactionResult, endTxnRequest.command());
                return true;
            }
        };
    }

    private void prepareAddOffsetsToTxnResponse(Errors errors, final String str, final long j, final short s) {
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.9
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) abstractRequest;
                Assert.assertEquals(str, addOffsetsToTxnRequest.consumerGroupId());
                Assert.assertEquals("foobar", addOffsetsToTxnRequest.transactionalId());
                Assert.assertEquals(j, addOffsetsToTxnRequest.producerId());
                Assert.assertEquals(s, addOffsetsToTxnRequest.producerEpoch());
                return true;
            }
        }, (AbstractResponse) new AddOffsetsToTxnResponse(0, errors));
    }

    private void prepareTxnOffsetCommitResponse(final String str, final long j, final short s, Map<TopicPartition, Errors> map) {
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.producer.internals.TransactionManagerTest.10
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                TxnOffsetCommitRequest txnOffsetCommitRequest = (TxnOffsetCommitRequest) abstractRequest;
                Assert.assertEquals(str, txnOffsetCommitRequest.consumerGroupId());
                Assert.assertEquals(j, txnOffsetCommitRequest.producerId());
                Assert.assertEquals(s, txnOffsetCommitRequest.producerEpoch());
                return true;
            }
        }, (AbstractResponse) new TxnOffsetCommitResponse(0, map));
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i) {
        return new ProduceResponse(Collections.singletonMap(topicPartition, new ProduceResponse.PartitionResponse(errors, j, -1L, 10L)), i);
    }

    private void doInitTransactions(long j, short s) {
        this.transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals(this.brokerNode, this.transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
        prepareInitPidResponse(Errors.NONE, false, j, s);
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue(this.transactionManager.hasProducerId());
    }

    private void assertAbortableError(Class<? extends RuntimeException> cls) {
        try {
            this.transactionManager.beginTransaction();
            Assert.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e) {
            Assert.assertTrue(cls.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue(this.transactionManager.hasError());
        }
        Assert.assertTrue(this.transactionManager.hasError());
        this.transactionManager.beginAbort();
        Assert.assertFalse(this.transactionManager.hasError());
    }

    private void assertFatalError(Class<? extends RuntimeException> cls) {
        Assert.assertTrue(this.transactionManager.hasError());
        try {
            this.transactionManager.beginAbort();
            Assert.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e) {
            Assert.assertTrue(cls.isAssignableFrom(e.getCause().getClass()));
            Assert.assertTrue(this.transactionManager.hasError());
        }
        try {
            this.transactionManager.beginAbort();
            Assert.fail("Should have raised " + cls.getSimpleName());
        } catch (KafkaException e2) {
            Assert.assertTrue(cls.isAssignableFrom(e2.getCause().getClass()));
            Assert.assertTrue(this.transactionManager.hasError());
        }
    }

    private void assertFutureFailed(Future<RecordMetadata> future) throws InterruptedException {
        Assert.assertTrue(future.isDone());
        try {
            future.get();
            Assert.fail("Expected produce future to throw");
        } catch (ExecutionException e) {
        }
    }
}
