/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.impl.admin;

import com.google.protobuf.MessageLite;
import com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils;
import com.mapr.db.exceptions.TableNotFoundException;
import com.mapr.fs.ShimLoader;
import com.mapr.fs.cldb.proto.CLDBProto;
import com.mapr.fs.proto.Common;
import com.mapr.fs.proto.License;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.fs.proto.Security;
import com.mapr.security.UnixUserGroupHelper;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.Admin;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.TopicDescriptor;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.impl.admin.MarlinAdminImpl;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.impl.admin.TopicFeedInfo;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AbortTransactionOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AbortTransactionResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AbortTransactionSpec;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterClientQuotasOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterClientQuotasResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterPartitionReassignmentsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterPartitionReassignmentsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterReplicaLogDirsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterReplicaLogDirsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterUserScramCredentialsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AlterUserScramCredentialsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.Config;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateAclsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateDelegationTokenResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteAclsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteConsumerGroupsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeClientQuotasOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeClientQuotasResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeFeaturesOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeFeaturesResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeLogDirsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeMetadataQuorumOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeMetadataQuorumResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeReplicaLogDirsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeTransactionsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeTransactionsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeUserScramCredentialsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.DescribeUserScramCredentialsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.FenceProducersResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListPartitionReassignmentsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListTransactionsResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewPartitions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.NewTopic;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.RenewDelegationTokenResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.TopicDescription;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.TopicListing;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.UnregisterBrokerOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.UnregisterBrokerResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.UserScramCredentialAlteration;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.ElectionType;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaFuture;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Metric;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.MetricName;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Node;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicCollection;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartitionInfo;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartitionReplica;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Uuid;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.acl.AclBinding;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.config.ConfigResource;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TopicExistsException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.protocol.Errors;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinAdminClientImpl
extends AdminClient {
    private static final KafkaException emptyTopicNameException = new InvalidTopicException("Topic name \"\" is illegal");
    private static final KafkaException noStreamNameSpecifiedException = new KafkaException("No stream name specified in the topic path or in the default stream configuration options");
    private static final Logger LOG;
    private Admin admin;
    private String defaultStreamName;

    static AdminClient createInternal(AdminClientConfig conf) {
        return new MarlinAdminClientImpl(conf);
    }

    MarlinAdminClientImpl(AdminClientConfig conf) {
        try {
            this.admin = new MarlinAdminImpl(new Configuration());
            this.defaultStreamName = conf.getString("streams.admin.default.stream");
            if (this.defaultStreamName.equals("")) {
                this.defaultStreamName = null;
            }
        }
        catch (Exception e) {
            throw new KafkaException("Failed to create MarlinAdminClient", e);
        }
    }

    MarlinAdminClientImpl(Admin admin, String defaultStreamName) {
        this.admin = admin;
        this.defaultStreamName = defaultStreamName;
    }

    public Admin getMapRAdmin() {
        return this.admin;
    }

    @Override
    public void close(Duration timeout) {
        this.admin.close(timeout);
    }

    @Override
    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) {
        HashMap<String, KafkaFuture<CreateTopicsResult.TopicMetadataAndConfig>> createTopicResult = new HashMap<String, KafkaFuture<CreateTopicsResult.TopicMetadataAndConfig>>();
        String fullTopicPath = null;
        String streamPath = null;
        String topicName = null;
        for (NewTopic n : newTopics) {
            fullTopicPath = n.name();
            String[] tokens = fullTopicPath.split(":");
            KafkaFutureImpl future = new KafkaFutureImpl();
            if (tokens.length == 2) {
                streamPath = tokens[0];
                topicName = tokens[1];
            } else {
                if (tokens[0].startsWith("/")) {
                    future.completeExceptionally(emptyTopicNameException);
                    createTopicResult.put(fullTopicPath, future);
                    continue;
                }
                if (this.defaultStreamName == null) {
                    future.completeExceptionally(noStreamNameSpecifiedException);
                    createTopicResult.put(fullTopicPath, future);
                    continue;
                }
                streamPath = this.defaultStreamName;
                topicName = fullTopicPath;
            }
            try {
                this.admin.createTopic(streamPath, topicName, n.numPartitions());
                future.complete(null);
            }
            catch (TableNotFoundException e) {
                future.completeExceptionally(new UnknownTopicOrPartitionException("Stream " + streamPath + " does not exist."));
            }
            catch (FileAlreadyExistsException e) {
                future.completeExceptionally(new TopicExistsException(e.getMessage()));
            }
            catch (IOException e) {
                future.completeExceptionally(e);
            }
            createTopicResult.put(fullTopicPath, future);
        }
        return new CreateTopicsResult(createTopicResult);
    }

    @Override
    public DeleteTopicsResult deleteTopics(TopicCollection topics, DeleteTopicsOptions options) {
        if (topics instanceof TopicCollection.TopicIdCollection) {
            return DeleteTopicsResult.ofTopicIds(this.handleDeleteTopicsUsingIds(((TopicCollection.TopicIdCollection)topics).topicIds(), options));
        }
        if (topics instanceof TopicCollection.TopicNameCollection) {
            return DeleteTopicsResult.ofTopicNames(this.handleDeleteTopicsUsingNames(((TopicCollection.TopicNameCollection)topics).topicNames(), options));
        }
        throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for deleteTopics.");
    }

    private Map<String, KafkaFuture<Void>> handleDeleteTopicsUsingNames(Collection<String> topicNames, DeleteTopicsOptions options) {
        HashMap<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<String, KafkaFuture<Void>>();
        String streamPath = null;
        String topicName = null;
        for (String fullTopicPath : topicNames) {
            String[] tokens = fullTopicPath.split(":");
            KafkaFutureImpl future = new KafkaFutureImpl();
            if (tokens.length == 2) {
                streamPath = tokens[0];
                topicName = tokens[1];
            } else {
                if (tokens[0].startsWith("/")) {
                    future.completeExceptionally(emptyTopicNameException);
                    deleteTopicsResult.put(fullTopicPath, future);
                    continue;
                }
                if (this.defaultStreamName == null) {
                    future.completeExceptionally(noStreamNameSpecifiedException);
                    deleteTopicsResult.put(fullTopicPath, future);
                    continue;
                }
                streamPath = this.defaultStreamName;
                topicName = fullTopicPath;
            }
            try {
                this.admin.deleteTopic(streamPath, topicName);
                future.complete(null);
            }
            catch (TableNotFoundException e) {
                future.completeExceptionally(new UnknownTopicOrPartitionException("Stream " + streamPath + " does not exist."));
            }
            catch (NoSuchFileException e) {
                future.completeExceptionally(new UnknownTopicOrPartitionException(e.getMessage()));
            }
            catch (IOException e) {
                future.completeExceptionally(e);
            }
            deleteTopicsResult.put(fullTopicPath, future);
        }
        return deleteTopicsResult;
    }

    private Map<Uuid, KafkaFuture<Void>> handleDeleteTopicsUsingIds(Collection<Uuid> topicIds, DeleteTopicsOptions options) {
        throw new KafkaException("Topic IDs are not currently supported in MapR Admin. Please use topic names instead");
    }

    @Override
    public ListTopicsResult listTopics(ListTopicsOptions options) {
        if (this.defaultStreamName == null) {
            throw new KafkaException("No default stream name specified in the configuration options");
        }
        return this.listTopicsForStream(this.defaultStreamName, options);
    }

    @Override
    public ListTopicsResult listTopics(String streamPath, ListTopicsOptions options) {
        return this.listTopicsForStream(streamPath, options);
    }

    private ListTopicsResult listTopicsForStream(String streamPath, ListTopicsOptions options) {
        KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<Map<String, TopicListing>>();
        try {
            List<String> topicNamesList = this.admin.listTopics(streamPath);
            HashMap<String, TopicListing> topicListingMap = new HashMap<String, TopicListing>();
            for (String topicName : topicNamesList) {
                topicListingMap.put(topicName, new TopicListing(topicName, false));
            }
            topicListingFuture.complete(topicListingMap);
        }
        catch (TableNotFoundException e) {
            topicListingFuture.completeExceptionally(new UnknownTopicOrPartitionException("Stream " + streamPath + " does not exist."));
        }
        catch (Exception e) {
            topicListingFuture.completeExceptionally(e);
        }
        return new ListTopicsResult(topicListingFuture);
    }

    @Override
    public DescribeTopicsResult describeTopics(TopicCollection topics, DescribeTopicsOptions options) {
        if (topics instanceof TopicCollection.TopicIdCollection) {
            return DescribeTopicsResult.ofTopicIds(this.handleDescribeTopicsByIds(((TopicCollection.TopicIdCollection)topics).topicIds(), options));
        }
        if (topics instanceof TopicCollection.TopicNameCollection) {
            return DescribeTopicsResult.ofTopicNames(this.handleDescribeTopicsByNames(((TopicCollection.TopicNameCollection)topics).topicNames(), options));
        }
        throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics.");
    }

    private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(Collection<String> topicNames, DescribeTopicsOptions options) {
        HashMap<String, KafkaFuture<TopicDescription>> describeTopicsResult = new HashMap<String, KafkaFuture<TopicDescription>>();
        String streamPath = null;
        String topicName = null;
        for (String fullTopicPath : topicNames) {
            String[] tokens = fullTopicPath.split(":");
            KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<TopicDescription>();
            if (tokens.length == 2) {
                streamPath = tokens[0];
                topicName = tokens[1];
            } else {
                if (tokens[0].startsWith("/")) {
                    future.completeExceptionally(emptyTopicNameException);
                    describeTopicsResult.put(fullTopicPath, future);
                    continue;
                }
                if (this.defaultStreamName == null) {
                    future.completeExceptionally(noStreamNameSpecifiedException);
                    describeTopicsResult.put(fullTopicPath, future);
                    continue;
                }
                streamPath = this.defaultStreamName;
                topicName = fullTopicPath;
            }
            try {
                TopicDescriptor marlinTopDesc = this.admin.getTopicDescriptor(streamPath, topicName);
                ArrayList<TopicPartitionInfo> partitions = new ArrayList<TopicPartitionInfo>();
                Node leader = new Node(0, "127.0.0.1", 7200);
                ArrayList<Node> replicas = new ArrayList<Node>();
                replicas.add(leader);
                for (int p = 0; p < marlinTopDesc.getPartitions(); ++p) {
                    partitions.add(new TopicPartitionInfo(p, leader, replicas, replicas));
                }
                TopicDescription kafkaTopDesc = new TopicDescription(streamPath + ":" + topicName, false, partitions);
                future.complete(kafkaTopDesc);
            }
            catch (TableNotFoundException e) {
                future.completeExceptionally(new UnknownTopicOrPartitionException("Stream " + streamPath + " does not exist."));
            }
            catch (NoSuchFileException e) {
                future.completeExceptionally(new UnknownTopicOrPartitionException(e.getMessage()));
            }
            catch (IOException e) {
                future.completeExceptionally(e);
            }
            describeTopicsResult.put(streamPath + ":" + topicName, future);
        }
        return describeTopicsResult;
    }

    private Map<Uuid, KafkaFuture<TopicDescription>> handleDescribeTopicsByIds(Collection<Uuid> topicIds, DescribeTopicsOptions options) {
        throw new KafkaException("Topic IDs are not currently supported in MapR Admin. Please use topic names instead");
    }

    @Override
    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
        HashMap<String, KafkaFuture<Void>> createPartitionsResult = new HashMap<String, KafkaFuture<Void>>();
        String streamPath = null;
        String topicName = null;
        for (String fullTopicPath : newPartitions.keySet()) {
            String[] tokens = fullTopicPath.split(":");
            KafkaFutureImpl future = new KafkaFutureImpl();
            if (tokens.length == 2) {
                streamPath = tokens[0];
                topicName = tokens[1];
            } else {
                if (tokens[0].startsWith("/")) {
                    future.completeExceptionally(emptyTopicNameException);
                    createPartitionsResult.put(fullTopicPath, future);
                    continue;
                }
                if (this.defaultStreamName == null) {
                    future.completeExceptionally(noStreamNameSpecifiedException);
                    createPartitionsResult.put(fullTopicPath, future);
                    continue;
                }
                streamPath = this.defaultStreamName;
                topicName = fullTopicPath;
            }
            NewPartitions n = newPartitions.get(fullTopicPath);
            try {
                this.admin.editTopic(streamPath, topicName, n.totalCount());
                future.complete(null);
            }
            catch (TableNotFoundException e) {
                future.completeExceptionally(new UnknownTopicOrPartitionException("Stream " + streamPath + " does not exist."));
            }
            catch (NoSuchFileException e) {
                future.completeExceptionally(new UnknownTopicOrPartitionException(e.getMessage()));
            }
            catch (IOException e) {
                future.completeExceptionally(e);
            }
            createPartitionsResult.put(fullTopicPath, future);
        }
        return new CreatePartitionsResult(createPartitionsResult);
    }

    @Override
    public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options) {
        throw new KafkaException("describeReplicaLogDirs API not implemented");
    }

    @Override
    public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) {
        throw new KafkaException("describeLogDirs API not implemented");
    }

    @Override
    public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options) {
        throw new KafkaException("alterConfigs API not implemented");
    }

    @Override
    public AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
        throw new KafkaException("incrementalAlterConfigs API not implemented");
    }

    @Override
    public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options) {
        throw new KafkaException("alterReplicaLogDirs API not implemented");
    }

    @Override
    public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
        throw new KafkaException("describeConfigs API not implemented");
    }

    @Override
    public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) {
        throw new KafkaException("deleteAcls API not implemented");
    }

    @Override
    public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) {
        throw new KafkaException("createAcls API not implemented");
    }

    @Override
    public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete, DeleteRecordsOptions options) {
        throw new KafkaException("deleteRecords API not implemented");
    }

    @Override
    public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) {
        throw new KafkaException("describeAcls API not implemented");
    }

    private License.LicenseIdResponse fetchClusterID(String cluster, Security.CredentialsMsg creds) throws Exception {
        License.LicenseIdResponse resp = null;
        License.LicenseIdRequest req = License.LicenseIdRequest.newBuilder().setCreds(License.LicenseCredentialsMsg.newBuilder().setUid(creds.getUid()).addAllGids((Iterable)creds.getGidsList()).build()).build();
        byte[] data = cluster != null && !cluster.isEmpty() ? CLDBRpcCommonUtils.getInstance().sendRequest(cluster, Common.MapRProgramId.CldbProgramId.getNumber(), CLDBProto.CLDBProg.GetLicenseIdProc.getNumber(), (MessageLite)req, License.LicenseIdResponse.class) : CLDBRpcCommonUtils.getInstance().sendRequest(Common.MapRProgramId.CldbProgramId.getNumber(), CLDBProto.CLDBProg.GetLicenseIdProc.getNumber(), (MessageLite)req, License.LicenseIdResponse.class);
        if (data == null) {
            return null;
        }
        resp = License.LicenseIdResponse.parseFrom((byte[])data);
        return resp;
    }

    private Security.CredentialsMsg getUserCredentials() {
        UnixUserGroupHelper ui = new UnixUserGroupHelper();
        String user = ui.getLoggedinUsername();
        int uid = ui.getUserId(user);
        int[] gids = ui.getGroups(user);
        Security.CredentialsMsg.Builder msg = Security.CredentialsMsg.newBuilder().setUid(uid);
        for (int gid : gids) {
            msg.addGids(gid);
        }
        return msg.build();
    }

    @Override
    public DescribeClusterResult describeCluster(DescribeClusterOptions options) {
        KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<Collection<Node>>();
        KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<Node>();
        KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<String>();
        String curCluster = CLDBRpcCommonUtils.getInstance().getCurrentClusterName();
        try {
            License.LicenseIdResponse licId = this.fetchClusterID(curCluster, this.getUserCredentials());
            clusterIdFuture.complete(licId.getClusterid());
        }
        catch (Exception e) {
            clusterIdFuture.completeExceptionally(e);
        }
        Node controller = new Node(0, "127.0.0.1", 7200);
        ArrayList<Node> brokers = new ArrayList<Node>();
        brokers.add(controller);
        describeClusterFuture.complete(brokers);
        controllerFuture.complete(controller);
        return new DescribeClusterResult(describeClusterFuture, controllerFuture, clusterIdFuture, null);
    }

    @Override
    public DescribeConsumerGroupsResult describeConsumerGroups(Collection<String> groupIds, DescribeConsumerGroupsOptions options) {
        throw new KafkaException("describeConsumerGroups API not implemented");
    }

    @Override
    public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions options) {
        if (this.defaultStreamName == null) {
            throw new KafkaException("No default stream name specified in the configuration options");
        }
        KafkaFutureImpl<Collection<Object>> all = new KafkaFutureImpl<Collection<Object>>();
        try {
            all.complete(this.admin.listConsumerGroups(this.defaultStreamName));
        }
        catch (IOException e) {
            all.completeExceptionally(e);
        }
        return new ListConsumerGroupsResult(all);
    }

    @Override
    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
        if (this.defaultStreamName == null) {
            throw new KafkaException("No default stream name specified in the configuration options");
        }
        return this.listConsumerGroupOffsets(this.defaultStreamName, groupSpecs);
    }

    @Override
    public boolean isMapr() {
        return true;
    }

    @Override
    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String stream, Map<String, ListConsumerGroupOffsetsSpec> groupSpecs, ListConsumerGroupOffsetsOptions options) {
        return this.listConsumerGroupOffsets(stream, groupSpecs);
    }

    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String stream, String groupId) {
        return this.listConsumerGroupOffsets(stream, Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec()));
    }

    @Override
    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String stream, Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
        if (groupSpecs.size() != 1) {
            throw new KafkaException("listConsumerGroupOffsets with more than 1 group is not currently supported in MapR Admin");
        }
        String groupId = (String)groupSpecs.keySet().stream().findAny().get();
        KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> groupOffsetListingFuture = new KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>>();
        try {
            groupOffsetListingFuture.complete(this.admin.listConsumerGroupOffsets(stream, groupId));
        }
        catch (IOException e) {
            groupOffsetListingFuture.completeExceptionally(e);
        }
        return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(groupId), groupOffsetListingFuture));
    }

    @Override
    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options) {
        throw new KafkaException("deleteConsumerGroups API not implemented");
    }

    @Override
    public DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteConsumerGroupOffsetsOptions options) {
        throw new KafkaException("deleteConsumerGroupOffsets API not implemented");
    }

    @Override
    public ElectLeadersResult electLeaders(ElectionType electionType, Set<TopicPartition> partitions, ElectLeadersOptions options) {
        throw new KafkaException("electLeaders API not implemented");
    }

    @Override
    public AlterPartitionReassignmentsResult alterPartitionReassignments(Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments, AlterPartitionReassignmentsOptions options) {
        throw new KafkaException("alterPartitionReassignments API not implemented");
    }

    @Override
    public ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions, ListPartitionReassignmentsOptions options) {
        throw new KafkaException("listPartitionReassignments API not implemented");
    }

    @Override
    public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, RemoveMembersFromConsumerGroupOptions options) {
        throw new KafkaException("removeMembersFromConsumerGroup API not implemented");
    }

    @Override
    public AlterConsumerGroupOffsetsResult alterConsumerGroupOffsets(String groupId, Map<TopicPartition, OffsetAndMetadata> offsets, AlterConsumerGroupOffsetsOptions options) {
        KafkaFutureImpl<Map<TopicPartition, Errors>> groupOffsetAlterFutures = new KafkaFutureImpl<Map<TopicPartition, Errors>>();
        try {
            groupOffsetAlterFutures.complete(this.admin.alterConsumerGroupOffsets(this.defaultStreamName, groupId, offsets));
        }
        catch (IOException e) {
            groupOffsetAlterFutures.completeExceptionally(e);
        }
        return new AlterConsumerGroupOffsetsResult(groupOffsetAlterFutures);
    }

    @Override
    public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, ListOffsetsOptions options) {
        HashMap futures = new HashMap(topicPartitionOffsets.size());
        HashMap<String, Map<String, List<TopicFeedInfo>>> streamsInfo = new HashMap<String, Map<String, List<TopicFeedInfo>>>(topicPartitionOffsets.size());
        for (TopicPartition topicPartition : topicPartitionOffsets.keySet()) {
            String fullTopicPath = topicPartition.topic();
            String streamPath = null;
            String topicName = null;
            String[] tokens = fullTopicPath.split(":");
            KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo> future = new KafkaFutureImpl<ListOffsetsResult.ListOffsetsResultInfo>();
            if (tokens.length == 2) {
                streamPath = tokens[0];
                topicName = tokens[1];
            } else {
                if (tokens[0].startsWith("/")) {
                    future.completeExceptionally(emptyTopicNameException);
                    futures.put(topicPartition, future);
                    continue;
                }
                if (this.defaultStreamName == null) {
                    future.completeExceptionally(noStreamNameSpecifiedException);
                    futures.put(topicPartition, future);
                    continue;
                }
                streamPath = this.defaultStreamName;
                topicName = fullTopicPath;
            }
            try {
                Map<String, List<TopicFeedInfo>> topicsForStream;
                if (streamsInfo.containsKey(streamPath)) {
                    topicsForStream = (Map<String, List<TopicFeedInfo>>)streamsInfo.get(streamPath);
                } else {
                    topicsForStream = this.admin.listTopicsForStream(streamPath);
                    streamsInfo.put(streamPath, topicsForStream);
                }
                if (topicsForStream.containsKey(topicName)) {
                    boolean checkPartitionExist = false;
                    for (TopicFeedInfo topicFeedInfo : topicsForStream.get(topicName)) {
                        Marlinserver.TopicFeedStatInfo tinfo = topicFeedInfo.stat();
                        if (topicPartition.partition() != tinfo.getFeedId()) continue;
                        future.complete(new ListOffsetsResult.ListOffsetsResultInfo(tinfo.getMaxSeq(), tinfo.getTimeRange().getMaxTS(), Optional.empty()));
                        checkPartitionExist = true;
                        break;
                    }
                    if (!checkPartitionExist) {
                        future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Partition %d cannot be found in the topic %s", topicPartition.partition(), topicPartition.topic())));
                    }
                } else {
                    future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s cannot be found in the stream %s", topicName, streamPath)));
                }
            }
            catch (TableNotFoundException e) {
                future.completeExceptionally(new UnknownTopicOrPartitionException("Stream " + streamPath + " does not exist."));
            }
            catch (IOException e) {
                future.completeExceptionally(e);
            }
            futures.put(topicPartition, future);
        }
        return new ListOffsetsResult(new HashMap<TopicPartition, KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>(futures));
    }

    @Override
    public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) {
        throw new KafkaException("describeClientQuotas API not implemented");
    }

    @Override
    public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options) {
        throw new KafkaException("alterClientQuotas API not implemented");
    }

    @Override
    public DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users, DescribeUserScramCredentialsOptions options) {
        throw new KafkaException("describeUserScramCredentials method is not currently supported in MapR Admin");
    }

    @Override
    public AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations, AlterUserScramCredentialsOptions options) {
        throw new KafkaException("alterUserScramCredentials method is not currently supported in MapR Admin");
    }

    @Override
    public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options) {
        throw new KafkaException("describeFeatures method is not currently supported in MapR Admin");
    }

    @Override
    public UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options) {
        throw new KafkaException("updateFeatures method is not currently supported in MapR Admin");
    }

    @Override
    public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) {
        throw new KafkaException("describeMetadataQuorum method is not currently supported in MapR Admin");
    }

    @Override
    public UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options) {
        throw new KafkaException("unregisterBroker method is not currently supported in MapR Admin");
    }

    @Override
    public DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options) {
        throw new KafkaException("describeProducers method is not currently supported in MapR Admin");
    }

    @Override
    public DescribeTransactionsResult describeTransactions(Collection<String> transactionalIds, DescribeTransactionsOptions options) {
        throw new KafkaException("describeTransactions method is not currently supported in MapR Admin");
    }

    @Override
    public AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortTransactionOptions options) {
        throw new KafkaException("abortTransaction method is not currently supported in MapR Admin");
    }

    @Override
    public ListTransactionsResult listTransactions(ListTransactionsOptions options) {
        throw new KafkaException("listTransactions method is not currently supported in MapR Admin");
    }

    @Override
    public FenceProducersResult fenceProducers(Collection<String> transactionalIds, FenceProducersOptions options) {
        throw new KafkaException("fenceProducers method is not currently supported in MapR Admin");
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        throw new KafkaException("metrics API not implemented");
    }

    @Override
    public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) {
        throw new KafkaException("createDelegationToken API not implemented");
    }

    @Override
    public RenewDelegationTokenResult renewDelegationToken(byte[] hmac, RenewDelegationTokenOptions options) {
        throw new KafkaException("renewDelegationToken API not implemented");
    }

    @Override
    public ExpireDelegationTokenResult expireDelegationToken(byte[] hmac, ExpireDelegationTokenOptions options) {
        throw new KafkaException("expireDelegationToken API not implemented");
    }

    @Override
    public DescribeDelegationTokenResult describeDelegationToken(DescribeDelegationTokenOptions options) {
        throw new KafkaException("describeDelegationToken API not implemented");
    }

    static {
        ShimLoader.load();
        LOG = LoggerFactory.getLogger(MarlinAdminClientImpl.class);
    }
}

