/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.impl;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.listener.MarlinListener;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class MarlinCoordinator {
    protected static Logger log = LoggerFactory.getLogger(MarlinCoordinator.class);
    private MarlinListener joiner;
    private final String groupId;
    private String syncTopic;
    private String coordStream;
    private String streamTopic;
    protected Long groupGenerationId;
    private static final String UNKNOWN_MEMBER_ID_STR = "";
    protected String memberId;
    private KafkaConsumer<Long, byte[]> syncReceiver;
    protected KafkaProducer<Long, byte[]> syncProducer;
    private static final long SYNC_POLL_TIMEOUT = 15000L;
    private ClusterConfigState configSnapshot;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = this.lock.newCondition();
    private boolean joinComplete;
    protected boolean rejoinEvent;
    private boolean wakeupEvent;
    private boolean isRejoinRequested;
    private boolean needsJoinPrepare;
    private MarlinListener.MarlinJoinCallback joinerCallback;
    private long backoffTimeMs;
    private static final int kMaxBackoffTimeMs = 900000;

    public MarlinCoordinator(String groupId) {
        this.groupId = groupId;
        this.groupGenerationId = 0L;
        this.memberId = UNKNOWN_MEMBER_ID_STR;
        this.joinComplete = false;
        this.rejoinEvent = false;
        this.wakeupEvent = false;
        this.isRejoinRequested = false;
        this.needsJoinPrepare = true;
        this.resetBackoff();
        log.debug("MarlinCoordinator constructor");
    }

    protected void init() {
        this.syncTopic = this.generateSyncTopic(this.groupId);
        this.coordStream = this.generateCoordStream();
        this.joiner = this.getJoiner(this.groupId, this.coordStream);
        this.initSync();
        this.joinerCallback = this.getJoinerCallback();
        log = this.getLogger();
    }

    private MarlinListener<?, ?> getJoiner(String groupId, String coordStream) {
        Properties props = new Properties();
        props.put("group.id", groupId);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("streams.consumer.default.stream", coordStream);
        props.put("streams.clientside.partition.assignment", "false");
        GenericHFactory configFactory = new GenericHFactory();
        ConsumerConfig config = (ConsumerConfig)configFactory.getImplementorInstance("org.apache.kafka.clients.consumer.ConsumerConfig", new Object[]{props}, new Class[]{Map.class});
        MarlinListener marlinListener = new MarlinListener(config, null, null);
        return marlinListener;
    }

    private void initSync() {
        log.debug("Creating Sync topic {} : ", (Object)this.syncTopic);
        try (Admin madmin = Streams.newAdmin(new Configuration());){
            madmin.createTopic(this.coordStream, this.syncTopic, 1);
        }
        catch (Exception e) {
            log.debug("Sync topic creation. Failed to create admin client: {} ", (Object)e.getMessage());
        }
        this.streamTopic = this.coordStream + ":" + this.syncTopic;
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("auto.offset.reset", "latest");
        props.put("streams.clientside.partition.assignment", "false");
        this.syncReceiver = new KafkaConsumer(props);
        this.syncProducer = new KafkaProducer(props);
        TopicPartition tp = new TopicPartition(this.streamTopic, 0);
        this.syncReceiver.assign(Arrays.asList(tp));
    }

    protected abstract Logger getLogger();

    protected abstract MarlinListener.MarlinJoinCallback getJoinerCallback();

    protected abstract String generateSyncTopic(String var1);

    protected abstract String generateCoordStream();

    protected abstract Marlinserver.JoinGroupDesc generateJoinDesc();

    protected abstract void revokeAssignments();

    protected abstract void protocolOnSyncComplete(Marlinserver.MemberState var1, long var2);

    protected abstract Map<String, ByteBuffer> performProtocolAssignment(String var1, List<Marlinserver.Member> var2);

    protected void onSyncComplete(Marlinserver.GroupAssignment ga) {
        log.debug("ga gen id {}", (Object)ga.getGroupGenerationId());
        for (Marlinserver.MemberState ms : ga.getMemberStateList()) {
            log.debug("ms id {} ", (Object)ms.getMemberId());
            if (this.memberId() != null) {
                log.debug("this id {}", (Object)this.memberId());
            }
            if (!ms.getMemberId().equals(this.memberId())) continue;
            this.protocolOnSyncComplete(ms, ga.getGroupGenerationId());
            this.needsJoinPrepare = true;
        }
    }

    public void ensureActiveGroup() {
        while (this.rejoinNeeded()) {
            this.resetRejoinFlags();
            if (this.needsJoinPrepare) {
                this.revokeAssignments();
                this.needsJoinPrepare = false;
            }
            Marlinserver.JoinGroupDesc desc = this.generateJoinDesc();
            this.joinComplete = false;
            Marlinserver.JoinGroupResponse resp = this.joiner.join(desc, this.joinerCallback);
            log.debug("ensureActiveGroup: joinStatus {}", (Object)resp.getJoinStatus());
            this.handleJoinGroupResponse(resp);
            this.waitForJoinOrRejoinEvent();
            if (!this.joinComplete) continue;
            this.doSync();
        }
    }

    private void handleJoinGroupResponse(Marlinserver.JoinGroupResponse resp) {
        switch (resp.getJoinStatus()) {
            case UNKNOWN_MEMBER_ID: {
                this.memberId = UNKNOWN_MEMBER_ID_STR;
                this.lock.lock();
                this.rejoinEvent = true;
                this.lock.unlock();
                break;
            }
            case STATUS_OK: {
                this.memberId = resp.getMemberId();
                this.resetBackoff();
                break;
            }
            case FUNCTION_UNAVAILABLE: {
                throw new BrokerNotAvailableException("Feature not available on server. Please upgrade to at least Version 5.2.1");
            }
            case STREAM_AUTHORIZATION_FAILED: {
                throw new AuthorizationException("Need produceperm and consumeperm permissions on stream " + this.coordStream);
            }
            case STREAM_UNAVAILABLE: {
                log.error("Could not open stream " + this.coordStream);
            }
            default: {
                log.error("Join Group request failed with {}. Retrying with exponential backoff", (Object)resp.getJoinStatus());
                this.backoff();
            }
        }
    }

    private void backoff() {
        this.backoffTimeMs = this.backoffTimeMs * 2L + 1000L < 900000L ? this.backoffTimeMs * 2L + 1000L : 900000L;
        Utils.sleep((long)this.backoffTimeMs);
    }

    private void resetBackoff() {
        this.backoffTimeMs = 0L;
    }

    private void waitForJoinOrRejoinEvent() {
        log.debug("waitForJoinOrRejoinEvent: memberId {}start", (Object)this.memberId);
        try {
            this.lock.lock();
            while (!this.joinComplete && !this.rejoinEvent) {
                this.condition.await();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
            log.debug("waitForJoinOrRejoinEvent: memberId {} interrupted", (Object)this.memberId);
        }
        finally {
            this.lock.unlock();
        }
        log.debug("waitForJoinOrRejoinEvent: memberId {} awoken. joinComplete {} rejoinEvent {}", new Object[]{this.memberId, this.joinComplete, this.rejoinEvent});
    }

    private void doSync() {
        int i = 0;
        while (true) {
            ConsumerRecords records = this.syncReceiver.poll(15000L);
            log.debug("doSync: memberId {} returned from poll {}", (Object)this.memberId, (Object)i);
            Long lastSeen = 0L;
            for (ConsumerRecord record : records) {
                Marlinserver.GroupAssignment ga;
                log.debug("doSync: consumer record..generation ID {}", record.key());
                lastSeen = (Long)record.key();
                if (!this.groupGenerationIdMatches(lastSeen)) continue;
                try {
                    ga = Marlinserver.GroupAssignment.parseFrom((byte[])((byte[])record.value()));
                }
                catch (InvalidProtocolBufferException e) {
                    throw new KafkaException("Error parsing Sync response");
                }
                this.onSyncComplete(ga);
                return;
            }
            if (this.rejoinEventOccured()) {
                return;
            }
            ++i;
        }
    }

    private boolean groupGenerationIdMatches(Long lastSeen) {
        this.lock.lock();
        boolean matches = this.groupGenerationId.equals(lastSeen);
        this.lock.unlock();
        return matches;
    }

    public void requestRejoin() {
        log.debug("requestRejoin");
        this.lock.lock();
        this.isRejoinRequested = true;
        this.condition.signal();
        this.lock.unlock();
    }

    public String memberId() {
        return this.memberId;
    }

    protected void close() {
        this.syncReceiver.close();
        this.syncProducer.close();
        this.joiner.close();
        this.lock.lock();
        this.isRejoinRequested = false;
        this.rejoinEvent = false;
        this.lock.unlock();
    }

    public void ensureCoordinatorKnown() {
    }

    public void ensureCoordinatorReady() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollEvent(long timeout) throws WakeupException {
        log.debug("Poll timeout {}", (Object)timeout);
        this.lock.lock();
        try {
            while (!(this.isRejoinRequested || this.rejoinEvent || this.wakeupEvent)) {
                boolean continueWaiting = this.condition.await(timeout, TimeUnit.MILLISECONDS);
                if (continueWaiting) continue;
                log.debug("MarlinConsumerCoordinator: poll time expired");
                return;
            }
            if (this.wakeupEvent) {
                this.wakeupEvent = false;
                log.debug("MarlinConsumerCoordinator: woken up");
                return;
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            log.debug("exiting poll");
            this.lock.unlock();
        }
        log.debug("exiting poll");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void poll(long timeout) throws WakeupException {
        log.debug("poll timeout {}", (Object)timeout);
        this.ensureActiveGroup();
        this.lock.lock();
        try {
            while (!(this.isRejoinRequested || this.rejoinEvent || this.wakeupEvent)) {
                boolean continueWaiting = this.condition.await(timeout, TimeUnit.MILLISECONDS);
                if (continueWaiting) continue;
                log.debug("MarlinCoordinator: poll time expired");
                return;
            }
            if (this.wakeupEvent) {
                this.wakeupEvent = false;
                throw new WakeupException();
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            log.debug("exiting poll");
            this.lock.unlock();
        }
        log.debug("exiting poll");
    }

    public void wakeup() {
        log.debug("wakeup: waking up");
        this.lock.lock();
        this.wakeupEvent = true;
        this.condition.signal();
        this.lock.unlock();
    }

    public void maybeLeaveGroup() {
        this.requestRejoin();
    }

    protected abstract boolean isProtocolRejoinNeeded();

    protected boolean rejoinNeeded() {
        this.lock.lock();
        boolean isRejoinNeeded = this.isRejoinRequested || this.rejoinEvent || this.isProtocolRejoinNeeded();
        this.lock.unlock();
        log.debug("isRejoinRequested {} rejoinEvent {} ", (Object)this.isRejoinRequested, (Object)this.rejoinEvent);
        return isRejoinNeeded;
    }

    protected void resetRejoinFlags() {
        this.lock.lock();
        this.rejoinEvent = false;
        this.isRejoinRequested = false;
        this.lock.unlock();
    }

    protected boolean rejoinEventOccured() {
        this.lock.lock();
        boolean eventOccured = this.rejoinEvent;
        this.lock.unlock();
        return eventOccured;
    }

    protected void performOnJoin(Marlinserver.JoinGroupInfo jgi) {
        String leaderId = jgi.getGroupLeaderId();
        log.debug("onJoin: memberId {} leaderId {}", (Object)this.memberId(), (Object)leaderId);
        if (leaderId.equals(this.memberId())) {
            try {
                Map<String, ByteBuffer> assignments = this.performProtocolAssignment(leaderId, jgi.getMembersList());
                Marlinserver.GroupAssignment.Builder gaBuilder = Marlinserver.GroupAssignment.newBuilder().setGroupGenerationId(jgi.getGroupGenerationId());
                for (Map.Entry<String, ByteBuffer> e : assignments.entrySet()) {
                    log.debug("setting memberstate member id to {}", (Object)e.getKey());
                    Marlinserver.MemberState ms = Marlinserver.MemberState.newBuilder().setMemberId(e.getKey()).setMemberAssignment(ByteString.copyFrom((ByteBuffer)e.getValue())).build();
                    gaBuilder.addMemberState(ms);
                }
                Marlinserver.GroupAssignment ga = gaBuilder.build();
                log.debug("onJoin: memberId {} producing assignment for generation {}", (Object)this.memberId(), (Object)jgi.getGroupGenerationId());
                this.syncProducer.send(new ProducerRecord(this.streamTopic, Integer.valueOf(0), null, (Object)jgi.getGroupGenerationId(), (Object)ga.toByteArray()));
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.lock.lock();
        this.groupGenerationId = jgi.getGroupGenerationId();
        this.joinComplete = true;
        this.condition.signal();
        this.lock.unlock();
    }

    protected abstract class MarlinCoordinatorJoinCallback
    implements MarlinListener.MarlinJoinCallback {
        protected MarlinCoordinatorJoinCallback() {
        }

        @Override
        public abstract void onJoin(Marlinserver.JoinGroupInfo var1);

        @Override
        public void onRejoin(Marlinserver.JoinGroupInfo jgi) {
            log.debug("onRejoin {}", (Object)MarlinCoordinator.this.memberId());
            MarlinCoordinator.this.lock.lock();
            MarlinCoordinator.this.rejoinEvent = true;
            MarlinCoordinator.this.condition.signal();
            MarlinCoordinator.this.lock.unlock();
        }
    }
}

