/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ListenerImpersonationTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerImpersonationTest.class);
    private static final String STREAM = "/jtest-" + ListenerImpersonationTest.class.getSimpleName();
    private static final String IMPERSONATION_STREAM = "/tmp/jtest-ListenerImpersonation";
    private static final String TOPIC = "testtopic";
    private static Admin madmin;
    private static Admin impersonationMadmin;
    private static Properties props;
    public static int msgValueLength;
    public static final byte[] value;
    public static final byte[] key;
    public static int numPartitions;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Before
    public void setupTable() throws Exception {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(numPartitions);
        sdesc.setAutoCreateTopics(true);
        madmin.createStream(STREAM, sdesc);
    }

    @After
    public void cleanupTest() throws Exception {
        madmin.deleteStream(STREAM);
    }

    private static UserGroupInformation createUser(String user) throws IOException {
        return UserGroupInformation.createRemoteUser((String)user);
    }

    private void testSend(boolean successIsExpected) throws IOException {
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), (Object)true);
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        props.put(cdef.getBufferTime(), (Object)500);
        KafkaProducer producer = new KafkaProducer(props);
        ProducerRecord record = new ProducerRecord("/tmp/jtest-ListenerImpersonation:testtopic", Integer.valueOf(1), (Object)key, (Object)value);
        TestCallback callback = new TestCallback(1, false);
        Future future = producer.send(record, (Callback)callback);
        try {
            future.get();
        }
        catch (Exception e) {
            _logger.error(e.getMessage());
        }
        if (successIsExpected) {
            Assert.assertTrue((boolean)callback.verify());
        } else {
            Assert.assertFalse((boolean)callback.verify());
        }
        producer.close();
        producer.flush();
        producer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testConsumerImpersonation(final boolean isPartOfGroup) throws Exception {
        String user = "m7user1";
        UserGroupInformation ugi = ListenerImpersonationTest.createUser("m7user1");
        Configuration conf = new Configuration();
        impersonationMadmin = Streams.newAdmin((Configuration)conf);
        try {
            impersonationMadmin.deleteStream(IMPERSONATION_STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(numPartitions);
        impersonationMadmin.createStream(IMPERSONATION_STREAM, sdesc);
        int numOfIterations = 2;
        for (int i = 0; i < 2; ++i) {
            this.testSend(true);
        }
        try {
            ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Void>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public Void run() throws Exception {
                    Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
                    props = new Properties();
                    props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                    props.put("fetch.min.bytes", "1");
                    props.put("auto.offset.reset", "earliest");
                    if (isPartOfGroup) {
                        props.put("group.id", "4004");
                    }
                    ArrayList<String> topics = new ArrayList<String>();
                    KafkaConsumer consumer = new KafkaConsumer(props);
                    String topic = "/tmp/jtest-ListenerImpersonation:testtopic";
                    topics.add(topic);
                    int msgCount = 0;
                    boolean exception = false;
                    try {
                        consumer.subscribe(topics);
                        _logger.debug("subscribed to topic" + topic);
                        ConsumerRecords consumerRecords = consumer.poll(1000L);
                        for (ConsumerRecord consumerRecord : consumerRecords) {
                        }
                        msgCount = consumerRecords.count();
                        consumer.close();
                    }
                    catch (Exception ie) {
                        exception = true;
                        ie.printStackTrace();
                    }
                    finally {
                        if (consumer != null) {
                            consumer.close();
                        }
                    }
                    _logger.info("Total messages consumed: " + msgCount);
                    Assert.assertEquals((long)0L, (long)msgCount);
                    return null;
                }
            });
        }
        catch (Exception e) {
            _logger.error(e.getMessage());
        }
    }

    @Test
    public void testConsumerWithGroup() throws Exception {
        try {
            this.testConsumerImpersonation(true);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testConsumerWithoutGroup() throws Exception {
        try {
            this.testConsumerImpersonation(false);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    static {
        msgValueLength = 200;
        value = new byte[msgValueLength];
        key = "abc".getBytes();
        numPartitions = 10;
    }

    private static final class TestCallback
    implements Callback {
        private boolean error;
        private int expectedFeedID;
        private Exception exceptionReceived;
        private RecordMetadata metadataReceived;
        private boolean callbackCompleted;
        private boolean checkfeedID;

        public TestCallback(boolean errors) {
            this.checkfeedID = false;
            this.error = errors;
            this.callbackCompleted = false;
        }

        public TestCallback(int feedid, boolean errors) {
            this.checkfeedID = true;
            this.expectedFeedID = feedid;
            this.error = errors;
            this.callbackCompleted = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            this.exceptionReceived = exception;
            this.metadataReceived = metadata;
            TestCallback testCallback = this;
            synchronized (testCallback) {
                this.callbackCompleted = true;
                try {
                    this.notifyAll();
                }
                catch (Exception e) {
                    _logger.error(e.getMessage());
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean verify() {
            TestCallback testCallback = this;
            synchronized (testCallback) {
                if (!this.callbackCompleted) {
                    try {
                        this.wait();
                    }
                    catch (Exception e) {
                        _logger.error(e.getMessage());
                    }
                }
            }
            boolean verified = true;
            if (this.error && this.exceptionReceived == null) {
                _logger.error("Did not get exception when expected");
                verified = false;
            } else if (!this.error && this.exceptionReceived != null) {
                _logger.error("Received exception " + this.exceptionReceived + " but expected none");
                verified = false;
            }
            if (this.checkfeedID && this.expectedFeedID != this.metadataReceived.partition()) {
                _logger.error("Received partition " + this.metadataReceived.partition() + " but expected " + this.expectedFeedID);
                verified = false;
            }
            return verified;
        }
    }
}

