/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.kafka.eventstreams.impl.listener;

import com.google.protobuf.ByteString;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.kafka.eventstreams.TopicRefreshListListener;
import com.mapr.kafka.eventstreams.TopicRefreshRegexListener;
import com.mapr.kafka.eventstreams.impl.MarlinCoordinator;
import com.mapr.kafka.eventstreams.impl.listener.MarlinListener;
import com.mapr.kafka.eventstreams.impl.listener.MarlinListenerImpl;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinConsumerCoordinator
extends MarlinCoordinator {
    private final MarlinListener<?, ?> listener;
    private final MarlinListenerImpl listenerimpl;
    private final Thread pollThread = new Thread(new ConsumerPollThread());
    private final SubscriptionState subscriptionState = new SubscriptionState();
    private ConsumerRebalanceListener rebalanceCb;
    private final ConsumerPartitionAssignor assignor;
    private final AtomicBoolean closing = new AtomicBoolean(false);
    private final TopicRefreshCCRegexListener regexRefreshListener = new TopicRefreshCCRegexListener();
    private final TopicRefreshCCListListener listRefreshListener = new TopicRefreshCCListListener();
    private boolean isLeader = false;
    private Marlinserver.JoinGroupInfo leaderJGI;
    private boolean leaderGroupJoinInProgress = false;
    private Cluster clusterWithTopicInfo;
    private boolean rejoinInProgress = false;
    private ConsumerGroupMetadata groupMetadata;
    private String internalStream;

    public MarlinConsumerCoordinator(MarlinListener<?, ?> listener, MarlinListenerImpl listenerimpl, String groupId, List<ConsumerPartitionAssignor> assignors, String intStream) {
        super(groupId);
        this.internalStream = intStream;
        this.listener = listener;
        this.listenerimpl = listenerimpl;
        this.assignor = assignors.get(0);
        this.groupMetadata = new ConsumerGroupMetadata(groupId);
        this.pollThread.start();
        this.init();
        log.debug("MarlinConsumerCoordinator constructor");
    }

    public ConsumerGroupMetadata groupMetadata() {
        return this.groupMetadata;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestRejoin() {
        MarlinConsumerCoordinator marlinConsumerCoordinator = this;
        synchronized (marlinConsumerCoordinator) {
            this.rejoinInProgress = true;
        }
        super.requestRejoin();
    }

    private void handleTopicRefresherRegex(Set<String> topics) {
        this.subscriptionState.updateSubscriptions(topics);
        this.requestRejoin();
    }

    private Cluster createClusterWithTopicMeta(Set<TopicPartition> topicsWithMaxFeeds) {
        HashSet<PartitionInfo> partInfo = new HashSet<PartitionInfo>();
        for (TopicPartition topicPart : topicsWithMaxFeeds) {
            for (int i = 0; i < topicPart.partition(); ++i) {
                PartitionInfo p = new PartitionInfo(topicPart.topic(), i, null, null, null, null);
                partInfo.add(p);
            }
        }
        Cluster clusterWithTopicInfo = new Cluster("mapR", Collections.emptyList(), partInfo, Collections.emptySet(), Collections.emptySet());
        return clusterWithTopicInfo;
    }

    private void handleTopicRefresherList(Set<TopicPartition> topicsWithMaxFeeds) {
        assert (this.isLeader);
        if (this.leaderGroupJoinInProgress) {
            log.debug("Leader join. groupSubscription topic info: {} ", topicsWithMaxFeeds);
            assert (this.leaderJGI != null);
            assert (this.clusterWithTopicInfo == null);
            this.clusterWithTopicInfo = this.createClusterWithTopicMeta(topicsWithMaxFeeds);
            this.performOnJoin(this.leaderJGI);
            this.leaderGroupJoinInProgress = false;
            this.leaderJGI = null;
            this.clusterWithTopicInfo = null;
        } else {
            this.requestRejoin();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        log.debug("Subscribe with regex, begin : {} ", (Object)pattern.toString());
        this.waitForRejoinCompletion();
        this.subscriptionState.subscribe(pattern, SubscriptionType.SUBSCRIBE_REGEX);
        this.rebalanceCb = callback;
        MarlinConsumerCoordinator marlinConsumerCoordinator = this;
        synchronized (marlinConsumerCoordinator) {
            this.rejoinInProgress = true;
        }
        this.listenerimpl.topicRefresherRegex(this.subscriptionState.getSubscribedRegex(), this.regexRefreshListener);
        this.waitForRejoinCompletion();
        log.debug("Subscribe with regex, end: {}", (Object)pattern.toString());
    }

    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) throws KafkaException {
        log.debug("Subscribe with topics, begin : {} ", topics);
        this.waitForRejoinCompletion();
        this.subscriptionState.subscribe(topics, SubscriptionType.SUBSCRIBE_LIST);
        this.rebalanceCb = callback;
        this.requestRejoinAndWait();
        log.debug("Subscribe with topics, end: {}", topics);
    }

    public void assign(Collection<TopicPartition> partitions) {
        log.debug("Assign with topic partitions, begin : {} ", partitions);
        this.waitForRejoinCompletion();
        this.subscriptionState.assign(partitions, SubscriptionType.ASSIGN_PARTITIONS);
        this.listenerimpl.assign(partitions);
        log.debug("Assign with topic partitions, end: {} ", partitions);
    }

    public void unsubscribe() {
        log.debug("Unsubscribe, begin");
        if (this.subscriptionState.getSubscriptionType() == SubscriptionType.NONE) {
            return;
        }
        this.waitForRejoinCompletion();
        if (this.subscriptionState.getSubscriptionType() == SubscriptionType.SUBSCRIBE_REGEX) {
            this.listenerimpl.topicRefresherRegex(null, null);
        }
        if (this.isLeader) {
            this.listenerimpl.topicRefresherList(Collections.emptySet(), null);
        }
        this.subscriptionState.unsubscribe();
        if (!this.closing.get()) {
            this.requestRejoinAndWait();
        }
        this.listenerimpl.unsubscribe();
        log.debug("Unsubscribe, end");
    }

    private synchronized void notifyRejoinCompletion() {
        this.rejoinInProgress = false;
        this.notifyAll();
    }

    private synchronized void requestRejoinAndWait() {
        this.requestRejoin();
        this.waitForRejoinCompletion();
    }

    private synchronized void waitForRejoinCompletion() {
        while (this.rejoinInProgress) {
            try {
                this.wait();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                log.debug("Unsubscribe: interrupted");
            }
        }
    }

    public Set<TopicPartition> assignment() {
        return this.subscriptionState.assignment();
    }

    public Set<String> subscription() {
        return this.subscriptionState.subscription();
    }

    @Override
    protected Logger getLogger() {
        return LoggerFactory.getLogger(MarlinConsumerCoordinator.class);
    }

    @Override
    protected MarlinListener.MarlinJoinCallback getJoinerCallback() {
        return new MarlinConsumerJoinCallback();
    }

    @Override
    protected String generateSyncTopic(String groupId) {
        String topic = "__mapr__" + groupId + "_assignment";
        return topic;
    }

    @Override
    protected String generateCoordStream() {
        return this.internalStream;
    }

    @Override
    protected Marlinserver.JoinGroupDesc generateJoinDesc() {
        Set joinedSubscription = this.subscriptionState.subscription();
        ArrayList topics = new ArrayList(joinedSubscription);
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(topics, this.assignor.subscriptionUserData(joinedSubscription), this.subscriptionState.assignmentList());
        ByteBuffer metadata = ConsumerProtocol.serializeSubscription((ConsumerPartitionAssignor.Subscription)subscription);
        Marlinserver.JoinGroupDesc desc = Marlinserver.JoinGroupDesc.newBuilder().setProtocolType("consumer").setMemberId(this.memberId).addMemberProtocols(Marlinserver.MemberProtocol.newBuilder().setProtocol(this.assignor.name()).setMemberMetadata(ByteString.copyFrom((ByteBuffer)metadata)).build()).build();
        return desc;
    }

    @Override
    protected void revokeAssignments() {
        Set revoked = this.subscriptionState.assignment();
        log.debug("Revoking partition assignments {}", (Object)revoked);
        try {
            if (this.rebalanceCb != null) {
                this.rebalanceCb.onPartitionsRevoked((Collection)revoked);
            }
        }
        catch (Exception e) {
            log.error("User provided listener {} failed on partition revocation", (Object)this.rebalanceCb.getClass().getName(), (Object)e);
        }
        this.subscriptionState.clearAssignments();
        this.listenerimpl.unsubscribe();
    }

    @Override
    protected void protocolOnSyncComplete(Marlinserver.MemberState ms, long generationId) {
        assert (this.assignor != null);
        ConsumerPartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment((ByteBuffer)ms.getMemberAssignment().asReadOnlyByteBuffer());
        List assignments = assignment.partitions();
        if (this.subscriptionState.getSubscriptionType() == SubscriptionType.NONE) {
            this.notifyRejoinCompletion();
            return;
        }
        assert (this.subscriptionState.getSubscriptionType() == SubscriptionType.SUBSCRIBE_LIST || this.subscriptionState.getSubscriptionType() == SubscriptionType.SUBSCRIBE_REGEX);
        if (this.subscriptionState.getSubscriptionType() == SubscriptionType.SUBSCRIBE_REGEX) {
            String[] tokens = this.subscriptionState.getSubscribedRegex().toString().split(":");
            Pattern topicRegex = Pattern.compile(tokens[tokens.length - 1]);
            for (TopicPartition tp : assignments) {
                String[] topicTokens = tp.topic().split(":");
                if (topicRegex.matcher(topicTokens[topicTokens.length - 1]).matches()) continue;
                throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic regex pattern; Subscription regex is " + this.subscriptionState.getSubscribedRegex());
            }
        } else {
            Set subscriptions = this.subscriptionState.subscription();
            for (TopicPartition tp : assignments) {
                if (subscriptions.contains(tp.topic())) continue;
                throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic; subscription is " + this.subscriptionState);
            }
        }
        this.subscriptionState.assign(assignments, this.subscriptionState.getSubscriptionType());
        this.assignor.onAssignment(assignment, this.groupMetadata);
        log.info("Setting newly assigned partitions {}", (Object)this.subscriptionState.assignments);
        try {
            if (this.rebalanceCb != null) {
                this.rebalanceCb.onPartitionsAssigned((Collection)this.subscriptionState.assignment());
            }
        }
        catch (Exception e) {
            log.error("User provided listener {} failed on partition assignment", (Object)this.rebalanceCb.getClass().getName(), (Object)e);
        }
        this.listenerimpl.assign(assignments);
        this.notifyRejoinCompletion();
    }

    @Override
    protected boolean isProtocolRejoinNeeded() {
        return false;
    }

    @Override
    public void close() {
        log.debug("close , begin");
        this.closing.set(true);
        this.unsubscribe();
        this.wakeup();
        try {
            this.pollThread.join();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        super.close();
        log.debug("close , end");
    }

    @Override
    protected Map<String, ByteBuffer> performProtocolAssignment(String leaderId, List<Marlinserver.Member> members) {
        assert (this.isLeader);
        HashMap<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
        HashSet allSubscribedTopics = new HashSet();
        for (Marlinserver.Member member : members) {
            ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription((ByteBuffer)member.getMemberMetadata().asReadOnlyByteBuffer());
            subscriptions.put(member.getMemberId(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }
        this.subscriptionState.setGroupSubscriptions(allSubscribedTopics);
        log.debug("Performing assignment using {} with subscriptions {}", (Object)this.assignor.name(), subscriptions);
        assert (this.clusterWithTopicInfo != null);
        Map assignment = this.assignor.assign(this.clusterWithTopicInfo, new ConsumerPartitionAssignor.GroupSubscription(subscriptions)).groupAssignment();
        HashSet<String> assignedTopics = new HashSet<String>();
        for (ConsumerPartitionAssignor.Assignment assigned : assignment.values()) {
            for (TopicPartition tp : assigned.partitions()) {
                assignedTopics.add(tp.topic());
            }
        }
        if (!assignedTopics.containsAll(allSubscribedTopics)) {
            HashSet notAssignedTopics = new HashSet(allSubscribedTopics);
            notAssignedTopics.removeAll(assignedTopics);
            log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics);
        }
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        for (Map.Entry assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment((ConsumerPartitionAssignor.Assignment)((ConsumerPartitionAssignor.Assignment)assignmentEntry.getValue()));
            groupAssignment.put((String)assignmentEntry.getKey(), buffer);
        }
        return groupAssignment;
    }

    private Collection<String> getAllSubscribedTopics(List<Marlinserver.Member> members) {
        HashSet<String> allSubscribedTopics = new HashSet<String>();
        for (Marlinserver.Member member : members) {
            ConsumerPartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription((ByteBuffer)member.getMemberMetadata().asReadOnlyByteBuffer());
            allSubscribedTopics.addAll(subscription.topics());
        }
        return allSubscribedTopics;
    }

    public class MarlinConsumerJoinCallback
    extends MarlinCoordinator.MarlinCoordinatorJoinCallback {
        public MarlinConsumerJoinCallback() {
            super(MarlinConsumerCoordinator.this);
        }

        @Override
        public void onJoin(Marlinserver.JoinGroupInfo jgi) {
            assert (!MarlinConsumerCoordinator.this.leaderGroupJoinInProgress);
            assert (MarlinConsumerCoordinator.this.leaderJGI == null);
            if (jgi.getGroupLeaderId().equals(MarlinConsumerCoordinator.this.memberId())) {
                log.debug(" Group Leader ");
                MarlinConsumerCoordinator.this.leaderGroupJoinInProgress = true;
                MarlinConsumerCoordinator.this.leaderJGI = jgi;
                MarlinConsumerCoordinator.this.isLeader = true;
                Collection groupSubscriptions = MarlinConsumerCoordinator.this.getAllSubscribedTopics(jgi.getMembersList());
                if (groupSubscriptions.size() > 0) {
                    MarlinConsumerCoordinator.this.listenerimpl.topicRefresherList(groupSubscriptions, MarlinConsumerCoordinator.this.listRefreshListener);
                } else {
                    MarlinConsumerCoordinator.this.clusterWithTopicInfo = MarlinConsumerCoordinator.this.createClusterWithTopicMeta(Collections.emptySet());
                    MarlinConsumerCoordinator.this.performOnJoin(jgi);
                    MarlinConsumerCoordinator.this.leaderGroupJoinInProgress = false;
                    MarlinConsumerCoordinator.this.leaderJGI = null;
                    MarlinConsumerCoordinator.this.clusterWithTopicInfo = null;
                }
            } else {
                log.debug(" Group Follower ");
                MarlinConsumerCoordinator.this.subscriptionState.clearGroupSubscriptions();
                MarlinConsumerCoordinator.this.listenerimpl.topicRefresherList(Collections.emptyList(), null);
                MarlinConsumerCoordinator.this.isLeader = false;
                MarlinConsumerCoordinator.this.performOnJoin(jgi);
            }
        }
    }

    private final class ConsumerPollThread
    implements Runnable {
        private ConsumerPollThread() {
        }

        @Override
        public void run() {
            while (true) {
                MarlinConsumerCoordinator.this.pollEvent(Long.MAX_VALUE);
                if (MarlinConsumerCoordinator.this.closing.get()) {
                    return;
                }
                if (MarlinConsumerCoordinator.this.rejoinEventOccured() && MarlinConsumerCoordinator.this.subscriptionState.getSubscriptionType() == SubscriptionType.SUBSCRIBE_REGEX) {
                    MarlinConsumerCoordinator.this.resetRejoinFlags();
                    log.debug("Rejoin event occured, refreshing regex with pattern {} ", (Object)MarlinConsumerCoordinator.this.subscriptionState.getSubscribedRegex());
                    MarlinConsumerCoordinator.this.listenerimpl.topicRefresherRegex(MarlinConsumerCoordinator.this.subscriptionState.getSubscribedRegex(), MarlinConsumerCoordinator.this.regexRefreshListener);
                }
                MarlinConsumerCoordinator.this.ensureActiveGroup();
            }
        }
    }

    public class TopicRefreshCCRegexListener
    implements TopicRefreshRegexListener {
        @Override
        public void updatedTopics(Set<String> topics) {
            log.debug("RegexRefresh updated topic info {} ", topics);
            MarlinConsumerCoordinator.this.handleTopicRefresherRegex(topics);
        }
    }

    public class TopicRefreshCCListListener
    implements TopicRefreshListListener {
        @Override
        public void updatedTopics(Set<TopicPartition> topicFeeds) {
            log.debug("ListRefresh updated topic info {} ", topicFeeds);
            MarlinConsumerCoordinator.this.handleTopicRefresherList(topicFeeds);
        }
    }

    private class SubscriptionState {
        private static final String EXCEPTION_MESSAGE = "Subscription to topics, partitions and pattern are mutually exclusive";
        private SubscriptionType subscriptionType = SubscriptionType.NONE;
        private Pattern subscribedRegex;
        private Set<String> subscriptions;
        private final Set<String> groupSubscriptions = new HashSet<String>();
        private final Set<TopicPartition> assignments = new HashSet<TopicPartition>();

        public SubscriptionState() {
            this.subscriptions = new HashSet<String>();
        }

        private synchronized void setSubscriptionType(SubscriptionType type) {
            if (this.subscriptionType == SubscriptionType.NONE) {
                this.subscriptionType = type;
            } else if (this.subscriptionType != type) {
                throw new IllegalStateException(EXCEPTION_MESSAGE);
            }
        }

        private synchronized SubscriptionType getSubscriptionType() {
            return this.subscriptionType;
        }

        private synchronized void subscribe(Pattern pattern, SubscriptionType type) {
            this.setSubscriptionType(type);
            this.subscribedRegex = pattern;
        }

        private synchronized void subscribe(Collection<String> topics, SubscriptionType type) {
            assert (this.subscriptions != null);
            assert (this.subscriptions.size() == 0);
            this.setSubscriptionType(type);
            for (String topic : topics) {
                this.subscriptions.add(topic);
            }
        }

        private synchronized void assign(Collection<TopicPartition> assignments, SubscriptionType type) {
            if (type == SubscriptionType.ASSIGN_PARTITIONS) {
                this.clearAssignments();
            }
            assert (this.assignments != null);
            assert (this.assignments.size() == 0);
            this.setSubscriptionType(type);
            for (TopicPartition topicPart : assignments) {
                this.assignments.add(topicPart);
            }
        }

        private synchronized void unsubscribe() {
            this.subscriptions.clear();
            this.subscribedRegex = null;
            this.subscriptionType = SubscriptionType.NONE;
        }

        private synchronized Set<TopicPartition> assignment() {
            return Collections.unmodifiableSet(new HashSet<TopicPartition>(this.assignments));
        }

        private synchronized List<TopicPartition> assignmentList() {
            return Collections.unmodifiableList(new ArrayList<TopicPartition>(this.assignments));
        }

        private synchronized Set<String> subscription() {
            return Collections.unmodifiableSet(new HashSet<String>(this.subscriptions));
        }

        private synchronized void clearAssignments() {
            this.assignments.clear();
        }

        private synchronized void updateSubscriptions(Collection<String> topics) {
            this.subscriptions.clear();
            for (String topic : topics) {
                this.subscriptions.add(topic);
            }
        }

        private synchronized Pattern getSubscribedRegex() {
            return this.subscribedRegex;
        }

        private synchronized void clearGroupSubscriptions() {
            this.groupSubscriptions.clear();
        }

        private synchronized void setGroupSubscriptions(Collection<String> allSubscribedTopics) {
            this.groupSubscriptions.clear();
            this.groupSubscriptions.addAll(allSubscribedTopics);
        }
    }

    private static enum SubscriptionType {
        NONE,
        SUBSCRIBE_LIST,
        SUBSCRIBE_REGEX,
        ASSIGN_PARTITIONS;

    }
}

