/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordination;

import com.google.inject.Inject;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.utils.ZKPaths;
import org.apache.hive.druid.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Maps;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.metamx.common.ISE;
import org.apache.hive.druid.com.metamx.common.logger.Logger;
import org.apache.hive.druid.io.druid.common.utils.UUIDUtils;
import org.apache.hive.druid.io.druid.curator.announcement.Announcer;
import org.apache.hive.druid.io.druid.server.coordination.AbstractDataSegmentAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.hive.druid.io.druid.server.initialization.ZkPathsConfig;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.joda.time.DateTime;

public class BatchDataSegmentAnnouncer
extends AbstractDataSegmentAnnouncer {
    private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
    private final BatchDataSegmentAnnouncerConfig config;
    private final Announcer announcer;
    private final ObjectMapper jsonMapper;
    private final String liveSegmentLocation;
    private final DruidServerMetadata server;
    private final Object lock = new Object();
    private final AtomicLong counter = new AtomicLong(0L);
    private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
    private final Map<DataSegment, SegmentZNode> segmentLookup = Maps.newConcurrentMap();
    private final Function<DataSegment, DataSegment> segmentTransformer;

    @Inject
    public BatchDataSegmentAnnouncer(DruidServerMetadata server, final BatchDataSegmentAnnouncerConfig config, ZkPathsConfig zkPaths, Announcer announcer, ObjectMapper jsonMapper) {
        super(server, zkPaths, announcer, jsonMapper);
        this.config = config;
        this.announcer = announcer;
        this.jsonMapper = jsonMapper;
        this.server = server;
        this.liveSegmentLocation = ZKPaths.makePath((String)zkPaths.getLiveSegmentsPath(), (String)server.getName());
        this.segmentTransformer = new Function<DataSegment, DataSegment>(){

            @Override
            public DataSegment apply(DataSegment input) {
                DataSegment rv = input;
                if (config.isSkipDimensionsAndMetrics()) {
                    rv = rv.withDimensions(null).withMetrics(null);
                }
                if (config.isSkipLoadSpec()) {
                    rv = rv.withLoadSpec(null);
                }
                return rv;
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void announceSegment(DataSegment segment) throws IOException {
        DataSegment toAnnounce = this.segmentTransformer.apply(segment);
        int newBytesLen = this.jsonMapper.writeValueAsBytes(toAnnounce).length;
        if ((long)newBytesLen > this.config.getMaxBytesPerNode()) {
            throw new ISE("byte size %,d exceeds %,d", newBytesLen, this.config.getMaxBytesPerNode());
        }
        Object object = this.lock;
        synchronized (object) {
            boolean done = false;
            if (!this.availableZNodes.isEmpty()) {
                Iterator<SegmentZNode> iter = this.availableZNodes.iterator();
                while (iter.hasNext() && !done) {
                    SegmentZNode availableZNode = iter.next();
                    if ((long)(availableZNode.getBytes().length + newBytesLen) < this.config.getMaxBytesPerNode()) {
                        availableZNode.addSegment(toAnnounce);
                        log.info("Announcing segment[%s] at existing path[%s]", toAnnounce.getIdentifier(), availableZNode.getPath());
                        this.announcer.update(availableZNode.getPath(), availableZNode.getBytes());
                        this.segmentLookup.put(toAnnounce, availableZNode);
                        if (availableZNode.getCount() >= this.config.getSegmentsPerNode()) {
                            this.availableZNodes.remove(availableZNode);
                        }
                        done = true;
                        continue;
                    }
                    this.availableZNodes.remove(availableZNode);
                }
            }
            if (!done) {
                assert (this.availableZNodes.isEmpty());
                SegmentZNode availableZNode = new SegmentZNode(this.makeServedSegmentPath());
                availableZNode.addSegment(toAnnounce);
                log.info("Announcing segment[%s] at new path[%s]", toAnnounce.getIdentifier(), availableZNode.getPath());
                this.announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
                this.segmentLookup.put(toAnnounce, availableZNode);
                this.availableZNodes.add(availableZNode);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unannounceSegment(DataSegment segment) throws IOException {
        SegmentZNode segmentZNode = this.segmentLookup.remove(segment);
        if (segmentZNode == null) {
            log.warn("No path to unannounce segment[%s]", segment.getIdentifier());
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            segmentZNode.removeSegment(segment);
            log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
            if (segmentZNode.getCount() == 0) {
                this.availableZNodes.remove(segmentZNode);
                this.announcer.unannounce(segmentZNode.getPath());
            } else {
                this.announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
                this.availableZNodes.add(segmentZNode);
            }
        }
    }

    @Override
    public void announceSegments(Iterable<DataSegment> segments) throws IOException {
        Iterable<DataSegment> toAnnounce = Iterables.transform(segments, this.segmentTransformer);
        SegmentZNode segmentZNode = new SegmentZNode(this.makeServedSegmentPath());
        HashSet<DataSegment> batch = Sets.newHashSet();
        int byteSize = 0;
        int count = 0;
        for (DataSegment segment : toAnnounce) {
            int newBytesLen = this.jsonMapper.writeValueAsBytes(segment).length;
            if ((long)newBytesLen > this.config.getMaxBytesPerNode()) {
                throw new ISE("byte size %,d exceeds %,d", newBytesLen, this.config.getMaxBytesPerNode());
            }
            if (count >= this.config.getSegmentsPerNode() || (long)(byteSize + newBytesLen) > this.config.getMaxBytesPerNode()) {
                segmentZNode.addSegments(batch);
                this.announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
                segmentZNode = new SegmentZNode(this.makeServedSegmentPath());
                batch = Sets.newHashSet();
                count = 0;
                byteSize = 0;
            }
            log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
            this.segmentLookup.put(segment, segmentZNode);
            batch.add(segment);
            ++count;
            byteSize += newBytesLen;
        }
        segmentZNode.addSegments(batch);
        this.announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
    }

    @Override
    public void unannounceSegments(Iterable<DataSegment> segments) throws IOException {
        for (DataSegment segment : segments) {
            this.unannounceSegment(segment);
        }
    }

    @Override
    public boolean isAnnounced(DataSegment segment) {
        return this.segmentLookup.containsKey(segment);
    }

    private String makeServedSegmentPath() {
        return this.makeServedSegmentPath(UUIDUtils.generateUuid(this.server.getHost(), this.server.getType(), this.server.getTier(), new DateTime().toString()));
    }

    private String makeServedSegmentPath(String zNode) {
        return ZKPaths.makePath((String)this.liveSegmentLocation, (String)String.format("%s%s", zNode, this.counter.getAndIncrement()));
    }

    private class SegmentZNode
    implements Comparable<SegmentZNode> {
        private final String path;
        private byte[] bytes = new byte[0];
        private int count = 0;

        public SegmentZNode(String path) {
            this.path = path;
        }

        public String getPath() {
            return this.path;
        }

        public int getCount() {
            return this.count;
        }

        public byte[] getBytes() {
            return this.bytes;
        }

        public Set<DataSegment> getSegments() {
            if (this.bytes.length == 0) {
                return Sets.newHashSet();
            }
            try {
                return BatchDataSegmentAnnouncer.this.jsonMapper.readValue(this.bytes, new TypeReference<Set<DataSegment>>(){});
            }
            catch (Exception e) {
                throw Throwables.propagate(e);
            }
        }

        public void addSegment(DataSegment segment) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.add(segment);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.remove(segment);
                throw Throwables.propagate(e);
            }
            ++this.count;
        }

        public void addSegments(Set<DataSegment> segments) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.addAll(segments);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.removeAll(segments);
                throw Throwables.propagate(e);
            }
            this.count += segments.size();
        }

        public void removeSegment(DataSegment segment) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.remove(segment);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.add(segment);
                throw Throwables.propagate(e);
            }
            --this.count;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SegmentZNode that = (SegmentZNode)o;
            return this.path.equals(that.path);
        }

        public int hashCode() {
            return this.path.hashCode();
        }

        @Override
        public int compareTo(SegmentZNode segmentZNode) {
            return this.path.compareTo(segmentZNode.getPath());
        }
    }
}

