/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordination;

import com.google.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.curator.utils.ZKPaths;
import org.apache.hive.druid.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.druid.com.google.common.util.concurrent.SettableFuture;
import org.apache.hive.druid.io.druid.common.utils.UUIDUtils;
import org.apache.hive.druid.io.druid.curator.announcement.Announcer;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.StringUtils;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.hive.druid.io.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestHistory;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import org.apache.hive.druid.io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.hive.druid.io.druid.server.initialization.ZkPathsConfig;
import org.apache.hive.druid.io.druid.timeline.DataSegment;

public class BatchDataSegmentAnnouncer
implements DataSegmentAnnouncer {
    private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
    private final BatchDataSegmentAnnouncerConfig config;
    private final Announcer announcer;
    private final ObjectMapper jsonMapper;
    private final String liveSegmentLocation;
    private final DruidServerMetadata server;
    private final Object lock = new Object();
    private final AtomicLong counter = new AtomicLong(0L);
    private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
    private final Map<DataSegment, SegmentZNode> segmentLookup = new ConcurrentHashMap<DataSegment, SegmentZNode>();
    private final Function<DataSegment, DataSegment> segmentTransformer;
    private final SegmentChangeRequestHistory changes = new SegmentChangeRequestHistory();
    private final SegmentZNode dummyZnode;

    @Inject
    public BatchDataSegmentAnnouncer(DruidServerMetadata server, final BatchDataSegmentAnnouncerConfig config, ZkPathsConfig zkPaths, Announcer announcer, ObjectMapper jsonMapper) {
        this.config = config;
        this.announcer = announcer;
        this.jsonMapper = jsonMapper;
        this.server = server;
        this.liveSegmentLocation = ZKPaths.makePath((String)zkPaths.getLiveSegmentsPath(), (String)server.getName());
        this.segmentTransformer = new Function<DataSegment, DataSegment>(){

            @Override
            public DataSegment apply(DataSegment input) {
                DataSegment rv = input;
                if (config.isSkipDimensionsAndMetrics()) {
                    rv = rv.withDimensions(null).withMetrics(null);
                }
                if (config.isSkipLoadSpec()) {
                    rv = rv.withLoadSpec(null);
                }
                return rv;
            }
        };
        this.dummyZnode = this.config.isSkipSegmentAnnouncementOnZk() ? new SegmentZNode("PLACE_HOLDER_ONLY") : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void announceSegment(DataSegment segment) throws IOException {
        if (this.segmentLookup.containsKey(segment)) {
            log.info("Skipping announcement of segment [%s]. Announcement exists already.", segment.getIdentifier());
            return;
        }
        DataSegment toAnnounce = this.segmentTransformer.apply(segment);
        Object object = this.lock;
        synchronized (object) {
            this.changes.addSegmentChangeRequest(new SegmentChangeRequestLoad(toAnnounce));
            if (this.config.isSkipSegmentAnnouncementOnZk()) {
                this.segmentLookup.put(segment, this.dummyZnode);
                return;
            }
            int newBytesLen = this.jsonMapper.writeValueAsBytes(toAnnounce).length;
            if ((long)newBytesLen > this.config.getMaxBytesPerNode()) {
                throw new ISE("byte size %,d exceeds %,d", newBytesLen, this.config.getMaxBytesPerNode());
            }
            boolean done = false;
            if (!this.availableZNodes.isEmpty()) {
                Iterator<SegmentZNode> iter = this.availableZNodes.iterator();
                while (iter.hasNext() && !done) {
                    SegmentZNode availableZNode = iter.next();
                    if ((long)(availableZNode.getBytes().length + newBytesLen) < this.config.getMaxBytesPerNode()) {
                        availableZNode.addSegment(toAnnounce);
                        log.info("Announcing segment[%s] at existing path[%s]", toAnnounce.getIdentifier(), availableZNode.getPath());
                        this.announcer.update(availableZNode.getPath(), availableZNode.getBytes());
                        this.segmentLookup.put(toAnnounce, availableZNode);
                        if (availableZNode.getCount() >= this.config.getSegmentsPerNode()) {
                            this.availableZNodes.remove(availableZNode);
                        }
                        done = true;
                        continue;
                    }
                    this.availableZNodes.remove(availableZNode);
                }
            }
            if (!done) {
                assert (this.availableZNodes.isEmpty());
                SegmentZNode availableZNode = new SegmentZNode(this.makeServedSegmentPath());
                availableZNode.addSegment(toAnnounce);
                log.info("Announcing segment[%s] at new path[%s]", toAnnounce.getIdentifier(), availableZNode.getPath());
                this.announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
                this.segmentLookup.put(toAnnounce, availableZNode);
                this.availableZNodes.add(availableZNode);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unannounceSegment(DataSegment segment) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            SegmentZNode segmentZNode = this.segmentLookup.remove(segment);
            if (segmentZNode == null) {
                log.warn("No path to unannounce segment[%s]", segment.getIdentifier());
                return;
            }
            this.changes.addSegmentChangeRequest(new SegmentChangeRequestDrop(segment));
            if (this.config.isSkipSegmentAnnouncementOnZk()) {
                return;
            }
            segmentZNode.removeSegment(segment);
            log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
            if (segmentZNode.getCount() == 0) {
                this.availableZNodes.remove(segmentZNode);
                this.announcer.unannounce(segmentZNode.getPath());
            } else {
                this.announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
                this.availableZNodes.add(segmentZNode);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void announceSegments(Iterable<DataSegment> segments) throws IOException {
        SegmentZNode segmentZNode = new SegmentZNode(this.makeServedSegmentPath());
        HashSet<DataSegment> batch = Sets.newHashSet();
        ArrayList<DataSegmentChangeRequest> changesBatch = new ArrayList<DataSegmentChangeRequest>();
        int byteSize = 0;
        int count = 0;
        Object object = this.lock;
        synchronized (object) {
            for (DataSegment ds : segments) {
                if (this.segmentLookup.containsKey(ds)) {
                    log.info("Skipping announcement of segment [%s]. Announcement exists already.", ds.getIdentifier());
                    return;
                }
                DataSegment segment = this.segmentTransformer.apply(ds);
                changesBatch.add(new SegmentChangeRequestLoad(segment));
                if (this.config.isSkipSegmentAnnouncementOnZk()) {
                    this.segmentLookup.put(segment, this.dummyZnode);
                    continue;
                }
                int newBytesLen = this.jsonMapper.writeValueAsBytes(segment).length;
                if ((long)newBytesLen > this.config.getMaxBytesPerNode()) {
                    throw new ISE("byte size %,d exceeds %,d", newBytesLen, this.config.getMaxBytesPerNode());
                }
                if (count >= this.config.getSegmentsPerNode() || (long)(byteSize + newBytesLen) > this.config.getMaxBytesPerNode()) {
                    segmentZNode.addSegments(batch);
                    this.announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
                    segmentZNode = new SegmentZNode(this.makeServedSegmentPath());
                    batch = Sets.newHashSet();
                    count = 0;
                    byteSize = 0;
                }
                log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath());
                this.segmentLookup.put(segment, segmentZNode);
                batch.add(segment);
                ++count;
                byteSize += newBytesLen;
            }
        }
        this.changes.addSegmentChangeRequests(changesBatch);
        if (!this.config.isSkipSegmentAnnouncementOnZk()) {
            segmentZNode.addSegments(batch);
            this.announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
        }
    }

    @Override
    public void unannounceSegments(Iterable<DataSegment> segments) throws IOException {
        for (DataSegment segment : segments) {
            this.unannounceSegment(segment);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<SegmentChangeRequestsSnapshot> getSegmentChangesSince(SegmentChangeRequestHistory.Counter counter) {
        if (counter.getCounter() < 0L) {
            Object object = this.lock;
            synchronized (object) {
                Iterable<DataSegmentChangeRequest> segments = Iterables.transform(this.segmentLookup.keySet(), new Function<DataSegment, DataSegmentChangeRequest>(){

                    @Override
                    @Nullable
                    public SegmentChangeRequestLoad apply(DataSegment input) {
                        return new SegmentChangeRequestLoad(input);
                    }
                });
                SettableFuture<SegmentChangeRequestsSnapshot> future = SettableFuture.create();
                future.set(SegmentChangeRequestsSnapshot.success(this.changes.getLastCounter(), Lists.newArrayList(segments)));
                return future;
            }
        }
        return this.changes.getRequestsSince(counter);
    }

    private String makeServedSegmentPath() {
        return this.makeServedSegmentPath(UUIDUtils.generateUuid(this.server.getHost(), this.server.getType().toString(), this.server.getTier(), DateTimes.nowUtc().toString()));
    }

    private String makeServedSegmentPath(String zNode) {
        return ZKPaths.makePath((String)this.liveSegmentLocation, (String)StringUtils.format("%s%s", zNode, this.counter.getAndIncrement()));
    }

    private class SegmentZNode
    implements Comparable<SegmentZNode> {
        private final String path;
        private byte[] bytes = new byte[0];
        private int count = 0;

        public SegmentZNode(String path) {
            this.path = path;
        }

        public String getPath() {
            return this.path;
        }

        public int getCount() {
            return this.count;
        }

        public byte[] getBytes() {
            return this.bytes;
        }

        public Set<DataSegment> getSegments() {
            if (this.bytes.length == 0) {
                return Sets.newHashSet();
            }
            try {
                return BatchDataSegmentAnnouncer.this.jsonMapper.readValue(this.bytes, new TypeReference<Set<DataSegment>>(){});
            }
            catch (Exception e) {
                throw Throwables.propagate(e);
            }
        }

        public void addSegment(DataSegment segment) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.add(segment);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.remove(segment);
                throw Throwables.propagate(e);
            }
            ++this.count;
        }

        public void addSegments(Set<DataSegment> segments) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.addAll(segments);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.removeAll(segments);
                throw Throwables.propagate(e);
            }
            this.count += segments.size();
        }

        public void removeSegment(DataSegment segment) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.remove(segment);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.add(segment);
                throw Throwables.propagate(e);
            }
            --this.count;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SegmentZNode that = (SegmentZNode)o;
            return this.path.equals(that.path);
        }

        public int hashCode() {
            return this.path.hashCode();
        }

        @Override
        public int compareTo(SegmentZNode segmentZNode) {
            return this.path.compareTo(segmentZNode.getPath());
        }
    }
}

