/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.kafka.eventstreams.impl.listener;

import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.jni.MarlinJniListener;
import com.mapr.fs.jni.NativeData;
import com.mapr.fs.proto.Dbserver;
import com.mapr.kafka.eventstreams.impl.listener.ListenerRecord;
import com.mapr.kafka.eventstreams.impl.listener.MarlinListenerImpl;
import com.mapr.kafka.eventstreams.impl.listener.MarlinListenerImplV10;
import com.mapr.kafka.eventstreams.impl.listener.NativeDataParserV10;
import com.mapr.kafka.eventstreams.impl.tools.MockUtil;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.core.classloader.annotations.SuppressStaticInitializationFor;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PowerMockIgnore(value={"jdk.internal.reflect.*", "javax.management.*", "javax.xml.*", "org.apache.xerces.*", "org.w3c.*", "org.apache.hadoop.fs.FileSystem$Cache$Key"})
@PrepareForTest(value={MapRFileSystem.class, MarlinJniListener.class, NativeDataParserV10.class, MarlinListenerImplV10.class})
@SuppressStaticInitializationFor(value={"org.apache.hadoop.conf.Configuration", "org.apache.hadoop.fs.FileSystem", "com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils"})
public class ListenerMerticsTest {
    private static Deserializer deserializer = new StringDeserializer();
    private static ConsumerConfig config = ListenerMerticsTest.getConsumerConfig(deserializer);
    private static MBeanServer mBeanServer;
    private static ObjectName consumerFetchMetricsMBean;
    private static MarlinListenerImpl listener;
    private static String topic;
    private static String key;
    private static String value;
    private static String headerKey;
    private static String headerValue;
    private static List<ListenerRecord> recordsToFetch;

    @Before
    public void setUp() throws Exception {
        mBeanServer = ManagementFactory.getPlatformMBeanServer();
        consumerFetchMetricsMBean = new ObjectName("kafka.consumer:client-id=consumer--1,type=consumer-fetch-manager-metrics");
        MockUtil.mockMapRFileSystem();
        MockUtil.mockListenerNativeMethods();
        HashMap<TopicPartition, List<ListenerRecord>> records = new HashMap<TopicPartition, List<ListenerRecord>>();
        records.put(new TopicPartition(topic, 0), recordsToFetch);
        NativeDataParserV10 nativeDataParser = (NativeDataParserV10)EasyMock.mock(NativeDataParserV10.class);
        EasyMock.expect((Object)nativeDataParser.parseListenerRecords(EasyMock.anyBoolean())).andReturn(records).anyTimes();
        PowerMock.expectNew(NativeDataParserV10.class, (Class[])new Class[]{NativeData.class}, (Object[])new Object[]{EasyMock.anyObject(NativeData.class)}).andReturn((Object)nativeDataParser).anyTimes();
        PowerMock.replay((Object[])new Object[]{nativeDataParser});
        PowerMock.replayAll((Object[])new Object[0]);
        listener = new MarlinListenerImplV10(config, null, Dbserver.CDCOpenFormatType.COFT_NONE);
    }

    @Test
    public void testFetchMetrics() throws Exception {
        RecordHeaders headers = new RecordHeaders(new Header[]{new RecordHeader(headerKey, headerValue.getBytes(StandardCharsets.UTF_8))});
        ListenerRecord record1 = new ListenerRecord(topic, -1, 0L, 0L, 0, key.getBytes(StandardCharsets.UTF_8), value.getBytes(StandardCharsets.UTF_8), (Headers)headers, "p");
        ListenerRecord record2 = new ListenerRecord(topic, -1, 0L, 0L, 0, null, null, (Headers)new RecordHeaders(), "p");
        int recordSize1 = this.estimatedRecordSize(record1);
        int recordSize2 = this.estimatedRecordSize(record2);
        this.pollRecord(record1);
        this.pollRecord(record2);
        double fetchSizeAvg = ListenerMerticsTest.getJmxMetric("fetch-size-avg");
        double fetchSizeMax = ListenerMerticsTest.getJmxMetric("fetch-size-max");
        double bytesConsumedRate = ListenerMerticsTest.getJmxMetric("bytes-consumed-rate");
        double bytesConsumedTotal = ListenerMerticsTest.getJmxMetric("bytes-consumed-total");
        double recordsConsumedRate = ListenerMerticsTest.getJmxMetric("records-consumed-rate");
        double recordsConsumedTotal = ListenerMerticsTest.getJmxMetric("records-consumed-total");
        Assert.assertEquals((double)((recordSize1 + recordSize2) / 2), (double)fetchSizeAvg, (double)0.1);
        Assert.assertEquals((double)recordSize1, (double)fetchSizeMax, (double)0.1);
        Assert.assertEquals((double)(recordSize1 + recordSize2), (double)bytesConsumedTotal, (double)0.1);
        Assert.assertEquals((double)2.0, (double)recordsConsumedTotal, (double)0.1);
        Assert.assertNotEquals((Object)0, (Object)bytesConsumedRate);
        Assert.assertNotEquals((Object)0, (Object)recordsConsumedRate);
    }

    private void pollRecord(ListenerRecord record) {
        recordsToFetch.add(record);
        listener.poll(100L);
        recordsToFetch.remove(record);
    }

    private int estimatedRecordSize(ListenerRecord record) {
        int topicSize = record.topic() == null ? 0 : record.topic().length();
        int key = record.key() == null ? 0 : record.key().length;
        int value = record.value() == null ? 0 : record.value().length;
        int headersSize = Arrays.stream(record.headers().toArray()).mapToInt(h -> h.key().length() + h.value().length).sum();
        return topicSize + key + value + headersSize;
    }

    private static ConsumerConfig getConsumerConfig(Deserializer deserializer) {
        String keyDeserializerClassName = deserializer.getClass().getName();
        String valueDeserializerClassName = deserializer.getClass().getName();
        Properties props = new Properties();
        props.setProperty("key.deserializer", keyDeserializerClassName);
        props.setProperty("value.deserializer", valueDeserializerClassName);
        props.setProperty("metrics.enabled", Boolean.toString(true));
        return new ConsumerConfig(props);
    }

    private static double getJmxMetric(String metric) throws Exception {
        return (Double)mBeanServer.getAttribute(consumerFetchMetricsMBean, metric);
    }

    static {
        topic = "/s:t";
        key = "recordKey";
        value = "recordValue";
        headerKey = "hKey";
        headerValue = "hValue";
        recordsToFetch = new ArrayList<ListenerRecord>();
    }
}

