package com.mapr.kafka.eventstreams.impl;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.kafka.eventstreams.Admin;
import com.mapr.kafka.eventstreams.Streams;
import com.mapr.kafka.eventstreams.impl.listener.MarlinListener;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mapr/kafka/eventstreams/impl/MarlinCoordinator.class */
public abstract class MarlinCoordinator {
    protected static Logger log = LoggerFactory.getLogger(MarlinCoordinator.class);
    private MarlinListener joiner;
    private final String groupId;
    private String syncTopic;
    private String coordStream;
    private String streamTopic;
    private static final String UNKNOWN_MEMBER_ID_STR = "";
    private KafkaConsumer<Long, byte[]> syncReceiver;
    protected KafkaProducer<Long, byte[]> syncProducer;
    private static final long SYNC_POLL_TIMEOUT = 15000;
    private ClusterConfigState configSnapshot;
    private MarlinListener.MarlinJoinCallback joinerCallback;
    private long backoffTimeMs;
    private static final int kMaxBackoffTimeMs = 900000;
    private final Lock lock = new ReentrantLock();
    private final Condition condition = this.lock.newCondition();
    protected Long groupGenerationId = 0L;
    protected String memberId = UNKNOWN_MEMBER_ID_STR;
    private boolean joinComplete = false;
    protected boolean rejoinEvent = false;
    private boolean wakeupEvent = false;
    private boolean isRejoinRequested = false;
    private boolean needsJoinPrepare = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.mapr.kafka.eventstreams.impl.MarlinCoordinator$1, reason: invalid class name */
    /* loaded from: input_file:com/mapr/kafka/eventstreams/impl/MarlinCoordinator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$mapr$fs$proto$Marlinserver$JoinStatusCode = new int[Marlinserver.JoinStatusCode.values().length];

        static {
            try {
                $SwitchMap$com$mapr$fs$proto$Marlinserver$JoinStatusCode[Marlinserver.JoinStatusCode.UNKNOWN_MEMBER_ID.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$mapr$fs$proto$Marlinserver$JoinStatusCode[Marlinserver.JoinStatusCode.STATUS_OK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$mapr$fs$proto$Marlinserver$JoinStatusCode[Marlinserver.JoinStatusCode.FUNCTION_UNAVAILABLE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$mapr$fs$proto$Marlinserver$JoinStatusCode[Marlinserver.JoinStatusCode.STREAM_AUTHORIZATION_FAILED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$mapr$fs$proto$Marlinserver$JoinStatusCode[Marlinserver.JoinStatusCode.STREAM_UNAVAILABLE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:com/mapr/kafka/eventstreams/impl/MarlinCoordinator$MarlinCoordinatorJoinCallback.class */
    protected abstract class MarlinCoordinatorJoinCallback implements MarlinListener.MarlinJoinCallback {
        /* JADX INFO: Access modifiers changed from: protected */
        public MarlinCoordinatorJoinCallback() {
        }

        @Override // com.mapr.kafka.eventstreams.impl.listener.MarlinListener.MarlinJoinCallback
        public abstract void onJoin(Marlinserver.JoinGroupInfo joinGroupInfo);

        @Override // com.mapr.kafka.eventstreams.impl.listener.MarlinListener.MarlinJoinCallback
        public void onRejoin(Marlinserver.JoinGroupInfo joinGroupInfo) {
            MarlinCoordinator.log.debug("onRejoin {}", MarlinCoordinator.this.memberId());
            MarlinCoordinator.this.lock.lock();
            MarlinCoordinator.this.rejoinEvent = true;
            MarlinCoordinator.this.condition.signal();
            MarlinCoordinator.this.lock.unlock();
        }
    }

    public MarlinCoordinator(String str) {
        this.groupId = str;
        resetBackoff();
        log.debug("MarlinCoordinator constructor");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void init() {
        this.syncTopic = generateSyncTopic(this.groupId);
        this.coordStream = generateCoordStream();
        this.joiner = getJoiner(this.groupId, this.coordStream);
        initSync();
        this.joinerCallback = getJoinerCallback();
        log = getLogger();
    }

    private MarlinListener<?, ?> getJoiner(String str, String str2) {
        Properties properties = new Properties();
        properties.put("group.id", str);
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("streams.consumer.default.stream", str2);
        properties.put("streams.clientside.partition.assignment", "false");
        return new MarlinListener<>((ConsumerConfig) new GenericHFactory().getImplementorInstance("org.apache.kafka.clients.consumer.ConsumerConfig", new Object[]{properties}, new Class[]{Map.class}), null, null);
    }

    private void initSync() {
        log.debug("Creating Sync topic {} : ", this.syncTopic);
        try {
            Admin newAdmin = Streams.newAdmin(new Configuration());
            try {
                try {
                    newAdmin.createTopic(this.coordStream, this.syncTopic, 1);
                    newAdmin.close();
                } finally {
                }
            } catch (Exception e) {
                log.debug("Sync topic creation failed : {} ", e.getMessage());
                newAdmin.close();
            }
        } catch (Exception e2) {
            log.debug("Sync topic creation. Failed to create admin client: {} ", e2.getMessage());
        }
        this.streamTopic = this.coordStream + ":" + this.syncTopic;
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("auto.offset.reset", "latest");
        properties.put("streams.clientside.partition.assignment", "false");
        this.syncReceiver = new KafkaConsumer<>(properties);
        this.syncProducer = new KafkaProducer<>(properties);
        this.syncReceiver.assign(Arrays.asList(new TopicPartition(this.streamTopic, 0)));
    }

    protected abstract Logger getLogger();

    protected abstract MarlinListener.MarlinJoinCallback getJoinerCallback();

    protected abstract String generateSyncTopic(String str);

    protected abstract String generateCoordStream();

    protected abstract Marlinserver.JoinGroupDesc generateJoinDesc();

    protected abstract void revokeAssignments();

    protected abstract void protocolOnSyncComplete(Marlinserver.MemberState memberState, long j);

    protected abstract Map<String, ByteBuffer> performProtocolAssignment(String str, List<Marlinserver.Member> list);

    protected void onSyncComplete(Marlinserver.GroupAssignment groupAssignment) {
        log.debug("ga gen id {}", Long.valueOf(groupAssignment.getGroupGenerationId()));
        for (Marlinserver.MemberState memberState : groupAssignment.getMemberStateList()) {
            log.debug("ms id {} ", memberState.getMemberId());
            if (memberId() != null) {
                log.debug("this id {}", memberId());
            }
            if (memberState.getMemberId().equals(memberId())) {
                protocolOnSyncComplete(memberState, groupAssignment.getGroupGenerationId());
                this.needsJoinPrepare = true;
            }
        }
    }

    public void ensureActiveGroup() {
        while (rejoinNeeded()) {
            resetRejoinFlags();
            if (this.needsJoinPrepare) {
                revokeAssignments();
                this.needsJoinPrepare = false;
            }
            Marlinserver.JoinGroupDesc generateJoinDesc = generateJoinDesc();
            this.joinComplete = false;
            Marlinserver.JoinGroupResponse join = this.joiner.join(generateJoinDesc, this.joinerCallback);
            log.debug("ensureActiveGroup: joinStatus {}", join.getJoinStatus());
            handleJoinGroupResponse(join);
            waitForJoinOrRejoinEvent();
            if (this.joinComplete) {
                doSync();
            }
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:2:0x000b. Please report as an issue. */
    private void handleJoinGroupResponse(Marlinserver.JoinGroupResponse joinGroupResponse) {
        switch (AnonymousClass1.$SwitchMap$com$mapr$fs$proto$Marlinserver$JoinStatusCode[joinGroupResponse.getJoinStatus().ordinal()]) {
            case 1:
                this.memberId = UNKNOWN_MEMBER_ID_STR;
                this.lock.lock();
                this.rejoinEvent = true;
                this.lock.unlock();
                return;
            case 2:
                this.memberId = joinGroupResponse.getMemberId();
                resetBackoff();
                return;
            case 3:
                throw new BrokerNotAvailableException("Feature not available on server. Please upgrade to at least Version 5.2.1");
            case 4:
                throw new AuthorizationException("Need produceperm and consumeperm permissions on stream " + this.coordStream);
            case 5:
                log.error("Could not open stream " + this.coordStream);
            default:
                log.error("Join Group request failed with {}. Retrying with exponential backoff", joinGroupResponse.getJoinStatus());
                backoff();
                return;
        }
    }

    private void backoff() {
        if ((this.backoffTimeMs * 2) + 1000 < 900000) {
            this.backoffTimeMs = (this.backoffTimeMs * 2) + 1000;
        } else {
            this.backoffTimeMs = 900000L;
        }
        Utils.sleep(this.backoffTimeMs);
    }

    private void resetBackoff() {
        this.backoffTimeMs = 0L;
    }

    private void waitForJoinOrRejoinEvent() {
        log.debug("waitForJoinOrRejoinEvent: memberId {}start", this.memberId);
        try {
            this.lock.lock();
            while (!this.joinComplete && !this.rejoinEvent) {
                this.condition.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.debug("waitForJoinOrRejoinEvent: memberId {} interrupted", this.memberId);
        } finally {
            this.lock.unlock();
        }
        log.debug("waitForJoinOrRejoinEvent: memberId {} awoken. joinComplete {} rejoinEvent {}", new Object[]{this.memberId, Boolean.valueOf(this.joinComplete), Boolean.valueOf(this.rejoinEvent)});
    }

    private void doSync() {
        int i = 0;
        while (true) {
            ConsumerRecords poll = this.syncReceiver.poll(SYNC_POLL_TIMEOUT);
            log.debug("doSync: memberId {} returned from poll {}", this.memberId, Integer.valueOf(i));
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                log.debug("doSync: consumer record..generation ID {}", consumerRecord.key());
                if (groupGenerationIdMatches((Long) consumerRecord.key())) {
                    try {
                        onSyncComplete(Marlinserver.GroupAssignment.parseFrom((byte[]) consumerRecord.value()));
                        return;
                    } catch (InvalidProtocolBufferException e) {
                        throw new KafkaException("Error parsing Sync response");
                    }
                }
            }
            if (rejoinEventOccured()) {
                return;
            } else {
                i++;
            }
        }
    }

    private boolean groupGenerationIdMatches(Long l) {
        this.lock.lock();
        boolean equals = this.groupGenerationId.equals(l);
        this.lock.unlock();
        return equals;
    }

    public void requestRejoin() {
        log.debug("requestRejoin");
        this.lock.lock();
        this.isRejoinRequested = true;
        this.condition.signal();
        this.lock.unlock();
    }

    public String memberId() {
        return this.memberId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void close() {
        this.syncReceiver.close();
        this.syncProducer.close();
        this.joiner.close();
        this.lock.lock();
        this.isRejoinRequested = false;
        this.rejoinEvent = false;
        this.lock.unlock();
    }

    public void ensureCoordinatorKnown() {
    }

    public void ensureCoordinatorReady() {
    }

    public void pollEvent(long j) throws WakeupException {
        log.debug("Poll timeout {}", Long.valueOf(j));
        this.lock.lock();
        do {
            try {
                try {
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    log.debug("exiting poll");
                    this.lock.unlock();
                }
                if (this.isRejoinRequested || this.rejoinEvent || this.wakeupEvent) {
                    if (!this.wakeupEvent) {
                        log.debug("exiting poll");
                        this.lock.unlock();
                        log.debug("exiting poll");
                        return;
                    } else {
                        this.wakeupEvent = false;
                        log.debug("MarlinConsumerCoordinator: woken up");
                        log.debug("exiting poll");
                        this.lock.unlock();
                        return;
                    }
                }
            } catch (Throwable th) {
                log.debug("exiting poll");
                this.lock.unlock();
                throw th;
            }
        } while (this.condition.await(j, TimeUnit.MILLISECONDS));
        log.debug("MarlinConsumerCoordinator: poll time expired");
        log.debug("exiting poll");
        this.lock.unlock();
    }

    public void poll(long j) throws WakeupException {
        log.debug("poll timeout {}", Long.valueOf(j));
        ensureActiveGroup();
        this.lock.lock();
        do {
            try {
                try {
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    log.debug("exiting poll");
                    this.lock.unlock();
                }
                if (this.isRejoinRequested || this.rejoinEvent || this.wakeupEvent) {
                    if (this.wakeupEvent) {
                        this.wakeupEvent = false;
                        throw new WakeupException();
                    }
                    log.debug("exiting poll");
                    this.lock.unlock();
                    log.debug("exiting poll");
                    return;
                }
            } catch (Throwable th) {
                log.debug("exiting poll");
                this.lock.unlock();
                throw th;
            }
        } while (this.condition.await(j, TimeUnit.MILLISECONDS));
        log.debug("MarlinCoordinator: poll time expired");
        log.debug("exiting poll");
        this.lock.unlock();
    }

    public void wakeup() {
        log.debug("wakeup: waking up");
        this.lock.lock();
        this.wakeupEvent = true;
        this.condition.signal();
        this.lock.unlock();
    }

    public void maybeLeaveGroup() {
        requestRejoin();
    }

    public RequestFuture<Void> maybeLeaveGroup(String str) {
        log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", new Object[]{this.memberId, this.groupId, str});
        maybeLeaveGroup();
        return null;
    }

    protected abstract boolean isProtocolRejoinNeeded();

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean rejoinNeeded() {
        this.lock.lock();
        boolean z = this.isRejoinRequested || this.rejoinEvent || isProtocolRejoinNeeded();
        this.lock.unlock();
        log.debug("isRejoinRequested {} rejoinEvent {} ", Boolean.valueOf(this.isRejoinRequested), Boolean.valueOf(this.rejoinEvent));
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetRejoinFlags() {
        this.lock.lock();
        this.rejoinEvent = false;
        this.isRejoinRequested = false;
        this.lock.unlock();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean rejoinEventOccured() {
        this.lock.lock();
        boolean z = this.rejoinEvent;
        this.lock.unlock();
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void performOnJoin(Marlinserver.JoinGroupInfo joinGroupInfo) {
        String groupLeaderId = joinGroupInfo.getGroupLeaderId();
        log.debug("onJoin: memberId {} leaderId {}", memberId(), groupLeaderId);
        if (groupLeaderId.equals(memberId())) {
            try {
                Map<String, ByteBuffer> performProtocolAssignment = performProtocolAssignment(groupLeaderId, joinGroupInfo.getMembersList());
                Marlinserver.GroupAssignment.Builder groupGenerationId = Marlinserver.GroupAssignment.newBuilder().setGroupGenerationId(joinGroupInfo.getGroupGenerationId());
                for (Map.Entry<String, ByteBuffer> entry : performProtocolAssignment.entrySet()) {
                    log.debug("setting memberstate member id to {}", entry.getKey());
                    groupGenerationId.addMemberState(Marlinserver.MemberState.newBuilder().setMemberId(entry.getKey()).setMemberAssignment(ByteString.copyFrom(entry.getValue())).build());
                }
                Marlinserver.GroupAssignment build = groupGenerationId.build();
                log.debug("onJoin: memberId {} producing assignment for generation {}", memberId(), Long.valueOf(joinGroupInfo.getGroupGenerationId()));
                this.syncProducer.send(new ProducerRecord(this.streamTopic, 0, (Long) null, Long.valueOf(joinGroupInfo.getGroupGenerationId()), build.toByteArray()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.lock.lock();
        this.groupGenerationId = Long.valueOf(joinGroupInfo.getGroupGenerationId());
        this.joinComplete = true;
        this.condition.signal();
        this.lock.unlock();
    }
}
