/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.google.protobuf.ByteString;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.listener.MarlinListener;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ListenerJoin
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerJoin.class);
    private static final String STREAM = "/jtest-" + ListenerJoin.class.getSimpleName();
    private static Admin madmin;
    private static final int numPartitions = 10;

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(10);
        madmin.createStream(STREAM, sdesc);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testJoin() throws IOException {
        int i;
        Thread[] threads = new Thread[5];
        for (i = 0; i < 3; ++i) {
            threads[i] = new Thread(new Joiner(false, i));
            threads[i].start();
        }
        threads[i] = new Thread(new Joiner(true, i));
        threads[i].start();
        threads[i + 1] = new Thread(new Joiner(false, i + 1));
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException e) {
            System.out.println("interrupted");
        }
        threads[i + 1].start();
        try {
            for (i = 0; i < 5; ++i) {
                threads[i].join();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }
    }

    public class Joiner
    implements Runnable {
        private Properties props;
        private MarlinListener listener;
        public JoinTestCallback cb = new JoinTestCallback();
        final Lock lock = new ReentrantLock();
        final Condition condition = this.lock.newCondition();
        boolean joinComplete;
        boolean rejoinNeeded;
        private Marlinserver.JoinGroupDesc desc;
        private Marlinserver.WorkerState ws;
        ByteString bstrMetadata;
        Marlinserver.JoinGroupResponse resp;
        private Map<String, Marlinserver.WorkerState> wsMap;
        Marlinserver.JoinGroupInfo joinInfo;
        KafkaProducer producer;
        KafkaConsumer consumer;
        final String groupId = "testgroup";
        final String syncTopic = STREAM + ":__mapr__testgroup_assignment";
        boolean leaveEarly;

        public Joiner(boolean l, int i) {
            this.props = new Properties();
            this.props.put("group.id", "testgroup");
            this.props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
            this.props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            this.props.put("auto.offset.reset", "earliest");
            this.props.put("streams.consumer.default.stream", STREAM);
            GenericHFactory configFactory = new GenericHFactory();
            ConsumerConfig config = (ConsumerConfig)configFactory.getImplementorInstance("org.apache.kafka.clients.consumer.ConsumerConfig", new Object[]{this.props}, new Class[]{Map.class});
            this.listener = new MarlinListener(config, null, null);
            this.joinComplete = false;
            this.rejoinNeeded = false;
            this.ws = Marlinserver.WorkerState.newBuilder().setUrl("worker_url_" + i).setOffset(1000L).build();
            this.bstrMetadata = this.ws.toByteString();
            this.desc = Marlinserver.JoinGroupDesc.newBuilder().setProtocolType("connect").addMemberProtocols(Marlinserver.MemberProtocol.newBuilder().setProtocol("someprotocol").setMemberMetadata(this.bstrMetadata).build()).build();
            this.leaveEarly = l;
            this.props.remove("group.id");
            this.props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
            this.consumer = new KafkaConsumer(this.props);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block14: while (true) {
                this.joinComplete = false;
                this.rejoinNeeded = false;
                this.resp = this.listener.join(this.desc, (MarlinListener.MarlinJoinCallback)this.cb);
                System.out.println("memberid from join API is " + this.resp.getMemberId());
                this.desc = Marlinserver.JoinGroupDesc.newBuilder().mergeFrom(this.desc).setMemberId(this.resp.getMemberId()).build();
                if (this.leaveEarly) {
                    try {
                        Thread.sleep(10000L);
                        System.out.println("leaver memberid is " + this.resp.getMemberId());
                        this.listener.close();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return;
                }
                try {
                    this.lock.lock();
                    while (!this.joinComplete && !this.rejoinNeeded) {
                        boolean continueWaiting = this.condition.await(60L, TimeUnit.SECONDS);
                        if (continueWaiting) continue;
                        System.out.println("join/rejoin time expired");
                        return;
                    }
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println("interrupted");
                }
                finally {
                    this.lock.unlock();
                }
                if (!this.joinComplete) continue;
                int i = 0;
                while (true) {
                    this.consumer.subscribe(Arrays.asList(this.syncTopic));
                    ConsumerRecords records = this.consumer.poll(15000L);
                    System.out.println("past consumer poll" + i);
                    Long lastSeen = 0L;
                    for (ConsumerRecord record : records) {
                        System.out.println("consumer record..generation ID " + record.key());
                        lastSeen = (Long)record.key();
                    }
                    this.lock.lock();
                    if (this.rejoinNeeded) {
                        this.lock.unlock();
                        break;
                    }
                    this.lock.unlock();
                    if (this.joinInfo.getGroupGenerationId() == lastSeen.longValue()) break;
                    ++i;
                }
                try {
                    this.lock.lock();
                    while (true) {
                        if (this.rejoinNeeded) continue block14;
                        boolean continueWaiting = this.condition.await(60L, TimeUnit.SECONDS);
                        if (continueWaiting) continue;
                        System.out.println("rejoin time expired");
                        return;
                    }
                }
                catch (InterruptedException e) {
                    System.out.println("interrupted");
                    continue;
                }
                finally {
                    this.lock.unlock();
                    continue;
                }
                break;
            }
        }

        public class JoinTestCallback
        implements MarlinListener.MarlinJoinCallback {
            public void onJoin(Marlinserver.JoinGroupInfo jgi) {
                if (jgi.getGroupLeaderId().equalsIgnoreCase(Joiner.this.resp.getMemberId())) {
                    try {
                        System.out.println("leader id is " + jgi.getGroupLeaderId());
                        Joiner.this.wsMap = new HashMap<String, Marlinserver.WorkerState>();
                        for (int i = 0; i < jgi.getMembersCount(); ++i) {
                            Joiner.this.wsMap.put(jgi.getMembers(i).getMemberId(), ((Marlinserver.WorkerState.Builder)Marlinserver.WorkerState.newBuilder().mergeFrom(jgi.getMembers(i).getMemberMetadata())).build());
                        }
                        Joiner.this.producer = new KafkaProducer(Joiner.this.props);
                        Marlinserver.GroupAssignment.Builder gaBuilder = Marlinserver.GroupAssignment.newBuilder().setGroupGenerationId(jgi.getGroupGenerationId());
                        String leaderUrl = Joiner.this.ws.getUrl();
                        for (Map.Entry<String, Marlinserver.WorkerState> e : Joiner.this.wsMap.entrySet()) {
                            Marlinserver.WorkerAssignment wa = Marlinserver.WorkerAssignment.newBuilder().setLeaderURL(leaderUrl).setLeaderId(jgi.getGroupLeaderId()).build();
                            Marlinserver.MemberState ms = Marlinserver.MemberState.newBuilder().setMemberId(e.getKey()).setMemberAssignment(wa.toByteString()).build();
                            gaBuilder.addMemberState(ms);
                            System.out.println("Member " + e.getKey() + " url is " + e.getValue().getUrl() + " offset is " + e.getValue().getOffset());
                        }
                        Marlinserver.GroupAssignment ga = gaBuilder.build();
                        System.out.println("producing generation id " + jgi.getGroupGenerationId());
                        Joiner.this.producer.send(new ProducerRecord(Joiner.this.syncTopic, (Object)jgi.getGroupGenerationId(), (Object)ga.toByteArray()));
                        Joiner.this.producer.close();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                Joiner.this.joinInfo = jgi;
                Joiner.this.lock.lock();
                Joiner.this.joinComplete = true;
                Joiner.this.condition.signal();
                Joiner.this.lock.unlock();
            }

            public void onRejoin(Marlinserver.JoinGroupInfo jgi) {
                Joiner.this.lock.lock();
                Joiner.this.rejoinNeeded = true;
                Joiner.this.condition.signal();
                Joiner.this.lock.unlock();
            }
        }
    }
}

