/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.kafka.eventstreams.impl.producer;

import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.ShimLoader;
import com.mapr.fs.jni.MarlinJniListener;
import com.mapr.fs.jni.MarlinJniProducer;
import com.mapr.fs.jni.MarlinProducerResult;
import com.mapr.kafka.eventstreams.impl.listener.NativeDataParserV10;
import com.mapr.kafka.eventstreams.impl.producer.MarlinProducerImpl;
import com.mapr.kafka.eventstreams.impl.producer.MarlinProducerImplV10;
import com.mapr.kafka.eventstreams.impl.producer.MarlinProducerResultImplV10;
import com.mapr.kafka.eventstreams.impl.tools.MockUtil;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PowerMockIgnore(value={"jdk.internal.reflect.*", "javax.management.*", "javax.xml.*", "org.apache.xerces.*", "org.w3c.*", "org.apache.hadoop.fs.FileSystem$Cache$Key"})
@PrepareForTest(value={MapRFileSystem.class, MarlinJniProducer.class, NativeDataParserV10.class})
@SuppressStaticInitializationFor(value={"org.apache.hadoop.conf.Configuration", "org.apache.hadoop.fs.FileSystem", "com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils"})
public class ProducerMetricsTest {
    private static Serializer<byte[]> serializer = new ByteArraySerializer();
    private static ProducerConfig config = ProducerMetricsTest.getProducerConfig(serializer);
    private static MarlinProducerImpl producer;
    private static MBeanServer mBeanServer;
    private static ObjectName producerMetricsMBean;
    private static ObjectName producerTopicMetricsMBean;
    private static String topic;
    private static String key;
    private static String value;
    private static String headerKey;
    private static String headerValue;

    @BeforeClass
    public static void staticSetUp() {
        MarlinJniListener.class.getDeclaredMethods();
    }

    @Before
    public void setUp() throws Exception {
        mBeanServer = ManagementFactory.getPlatformMBeanServer();
        producerMetricsMBean = new ObjectName("kafka.producer:client-id=producer-1,type=producer-metrics");
        producerTopicMetricsMBean = new ObjectName("kafka.producer:client-id=producer-1,type=producer-topic-metrics,topic=\"" + topic + "\"");
        MockUtil.mockProducerNativeMethods();
        MockUtil.mockMapRFileSystem();
        PowerMock.suppress((Method)ShimLoader.class.getMethod("load", new Class[0]));
        PowerMock.replayAll((Object[])new Object[0]);
        producer = new MarlinProducerImplV10(config);
    }

    @Test
    public void testSendMetrics() throws Exception {
        ProducerRecord record1 = new ProducerRecord(topic, (Object)key.getBytes(StandardCharsets.UTF_8), (Object)value.getBytes(StandardCharsets.UTF_8));
        record1.headers().add(headerKey, headerValue.getBytes(StandardCharsets.UTF_8));
        ProducerRecord record2 = new ProducerRecord(topic, null, null);
        int expectedRecordSize1 = topic.length() + key.length() + value.length() + headerKey.length() + headerValue.length();
        int expectedRecordSize2 = topic.length();
        this.send((ProducerRecord<byte[], byte[]>)record1);
        this.send((ProducerRecord<byte[], byte[]>)record2);
        producer.close();
        double recordSendRate = this.getJmxProducerMetric(producerMetricsMBean, "record-send-rate");
        double recordSendTotal = this.getJmxProducerMetric(producerMetricsMBean, "record-send-total");
        double recordSizeAvg = this.getJmxProducerMetric(producerMetricsMBean, "record-size-avg");
        double recordSizeMax = this.getJmxProducerMetric(producerMetricsMBean, "record-size-max");
        double byteRate = this.getJmxProducerMetric(producerTopicMetricsMBean, "byte-rate");
        double byteTotal = this.getJmxProducerMetric(producerTopicMetricsMBean, "byte-total");
        double recordSendRatePerTopic = this.getJmxProducerMetric(producerTopicMetricsMBean, "record-send-rate");
        double recordSendTotalPerTopic = this.getJmxProducerMetric(producerTopicMetricsMBean, "record-send-total");
        Assert.assertEquals((double)((expectedRecordSize1 + expectedRecordSize2) / 2), (double)recordSizeAvg, (double)0.1);
        Assert.assertEquals((double)expectedRecordSize1, (double)recordSizeMax, (double)0.1);
        Assert.assertEquals((double)(expectedRecordSize1 + expectedRecordSize2), (double)byteTotal, (double)0.1);
        Assert.assertEquals((double)2.0, (double)recordSendTotal, (double)0.1);
        Assert.assertEquals((double)2.0, (double)recordSendTotalPerTopic, (double)0.1);
        Assert.assertNotEquals((Object)0, (Object)recordSendRate);
        Assert.assertNotEquals((Object)0, (Object)byteRate);
        Assert.assertNotEquals((Object)0, (Object)recordSendRatePerTopic);
    }

    @Test
    public void testErrorMetrics() throws Exception {
        MarlinProducerResult[] results = new MarlinProducerResult[]{new MarlinProducerResultImplV10(topic, -1, null, key.length(), value.length())};
        producer.handleJniCallbacks(1, new long[0], new long[0], results, -1, 13);
        double recordErrorRate = this.getJmxProducerMetric(producerMetricsMBean, "record-error-rate");
        double recordErrorTotal = this.getJmxProducerMetric(producerMetricsMBean, "record-error-total");
        double recordErrorRatePerTopic = this.getJmxProducerMetric(producerTopicMetricsMBean, "record-error-rate");
        double recordErrorTotalPerTopic = this.getJmxProducerMetric(producerTopicMetricsMBean, "record-error-total");
        Assert.assertEquals((double)1.0, (double)recordErrorTotal, (double)0.1);
        Assert.assertEquals((double)1.0, (double)recordErrorTotalPerTopic, (double)0.1);
        Assert.assertNotEquals((Object)0, (Object)recordErrorRate);
        Assert.assertNotEquals((Object)0, (Object)recordErrorRatePerTopic);
    }

    private static ProducerConfig getProducerConfig(Serializer serializer) {
        String serializerClassName = serializer.getClass().getName();
        Properties props = new Properties();
        props.setProperty("key.serializer", serializerClassName);
        props.setProperty("value.serializer", serializerClassName);
        props.setProperty("metrics.enabled", Boolean.toString(true));
        return new ProducerConfig(props);
    }

    private void send(ProducerRecord<byte[], byte[]> record) {
        byte[] key = serializer.serialize(record.topic(), (Object)((byte[])record.key()));
        byte[] value = serializer.serialize(record.topic(), (Object)((byte[])record.value()));
        MarlinProducerResult[] results = new MarlinProducerResult[]{new MarlinProducerResultImplV10(topic, -1, null, key != null ? key.length : 0, value != null ? value.length : 0)};
        producer.send(record, -1, key, value, null);
        producer.handleJniCallbacks(1, new long[0], new long[0], results, -1, 0);
    }

    private double getJmxProducerMetric(ObjectName bean, String metric) throws Exception {
        return (Double)mBeanServer.getAttribute(bean, metric);
    }

    static {
        topic = "/s:t";
        key = "recordKey";
        value = "recordValue";
        headerKey = "hKey";
        headerValue = "hValue";
    }
}

