/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.druid.serde;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.druid.DruidStorageHandler;
import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils;
import org.apache.hadoop.hive.druid.io.HiveDruidSplit;
import org.apache.hadoop.hive.druid.serde.DruidWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hive.druid.com.fasterxml.jackson.core.JsonParser;
import org.apache.hive.druid.com.fasterxml.jackson.core.JsonToken;
import org.apache.hive.druid.com.fasterxml.jackson.core.ObjectCodec;
import org.apache.hive.druid.com.fasterxml.jackson.databind.JavaType;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.druid.com.metamx.http.client.HttpClient;
import org.apache.hive.druid.com.metamx.http.client.Request;
import org.apache.hive.druid.com.metamx.http.client.response.InputStreamResponseHandler;
import org.apache.hive.druid.io.druid.java.util.common.IAE;
import org.apache.hive.druid.io.druid.java.util.common.RE;
import org.apache.hive.druid.io.druid.java.util.common.guava.CloseQuietly;
import org.apache.hive.druid.io.druid.query.BaseQuery;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryInterruptedException;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends Comparable<R>>
extends RecordReader<NullWritable, DruidWritable>
implements org.apache.hadoop.mapred.RecordReader<NullWritable, DruidWritable> {
    private static final Logger LOG = LoggerFactory.getLogger(DruidQueryRecordReader.class);
    private HttpClient httpClient;
    private ObjectMapper mapper;
    private ObjectMapper smileMapper;
    protected Query query;
    protected JsonParserIterator<R> queryResultsIterator = null;
    protected JavaType resultsType = null;

    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
        this.initialize(split, context.getConfiguration());
    }

    public void initialize(InputSplit split, Configuration conf, ObjectMapper mapper, ObjectMapper smileMapper, HttpClient httpClient) throws IOException {
        HiveDruidSplit hiveDruidSplit = (HiveDruidSplit)split;
        Preconditions.checkNotNull(hiveDruidSplit, "input split is null ???");
        this.mapper = Preconditions.checkNotNull(mapper, "object Mapper can not be null");
        this.smileMapper = Preconditions.checkNotNull(smileMapper, "Smile Mapper can not be null");
        this.query = this.mapper.readValue(Preconditions.checkNotNull(hiveDruidSplit.getDruidQuery()), Query.class);
        Preconditions.checkNotNull(this.query);
        this.resultsType = this.getResultTypeDef();
        this.httpClient = Preconditions.checkNotNull(httpClient, "need Http Client");
        LOG.debug("Retrieving data from druid using query:\n " + this.query);
        String address = hiveDruidSplit.getLocations()[0];
        if (Strings.isNullOrEmpty((String)address)) {
            throw new IOException("can not fetch results form empty or null host value");
        }
        Request request = DruidStorageHandlerUtils.createSmileRequest(address, this.query);
        ListenableFuture<InputStream> inputStreamFuture = this.httpClient.go(request, new InputStreamResponseHandler());
        this.queryResultsIterator = new JsonParserIterator(this.smileMapper, this.resultsType, inputStreamFuture, request.getUrl().toString(), this.query);
    }

    public void initialize(InputSplit split, Configuration conf) throws IOException {
        this.initialize(split, conf, DruidStorageHandlerUtils.JSON_MAPPER, DruidStorageHandlerUtils.SMILE_MAPPER, DruidStorageHandler.getHttpClient());
    }

    protected abstract JavaType getResultTypeDef();

    public NullWritable createKey() {
        return NullWritable.get();
    }

    public DruidWritable createValue() {
        return new DruidWritable();
    }

    public abstract boolean next(NullWritable var1, DruidWritable var2) throws IOException;

    public long getPos() {
        return 0L;
    }

    public abstract boolean nextKeyValue() throws IOException;

    public abstract NullWritable getCurrentKey() throws IOException, InterruptedException;

    public abstract DruidWritable getCurrentValue() throws IOException, InterruptedException;

    public abstract float getProgress() throws IOException;

    public void close() {
        CloseQuietly.close(this.queryResultsIterator);
    }

    protected class JsonParserIterator<R extends Comparable<R>>
    implements Iterator<R>,
    Closeable {
        private JsonParser jp;
        private ObjectCodec objectCodec;
        private final ObjectMapper mapper;
        private final JavaType typeRef;
        private final Future<InputStream> future;
        private final Query query;
        private final String url;

        public JsonParserIterator(ObjectMapper mapper, JavaType typeRef, Future<InputStream> future, String url, Query query) {
            this.typeRef = typeRef;
            this.future = future;
            this.url = url;
            this.query = query;
            this.mapper = mapper;
            this.jp = null;
        }

        @Override
        public boolean hasNext() {
            this.init();
            if (this.jp.isClosed()) {
                return false;
            }
            if (this.jp.getCurrentToken() == JsonToken.END_ARRAY) {
                CloseQuietly.close(this.jp);
                return false;
            }
            return true;
        }

        @Override
        public R next() {
            this.init();
            try {
                Comparable retVal = (Comparable)this.objectCodec.readValue(this.jp, this.typeRef);
                this.jp.nextToken();
                return (R)retVal;
            }
            catch (IOException e) {
                throw Throwables.propagate(e);
            }
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        private void init() {
            if (this.jp == null) {
                try {
                    InputStream is = this.future.get();
                    if (is == null) {
                        throw new IOException(String.format("query[%s] url[%s] timed out", this.query, this.url));
                    }
                    this.jp = this.mapper.getFactory().createParser(is).configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, true);
                    JsonToken nextToken = this.jp.nextToken();
                    if (nextToken == JsonToken.START_OBJECT) {
                        QueryInterruptedException cause = this.jp.getCodec().readValue(this.jp, QueryInterruptedException.class);
                        throw new QueryInterruptedException(cause);
                    }
                    if (nextToken != JsonToken.START_ARRAY) {
                        throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", new Object[]{this.jp.getCurrentToken(), this.url});
                    }
                    this.jp.nextToken();
                    this.objectCodec = this.jp.getCodec();
                }
                catch (IOException | InterruptedException | ExecutionException e) {
                    throw new RE(e, "Failure getting results for query[%s] url[%s] because of [%s]", this.query, this.url, e.getMessage());
                }
            }
        }

        @Override
        public void close() throws IOException {
            CloseQuietly.close(this.jp);
        }
    }
}

