/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.segment.realtime.firehose;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.io.druid.data.input.Firehose;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.data.input.MapBasedInputRow;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.granularity.Granularities;
import org.apache.hive.druid.io.druid.java.util.common.guava.Sequence;
import org.apache.hive.druid.io.druid.java.util.common.guava.Sequences;
import org.apache.hive.druid.io.druid.java.util.common.guava.Yielder;
import org.apache.hive.druid.io.druid.java.util.common.guava.Yielders;
import org.apache.hive.druid.io.druid.query.dimension.DefaultDimensionSpec;
import org.apache.hive.druid.io.druid.query.filter.DimFilter;
import org.apache.hive.druid.io.druid.segment.BaseObjectColumnValueSelector;
import org.apache.hive.druid.io.druid.segment.ColumnValueSelector;
import org.apache.hive.druid.io.druid.segment.Cursor;
import org.apache.hive.druid.io.druid.segment.DimensionSelector;
import org.apache.hive.druid.io.druid.segment.VirtualColumns;
import org.apache.hive.druid.io.druid.segment.data.IndexedInts;
import org.apache.hive.druid.io.druid.segment.filter.Filters;
import org.apache.hive.druid.io.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.hive.druid.io.druid.segment.transform.TransformSpec;
import org.apache.hive.druid.io.druid.segment.transform.Transformer;
import org.apache.hive.druid.io.druid.utils.Runnables;

public class IngestSegmentFirehose
implements Firehose {
    private final Transformer transformer;
    private Yielder<InputRow> rowYielder;

    public IngestSegmentFirehose(List<WindowedStorageAdapter> adapters, TransformSpec transformSpec, final List<String> dims, final List<String> metrics, final DimFilter dimFilter) {
        this.transformer = transformSpec.toTransformer();
        Sequence rows = Sequences.concat(Iterables.transform(adapters, new Function<WindowedStorageAdapter, Sequence<InputRow>>(){

            @Override
            @Nullable
            public Sequence<InputRow> apply(WindowedStorageAdapter adapter) {
                return Sequences.concat(Sequences.map(adapter.getAdapter().makeCursors(Filters.toFilter(dimFilter), adapter.getInterval(), VirtualColumns.EMPTY, Granularities.ALL, false, null), new Function<Cursor, Sequence<InputRow>>(){

                    @Override
                    @Nullable
                    public Sequence<InputRow> apply(final Cursor cursor) {
                        final ColumnValueSelector timestampColumnSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector("__time");
                        final HashMap<String, DimensionSelector> dimSelectors = Maps.newHashMap();
                        for (String dim : dims) {
                            DimensionSelector dimSelector = cursor.getColumnSelectorFactory().makeDimensionSelector(new DefaultDimensionSpec(dim, dim));
                            if (dimSelector == null) continue;
                            dimSelectors.put(dim, dimSelector);
                        }
                        final HashMap<String, ColumnValueSelector> metSelectors = Maps.newHashMap();
                        for (String metric : metrics) {
                            ColumnValueSelector metricSelector = cursor.getColumnSelectorFactory().makeColumnValueSelector(metric);
                            metSelectors.put(metric, metricSelector);
                        }
                        return Sequences.simple(new Iterable<InputRow>(){

                            @Override
                            public Iterator<InputRow> iterator() {
                                return new Iterator<InputRow>(){

                                    @Override
                                    public boolean hasNext() {
                                        return !cursor.isDone();
                                    }

                                    @Override
                                    public InputRow next() {
                                        BaseObjectColumnValueSelector selector;
                                        LinkedHashMap<String, Object> theEvent = Maps.newLinkedHashMap();
                                        long timestamp = timestampColumnSelector.getLong();
                                        theEvent.put("timestamp", DateTimes.utc(timestamp));
                                        for (Map.Entry dimSelector : dimSelectors.entrySet()) {
                                            String dim = (String)dimSelector.getKey();
                                            selector = (DimensionSelector)dimSelector.getValue();
                                            IndexedInts vals = selector.getRow();
                                            if (vals.size() == 1) {
                                                String dimVal = selector.lookupName(vals.get(0));
                                                theEvent.put(dim, dimVal);
                                                continue;
                                            }
                                            ArrayList<String> dimVals = Lists.newArrayList();
                                            for (int i = 0; i < vals.size(); ++i) {
                                                dimVals.add(selector.lookupName(vals.get(i)));
                                            }
                                            theEvent.put(dim, dimVals);
                                        }
                                        for (Map.Entry metSelector : metSelectors.entrySet()) {
                                            String metric = (String)metSelector.getKey();
                                            selector = (BaseObjectColumnValueSelector)metSelector.getValue();
                                            Object value = selector.getObject();
                                            if (value == null) continue;
                                            theEvent.put(metric, value);
                                        }
                                        cursor.advance();
                                        return new MapBasedInputRow(timestamp, (List<String>)dims, theEvent);
                                    }

                                    @Override
                                    public void remove() {
                                        throw new UnsupportedOperationException("Remove Not Supported");
                                    }
                                };
                            }
                        });
                    }
                }));
            }
        }));
        this.rowYielder = Yielders.each(rows);
    }

    @Override
    public boolean hasMore() {
        return !this.rowYielder.isDone();
    }

    @Override
    @Nullable
    public InputRow nextRow() {
        InputRow inputRow = this.rowYielder.get();
        this.rowYielder = this.rowYielder.next(null);
        return this.transformer.transform(inputRow);
    }

    @Override
    public Runnable commit() {
        return Runnables.getNoopRunnable();
    }

    @Override
    public void close() throws IOException {
        this.rowYielder.close();
    }
}

