/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.resources;

import io.confluent.kafkarest.ConsumerManager;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.KafkaStreamsMetadataObserver;
import io.confluent.kafkarest.ProducerPool;
import io.confluent.kafkarest.RecordMetadataOrException;
import io.confluent.kafkarest.SimpleConsumerManager;
import io.confluent.kafkarest.entities.AbstractConsumerRecord;
import io.confluent.kafkarest.entities.AvroProduceRecord;
import io.confluent.kafkarest.entities.BinaryProduceRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.JsonProduceRecord;
import io.confluent.kafkarest.entities.Partition;
import io.confluent.kafkarest.entities.PartitionOffset;
import io.confluent.kafkarest.entities.PartitionProduceRequest;
import io.confluent.kafkarest.entities.ProduceRecord;
import io.confluent.kafkarest.entities.ProduceResponse;
import io.confluent.rest.annotations.PerformanceMetric;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Vector;
import javax.servlet.http.HttpServletRequest;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import kafka.common.KafkaException;
import org.apache.hadoop.security.UserGroupInformation;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/topics/{topic}/partitions")
@Produces(value={"application/vnd.kafka.binary.v1+json; qs=0.1", "application/vnd.kafka.avro.v1+json; qs=0.1", "application/vnd.kafka.v1+json; qs=0.9", "application/vnd.kafka+json; qs=0.8", "application/json; qs=0.5"})
@Consumes(value={"application/vnd.kafka.v1+json", "application/vnd.kafka+json", "application/json", "application/octet-stream"})
public class PartitionsResource {
    private static final Logger log = LoggerFactory.getLogger(PartitionsResource.class);
    private final KafkaRestContext ctx;
    private final boolean isStreams;

    public PartitionsResource(KafkaRestContext ctx) {
        this.ctx = ctx;
        this.isStreams = ctx.getConfig().isStreams();
    }

    @GET
    @PerformanceMetric(value="partitions.list")
    public List<Partition> list(@Context HttpServletRequest httpRequest, final @PathParam(value="topic") String topic) throws Exception {
        return (List)this.runProxyQuery(new PrivilegedExceptionAction(){

            public List<Partition> run() throws Exception {
                PartitionsResource.this.checkTopicExists(topic);
                KafkaStreamsMetadataObserver metadataObserver = PartitionsResource.this.ctx.getMetadataObserver();
                List<Partition> partitions = metadataObserver.getTopicPartitions(topic);
                if (PartitionsResource.this.ctx.isImpersonationEnabled()) {
                    metadataObserver.shutdown();
                }
                return partitions;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{partition}")
    @PerformanceMetric(value="partition.get")
    public Partition getPartition(@Context HttpServletRequest httpRequest, final @PathParam(value="topic") String topic, final @PathParam(value="partition") int partition) throws Exception {
        return (Partition)this.runProxyQuery(new PrivilegedExceptionAction(){

            public Partition run() throws Exception {
                PartitionsResource.this.checkTopicExists(topic);
                KafkaStreamsMetadataObserver metadataObserver = PartitionsResource.this.ctx.getMetadataObserver();
                Partition part = metadataObserver.getTopicPartition(topic, partition);
                if (PartitionsResource.this.ctx.isImpersonationEnabled()) {
                    metadataObserver.shutdown();
                }
                if (part == null) {
                    throw Errors.partitionNotFoundException();
                }
                return part;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{partition}/messages")
    @PerformanceMetric(value="partition.consume-binary")
    @Produces(value={"application/vnd.kafka.binary.v1+json", "application/vnd.kafka.v1+json; qs=0.9", "application/vnd.kafka+json; qs=0.8", "application/json; qs=0.5"})
    public void consumeBinary(@Context HttpServletRequest httpRequest, final @Suspended AsyncResponse asyncResponse, final @PathParam(value="topic") String topicName, final @PathParam(value="partition") int partitionId, final @QueryParam(value="offset") long offset, final @QueryParam(value="count") @DefaultValue(value="1") long count) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Partition run() throws Exception {
                PartitionsResource.this.consume(asyncResponse, topicName, partitionId, offset, count, EmbeddedFormat.BINARY);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{partition}/messages")
    @PerformanceMetric(value="partition.consume-avro")
    @Produces(value={"application/vnd.kafka.avro.v1+json; qs=0.1"})
    public void consumeAvro(@Suspended AsyncResponse asyncResponse, @PathParam(value="topic") String topicName, @PathParam(value="partition") int partitionId, @QueryParam(value="offset") long offset, @QueryParam(value="count") @DefaultValue(value="1") long count) {
        if (this.isStreams) {
            throw Errors.notSupportedByMapRStreams();
        }
        this.consume(asyncResponse, topicName, partitionId, offset, count, EmbeddedFormat.AVRO);
    }

    @GET
    @Path(value="/{partition}/messages")
    @PerformanceMetric(value="partition.consume-json")
    @Produces(value={"application/vnd.kafka.json.v1+json; qs=0.1"})
    public void consumeJson(@Context HttpServletRequest httpRequest, final @Suspended AsyncResponse asyncResponse, final @PathParam(value="topic") String topicName, final @PathParam(value="partition") int partitionId, final @QueryParam(value="offset") long offset, final @QueryParam(value="count") @DefaultValue(value="1") long count) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Partition run() throws Exception {
                PartitionsResource.this.consume(asyncResponse, topicName, partitionId, offset, count, EmbeddedFormat.JSON);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{partition}")
    @PerformanceMetric(value="partition.produce-binary")
    @Consumes(value={"application/vnd.kafka.binary.v1+json", "application/vnd.kafka.v1+json", "application/vnd.kafka+json", "application/json", "application/octet-stream"})
    public void produceBinary(final @Context HttpServletRequest httpRequest, final @Suspended AsyncResponse asyncResponse, final @PathParam(value="topic") String topic, final @PathParam(value="partition") int partition, final @Valid @NotNull PartitionProduceRequest<BinaryProduceRecord> request) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Partition run() throws Exception {
                PartitionsResource.this.produce(httpRequest.getRemoteUser(), asyncResponse, topic, partition, EmbeddedFormat.BINARY, request);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{partition}")
    @PerformanceMetric(value="partition.produce-json")
    @Consumes(value={"application/vnd.kafka.json.v1+json"})
    public void produceJson(final @Context HttpServletRequest httpRequest, final @Suspended AsyncResponse asyncResponse, final @PathParam(value="topic") String topic, final @PathParam(value="partition") int partition, final @Valid @NotNull PartitionProduceRequest<JsonProduceRecord> request) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Partition run() throws Exception {
                PartitionsResource.this.produce(httpRequest.getRemoteUser(), asyncResponse, topic, partition, EmbeddedFormat.JSON, request);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{partition}")
    @PerformanceMetric(value="partition.produce-avro")
    @Consumes(value={"application/vnd.kafka.avro.v1+json"})
    public void produceAvro(@Suspended AsyncResponse asyncResponse, @PathParam(value="topic") String topic, @PathParam(value="partition") int partition, @Valid @NotNull PartitionProduceRequest<AvroProduceRecord> request) {
        if (this.isStreams) {
            throw Errors.notSupportedByMapRStreams();
        }
        boolean hasKeys = false;
        boolean hasValues = false;
        for (AvroProduceRecord rec : request.getRecords()) {
            hasKeys = hasKeys || !rec.getJsonKey().isNull();
            hasValues = hasValues || !rec.getJsonValue().isNull();
        }
        if (hasKeys && request.getKeySchema() == null && request.getKeySchemaId() == null) {
            throw Errors.keySchemaMissingException();
        }
        if (hasValues && request.getValueSchema() == null && request.getValueSchemaId() == null) {
            throw Errors.valueSchemaMissingException();
        }
        this.produce(null, asyncResponse, topic, partition, EmbeddedFormat.AVRO, request);
    }

    private <K, V> void consume(final @Suspended AsyncResponse asyncResponse, String topicName, int partitionId, long offset, long count, EmbeddedFormat embeddedFormat) {
        log.trace("Executing simple consume id={} topic={} partition={} offset={} count={}", new Object[]{asyncResponse, topicName, partitionId, offset, count});
        SimpleConsumerManager consumerManager = this.ctx.getSimpleConsumerManager();
        consumerManager.consume(topicName, partitionId, offset, count, embeddedFormat, new ConsumerManager.ReadCallback<K, V>(){

            @Override
            public void onCompletion(List<? extends AbstractConsumerRecord<K, V>> records, Exception e) {
                log.trace("Completed simple consume id={} records={} exception={}", new Object[]{asyncResponse, records, e});
                if (e != null) {
                    asyncResponse.resume((Throwable)e);
                } else {
                    asyncResponse.resume(records);
                }
            }
        });
        if (this.ctx.isImpersonationEnabled()) {
            consumerManager.shutdown();
        }
    }

    protected <K, V, R extends ProduceRecord<K, V>> void produce(String userName, final AsyncResponse asyncResponse, String topic, int partition, EmbeddedFormat format, PartitionProduceRequest<R> request) {
        log.trace("Executing topic produce request id={} topic={} partition={} format={} request={}", new Object[]{asyncResponse, topic, partition, format, request});
        try {
            this.ctx.getProducerPool().produce(topic, partition, format, request, request.getRecords(), new ProducerPool.ProduceRequestCallback(){

                @Override
                public void onCompletion(Integer keySchemaId, Integer valueSchemaId, List<RecordMetadataOrException> results) {
                    ProduceResponse response = new ProduceResponse();
                    Vector<PartitionOffset> offsets = new Vector<PartitionOffset>();
                    for (RecordMetadataOrException result : results) {
                        if (result.getException() != null) {
                            int errorCode = Errors.codeFromProducerException(result.getException());
                            String errorMessage = result.getException().getMessage();
                            offsets.add(new PartitionOffset(null, null, errorCode, errorMessage));
                            continue;
                        }
                        offsets.add(new PartitionOffset(result.getRecordMetadata().partition(), result.getRecordMetadata().offset(), null, null));
                    }
                    response.setOffsets(offsets);
                    response.setKeySchemaId(keySchemaId);
                    response.setValueSchemaId(valueSchemaId);
                    log.trace("Completed topic produce request id={} response={}", (Object)asyncResponse, (Object)response);
                    asyncResponse.resume((Object)response);
                }
            }, userName);
        }
        catch (KafkaException e) {
            if (StringUtil.startsWithIgnoreCase((String)e.getMessage(), (String)"Invalid partition")) {
                Errors.partitionNotFoundException();
            }
            Errors.kafkaErrorException(e);
        }
    }

    private boolean topicExists(String topic) {
        return this.ctx.getMetadataObserver().topicExists(topic);
    }

    private void checkTopicExists(String topic) {
        if (!this.topicExists(topic)) {
            throw Errors.topicNotFoundException();
        }
    }

    public Object runProxyQuery(PrivilegedExceptionAction action, String remoteUser) throws Exception {
        if (this.ctx.isImpersonationEnabled()) {
            UserGroupInformation ugi = UserGroupInformation.createProxyUser((String)remoteUser, (UserGroupInformation)UserGroupInformation.getCurrentUser());
            return ugi.doAs(action);
        }
        return action.run();
    }
}

