/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.resources.v2;

import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.UriUtils;
import io.confluent.kafkarest.entities.ConsumerAssignmentRequest;
import io.confluent.kafkarest.entities.ConsumerAssignmentResponse;
import io.confluent.kafkarest.entities.ConsumerCommittedRequest;
import io.confluent.kafkarest.entities.ConsumerCommittedResponse;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerSeekToOffsetRequest;
import io.confluent.kafkarest.entities.ConsumerSeekToRequest;
import io.confluent.kafkarest.entities.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.ConsumerSubscriptionResponse;
import io.confluent.kafkarest.entities.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.v2.AvroKafkaConsumerState;
import io.confluent.kafkarest.v2.BinaryKafkaConsumerState;
import io.confluent.kafkarest.v2.JsonKafkaConsumerState;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import io.confluent.kafkarest.v2.KafkaConsumerState;
import io.confluent.rest.annotations.PerformanceMetric;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;
import org.apache.hadoop.security.UserGroupInformation;

@Path(value="/consumers")
@Produces(value={"application/vnd.kafka.binary.v2+json; qs=0.1", "application/vnd.kafka.avro.v2+json; qs=0.1", "application/vnd.kafka.json.v2+json; qs=0.1", "application/vnd.kafka.v2+json; qs=0.9"})
@Consumes(value={"application/vnd.kafka.binary.v2+json", "application/vnd.kafka.avro.v2+json", "application/vnd.kafka.json.v2+json", "application/vnd.kafka.v2+json"})
public class ConsumersResource {
    private final KafkaRestContext ctx;

    public ConsumersResource(KafkaRestContext ctx) {
        this.ctx = ctx;
    }

    @POST
    @Valid
    @Path(value="/{group}")
    @PerformanceMetric(value="consumer.create+v2")
    public CreateConsumerInstanceResponse createGroup(@Context HttpServletRequest httpRequest, final @Context UriInfo uriInfo, final @PathParam(value="group") String group, @Valid ConsumerInstanceConfig config) throws Exception {
        if (config == null) {
            config = new ConsumerInstanceConfig();
        }
        final ConsumerInstanceConfig consumerConfig = config;
        return (CreateConsumerInstanceResponse)this.runProxyQuery(new PrivilegedExceptionAction(){

            public CreateConsumerInstanceResponse run() throws Exception {
                String instanceId = ConsumersResource.this.ctx.getKafkaConsumerManager().createConsumer(group, consumerConfig);
                String instanceBaseUri = UriUtils.absoluteUriBuilder(ConsumersResource.this.ctx.getConfig(), uriInfo).path("instances").path(instanceId).build(new Object[0]).toString();
                return new CreateConsumerInstanceResponse(instanceId, instanceBaseUri);
            }
        }, httpRequest.getRemoteUser());
    }

    @DELETE
    @Path(value="/{group}/instances/{instance}")
    @PerformanceMetric(value="consumer.delete+v2")
    public void deleteGroup(@Context HttpServletRequest httpRequest, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                ConsumersResource.this.ctx.getKafkaConsumerManager().deleteConsumer(group, instance);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.subscribe+v2")
    public void subscribe(@Context HttpServletRequest httpRequest, @Context UriInfo uriInfo, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @Valid @NotNull ConsumerSubscriptionRecord subscription) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                try {
                    ConsumersResource.this.ctx.getKafkaConsumerManager().subscribe(group, instance, subscription);
                }
                catch (IllegalStateException e) {
                    throw Errors.illegalStateException(e);
                }
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.subscription+v2")
    public ConsumerSubscriptionResponse subscription(@Context HttpServletRequest httpRequest, @Context UriInfo uriInfo, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance) throws Exception {
        return (ConsumerSubscriptionResponse)this.runProxyQuery(new PrivilegedExceptionAction(){

            public ConsumerSubscriptionResponse run() throws Exception {
                return ConsumersResource.this.ctx.getKafkaConsumerManager().subscription(group, instance);
            }
        }, httpRequest.getRemoteUser());
    }

    @DELETE
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.unsubscribe+v2")
    public void unsubscribe(@Context HttpServletRequest httpRequest, @Context UriInfo uriInfo, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                ConsumersResource.this.ctx.getKafkaConsumerManager().unsubscribe(group, instance);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-binary+v2")
    @Produces(value={"application/vnd.kafka.binary.v2+json", "application/vnd.kafka.v2+json; qs=0.9"})
    public void readRecordBinary(@Context HttpServletRequest httpRequest, final @Suspended AsyncResponse asyncResponse, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @QueryParam(value="timeout") @DefaultValue(value="-1") long timeout, final @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                ConsumersResource.this.readRecords(asyncResponse, group, instance, timeout, maxBytes, BinaryKafkaConsumerState.class);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-json+v2")
    @Produces(value={"application/vnd.kafka.json.v2+json; qs=0.1"})
    public void readRecordJson(@Context HttpServletRequest httpRequest, final @Suspended AsyncResponse asyncResponse, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @QueryParam(value="timeout") @DefaultValue(value="-1") long timeout, final @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                ConsumersResource.this.readRecords(asyncResponse, group, instance, timeout, maxBytes, JsonKafkaConsumerState.class);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-avro+v2")
    @Produces(value={"application/vnd.kafka.avro.v2+json; qs=0.1"})
    public void readRecordAvro(@Context HttpServletRequest httpRequest, final @Suspended AsyncResponse asyncResponse, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @QueryParam(value="timeout") @DefaultValue(value="-1") long timeout, final @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                ConsumersResource.this.readRecords(asyncResponse, group, instance, timeout, maxBytes, AvroKafkaConsumerState.class);
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{group}/instances/{instance}/offsets")
    @PerformanceMetric(value="consumer.commit-offsets+v2")
    public void commitOffsets(@Context HttpServletRequest httpRequest, final @Suspended AsyncResponse asyncResponse, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @QueryParam(value="async") @DefaultValue(value="false") String async, final @Valid ConsumerOffsetCommitRequest offsetCommitRequest) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                ConsumersResource.this.ctx.getKafkaConsumerManager().commitOffsets(group, instance, async, offsetCommitRequest, new KafkaConsumerManager.CommitCallback(){

                    @Override
                    public void onCompletion(List<TopicPartitionOffset> offsets, Exception e) {
                        if (e != null) {
                            asyncResponse.resume((Throwable)e);
                        } else {
                            asyncResponse.resume(offsets);
                        }
                    }
                });
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{group}/instances/{instance}/offsets")
    @PerformanceMetric(value="consumer.committed-offsets+v2")
    public ConsumerCommittedResponse committedOffsets(@Context HttpServletRequest httpRequest, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @Valid ConsumerCommittedRequest request) throws Exception {
        if (request == null) {
            throw Errors.partitionNotFoundException();
        }
        return (ConsumerCommittedResponse)this.runProxyQuery(new PrivilegedExceptionAction(){

            public ConsumerCommittedResponse run() throws Exception {
                return ConsumersResource.this.ctx.getKafkaConsumerManager().committed(group, instance, request);
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions/beginning")
    @PerformanceMetric(value="consumer.seek-to-beginning+v2")
    public void seekToBeginning(@Context HttpServletRequest httpRequest, @Context UriInfo uriInfo, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @Valid @NotNull ConsumerSeekToRequest seekToRequest) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                try {
                    ConsumersResource.this.ctx.getKafkaConsumerManager().seekToBeginning(group, instance, seekToRequest);
                }
                catch (IllegalStateException e) {
                    throw Errors.illegalStateException(e);
                }
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions/end")
    @PerformanceMetric(value="consumer.seek-to-end+v2")
    public void seekToEnd(@Context HttpServletRequest httpRequest, @Context UriInfo uriInfo, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @Valid @NotNull ConsumerSeekToRequest seekToRequest) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                try {
                    ConsumersResource.this.ctx.getKafkaConsumerManager().seekToEnd(group, instance, seekToRequest);
                }
                catch (IllegalStateException e) {
                    throw Errors.illegalStateException(e);
                }
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions")
    @PerformanceMetric(value="consumer.seek-to-offset+v2")
    public void seekToOffset(@Context HttpServletRequest httpRequest, @Context UriInfo uriInfo, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @Valid @NotNull ConsumerSeekToOffsetRequest seekToOffsetRequest) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                try {
                    ConsumersResource.this.ctx.getKafkaConsumerManager().seekToOffset(group, instance, seekToOffsetRequest);
                }
                catch (IllegalStateException e) {
                    throw Errors.illegalStateException(e);
                }
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @POST
    @Path(value="/{group}/instances/{instance}/assignments")
    @PerformanceMetric(value="consumer.assign+v2")
    public void assign(@Context HttpServletRequest httpRequest, @Context UriInfo uriInfo, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance, final @Valid @NotNull ConsumerAssignmentRequest assignmentRequest) throws Exception {
        this.runProxyQuery(new PrivilegedExceptionAction(){

            public Object run() throws Exception {
                try {
                    ConsumersResource.this.ctx.getKafkaConsumerManager().assign(group, instance, assignmentRequest);
                }
                catch (IllegalStateException e) {
                    throw Errors.illegalStateException(e);
                }
                return null;
            }
        }, httpRequest.getRemoteUser());
    }

    @GET
    @Path(value="/{group}/instances/{instance}/assignments")
    @PerformanceMetric(value="consumer.assignment+v2")
    public ConsumerAssignmentResponse assignment(@Context HttpServletRequest httpRequest, @Context UriInfo uriInfo, final @PathParam(value="group") String group, final @PathParam(value="instance") String instance) throws Exception {
        return (ConsumerAssignmentResponse)this.runProxyQuery(new PrivilegedExceptionAction(){

            public ConsumerAssignmentResponse run() throws Exception {
                try {
                    return ConsumersResource.this.ctx.getKafkaConsumerManager().assignment(group, instance);
                }
                catch (IllegalStateException e) {
                    throw Errors.illegalStateException(e);
                }
            }
        }, httpRequest.getRemoteUser());
    }

    private <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> void readRecords(final @Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeout, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes, Class<? extends KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> consumerStateType) {
        maxBytes = maxBytes <= 0L ? Long.MAX_VALUE : maxBytes;
        this.ctx.getKafkaConsumerManager().readRecords(group, instance, consumerStateType, timeout, maxBytes, new KafkaConsumerManager.ReadCallback<ClientKeyT, ClientValueT>(){

            @Override
            public void onCompletion(List<? extends ConsumerRecord<ClientKeyT, ClientValueT>> records, Exception e) {
                if (e != null) {
                    asyncResponse.resume((Throwable)e);
                } else {
                    asyncResponse.resume(records);
                }
            }
        });
    }

    public Object runProxyQuery(PrivilegedExceptionAction action, String remoteUser) throws Exception {
        if (this.ctx.isImpersonationEnabled()) {
            UserGroupInformation ugi = UserGroupInformation.createProxyUser((String)remoteUser, (UserGroupInformation)UserGroupInformation.getCurrentUser());
            return ugi.doAs(action);
        }
        return action.run();
    }
}

