/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.impl.producer;

import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.ShimLoader;
import com.mapr.fs.jni.MapRUserInfo;
import com.mapr.fs.jni.MarlinJniProducer;
import com.mapr.fs.jni.MarlinProducerResult;
import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.impl.MarlinClient;
import com.mapr.streams.impl.MarlinTopicInfo;
import com.mapr.streams.impl.producer.MarlinFuture;
import com.mapr.streams.impl.producer.MarlinProducerResultImpl;
import com.mapr.streams.impl.producer.ProducerRecordJob;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.StreamsPartitioner;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinProducerImpl
extends MarlinJniProducer {
    protected static final Logger LOG;
    private static final int MAX_WORK_QUEUE_SIZE = 4;
    protected static final int SEND_BATCH_SIZE = 100;
    private static final AtomicLong curProducerId;
    private int MaxWorkQueues;
    private List<WorkQueue> workQueues;
    private List<Thread> workers;
    private Boolean multiFlushers;
    private Boolean shuttingdown;
    private final AtomicLong counter = new AtomicLong(0L);
    private final AtomicLong outstandingMsgs = new AtomicLong(0L);
    private Map<String, Integer> topicNameToNumPartitions;
    private StreamsPartitioner partitioner;
    protected long producerId;
    private String defaultClusterPath;
    private String defaultStreamName;
    protected final Marlinserver.MarlinInternalDefaults marlinInternalDefaults = Marlinserver.MarlinInternalDefaults.getDefaultInstance();

    private static void checkProducerConfig(ProducerConfig config) throws KafkaException {
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        try {
            String string = config.getString(mConfDef.getClientID());
        }
        catch (ConfigException e) {
            LOG.error("Invalid client id configuration");
            throw e;
        }
        try {
            Boolean e = config.getBoolean(mConfDef.getParallelFlushersPerPartition());
        }
        catch (ConfigException e) {
            LOG.error("Invalid streams paralle flushers configuration");
            throw e;
        }
        try {
            long bufferMemory = config.getLong(mConfDef.getBufferMemory());
            if (bufferMemory <= 0L) {
                throw new ConfigException("Buffer memory (" + bufferMemory + ") must be a positive number");
            }
        }
        catch (ConfigException e) {
            LOG.error("Invalid buffer memory configuration");
            throw e;
        }
        try {
            long bufferTime = config.getLong(mConfDef.getBufferTime());
            if (bufferTime < 0L) {
                throw new ConfigException("buffer ms (" + bufferTime + ") cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error("Invalid streams buffer time configuration");
            throw e;
        }
        try {
            long metadataAge = config.getLong(mConfDef.getMetadataMaxAge());
            if (metadataAge < 0L) {
                throw new ConfigException("metadata max age (" + metadataAge + ") cannot be negative number");
            }
        }
        catch (ConfigException e) {
            LOG.error("Invalid metadata max age configuration");
            throw e;
        }
    }

    public MarlinProducerImpl(ProducerConfig config) throws KafkaException {
        MapRUserInfo userInfo;
        MarlinProducerImpl.checkProducerConfig(config);
        Marlinserver.MarlinConfigDefaults mConfDef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        this.multiFlushers = config.getBoolean(mConfDef.getParallelFlushersPerPartition());
        this.partitioner = (StreamsPartitioner)config.getConfiguredInstance(mConfDef.getPartitioner(), StreamsPartitioner.class);
        this.defaultStreamName = null;
        int rpcTimeoutMs = 0;
        boolean hardMount = true;
        boolean isIdempotent = false;
        try {
            this.defaultStreamName = config.getString(mConfDef.getProducerDefaultStream());
            rpcTimeoutMs = config.getInt(mConfDef.getRpcTimeout());
            hardMount = config.getBoolean(mConfDef.getHardMount());
            isIdempotent = config.getBoolean(mConfDef.getEnableIdempotenceConfig());
        }
        catch (ConfigException configException) {
            // empty catch block
        }
        this.shuttingdown = false;
        this.MaxWorkQueues = 4;
        try {
            userInfo = MapRFileSystem.CurrentUserInfo();
        }
        catch (IOException e) {
            throw new KafkaException("Could not create MarlinProducer", (Throwable)e);
        }
        this._clntPtr = this.OpenProducer(config.getString(mConfDef.getClientID()), rpcTimeoutMs, hardMount, config.getBoolean(mConfDef.getParallelFlushersPerPartition()), isIdempotent, config.getLong(mConfDef.getBufferMemory()), config.getLong(mConfDef.getBufferTime()), config.getLong(mConfDef.getMetadataMaxAge()), this.defaultStreamName, userInfo);
        this.producerId = curProducerId.getAndIncrement();
        if (this._clntPtr == 0L) {
            throw new NetworkException("Could not create Producer. Please ensure that the CLDB service is configured properly and is available");
        }
        this.defaultClusterPath = this.GetDefaultClusterPath(this._clntPtr);
        this.workQueues = new ArrayList<WorkQueue>();
        this.workers = new ArrayList<Thread>();
        for (int i = 0; i < this.MaxWorkQueues; ++i) {
            WorkQueue wq = new WorkQueue(1024);
            this.workQueues.add(wq);
            Thread worker = new Thread(new WorkerThread(wq, i));
            worker.setDaemon(true);
            this.workers.add(worker);
            worker.start();
        }
        this.topicNameToNumPartitions = new ConcurrentHashMap<String, Integer>();
    }

    protected RecordMetadata getDummyRecordMetadata(String topic) {
        return new RecordMetadata(new TopicPartition(topic, -1), -1L, 0L);
    }

    public <K, V> Future<RecordMetadata> send(ProducerRecord<K, V> record, int feed, byte[] serializedKey, byte[] serializedValue, Callback callback) throws KafkaException {
        return this.do_send(record.topic(), feed, record.key(), serializedKey, record.value(), serializedValue, callback, this.marlinInternalDefaults.getNoTimestamp());
    }

    protected Future<RecordMetadata> do_send(String topic, int feed, Object keyObj, byte[] key, Object valueObj, byte[] value, Callback callback, long timestamp) throws KafkaException {
        if (this.shuttingdown.booleanValue()) {
            LOG.error("Cannot send message when producer is shutting down.");
            if (callback != null) {
                callback.onCompletion(this.getDummyRecordMetadata(topic), (Exception)new RuntimeException("Producer is shutting down. Cannot send."));
            }
            return new FutureFailure(new RuntimeException("Producer is shutting down. Cannot send."));
        }
        if (feed == -1) {
            int numParts = 0;
            if (topic.startsWith(this.defaultClusterPath)) {
                topic = topic.substring(this.defaultClusterPath.length());
            }
            try {
                numParts = this.getNumPartitions(topic);
            }
            catch (KafkaException e) {
                LOG.error("Cannot send message, got an error while fetching number of partitions " + (Object)((Object)e));
                if (callback != null) {
                    callback.onCompletion(this.getDummyRecordMetadata(topic), (Exception)new IllegalArgumentException(e.toString()));
                }
                return new FutureFailure(new IllegalArgumentException(e.toString()));
            }
            feed = this.partitioner.partition(topic, keyObj, key, valueObj, value, numParts);
        }
        MarlinProducerResultImpl result = this.getMarlinProducerResultImpl(topic, feed, callback, key == null ? -1 : key.length, value == null ? -1 : value.length);
        ProducerRecordJob job = this.getProducerRecordJob(result, key, value, timestamp);
        long queueNumber = 0L;
        queueNumber = this.multiFlushers.booleanValue() && feed == -1 && key == null ? (queueNumber += this.counter.getAndIncrement() % Long.MAX_VALUE) : (queueNumber += (long)(topic.length() + (feed > 0 ? feed : 0)) % Long.MAX_VALUE);
        int qid = (int)(queueNumber % (long)this.MaxWorkQueues);
        try {
            this.workQueues.get(qid).enqueue(job);
        }
        catch (Exception e) {
            LOG.error(e.getMessage(), (Throwable)e);
            return new FutureFailure(e);
        }
        this.outstandingMsgs.getAndIncrement();
        return new MarlinFuture(result);
    }

    protected ProducerRecordJob getProducerRecordJob(MarlinProducerResultImpl result, byte[] key, byte[] value, long timestamp) {
        return new ProducerRecordJob(result, key, value, timestamp);
    }

    protected ProducerRecordJob getProducerRecordJob() {
        return new ProducerRecordJob();
    }

    protected MarlinProducerResultImpl getMarlinProducerResultImpl(String topic, int feed, Callback callback, int serKeySz, int serValSz) {
        return new MarlinProducerResultImpl(topic, feed, callback, serKeySz, serValSz);
    }

    public void flush() throws KafkaException {
        for (int i = 0; i < this.MaxWorkQueues; ++i) {
            this.workQueues.get(i).enqueueFlushAndWait();
        }
        int err = this.Flush(this._clntPtr);
        if (err != 0) {
            throw MarlinClient.jniErrToException(err, "could not flush to MarlinProducer");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleJniCallbacks(int length, long[] offsets, long[] timestamps, MarlinProducerResult[] results, int feedid, int errorCode) {
        LOG.debug("Handling callback for " + length);
        try {
            for (int i = 0; i < length; ++i) {
                MarlinProducerResult result = results[i];
                if (result != null) {
                    RuntimeException ex = null;
                    if (errorCode < 0) {
                        errorCode = -errorCode;
                    }
                    if (errorCode == 13) {
                        MarlinProducerResultImpl resultImpl = (MarlinProducerResultImpl)result;
                        ex = new TopicAuthorizationException(resultImpl.getTopic());
                    } else {
                        ex = MarlinClient.jniErrToException(errorCode, null);
                    }
                    result.done(feedid, offsets[i], timestamps[i], (Exception)ex);
                    result.onCompletion();
                    continue;
                }
                LOG.error("producer.handleJniCallbacks() got null as result");
            }
        }
        catch (Throwable e) {
            LOG.error("Hitting exception during handleJniCallbacks");
            LOG.error("Exception " + e);
        }
        finally {
            long prevValue = this.outstandingMsgs.getAndAdd(-1 * length);
            if (prevValue < (long)length) {
                LOG.error("outstandingMsgs are " + prevValue + " but we got results for " + length);
            }
            if (this.outstandingMsgs.get() == 0L) {
                AtomicLong atomicLong = this.outstandingMsgs;
                synchronized (atomicLong) {
                    if (this.outstandingMsgs.get() == 0L) {
                        try {
                            this.outstandingMsgs.notifyAll();
                        }
                        catch (Exception e) {
                            LOG.error("producer.handleJniCallbacks() exception while closing " + e);
                        }
                    }
                }
            }
        }
    }

    public void handleJniTopicMetadata(String topicNames, int[] topicNameSizes, int[] numPartitionsForTopic, int arraySize) {
        LOG.debug("Handling topic metadata changes for " + arraySize + " topics");
        try {
            int offset = 0;
            for (int i = 0; i < arraySize; ++i) {
                String tname = topicNames.substring(offset, offset + topicNameSizes[i]);
                offset += topicNameSizes[i];
                if (numPartitionsForTopic[i] == 0) {
                    LOG.debug("Removing topic metadata for " + tname);
                    this.topicNameToNumPartitions.remove(tname);
                    continue;
                }
                LOG.debug("Adding topic metadata for " + tname + " with " + numPartitionsForTopic[i] + " partitions");
                this.topicNameToNumPartitions.put(tname, numPartitionsForTopic[i]);
            }
        }
        catch (Throwable e) {
            LOG.error("Hitting exception during handleJniTopicMetadata");
            LOG.error("Exception " + e);
        }
    }

    private int getNumPartitions(String topic) throws KafkaException {
        Integer nfeeds = this.topicNameToNumPartitions.get(topic);
        if (nfeeds == null) {
            nfeeds = this.GetTopicInfo(this._clntPtr, topic);
            if (nfeeds < 0) {
                throw new UnknownTopicOrPartitionException("could not get TopicInfo, err " + -nfeeds.intValue());
            }
            this.topicNameToNumPartitions.put(topic, nfeeds);
        }
        return nfeeds;
    }

    public List<PartitionInfo> getTopicInfo(String topic) throws KafkaException {
        int nfeeds = this.getNumPartitions(topic);
        MarlinTopicInfo result = new MarlinTopicInfo(topic, nfeeds);
        return result.getKafkaPartitionInfo();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        LOG.debug("Producer is shutting down");
        MarlinProducerImpl marlinProducerImpl = this;
        synchronized (marlinProducerImpl) {
            if (this.shuttingdown.booleanValue()) {
                LOG.debug("Producer shutdown already in progress");
                return;
            }
            this.shuttingdown = true;
        }
        this.closeInternal();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close(long timeout, TimeUnit unit) throws InterruptedException {
        LOG.debug("Producer is shutting down");
        MarlinProducerImpl marlinProducerImpl = this;
        synchronized (marlinProducerImpl) {
            if (this.shuttingdown.booleanValue()) {
                LOG.debug("Producer shutdown already in progress");
                return;
            }
            this.shuttingdown = true;
        }
        Thread closeThread = new Thread(new CloseThread());
        closeThread.setDaemon(true);
        closeThread.start();
        if (timeout <= 0L) {
            return;
        }
        closeThread.join(unit.toMillis(timeout));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void closeInternal() {
        int i2;
        LOG.debug("Producer actually shutting down");
        this.flush();
        for (i2 = 0; i2 < this.MaxWorkQueues; ++i2) {
            this.workQueues.get(i2).shutdown();
        }
        if (this.outstandingMsgs.get() != 0L) {
            AtomicLong i2 = this.outstandingMsgs;
            synchronized (i2) {
                if (this.outstandingMsgs.get() != 0L) {
                    try {
                        this.outstandingMsgs.wait();
                    }
                    catch (Exception e) {
                        LOG.error("producer.close() exception while waiting on outstanding messages " + e);
                    }
                }
            }
        }
        for (i2 = 0; i2 < this.MaxWorkQueues; ++i2) {
            try {
                this.workers.get(i2).join();
                continue;
            }
            catch (InterruptedException e) {
                throw new InterruptException(e);
            }
        }
        LOG.debug("Calling jni producer close");
        this.CloseProducer(this._clntPtr);
        LOG.debug("Returned from jni producer close");
        this.MaxWorkQueues = 0;
        this._clntPtr = 0L;
    }

    protected int encodeJniData(WorkerThread.WorkerState ws, int recSz) {
        int numOffsets = recSz * 3;
        ws.byteOffsets = new int[numOffsets];
        ws.results = new MarlinProducerResultImpl[recSz];
        ws.byteSize = 0;
        int byteOffsetIndex = 0;
        int numMsgJobs = 0;
        for (int i = 0; i < recSz; ++i) {
            ProducerRecordJob rec = ws.recList[i];
            if (rec.isFlushJob()) {
                ws.flushJobs.add(rec);
                continue;
            }
            ws.feedIDs[numMsgJobs] = rec.getResult().getFeed();
            ws.results[numMsgJobs] = rec.getResult();
            ws.byteSize += rec.getTopic().length;
            ws.byteOffsets[byteOffsetIndex++] = ws.byteSize;
            if (rec.getKey() != null) {
                ws.byteSize += rec.getKey().length;
            }
            ws.byteOffsets[byteOffsetIndex++] = ws.byteSize;
            if (rec.getValue() != null) {
                ws.byteSize += rec.getValue().length;
            }
            ws.byteOffsets[byteOffsetIndex++] = ws.byteSize;
            ws.timestamps[numMsgJobs] = rec.getTimestamp();
            assert (ws.numHeaders[numMsgJobs] == 0);
            ++numMsgJobs;
        }
        if (ws.maxBytesToSend < ws.byteSize) {
            LOG.debug("Worker thread increasing byte array from " + ws.maxBytesToSend + " to " + ws.byteSize);
            ws.toSend = new byte[ws.byteSize];
            ws.maxBytesToSend = ws.byteSize;
        }
        ByteBuffer toSendBuffer = ByteBuffer.wrap(ws.toSend);
        numMsgJobs = 0;
        for (int i = 0; i < recSz; ++i) {
            ProducerRecordJob rec = ws.recList[i];
            if (rec.isFlushJob()) continue;
            toSendBuffer.put(rec.getTopic());
            if (rec.getKey() != null) {
                toSendBuffer.put(rec.getKey());
            }
            if (rec.getValue() != null) {
                toSendBuffer.put(rec.getValue());
            }
            ++numMsgJobs;
        }
        return numMsgJobs;
    }

    static {
        ShimLoader.load();
        LOG = LoggerFactory.getLogger(MarlinProducerImpl.class);
        curProducerId = new AtomicLong(1L);
    }

    protected class WorkerThread
    implements Runnable {
        private WorkQueue wq;
        private int id;
        protected WorkerState ws = new WorkerState();

        public WorkerThread(WorkQueue wq, int id) {
            this.id = id;
            this.wq = wq;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (true) {
                    int recSz;
                    if ((recSz = this.wq.dequeue(this.ws.recList)) == 0) {
                        LOG.debug("Worker thread " + this.id + " joining");
                        return;
                    }
                    LOG.debug("Worker thread " + this.id + " got " + recSz + " jobs");
                    int numMsgJobs = MarlinProducerImpl.this.encodeJniData(this.ws, recSz);
                    assert (recSz == numMsgJobs + this.ws.flushJobs.size());
                    if (numMsgJobs > 0) {
                        try {
                            int err = MarlinProducerImpl.this.Send(MarlinProducerImpl.this._clntPtr, MarlinProducerImpl.this.producerId, this.ws.toSend, this.ws.byteSize, this.ws.byteOffsets, this.ws.feedIDs, this.ws.timestamps, this.ws.numHeaders, this.ws.results, numMsgJobs);
                            if (err != 0) {
                                throw MarlinClient.jniErrToException(err, "could not send to MarlinProducer");
                            }
                        }
                        catch (ApiException e) {
                            LOG.error("Exception occurred during message send:", (Throwable)e);
                            for (int i = 0; i < recSz; ++i) {
                                if (this.ws.recList[i].isFlushJob()) continue;
                                MarlinProducerResultImpl result = this.ws.recList[i].getResult();
                                result.done(result.getFeed(), -1L, MarlinProducerImpl.this.marlinInternalDefaults.getNoTimestamp(), (Exception)((Object)e));
                                result.onCompletion();
                            }
                        }
                    }
                    for (int i = 0; i < this.ws.flushJobs.size(); ++i) {
                        ProducerRecordJob notifyJob;
                        ProducerRecordJob producerRecordJob = notifyJob = this.ws.flushJobs.get(i);
                        synchronized (producerRecordJob) {
                            notifyJob.markFlushJobDone();
                            notifyJob.notify();
                            continue;
                        }
                    }
                    this.ws.flushJobs.clear();
                }
            }
            catch (Exception e) {
                LOG.error(e.getMessage(), (Throwable)e);
                return;
            }
        }

        protected class WorkerState {
            protected int maxBytesToSend = 50000;
            protected List<ProducerRecordJob> flushJobs = new ArrayList<ProducerRecordJob>();
            protected int[] feedIDs = new int[100];
            protected ProducerRecordJob[] recList = new ProducerRecordJob[100];
            protected byte[] toSend = new byte[this.maxBytesToSend];
            protected long[] timestamps = new long[100];
            protected int[] numHeaders = new int[100];
            protected int[] byteOffsets;
            protected MarlinProducerResultImpl[] results;
            protected int byteSize = 0;

            protected WorkerState() {
            }
        }
    }

    private class CloseThread
    implements Runnable {
        private CloseThread() {
        }

        @Override
        public void run() {
            assert (MarlinProducerImpl.this.shuttingdown.booleanValue());
            MarlinProducerImpl.this.closeInternal();
        }
    }

    private class WorkQueue {
        private ProducerRecordJob[] precords;
        private int capacity;
        private int tailIdx;
        private int headIdx;
        private boolean shuttingdown;
        private boolean notifyOnDequeue;
        private boolean notifyOnEnqueue;

        public WorkQueue(int maxSz) {
            this.capacity = maxSz;
            this.precords = new ProducerRecordJob[this.capacity];
            this.headIdx = 0;
            this.tailIdx = 0;
            this.shuttingdown = false;
            this.notifyOnDequeue = false;
            this.notifyOnEnqueue = false;
        }

        public synchronized void shutdown() {
            this.shuttingdown = true;
            this.notifyAll();
        }

        private boolean isEmpty() {
            return this.tailIdx == this.headIdx;
        }

        private boolean isFull() {
            return this.headIdx == this.tailIdx - 1 || this.headIdx == this.capacity - 1 && this.tailIdx == 0;
        }

        private int size() {
            if (this.isEmpty()) {
                return 0;
            }
            if (this.tailIdx < this.headIdx) {
                return this.headIdx - this.tailIdx;
            }
            return this.capacity - this.tailIdx + this.headIdx;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void enqueueFlushAndWait() {
            ProducerRecordJob flushJob;
            ProducerRecordJob producerRecordJob = flushJob = MarlinProducerImpl.this.getProducerRecordJob();
            synchronized (producerRecordJob) {
                this.enqueue(flushJob);
                try {
                    while (!flushJob.getFlushJobDone()) {
                        flushJob.wait();
                    }
                }
                catch (Exception e) {
                    LOG.error("enqueueFlushAndWait flushJob.wait() encountered " + e);
                }
            }
        }

        public synchronized void enqueue(ProducerRecordJob rec) {
            if (this.shuttingdown) {
                LOG.error("enqueue job failed:  producer shutting down");
                throw new RuntimeException("enqueue job failed:  producer shutting down");
            }
            while (this.isFull()) {
                try {
                    this.notifyOnDequeue = true;
                    this.wait();
                }
                catch (Exception e) {
                    LOG.error("enqueue encountered " + e);
                }
            }
            this.notifyOnDequeue = false;
            this.precords[this.headIdx] = rec;
            this.headIdx = (this.headIdx + 1) % this.capacity;
            assert (this.headIdx != this.tailIdx);
            if (this.notifyOnEnqueue) {
                this.notify();
            }
        }

        private ProducerRecordJob dequeueInternal() {
            assert (!this.isEmpty());
            ProducerRecordJob rec = this.precords[this.tailIdx];
            this.precords[this.tailIdx] = null;
            this.tailIdx = (this.tailIdx + 1) % this.capacity;
            return rec;
        }

        public synchronized int dequeue(ProducerRecordJob[] retList) {
            while (this.isEmpty() && !this.shuttingdown) {
                try {
                    this.notifyOnEnqueue = true;
                    this.wait();
                }
                catch (Exception exception) {}
            }
            this.notifyOnEnqueue = false;
            if (this.size() < 100 && !this.shuttingdown) {
                try {
                    this.wait(1L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            int retSz = this.size();
            assert (this.shuttingdown || retSz > 0);
            if (retSz > 100) {
                retSz = 100;
            }
            if (retSz > 0) {
                int idx;
                for (idx = 0; idx < retSz; ++idx) {
                    retList[idx] = this.dequeueInternal();
                }
                assert (idx == retSz);
                this.notify();
            }
            return retSz;
        }
    }

    private static class FutureFailure
    implements Future<RecordMetadata> {
        private final ExecutionException exception;

        public FutureFailure(Exception exception) {
            this.exception = new ExecutionException(exception);
        }

        @Override
        public boolean cancel(boolean interrupt) {
            return false;
        }

        @Override
        public RecordMetadata get() throws ExecutionException {
            throw this.exception;
        }

        @Override
        public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException {
            throw this.exception;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }
    }
}

