/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 0x100000;
    private static final short ACKS_ALL = -1;
    private static final int MAX_RETRIES = 0;
    private static final String CLIENT_ID = "clientId";
    private static final String METRIC_GROUP = "producer-metrics";
    private static final double EPS = 1.0E-4;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 1000;
    private TopicPartition tp = new TopicPartition("test", 0);
    private MockTime time = new MockTime();
    private MockClient client = new MockClient(this.time);
    private int batchSize = 16384;
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE);
    private Cluster cluster = TestUtils.singletonCluster("test", 1);
    private Metrics metrics = new Metrics((Time)this.time);
    Map<String, String> metricTags = new LinkedHashMap<String, String>();
    private RecordAccumulator accumulator = new RecordAccumulator(this.batchSize, 0x100000L, CompressionType.NONE, 0L, 0L, this.metrics, (Time)this.time, this.metricTags);
    private Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, 0x100000, -1, 0, this.metrics, (Time)this.time, "clientId", 1000);

    @Before
    public void setup() {
        this.metadata.update(this.cluster, this.time.milliseconds());
        this.metricTags.put("client-id", CLIENT_ID);
    }

    @After
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testSimple() throws Exception {
        long offset = 0L;
        FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
        this.sender.run(this.time.milliseconds());
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"We should have a single produce request in flight.", (long)1L, (long)this.client.inFlightRequestCount());
        this.client.respond(this.produceResponse(this.tp, offset, Errors.NONE.code(), 0));
        this.sender.run(this.time.milliseconds());
        Assert.assertEquals((String)"All requests completed.", (long)offset, (long)this.client.inFlightRequestCount());
        this.sender.run(this.time.milliseconds());
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        long offset = 0L;
        for (int i = 1; i <= 3; ++i) {
            FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
            this.sender.run(this.time.milliseconds());
            this.client.respond(this.produceResponse(this.tp, 0L, Errors.NONE.code(), 100 * i));
            this.sender.run(this.time.milliseconds());
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(new MetricName("produce-throttle-time-avg", METRIC_GROUP, "", this.metricTags));
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(new MetricName("produce-throttle-time-max", METRIC_GROUP, "", this.metricTags));
        Assert.assertEquals((double)200.0, (double)avgMetric.value(), (double)1.0E-4);
        Assert.assertEquals((double)300.0, (double)maxMetric.value(), (double)1.0E-4);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetries() throws Exception {
        int maxRetries = 1;
        try (Metrics m = new Metrics();){
            Sender sender = new Sender((KafkaClient)this.client, this.metadata, this.accumulator, 0x100000, -1, maxRetries, m, (Time)this.time, CLIENT_ID, 1000);
            FutureRecordMetadata future = this.accumulator.append((TopicPartition)this.tp, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            String id = this.client.requests().peek().request().destination();
            Node node = new Node(Integer.valueOf(id).intValue(), "localhost", 0);
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            Assert.assertTrue((String)"Client ready status should be true", (boolean)this.client.isReady(node, 0L));
            this.client.disconnect(id);
            Assert.assertEquals((long)0L, (long)this.client.inFlightRequestCount());
            Assert.assertFalse((String)"Client ready status should be false", (boolean)this.client.isReady(node, 0L));
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            sender.run(this.time.milliseconds());
            Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount());
            long offset = 0L;
            this.client.respond(this.produceResponse(this.tp, offset, Errors.NONE.code(), 0));
            sender.run(this.time.milliseconds());
            Assert.assertTrue((String)"Request should have retried and completed", (boolean)future.isDone());
            Assert.assertEquals((long)offset, (long)((RecordMetadata)future.get()).offset());
            future = this.accumulator.append((TopicPartition)this.tp, (byte[])"key".getBytes(), (byte[])"value".getBytes(), null, (long)1000L).future;
            sender.run(this.time.milliseconds());
            for (int i = 0; i < maxRetries + 1; ++i) {
                this.client.disconnect(this.client.requests().peek().request().destination());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
                sender.run(this.time.milliseconds());
            }
            sender.run(this.time.milliseconds());
            this.completedWithError((Future<RecordMetadata>)future, Errors.NETWORK_EXCEPTION);
        }
    }

    private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
        Assert.assertTrue((String)"Request should be completed", (boolean)future.isDone());
        try {
            future.get();
            Assert.fail((String)"Should have thrown an exception.");
        }
        catch (ExecutionException e) {
            Assert.assertEquals(error.exception().getClass(), e.getCause().getClass());
        }
    }

    private Struct produceResponse(TopicPartition tp, long offset, int error, int throttleTimeMs) {
        ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse((short)error, offset);
        Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
        ProduceResponse response = new ProduceResponse(partResp, throttleTimeMs);
        return response.toStruct();
    }
}

