/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.kafka.eventstreams.impl;

import com.google.protobuf.ByteString;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.kafka.eventstreams.impl.MarlinCoordinator;
import com.mapr.kafka.eventstreams.impl.listener.MarlinListener;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.utils.CircularIterator;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.GenericWorkerCoordinator;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinWorkerCoordinator
extends MarlinCoordinator
implements GenericWorkerCoordinator {
    private LeaderState leaderState;
    private ExtendedAssignment assignmentSnapshot;
    private ClusterConfigState configSnapshot;
    private DistributedConfig config;
    private final String restUrl;
    protected final WorkerRebalanceListener rebalanceCb;
    private final KafkaConfigBackingStore configStorage;

    public MarlinWorkerCoordinator(DistributedConfig conf, String groupId, String restUrl, KafkaConfigBackingStore configStorage, WorkerRebalanceListener rebalanceCb) {
        super(groupId);
        this.config = conf;
        this.restUrl = restUrl;
        this.configStorage = configStorage;
        this.rebalanceCb = rebalanceCb;
        this.assignmentSnapshot = null;
        this.init();
        log.debug("MarlinWorkerCoordinator constructor");
    }

    @Override
    protected Logger getLogger() {
        return LoggerFactory.getLogger(MarlinWorkerCoordinator.class);
    }

    @Override
    protected boolean isProtocolRejoinNeeded() {
        if (this.assignmentSnapshot != null) {
            log.debug("assignmentSnapshot.failed {}", (Object)this.assignmentSnapshot.failed());
        }
        return this.assignmentSnapshot == null || this.assignmentSnapshot.failed();
    }

    @Override
    protected void revokeAssignments() {
        log.debug("Revoking previous assignment {}", (Object)this.assignmentSnapshot);
        if (this.assignmentSnapshot != null && !this.assignmentSnapshot.failed()) {
            this.rebalanceCb.onRevoked(this.assignmentSnapshot.leader(), this.assignmentSnapshot.connectors(), this.assignmentSnapshot.tasks());
        }
    }

    public void revokeAssignment(ExtendedAssignment assignment) {
        log.debug("Revoking previous assignment {}", (Object)assignment);
        if (assignment != null && !assignment.failed()) {
            this.rebalanceCb.onRevoked(assignment.leader(), assignment.connectors(), assignment.tasks());
        }
    }

    @Override
    protected void protocolOnSyncComplete(Marlinserver.MemberState ms, long generationId) {
        ConnectProtocol.Assignment connectProtocolAssignment = ConnectProtocol.deserializeAssignment((ByteBuffer)ms.getMemberAssignment().asReadOnlyByteBuffer());
        this.assignmentSnapshot = new ExtendedAssignment(0, connectProtocolAssignment.error(), connectProtocolAssignment.leader(), connectProtocolAssignment.leaderUrl(), connectProtocolAssignment.offset(), connectProtocolAssignment.connectors(), connectProtocolAssignment.tasks(), Collections.emptyList(), Collections.emptyList(), 0);
        this.invokeAssignCallback(this.assignmentSnapshot, (int)generationId);
    }

    protected void invokeAssignCallback(ExtendedAssignment assignmentSnapshot, int groupGenerationId) {
        this.rebalanceCb.onAssigned(assignmentSnapshot, groupGenerationId);
    }

    public short currentProtocolVersion() {
        return this.assignmentSnapshot.version();
    }

    @Override
    public void close() {
        super.close();
        this.assignmentSnapshot = null;
    }

    public String ownerUrl(String connector) {
        if (this.rejoinNeeded() || !this.isLeader()) {
            return null;
        }
        return this.leaderState.ownerUrl(connector);
    }

    public String ownerUrl(ConnectorTaskId task) {
        if (this.rejoinNeeded() || !this.isLeader()) {
            return null;
        }
        return this.leaderState.ownerUrl(task);
    }

    private boolean isLeader() {
        return this.assignmentSnapshot != null && this.memberId.equals(this.assignmentSnapshot.leader());
    }

    @Override
    protected String generateSyncTopic(String groupId) {
        String topic = "__mapr__" + groupId + "_assignment";
        return topic;
    }

    protected String getConfigTopic(Map<String, ?> configs) {
        return (String)configs.get("config.storage.topic");
    }

    @Override
    protected String generateCoordStream() {
        String configTopic = this.getConfigTopic(this.config.originals());
        int idx = configTopic.lastIndexOf(58);
        String streamName = configTopic.substring(0, idx);
        return streamName;
    }

    protected ClusterConfigState getConfigSnapshot() {
        return this.configStorage.snapshot();
    }

    @Override
    protected MarlinListener.MarlinJoinCallback getJoinerCallback() {
        return new MarlinWorkerJoinCallback();
    }

    @Override
    protected Marlinserver.JoinGroupDesc generateJoinDesc() {
        this.configSnapshot = this.getConfigSnapshot();
        ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(this.restUrl, this.configSnapshot.offset());
        ByteBuffer metadata = ConnectProtocol.serializeMetadata((ConnectProtocol.WorkerState)workerState);
        Marlinserver.JoinGroupDesc desc = Marlinserver.JoinGroupDesc.newBuilder().setProtocolType("connect").setMemberId(this.memberId).addMemberProtocols(Marlinserver.MemberProtocol.newBuilder().setProtocol("default").setMemberMetadata(ByteString.copyFrom((ByteBuffer)metadata)).build()).build();
        return desc;
    }

    @Override
    protected Map<String, ByteBuffer> performProtocolAssignment(String leaderId, List<Marlinserver.Member> members) {
        HashMap<String, ConnectProtocol.WorkerState> wsMap = new HashMap<String, ConnectProtocol.WorkerState>();
        for (Marlinserver.Member member : members) {
            wsMap.put(member.getMemberId(), ConnectProtocol.deserializeMetadata((ByteBuffer)member.getMemberMetadata().asReadOnlyByteBuffer()));
        }
        long maxOffset = this.findMaxMemberConfigOffset(wsMap);
        Long leaderOffset = this.ensureLeaderConfig(maxOffset);
        if (leaderOffset == null) {
            return this.fillAssignmentsAndSerialize(wsMap.keySet(), (short)1, leaderId, ((ConnectProtocol.WorkerState)wsMap.get(leaderId)).url(), maxOffset, new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>());
        }
        return this.performTaskAssignment(leaderId, leaderOffset, wsMap);
    }

    private long findMaxMemberConfigOffset(Map<String, ConnectProtocol.WorkerState> allConfigs) {
        Long maxOffset = null;
        for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : allConfigs.entrySet()) {
            long memberRootOffset = stateEntry.getValue().offset();
            if (maxOffset == null) {
                maxOffset = memberRootOffset;
                continue;
            }
            maxOffset = Math.max(maxOffset, memberRootOffset);
        }
        log.debug("Max config offset root: {}, local snapshot config offsets root: {}", maxOffset, (Object)this.configSnapshot.offset());
        return maxOffset;
    }

    private Long ensureLeaderConfig(long maxOffset) {
        if (this.configSnapshot.offset() < maxOffset) {
            ClusterConfigState updatedSnapshot = this.getConfigSnapshot();
            if (updatedSnapshot.offset() < maxOffset) {
                log.info("Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync.");
                return null;
            }
            this.configSnapshot = updatedSnapshot;
            return this.configSnapshot.offset();
        }
        return maxOffset;
    }

    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> allConfigs) {
        HashMap<String, List<String>> connectorAssignments = new HashMap<String, List<String>>();
        HashMap<String, List<ConnectorTaskId>> taskAssignments = new HashMap<String, List<ConnectorTaskId>>();
        CircularIterator memberIt = new CircularIterator((Collection)Utils.sorted(allConfigs.keySet()));
        for (String connectorId : Utils.sorted((Collection)this.configSnapshot.connectors())) {
            String connectorAssignedTo = (String)memberIt.next();
            log.trace("Assigning connector {} to {}", (Object)connectorId, (Object)connectorAssignedTo);
            ArrayList<String> memberConnectors = (ArrayList<String>)connectorAssignments.get(connectorAssignedTo);
            if (memberConnectors == null) {
                memberConnectors = new ArrayList<String>();
                connectorAssignments.put(connectorAssignedTo, memberConnectors);
            }
            memberConnectors.add(connectorId);
            for (ConnectorTaskId taskId : Utils.sorted((Collection)this.configSnapshot.tasks(connectorId))) {
                String taskAssignedTo = (String)memberIt.next();
                log.trace("Assigning task {} to {}", (Object)taskId, (Object)taskAssignedTo);
                ArrayList<ConnectorTaskId> memberTasks = (ArrayList<ConnectorTaskId>)taskAssignments.get(taskAssignedTo);
                if (memberTasks == null) {
                    memberTasks = new ArrayList<ConnectorTaskId>();
                    taskAssignments.put(taskAssignedTo, memberTasks);
                }
                memberTasks.add(taskId);
            }
        }
        this.leaderState = new LeaderState(allConfigs, connectorAssignments, taskAssignments);
        return this.fillAssignmentsAndSerialize(allConfigs.keySet(), (short)0, leaderId, allConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);
    }

    private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members, short error, String leaderId, String leaderUrl, long maxOffset, Map<String, List<String>> connectorAssignments, Map<String, List<ConnectorTaskId>> taskAssignments) {
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        for (String member : members) {
            List<Object> tasks;
            List<String> connectors = connectorAssignments.get(member);
            if (connectors == null) {
                connectors = Collections.emptyList();
            }
            if ((tasks = taskAssignments.get(member)) == null) {
                tasks = Collections.emptyList();
            }
            ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks);
            log.debug("Assignment: {} -> {}", (Object)member, (Object)assignment);
            groupAssignment.put(member, ConnectProtocol.serializeAssignment((ConnectProtocol.Assignment)assignment));
        }
        log.debug("Finished assignment");
        return groupAssignment;
    }

    private static <K, V> Map<V, K> invertAssignment(Map<K, List<V>> assignment) {
        HashMap<V, K> inverted = new HashMap<V, K>();
        for (Map.Entry<K, List<V>> assignmentEntry : assignment.entrySet()) {
            K key = assignmentEntry.getKey();
            for (V value : assignmentEntry.getValue()) {
                inverted.put(value, key);
            }
        }
        return inverted;
    }

    public boolean ensureCoordinatorReady(Timer timer) {
        return false;
    }

    private static class LeaderState {
        private final Map<String, ConnectProtocol.WorkerState> allMembers;
        private final Map<String, String> connectorOwners;
        private final Map<ConnectorTaskId, String> taskOwners;

        public LeaderState(Map<String, ConnectProtocol.WorkerState> allMembers, Map<String, List<String>> connectorAssignment, Map<String, List<ConnectorTaskId>> taskAssignment) {
            this.allMembers = allMembers;
            this.connectorOwners = MarlinWorkerCoordinator.invertAssignment(connectorAssignment);
            this.taskOwners = MarlinWorkerCoordinator.invertAssignment(taskAssignment);
        }

        private String ownerUrl(ConnectorTaskId id) {
            String ownerId = this.taskOwners.get(id);
            if (ownerId == null) {
                return null;
            }
            return this.allMembers.get(ownerId).url();
        }

        private String ownerUrl(String connector) {
            String ownerId = this.connectorOwners.get(connector);
            if (ownerId == null) {
                return null;
            }
            return this.allMembers.get(ownerId).url();
        }
    }

    public class MarlinWorkerJoinCallback
    extends MarlinCoordinator.MarlinCoordinatorJoinCallback {
        @Override
        public void onJoin(Marlinserver.JoinGroupInfo jgi) {
            MarlinWorkerCoordinator.this.performOnJoin(jgi);
        }
    }
}

