/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.admin.AbortTransactionResult;
import org.apache.kafka.clients.admin.AbortTransactionSpec;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.clients.admin.TransactionState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.TransactionsCommand;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

public class TransactionsCommandTest {
    private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
    private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    private final PrintStream out = new PrintStream(this.outputStream);
    private final MockTime time = new MockTime();
    private final Admin admin = (Admin)Mockito.mock(Admin.class);

    @BeforeEach
    public void setupExitProcedure() {
        Exit.setExitProcedure((Exit.Procedure)this.exitProcedure);
    }

    @AfterEach
    public void resetExitProcedure() {
        Exit.resetExitProcedure();
    }

    @Test
    public void testDescribeProducersTopicRequired() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "describe-producers", "--partition", "0"});
    }

    @Test
    public void testDescribeProducersPartitionRequired() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "describe-producers", "--topic", "foo"});
    }

    @Test
    public void testDescribeProducersLeader() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "describe-producers", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        this.testDescribeProducers(topicPartition, args, new DescribeProducersOptions());
    }

    @Test
    public void testDescribeProducersSpecificReplica() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        int brokerId = 5;
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "describe-producers", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--broker-id", String.valueOf(brokerId)};
        this.testDescribeProducers(topicPartition, args, new DescribeProducersOptions().brokerId(brokerId));
    }

    private void testDescribeProducers(TopicPartition topicPartition, String[] args, DescribeProducersOptions expectedOptions) throws Exception {
        DescribeProducersResult describeResult = (DescribeProducersResult)Mockito.mock(DescribeProducersResult.class);
        KafkaFuture describeFuture = KafkaFuture.completedFuture((Object)new DescribeProducersResult.PartitionProducerState(Arrays.asList(new ProducerState(12345L, 15, 1300, 1599509565L, OptionalInt.of(20), OptionalLong.of(990L)), new ProducerState(98765L, 30, 2300, 1599509599L, OptionalInt.empty(), OptionalLong.empty()))));
        Mockito.when((Object)describeResult.partitionResult(topicPartition)).thenReturn((Object)describeFuture);
        Mockito.when((Object)this.admin.describeProducers(Collections.singleton(topicPartition), expectedOptions)).thenReturn((Object)describeResult);
        this.execute(args);
        this.assertNormalExit();
        List<List<String>> table = this.readOutputAsTable();
        Assertions.assertEquals((int)3, (int)table.size());
        List expectedHeaders = TransactionsCommand.DescribeProducersCommand.HEADERS;
        Assertions.assertEquals((Object)expectedHeaders, table.get(0));
        Set expectedRows = Utils.mkSet((Object[])new List[]{Arrays.asList("12345", "15", "20", "1300", "1599509565", "990"), Arrays.asList("98765", "30", "-1", "2300", "1599509599", "None")});
        Assertions.assertEquals((Object)expectedRows, new HashSet<List<String>>(table.subList(1, table.size())));
    }

    @Test
    public void testListTransactions() throws Exception {
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "list"};
        HashMap<Integer, Collection<TransactionListing>> transactions = new HashMap<Integer, Collection<TransactionListing>>();
        transactions.put(0, Arrays.asList(new TransactionListing("foo", 12345L, TransactionState.ONGOING), new TransactionListing("bar", 98765L, TransactionState.PREPARE_ABORT)));
        transactions.put(1, Collections.singletonList(new TransactionListing("baz", 13579L, TransactionState.COMPLETE_COMMIT)));
        this.expectListTransactions(transactions);
        this.execute(args);
        this.assertNormalExit();
        List<List<String>> table = this.readOutputAsTable();
        Assertions.assertEquals((int)4, (int)table.size());
        List expectedHeaders = TransactionsCommand.ListTransactionsCommand.HEADERS;
        Assertions.assertEquals((Object)expectedHeaders, table.get(0));
        Set expectedRows = Utils.mkSet((Object[])new List[]{Arrays.asList("foo", "0", "12345", "Ongoing"), Arrays.asList("bar", "0", "98765", "PrepareAbort"), Arrays.asList("baz", "1", "13579", "CompleteCommit")});
        Assertions.assertEquals((Object)expectedRows, new HashSet<List<String>>(table.subList(1, table.size())));
    }

    @Test
    public void testDescribeTransactionsTransactionalIdRequired() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "describe"});
    }

    @Test
    public void testDescribeTransaction() throws Exception {
        String transactionalId = "foo";
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "describe", "--transactional-id", transactionalId};
        DescribeTransactionsResult describeResult = (DescribeTransactionsResult)Mockito.mock(DescribeTransactionsResult.class);
        int coordinatorId = 5;
        long transactionStartTime = this.time.milliseconds();
        KafkaFuture describeFuture = KafkaFuture.completedFuture((Object)new TransactionDescription(coordinatorId, TransactionState.ONGOING, 12345L, 15, 10000L, OptionalLong.of(transactionStartTime), Collections.singleton(new TopicPartition("bar", 0))));
        Mockito.when((Object)describeResult.description(transactionalId)).thenReturn((Object)describeFuture);
        Mockito.when((Object)this.admin.describeTransactions(Collections.singleton(transactionalId))).thenReturn((Object)describeResult);
        this.time.sleep(5000L);
        this.execute(args);
        this.assertNormalExit();
        List<List<String>> table = this.readOutputAsTable();
        Assertions.assertEquals((int)2, (int)table.size());
        List expectedHeaders = TransactionsCommand.DescribeTransactionsCommand.HEADERS;
        Assertions.assertEquals((Object)expectedHeaders, table.get(0));
        List<String> expectedRow = Arrays.asList(String.valueOf(coordinatorId), transactionalId, "12345", "15", "Ongoing", "10000", String.valueOf(transactionStartTime), "5000", "bar-0");
        Assertions.assertEquals(expectedRow, table.get(1));
    }

    @Test
    public void testDescribeTransactionsStartOffsetOrProducerIdRequired() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", "foo", "--partition", "0"});
    }

    @Test
    public void testDescribeTransactionsTopicRequired() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--partition", "0", "--start-offset", "9990"});
    }

    @Test
    public void testDescribeTransactionsPartitionRequired() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", "foo", "--start-offset", "9990"});
    }

    @Test
    public void testDescribeTransactionsProducerEpochRequiredWithProducerId() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", "foo", "--partition", "0", "--producer-id", "12345"});
    }

    @Test
    public void testDescribeTransactionsCoordinatorEpochRequiredWithProducerId() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", "foo", "--partition", "0", "--producer-id", "12345", "--producer-epoch", "15"});
    }

    @Test
    public void testNewBrokerAbortTransaction() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        long startOffset = 9173L;
        long producerId = 12345L;
        short producerEpoch = 15;
        int coordinatorEpoch = 76;
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--start-offset", String.valueOf(startOffset)};
        DescribeProducersResult describeResult = (DescribeProducersResult)Mockito.mock(DescribeProducersResult.class);
        KafkaFuture describeFuture = KafkaFuture.completedFuture((Object)new DescribeProducersResult.PartitionProducerState(Collections.singletonList(new ProducerState(producerId, (int)producerEpoch, 1300, 1599509565L, OptionalInt.of(coordinatorEpoch), OptionalLong.of(startOffset)))));
        AbortTransactionResult abortTransactionResult = (AbortTransactionResult)Mockito.mock(AbortTransactionResult.class);
        KafkaFuture abortFuture = KafkaFuture.completedFuture(null);
        AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(topicPartition, producerId, producerEpoch, coordinatorEpoch);
        Mockito.when((Object)describeResult.partitionResult(topicPartition)).thenReturn((Object)describeFuture);
        Mockito.when((Object)this.admin.describeProducers(Collections.singleton(topicPartition))).thenReturn((Object)describeResult);
        Mockito.when((Object)abortTransactionResult.all()).thenReturn((Object)abortFuture);
        Mockito.when((Object)this.admin.abortTransaction(expectedAbortSpec)).thenReturn((Object)abortTransactionResult);
        this.execute(args);
        this.assertNormalExit();
    }

    @ParameterizedTest
    @ValueSource(ints={29, -1})
    public void testOldBrokerAbortTransactionWithUnknownCoordinatorEpoch(int coordinatorEpoch) throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        long producerId = 12345L;
        short producerEpoch = 15;
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "abort", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition()), "--producer-id", String.valueOf(producerId), "--producer-epoch", String.valueOf(producerEpoch), "--coordinator-epoch", String.valueOf(coordinatorEpoch)};
        AbortTransactionResult abortTransactionResult = (AbortTransactionResult)Mockito.mock(AbortTransactionResult.class);
        KafkaFuture abortFuture = KafkaFuture.completedFuture(null);
        int expectedCoordinatorEpoch = coordinatorEpoch < 0 ? 0 : coordinatorEpoch;
        AbortTransactionSpec expectedAbortSpec = new AbortTransactionSpec(topicPartition, producerId, producerEpoch, expectedCoordinatorEpoch);
        Mockito.when((Object)abortTransactionResult.all()).thenReturn((Object)abortFuture);
        Mockito.when((Object)this.admin.abortTransaction(expectedAbortSpec)).thenReturn((Object)abortTransactionResult);
        this.execute(args);
        this.assertNormalExit();
    }

    @Test
    public void testFindHangingRequiresEitherBrokerIdOrTopic() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "find-hanging"});
    }

    @Test
    public void testFindHangingRequiresTopicIfPartitionIsSpecified() throws Exception {
        this.assertCommandFailure(new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--broker-id", "0", "--partition", "5"});
    }

    private void expectListTransactions(Map<Integer, Collection<TransactionListing>> listingsByBroker) {
        this.expectListTransactions(new ListTransactionsOptions(), listingsByBroker);
    }

    private void expectListTransactions(ListTransactionsOptions options, Map<Integer, Collection<TransactionListing>> listingsByBroker) {
        ListTransactionsResult listResult = (ListTransactionsResult)Mockito.mock(ListTransactionsResult.class);
        Mockito.when((Object)this.admin.listTransactions(options)).thenReturn((Object)listResult);
        ArrayList allListings = new ArrayList();
        listingsByBroker.values().forEach(allListings::addAll);
        Mockito.when((Object)listResult.all()).thenReturn((Object)KafkaFuture.completedFuture(allListings));
        Mockito.when((Object)listResult.allByBrokerId()).thenReturn((Object)KafkaFuture.completedFuture(listingsByBroker));
    }

    private void expectDescribeProducers(TopicPartition topicPartition, long producerId, short producerEpoch, long lastTimestamp, OptionalInt coordinatorEpoch, OptionalLong txnStartOffset) {
        DescribeProducersResult.PartitionProducerState partitionProducerState = new DescribeProducersResult.PartitionProducerState(Collections.singletonList(new ProducerState(producerId, (int)producerEpoch, 500, lastTimestamp, coordinatorEpoch, txnStartOffset)));
        DescribeProducersResult result = (DescribeProducersResult)Mockito.mock(DescribeProducersResult.class);
        Mockito.when((Object)result.all()).thenReturn((Object)KafkaFuture.completedFuture(Collections.singletonMap(topicPartition, partitionProducerState)));
        Mockito.when((Object)this.admin.describeProducers(Collections.singletonList(topicPartition), new DescribeProducersOptions())).thenReturn((Object)result);
    }

    private void expectDescribeTransactions(Map<String, TransactionDescription> descriptions) {
        DescribeTransactionsResult result = (DescribeTransactionsResult)Mockito.mock(DescribeTransactionsResult.class);
        descriptions.forEach((transactionalId, description) -> Mockito.when((Object)result.description(transactionalId)).thenReturn((Object)KafkaFuture.completedFuture((Object)description)));
        Mockito.when((Object)result.all()).thenReturn((Object)KafkaFuture.completedFuture(descriptions));
        Mockito.when((Object)this.admin.describeTransactions(descriptions.keySet())).thenReturn((Object)result);
    }

    private void expectListTopics(Set<String> topics) {
        ListTopicsResult result = (ListTopicsResult)Mockito.mock(ListTopicsResult.class);
        Mockito.when((Object)result.names()).thenReturn((Object)KafkaFuture.completedFuture(topics));
        ListTopicsOptions listOptions = new ListTopicsOptions().listInternal(true);
        Mockito.when((Object)this.admin.listTopics(listOptions)).thenReturn((Object)result);
    }

    private void expectDescribeTopics(Map<String, TopicDescription> descriptions) {
        DescribeTopicsResult result = (DescribeTopicsResult)Mockito.mock(DescribeTopicsResult.class);
        Mockito.when((Object)result.allTopicNames()).thenReturn((Object)KafkaFuture.completedFuture(descriptions));
        Mockito.when((Object)this.admin.describeTopics(new ArrayList<String>(descriptions.keySet()))).thenReturn((Object)result);
    }

    @Test
    public void testFindHangingLookupTopicPartitionsForBroker() throws Exception {
        int brokerId = 5;
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--broker-id", String.valueOf(brokerId)};
        String topic = "foo";
        this.expectListTopics(Collections.singleton(topic));
        Node node0 = new Node(0, "localhost", 9092);
        Node node1 = new Node(1, "localhost", 9093);
        Node node5 = new Node(5, "localhost", 9097);
        TopicPartitionInfo partition0 = new TopicPartitionInfo(0, node0, Arrays.asList(node0, node1), Arrays.asList(node0, node1));
        TopicPartitionInfo partition1 = new TopicPartitionInfo(1, node1, Arrays.asList(node1, node5), Arrays.asList(node1, node5));
        TopicDescription description = new TopicDescription(topic, false, Arrays.asList(partition0, partition1));
        this.expectDescribeTopics(Collections.singletonMap(topic, description));
        DescribeProducersResult result = (DescribeProducersResult)Mockito.mock(DescribeProducersResult.class);
        Mockito.when((Object)result.all()).thenReturn((Object)KafkaFuture.completedFuture(Collections.emptyMap()));
        Mockito.when((Object)this.admin.describeProducers(Collections.singletonList(new TopicPartition(topic, 1)), new DescribeProducersOptions().brokerId(brokerId))).thenReturn((Object)result);
        this.execute(args);
        this.assertNormalExit();
        this.assertNoHangingTransactions();
    }

    @Test
    public void testFindHangingLookupTopicAndBrokerId() throws Exception {
        int brokerId = 5;
        String topic = "foo";
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--broker-id", String.valueOf(brokerId), "--topic", topic};
        Node node0 = new Node(0, "localhost", 9092);
        Node node1 = new Node(1, "localhost", 9093);
        Node node5 = new Node(5, "localhost", 9097);
        TopicPartitionInfo partition0 = new TopicPartitionInfo(0, node0, Arrays.asList(node0, node1), Arrays.asList(node0, node1));
        TopicPartitionInfo partition1 = new TopicPartitionInfo(1, node1, Arrays.asList(node1, node5), Arrays.asList(node1, node5));
        TopicDescription description = new TopicDescription(topic, false, Arrays.asList(partition0, partition1));
        this.expectDescribeTopics(Collections.singletonMap(topic, description));
        DescribeProducersResult result = (DescribeProducersResult)Mockito.mock(DescribeProducersResult.class);
        Mockito.when((Object)result.all()).thenReturn((Object)KafkaFuture.completedFuture(Collections.emptyMap()));
        Mockito.when((Object)this.admin.describeProducers(Collections.singletonList(new TopicPartition(topic, 1)), new DescribeProducersOptions().brokerId(brokerId))).thenReturn((Object)result);
        this.execute(args);
        this.assertNormalExit();
        this.assertNoHangingTransactions();
    }

    @Test
    public void testFindHangingLookupTopicPartitionsForTopic() throws Exception {
        String topic = "foo";
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topic};
        Node node0 = new Node(0, "localhost", 9092);
        Node node1 = new Node(1, "localhost", 9093);
        Node node5 = new Node(5, "localhost", 9097);
        TopicPartitionInfo partition0 = new TopicPartitionInfo(0, node0, Arrays.asList(node0, node1), Arrays.asList(node0, node1));
        TopicPartitionInfo partition1 = new TopicPartitionInfo(1, node1, Arrays.asList(node1, node5), Arrays.asList(node1, node5));
        TopicDescription description = new TopicDescription(topic, false, Arrays.asList(partition0, partition1));
        this.expectDescribeTopics(Collections.singletonMap(topic, description));
        DescribeProducersResult result = (DescribeProducersResult)Mockito.mock(DescribeProducersResult.class);
        Mockito.when((Object)result.all()).thenReturn((Object)KafkaFuture.completedFuture(Collections.emptyMap()));
        Mockito.when((Object)this.admin.describeProducers(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1)), new DescribeProducersOptions())).thenReturn((Object)result);
        this.execute(args);
        this.assertNormalExit();
        this.assertNoHangingTransactions();
    }

    private void assertNoHangingTransactions() throws Exception {
        List<List<String>> table = this.readOutputAsTable();
        Assertions.assertEquals((int)1, (int)table.size());
        List expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS;
        Assertions.assertEquals((Object)expectedHeaders, table.get(0));
    }

    @Test
    public void testFindHangingSpecifiedTopicPartition() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long producerId = 132L;
        short producerEpoch = 5;
        long lastTimestamp = this.time.milliseconds();
        OptionalInt coordinatorEpoch = OptionalInt.of(19);
        OptionalLong txnStartOffset = OptionalLong.of(29384L);
        this.expectDescribeProducers(topicPartition, producerId, producerEpoch, lastTimestamp, coordinatorEpoch, txnStartOffset);
        this.execute(args);
        this.assertNormalExit();
        List<List<String>> table = this.readOutputAsTable();
        Assertions.assertEquals((int)1, (int)table.size());
        List expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS;
        Assertions.assertEquals((Object)expectedHeaders, table.get(0));
    }

    @Test
    public void testFindHangingNoMappedTransactionalId() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long producerId = 132L;
        short producerEpoch = 5;
        long lastTimestamp = this.time.milliseconds() - TimeUnit.MINUTES.toMillis(60L);
        int coordinatorEpoch = 19;
        long txnStartOffset = 29384L;
        this.expectDescribeProducers(topicPartition, producerId, producerEpoch, lastTimestamp, OptionalInt.of(coordinatorEpoch), OptionalLong.of(txnStartOffset));
        this.expectListTransactions(new ListTransactionsOptions().filterProducerIds(Collections.singleton(producerId)), Collections.singletonMap(1, Collections.emptyList()));
        this.expectDescribeTransactions(Collections.emptyMap());
        this.execute(args);
        this.assertNormalExit();
        this.assertHangingTransaction(topicPartition, producerId, producerEpoch, coordinatorEpoch, txnStartOffset, lastTimestamp);
    }

    @Test
    public void testFindHangingWithNoTransactionDescription() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long producerId = 132L;
        short producerEpoch = 5;
        long lastTimestamp = this.time.milliseconds() - TimeUnit.MINUTES.toMillis(60L);
        int coordinatorEpoch = 19;
        long txnStartOffset = 29384L;
        this.expectDescribeProducers(topicPartition, producerId, producerEpoch, lastTimestamp, OptionalInt.of(coordinatorEpoch), OptionalLong.of(txnStartOffset));
        String transactionalId = "bar";
        TransactionListing listing = new TransactionListing(transactionalId, producerId, TransactionState.ONGOING);
        this.expectListTransactions(new ListTransactionsOptions().filterProducerIds(Collections.singleton(producerId)), Collections.singletonMap(1, Collections.singletonList(listing)));
        DescribeTransactionsResult result = (DescribeTransactionsResult)Mockito.mock(DescribeTransactionsResult.class);
        Mockito.when((Object)result.description(transactionalId)).thenReturn(this.failedFuture((Exception)new TransactionalIdNotFoundException(transactionalId + " not found")));
        Mockito.when((Object)this.admin.describeTransactions(Collections.singleton(transactionalId))).thenReturn((Object)result);
        this.execute(args);
        this.assertNormalExit();
        this.assertHangingTransaction(topicPartition, producerId, producerEpoch, coordinatorEpoch, txnStartOffset, lastTimestamp);
    }

    private <T> KafkaFuture<T> failedFuture(Exception e) {
        KafkaFutureImpl future = new KafkaFutureImpl();
        future.completeExceptionally((Throwable)e);
        return future;
    }

    @Test
    public void testFindHangingDoesNotFilterByTransactionInProgressWithDifferentPartitions() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long producerId = 132L;
        short producerEpoch = 5;
        long lastTimestamp = this.time.milliseconds() - TimeUnit.MINUTES.toMillis(60L);
        int coordinatorEpoch = 19;
        long txnStartOffset = 29384L;
        this.expectDescribeProducers(topicPartition, producerId, producerEpoch, lastTimestamp, OptionalInt.of(coordinatorEpoch), OptionalLong.of(txnStartOffset));
        String transactionalId = "bar";
        TransactionListing listing = new TransactionListing(transactionalId, producerId, TransactionState.ONGOING);
        this.expectListTransactions(new ListTransactionsOptions().filterProducerIds(Collections.singleton(producerId)), Collections.singletonMap(1, Collections.singletonList(listing)));
        TransactionDescription description = new TransactionDescription(1, TransactionState.ONGOING, producerId, (int)producerEpoch, 60000L, OptionalLong.of(this.time.milliseconds()), Collections.singleton(new TopicPartition("foo", 10)));
        this.expectDescribeTransactions(Collections.singletonMap(transactionalId, description));
        this.execute(args);
        this.assertNormalExit();
        this.assertHangingTransaction(topicPartition, producerId, producerEpoch, coordinatorEpoch, txnStartOffset, lastTimestamp);
    }

    private void assertHangingTransaction(TopicPartition topicPartition, long producerId, short producerEpoch, int coordinatorEpoch, long txnStartOffset, long lastTimestamp) throws Exception {
        List<List<String>> table = this.readOutputAsTable();
        Assertions.assertEquals((int)2, (int)table.size());
        List expectedHeaders = TransactionsCommand.FindHangingTransactionsCommand.HEADERS;
        Assertions.assertEquals((Object)expectedHeaders, table.get(0));
        long durationMinutes = TimeUnit.MILLISECONDS.toMinutes(this.time.milliseconds() - lastTimestamp);
        List<String> expectedRow = Arrays.asList(topicPartition.topic(), String.valueOf(topicPartition.partition()), String.valueOf(producerId), String.valueOf(producerEpoch), String.valueOf(coordinatorEpoch), String.valueOf(txnStartOffset), String.valueOf(lastTimestamp), String.valueOf(durationMinutes));
        Assertions.assertEquals(expectedRow, table.get(1));
    }

    @Test
    public void testFindHangingFilterByTransactionInProgressWithSamePartition() throws Exception {
        TopicPartition topicPartition = new TopicPartition("foo", 5);
        String[] args = new String[]{"--bootstrap-server", "localhost:9092", "find-hanging", "--topic", topicPartition.topic(), "--partition", String.valueOf(topicPartition.partition())};
        long producerId = 132L;
        short producerEpoch = 5;
        long lastTimestamp = this.time.milliseconds() - TimeUnit.MINUTES.toMillis(60L);
        int coordinatorEpoch = 19;
        long txnStartOffset = 29384L;
        this.expectDescribeProducers(topicPartition, producerId, producerEpoch, lastTimestamp, OptionalInt.of(coordinatorEpoch), OptionalLong.of(txnStartOffset));
        String transactionalId = "bar";
        TransactionListing listing = new TransactionListing(transactionalId, producerId, TransactionState.ONGOING);
        this.expectListTransactions(new ListTransactionsOptions().filterProducerIds(Collections.singleton(producerId)), Collections.singletonMap(1, Collections.singletonList(listing)));
        TransactionDescription description = new TransactionDescription(1, TransactionState.ONGOING, producerId, (int)producerEpoch, 60000L, OptionalLong.of(lastTimestamp), Collections.singleton(topicPartition));
        this.expectDescribeTransactions(Collections.singletonMap(transactionalId, description));
        this.execute(args);
        this.assertNormalExit();
        this.assertNoHangingTransactions();
    }

    private void execute(String[] args) throws Exception {
        TransactionsCommand.execute((String[])args, ns -> this.admin, (PrintStream)this.out, (Time)this.time);
    }

    private List<List<String>> readOutputAsTable() throws IOException {
        List<String> row;
        ArrayList<List<String>> table = new ArrayList<List<String>>();
        ByteArrayInputStream inputStream = new ByteArrayInputStream(this.outputStream.toByteArray());
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
        while ((row = this.readRow(reader)) != null) {
            table.add(row);
        }
        return table;
    }

    private List<String> readRow(BufferedReader reader) throws IOException {
        String line = reader.readLine();
        if (line == null) {
            return null;
        }
        return Arrays.asList(line.split("\\s+"));
    }

    private void assertNormalExit() {
        Assertions.assertTrue((boolean)this.exitProcedure.hasExited());
        Assertions.assertEquals((int)0, (int)this.exitProcedure.statusCode());
    }

    private void assertCommandFailure(String[] args) throws Exception {
        this.execute(args);
        Assertions.assertTrue((boolean)this.exitProcedure.hasExited());
        Assertions.assertEquals((int)1, (int)this.exitProcedure.statusCode());
    }
}

