/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.kafka.rpc;

import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={ClusterTest.class})
public class MetadataRequestTest {
    private ChannelBuilder channelBuilder;
    private Selector selector;
    String node = "1";
    private static final int BUFFER_SIZE = 4096;
    private static Time time = Time.SYSTEM;

    @Before
    public void setup() {
        Properties props = new Properties();
        props.setProperty("client.id", "MetadataRequestTest");
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("max.in.flight.requests.per.connection", "1");
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig((Properties)props, (Serializer)new StringSerializer(), (Serializer)new StringSerializer()));
        this.channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)SecurityProtocol.PLAINTEXT, (JaasContext.Type)JaasContext.Type.CLIENT, (AbstractConfig)config, null, (String)"GSSAPI", (boolean)true);
        this.selector = new Selector(5000L, new Metrics(), time, "MetricGroup", this.channelBuilder, new LogContext());
        InetSocketAddress addr = new InetSocketAddress("localhost", 9092);
        try {
            this.selector.connect(this.node, addr, 4096, 4096);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void testAllTopics() {
        RequestHeader header = new RequestHeader(ApiKeys.METADATA, 5, "metaclient", 1);
        MetadataRequest request = new MetadataRequest.Builder(null, true).build((short)5);
        this.selector.send(request.toSend(this.node, header));
        ByteBuffer responseBuffer = null;
        try {
            responseBuffer = this.waitForResponse();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        ResponseHeader.parse((ByteBuffer)responseBuffer);
        MetadataResponse response = MetadataResponse.parse((ByteBuffer)responseBuffer, (short)5);
        System.out.println("\nResponse received!\n");
        for (Node node : response.brokers()) {
            System.err.println("\nBroker id : " + node.idString() + "\t host: " + node.host() + "\t port: " + node.port() + "\t rack: " + node.rack());
        }
        System.out.println("\nCluster id : " + response.clusterId());
        for (MetadataResponse.TopicMetadata topicMeta : response.topicMetadata()) {
            System.out.println("\nTopic: " + topicMeta.topic());
            for (MetadataResponse.PartitionMetadata pMeta : topicMeta.partitionMetadata()) {
                System.out.println("Partition:" + pMeta.partition() + " Leader: " + pMeta.leaderId());
            }
        }
    }

    @Test
    public void testSomeTopics() {
        RequestHeader header = new RequestHeader(ApiKeys.METADATA, 5, "metaclient", 1);
        ArrayList<String> topicList = new ArrayList<String>();
        topicList.add("topic0");
        topicList.add("topic1");
        topicList.add("topic2");
        MetadataRequest request = new MetadataRequest.Builder(topicList, true).build((short)5);
        this.selector.send(request.toSend(this.node, header));
        ByteBuffer responseBuffer = null;
        try {
            responseBuffer = this.waitForResponse();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        ResponseHeader.parse((ByteBuffer)responseBuffer);
        MetadataResponse response = MetadataResponse.parse((ByteBuffer)responseBuffer, (short)5);
        System.out.println("\nResponse received!\n");
        for (Node node : response.brokers()) {
            System.err.println("\nBroker id : " + node.idString() + "\t host: " + node.host() + "\t port: " + node.port() + "\t rack: " + node.rack());
        }
        System.out.println("\nCluster id : " + response.clusterId());
        for (MetadataResponse.TopicMetadata topicMeta : response.topicMetadata()) {
            System.out.println("\nTopic: " + topicMeta.topic() + " err: " + topicMeta.error());
            for (MetadataResponse.PartitionMetadata pMeta : topicMeta.partitionMetadata()) {
                System.out.println("Partition:" + pMeta.partition() + " error:" + pMeta.error() + " Leader: " + pMeta.leaderId());
            }
        }
    }

    @Test
    public void testParallelRequests() {
        new MetadataRequestThread("META-1", "1");
        new MetadataRequestThread("META-2", "2");
        new MetadataRequestThread("META-3", "3");
        new MetadataRequestThread("META-4", "4");
        new MetadataRequestThread("META-5", "5");
        try {
            Thread.sleep(15000L);
        }
        catch (InterruptedException e) {
            System.out.println("Main thread Interrupted");
        }
        System.out.println("Main thread exiting.");
    }

    private ByteBuffer waitForResponse() throws IOException {
        int waitSeconds = 10;
        do {
            this.selector.poll(1000L);
        } while (this.selector.completedReceives().isEmpty() && waitSeconds-- > 0);
        Assert.assertEquals((long)1L, (long)this.selector.completedReceives().size());
        return ((NetworkReceive)this.selector.completedReceives().get(0)).payload();
    }

    class MetadataRequestThread
    implements Runnable {
        String name;
        String node;
        Thread t;

        MetadataRequestThread(String threadname, String id) {
            this.name = threadname;
            this.node = id;
            this.t = new Thread((Runnable)this, this.name);
            System.out.println("New thread: " + this.t);
            this.t.start();
        }

        @Override
        public void run() {
            try {
                Properties tprops = new Properties();
                tprops.setProperty("client.id", this.name);
                tprops.setProperty("bootstrap.servers", "localhost:9092");
                tprops.setProperty("max.in.flight.requests.per.connection", "1");
                ProducerConfig tConfig = new ProducerConfig(ProducerConfig.addSerializerToConfig((Properties)tprops, (Serializer)new StringSerializer(), (Serializer)new StringSerializer()));
                ChannelBuilder tChannelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)SecurityProtocol.PLAINTEXT, (JaasContext.Type)JaasContext.Type.CLIENT, (AbstractConfig)tConfig, null, (String)"GSSAPI", (boolean)true);
                Selector tSelector = new Selector(5000L, new Metrics(), time, "MetricGroup", tChannelBuilder, new LogContext());
                InetSocketAddress tAddr = new InetSocketAddress("localhost", 9092);
                try {
                    tSelector.connect(this.node, tAddr, 4096, 4096);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
                RequestHeader header = new RequestHeader(ApiKeys.METADATA, 5, this.name, 1);
                ArrayList<String> topicList = new ArrayList<String>();
                topicList.add("topic0");
                topicList.add("topic1");
                topicList.add("topic2");
                MetadataRequest request = new MetadataRequest.Builder(topicList, true).build((short)5);
                tSelector.send(request.toSend(this.node, header));
                ByteBuffer responseBuffer = null;
                try {
                    responseBuffer = this.waitForResponse(tSelector);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
                ResponseHeader.parse((ByteBuffer)responseBuffer);
                MetadataResponse response = MetadataResponse.parse((ByteBuffer)responseBuffer, (short)5);
                System.out.println("\nResponse received! for " + this.name + "\n");
                for (Node n : response.brokers()) {
                    System.err.println("\nBroker id : " + n.idString() + "\t host: " + n.host() + "\t port: " + n.port() + "\t rack: " + n.rack());
                }
                System.out.println("\nCluster id : " + response.clusterId());
                for (MetadataResponse.TopicMetadata topicMeta : response.topicMetadata()) {
                    System.out.println("\n" + this.name + "- Topic: " + topicMeta.topic() + " err: " + topicMeta.error());
                    for (MetadataResponse.PartitionMetadata pMeta : topicMeta.partitionMetadata()) {
                        System.out.println(this.name + "- Partition:" + pMeta.partition() + " error:" + pMeta.error() + " Leader: " + pMeta.leaderId());
                    }
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(this.name + " exiting.");
        }

        private ByteBuffer waitForResponse(Selector s) throws IOException {
            int waitSeconds = 10;
            do {
                s.poll(1000L);
            } while (s.completedReceives().isEmpty() && waitSeconds-- > 0);
            Assert.assertEquals((long)1L, (long)s.completedReceives().size());
            return ((NetworkReceive)s.completedReceives().get(0)).payload();
        }
    }
}

