package com.mapr.streams.tests.listener;

import com.google.protobuf.ByteString;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.listener.MarlinListener;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/ListenerJoin.class */
public class ListenerJoin extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerJoin.class);
    private static final String STREAM = "/jtest-" + ListenerJoin.class.getSimpleName();
    private static Admin madmin;
    private static final int numPartitions = 10;

    /* loaded from: input_file:com/mapr/streams/tests/listener/ListenerJoin$Joiner.class */
    public class Joiner implements Runnable {
        private MarlinListener listener;
        boolean joinComplete;
        boolean rejoinNeeded;
        private Marlinserver.JoinGroupDesc desc;
        private Marlinserver.WorkerState ws;
        ByteString bstrMetadata;
        Marlinserver.JoinGroupResponse resp;
        private Map<String, Marlinserver.WorkerState> wsMap;
        Marlinserver.JoinGroupInfo joinInfo;
        KafkaProducer producer;
        KafkaConsumer consumer;
        boolean leaveEarly;
        public JoinTestCallback cb = new JoinTestCallback();
        final Lock lock = new ReentrantLock();
        final Condition condition = this.lock.newCondition();
        final String groupId = "testgroup";
        final String syncTopic = ListenerJoin.STREAM + ":__mapr__testgroup_assignment";
        private Properties props = new Properties();

        /* loaded from: input_file:com/mapr/streams/tests/listener/ListenerJoin$Joiner$JoinTestCallback.class */
        public class JoinTestCallback implements MarlinListener.MarlinJoinCallback {
            public JoinTestCallback() {
            }

            public void onJoin(Marlinserver.JoinGroupInfo joinGroupInfo) {
                if (joinGroupInfo.getGroupLeaderId().equalsIgnoreCase(Joiner.this.resp.getMemberId())) {
                    try {
                        System.out.println("leader id is " + joinGroupInfo.getGroupLeaderId());
                        Joiner.this.wsMap = new HashMap();
                        for (int i = 0; i < joinGroupInfo.getMembersCount(); i++) {
                            Joiner.this.wsMap.put(joinGroupInfo.getMembers(i).getMemberId(), Marlinserver.WorkerState.newBuilder().mergeFrom(joinGroupInfo.getMembers(i).getMemberMetadata()).build());
                        }
                        Joiner.this.producer = new KafkaProducer(Joiner.this.props);
                        Marlinserver.GroupAssignment.Builder groupGenerationId = Marlinserver.GroupAssignment.newBuilder().setGroupGenerationId(joinGroupInfo.getGroupGenerationId());
                        String url = Joiner.this.ws.getUrl();
                        for (Map.Entry<String, Marlinserver.WorkerState> entry : Joiner.this.wsMap.entrySet()) {
                            groupGenerationId.addMemberState(Marlinserver.MemberState.newBuilder().setMemberId(entry.getKey()).setMemberAssignment(Marlinserver.WorkerAssignment.newBuilder().setLeaderURL(url).setLeaderId(joinGroupInfo.getGroupLeaderId()).build().toByteString()).build());
                            System.out.println("Member " + entry.getKey() + " url is " + entry.getValue().getUrl() + " offset is " + entry.getValue().getOffset());
                        }
                        Marlinserver.GroupAssignment build = groupGenerationId.build();
                        System.out.println("producing generation id " + joinGroupInfo.getGroupGenerationId());
                        Joiner.this.producer.send(new ProducerRecord(Joiner.this.syncTopic, Long.valueOf(joinGroupInfo.getGroupGenerationId()), build.toByteArray()));
                        Joiner.this.producer.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                Joiner.this.joinInfo = joinGroupInfo;
                Joiner.this.lock.lock();
                Joiner.this.joinComplete = true;
                Joiner.this.condition.signal();
                Joiner.this.lock.unlock();
            }

            public void onRejoin(Marlinserver.JoinGroupInfo joinGroupInfo) {
                Joiner.this.lock.lock();
                Joiner.this.rejoinNeeded = true;
                Joiner.this.condition.signal();
                Joiner.this.lock.unlock();
            }
        }

        public Joiner(boolean z, int i) {
            this.props.put("group.id", "testgroup");
            this.props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
            this.props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            this.props.put("auto.offset.reset", "earliest");
            this.props.put("streams.consumer.default.stream", ListenerJoin.STREAM);
            this.listener = new MarlinListener((ConsumerConfig) new GenericHFactory().getImplementorInstance("org.apache.kafka.clients.consumer.ConsumerConfig", new Object[]{this.props}, new Class[]{Map.class}), (Deserializer) null, (Deserializer) null);
            this.joinComplete = false;
            this.rejoinNeeded = false;
            this.ws = Marlinserver.WorkerState.newBuilder().setUrl("worker_url_" + i).setOffset(1000L).build();
            this.bstrMetadata = this.ws.toByteString();
            this.desc = Marlinserver.JoinGroupDesc.newBuilder().setProtocolType("connect").addMemberProtocols(Marlinserver.MemberProtocol.newBuilder().setProtocol("someprotocol").setMemberMetadata(this.bstrMetadata).build()).build();
            this.leaveEarly = z;
            this.props.remove("group.id");
            this.props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
            this.consumer = new KafkaConsumer(this.props);
        }

        /* JADX WARN: Code restructure failed: missing block: B:37:0x019a, code lost:
        
            r6.lock.lock();
         */
        /* JADX WARN: Code restructure failed: missing block: B:39:0x01a7, code lost:
        
            if (r6.rejoinNeeded != false) goto L70;
         */
        /* JADX WARN: Code restructure failed: missing block: B:41:0x01bb, code lost:
        
            if (r6.condition.await(60, java.util.concurrent.TimeUnit.SECONDS) != false) goto L49;
         */
        /* JADX WARN: Code restructure failed: missing block: B:44:0x01be, code lost:
        
            java.lang.System.out.println("rejoin time expired");
         */
        /* JADX WARN: Code restructure failed: missing block: B:48:0x01cf, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:54:0x01f4, code lost:
        
            r12 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:57:0x0201, code lost:
        
            throw r12;
         */
        /* JADX WARN: Code restructure failed: missing block: B:59:0x01e0, code lost:
        
            java.lang.System.out.println("interrupted");
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 517
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.mapr.streams.tests.listener.ListenerJoin.Joiner.run():void");
        }
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numPartitions);
        madmin.createStream(STREAM, newStreamDescriptor);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testJoin() throws IOException {
        Thread[] threadArr = new Thread[5];
        int i = 0;
        while (i < 3) {
            threadArr[i] = new Thread(new Joiner(false, i));
            threadArr[i].start();
            i++;
        }
        threadArr[i] = new Thread(new Joiner(true, i));
        threadArr[i].start();
        threadArr[i + 1] = new Thread(new Joiner(false, i + 1));
        try {
            Thread.sleep(5000L);
        } catch (InterruptedException e) {
            System.out.println("interrupted");
        }
        threadArr[i + 1].start();
        for (int i2 = 0; i2 < 5; i2++) {
            try {
                threadArr[i2].join();
            } catch (InterruptedException e2) {
                e2.printStackTrace();
                return;
            }
        }
    }
}
