/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.metadata;

import com.google.inject.Inject;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableSet;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.collect.Ordering;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.hash.Hashing;
import org.apache.hive.druid.com.google.common.io.BaseEncoding;
import org.apache.hive.druid.io.druid.indexing.overlord.DataSourceMetadata;
import org.apache.hive.druid.io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.hive.druid.io.druid.indexing.overlord.SegmentPublishResult;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.IAE;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Intervals;
import org.apache.hive.druid.io.druid.java.util.common.StringUtils;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;
import org.apache.hive.druid.io.druid.metadata.MetadataStorageTablesConfig;
import org.apache.hive.druid.io.druid.metadata.RetryTransactionException;
import org.apache.hive.druid.io.druid.metadata.SQLMetadataConnector;
import org.apache.hive.druid.io.druid.segment.realtime.appenderator.SegmentIdentifier;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.TimelineObjectHolder;
import org.apache.hive.druid.io.druid.timeline.VersionedIntervalTimeline;
import org.apache.hive.druid.io.druid.timeline.partition.LinearShardSpec;
import org.apache.hive.druid.io.druid.timeline.partition.NoneShardSpec;
import org.apache.hive.druid.io.druid.timeline.partition.NumberedShardSpec;
import org.apache.hive.druid.io.druid.timeline.partition.PartitionChunk;
import org.joda.time.Interval;
import org.joda.time.ReadableInterval;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.StringMapper;

public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStorageCoordinator {
    private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class);
    private static final int ALLOCATE_SEGMENT_QUIET_TRIES = 3;
    private final ObjectMapper jsonMapper;
    private final MetadataStorageTablesConfig dbTables;
    private final SQLMetadataConnector connector;

    @Inject
    public IndexerSQLMetadataStorageCoordinator(ObjectMapper jsonMapper, MetadataStorageTablesConfig dbTables, SQLMetadataConnector connector) {
        this.jsonMapper = jsonMapper;
        this.dbTables = dbTables;
        this.connector = connector;
    }

    @LifecycleStart
    public void start() {
        this.connector.createDataSourceTable();
        this.connector.createPendingSegmentsTable();
        this.connector.createSegmentTable();
    }

    @Override
    public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException {
        return this.getUsedSegmentsForIntervals(dataSource, ImmutableList.of(interval));
    }

    @Override
    public List<DataSegment> getUsedSegmentsForIntervals(final String dataSource, final List<Interval> intervals) throws IOException {
        return this.connector.retryWithHandle(new HandleCallback<List<DataSegment>>(){

            @Override
            public List<DataSegment> withHandle(Handle handle) throws Exception {
                final VersionedIntervalTimeline timeline = IndexerSQLMetadataStorageCoordinator.this.getTimelineForIntervalsWithHandle(handle, dataSource, intervals);
                HashSet<DataSegment> segments = Sets.newHashSet(Iterables.concat(Iterables.transform(Iterables.concat(Iterables.transform(intervals, new Function<Interval, Iterable<TimelineObjectHolder<String, DataSegment>>>(){

                    @Override
                    public Iterable<TimelineObjectHolder<String, DataSegment>> apply(Interval interval) {
                        return timeline.lookup(interval);
                    }
                })), new Function<TimelineObjectHolder<String, DataSegment>, Iterable<DataSegment>>(){

                    @Override
                    public Iterable<DataSegment> apply(TimelineObjectHolder<String, DataSegment> input) {
                        return input.getObject().payloads();
                    }
                })));
                return new ArrayList<DataSegment>(segments);
            }
        });
    }

    private List<SegmentIdentifier> getPendingSegmentsForIntervalWithHandle(Handle handle, String dataSource, Interval interval) throws IOException {
        ArrayList<SegmentIdentifier> identifiers = Lists.newArrayList();
        Iterator dbSegments = ((Query)((Query)((Query)handle.createQuery(StringUtils.format("SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start <= :end and %2$send%2$s >= :start", this.dbTables.getPendingSegmentsTable(), this.connector.getQuoteString())).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map(ByteArrayMapper.FIRST).iterator();
        while (dbSegments.hasNext()) {
            byte[] payload = (byte[])dbSegments.next();
            SegmentIdentifier identifier = this.jsonMapper.readValue(payload, SegmentIdentifier.class);
            if (!interval.overlaps((ReadableInterval)identifier.getInterval())) continue;
            identifiers.add(identifier);
        }
        dbSegments.close();
        return identifiers;
    }

    private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalsWithHandle(Handle handle, String dataSource, List<Interval> intervals) throws IOException {
        if (intervals == null || intervals.isEmpty()) {
            throw new IAE("null/empty intervals", new Object[0]);
        }
        StringBuilder sb = new StringBuilder();
        sb.append("SELECT payload FROM %s WHERE used = true AND dataSource = ? AND (");
        for (int i = 0; i < intervals.size(); ++i) {
            sb.append(StringUtils.format("(start <= ? AND %1$send%1$s >= ?)", this.connector.getQuoteString()));
            if (i == intervals.size() - 1) {
                sb.append(")");
                continue;
            }
            sb.append(" OR ");
        }
        Query sql = (Query)handle.createQuery(StringUtils.format(sb.toString(), this.dbTables.getSegmentsTable())).bind(0, dataSource);
        for (int i = 0; i < intervals.size(); ++i) {
            Interval interval = intervals.get(i);
            sql = (Query)((Query)sql.bind(2 * i + 1, interval.getEnd().toString())).bind(2 * i + 2, interval.getStart().toString());
        }
        Iterator dbSegments = sql.map(ByteArrayMapper.FIRST).iterator();
        VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<String, DataSegment>(Ordering.natural());
        while (dbSegments.hasNext()) {
            byte[] payload = (byte[])dbSegments.next();
            DataSegment segment = this.jsonMapper.readValue(payload, DataSegment.class);
            timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
        }
        dbSegments.close();
        return timeline;
    }

    @Override
    public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) throws IOException {
        SegmentPublishResult result = this.announceHistoricalSegments(segments, null, null);
        if (!result.isSuccess()) {
            throw new ISE("WTF?! announceHistoricalSegments failed with null metadata, should not happen.", new Object[0]);
        }
        return result.getSegments();
    }

    @Override
    public SegmentPublishResult announceHistoricalSegments(final Set<DataSegment> segments, final DataSourceMetadata startMetadata, final DataSourceMetadata endMetadata) throws IOException {
        if (segments.isEmpty()) {
            throw new IllegalArgumentException("segment set must not be empty");
        }
        final String dataSource = segments.iterator().next().getDataSource();
        for (DataSegment segment : segments) {
            if (dataSource.equals(segment.getDataSource())) continue;
            throw new IllegalArgumentException("segments must all be from the same dataSource");
        }
        if (startMetadata == null && endMetadata != null || startMetadata != null && endMetadata == null) {
            throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
        }
        final HashSet<DataSegment> usedSegments = Sets.newHashSet();
        List<TimelineObjectHolder<String, DataSegment>> segmentHolders = VersionedIntervalTimeline.forSegments(segments).lookupWithIncompletePartitions(Intervals.ETERNITY);
        for (TimelineObjectHolder<String, DataSegment> holder : segmentHolders) {
            for (PartitionChunk<DataSegment> partitionChunk : holder.getObject()) {
                usedSegments.add(partitionChunk.getObject());
            }
        }
        final AtomicBoolean txnFailure = new AtomicBoolean(false);
        try {
            return this.connector.retryTransaction(new TransactionCallback<SegmentPublishResult>(){

                @Override
                public SegmentPublishResult inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                    DataSourceMetadataUpdateResult result;
                    HashSet<DataSegment> inserted = Sets.newHashSet();
                    if (startMetadata != null && (result = IndexerSQLMetadataStorageCoordinator.this.updateDataSourceMetadataWithHandle(handle, dataSource, startMetadata, endMetadata)) != DataSourceMetadataUpdateResult.SUCCESS) {
                        transactionStatus.setRollbackOnly();
                        txnFailure.set(true);
                        if (result == DataSourceMetadataUpdateResult.FAILURE) {
                            throw new RuntimeException("Aborting transaction!");
                        }
                        if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) {
                            throw new RetryTransactionException("Aborting transaction!");
                        }
                    }
                    for (DataSegment segment : segments) {
                        if (!IndexerSQLMetadataStorageCoordinator.this.announceHistoricalSegment(handle, segment, usedSegments.contains(segment))) continue;
                        inserted.add(segment);
                    }
                    return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true);
                }
            }, 3, 10);
        }
        catch (CallbackFailedException e) {
            if (txnFailure.get()) {
                return new SegmentPublishResult(ImmutableSet.of(), false);
            }
            throw e;
        }
    }

    @Override
    public SegmentIdentifier allocatePendingSegment(final String dataSource, final String sequenceName, String previousSegmentId, final Interval interval, final String maxVersion, final boolean skipSegmentLineageCheck) {
        Preconditions.checkNotNull(dataSource, "dataSource");
        Preconditions.checkNotNull(sequenceName, "sequenceName");
        Preconditions.checkNotNull(interval, "interval");
        Preconditions.checkNotNull(maxVersion, "maxVersion");
        final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
        return this.connector.retryTransaction(new TransactionCallback<SegmentIdentifier>(){

            @Override
            public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                SegmentIdentifier newIdentifier;
                List<byte[]> existingBytes = !skipSegmentLineageCheck ? ((Query)((Query)((Query)handle.createQuery(StringUtils.format("SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND sequence_prev_id = :sequence_prev_id", IndexerSQLMetadataStorageCoordinator.this.dbTables.getPendingSegmentsTable())).bind("dataSource", dataSource)).bind("sequence_name", sequenceName)).bind("sequence_prev_id", previousSegmentIdNotNull)).map(ByteArrayMapper.FIRST).list() : ((Query)((Query)((Query)((Query)handle.createQuery(StringUtils.format("SELECT payload FROM %s WHERE dataSource = :dataSource AND sequence_name = :sequence_name AND start = :start AND %2$send%2$s = :end", IndexerSQLMetadataStorageCoordinator.this.dbTables.getPendingSegmentsTable(), IndexerSQLMetadataStorageCoordinator.this.connector.getQuoteString())).bind("dataSource", dataSource)).bind("sequence_name", sequenceName)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map(ByteArrayMapper.FIRST).list();
                if (!existingBytes.isEmpty()) {
                    SegmentIdentifier existingIdentifier = IndexerSQLMetadataStorageCoordinator.this.jsonMapper.readValue(Iterables.getOnlyElement(existingBytes), SegmentIdentifier.class);
                    if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
                        log.info("Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", existingIdentifier.getIdentifierAsString(), sequenceName, previousSegmentIdNotNull);
                        return existingIdentifier;
                    }
                    log.warn("Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, does not match requested interval[%s]", existingIdentifier.getIdentifierAsString(), sequenceName, previousSegmentIdNotNull, interval);
                    return null;
                }
                List existingChunks = IndexerSQLMetadataStorageCoordinator.this.getTimelineForIntervalsWithHandle(handle, dataSource, ImmutableList.of(interval)).lookup(interval);
                if (existingChunks.size() > 1) {
                    log.warn("Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", dataSource, interval, maxVersion, existingChunks.size());
                    return null;
                }
                SegmentIdentifier max = null;
                if (!existingChunks.isEmpty()) {
                    TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks);
                    for (PartitionChunk existing : existingHolder.getObject()) {
                        if (max != null && max.getShardSpec().getPartitionNum() >= ((DataSegment)existing.getObject()).getShardSpec().getPartitionNum()) continue;
                        max = SegmentIdentifier.fromDataSegment((DataSegment)existing.getObject());
                    }
                }
                List pendings = IndexerSQLMetadataStorageCoordinator.this.getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
                for (SegmentIdentifier pending : pendings) {
                    if (max != null && pending.getVersion().compareTo(max.getVersion()) <= 0 && (!pending.getVersion().equals(max.getVersion()) || pending.getShardSpec().getPartitionNum() <= max.getShardSpec().getPartitionNum())) continue;
                    max = pending;
                }
                if (max == null) {
                    newIdentifier = new SegmentIdentifier(dataSource, interval, maxVersion, new NumberedShardSpec(0, 0));
                } else {
                    if (!max.getInterval().equals((Object)interval) || max.getVersion().compareTo(maxVersion) > 0) {
                        log.warn("Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", dataSource, interval, maxVersion, max.getIdentifierAsString());
                        return null;
                    }
                    if (max.getShardSpec() instanceof LinearShardSpec) {
                        newIdentifier = new SegmentIdentifier(dataSource, max.getInterval(), max.getVersion(), new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1));
                    } else if (max.getShardSpec() instanceof NumberedShardSpec) {
                        newIdentifier = new SegmentIdentifier(dataSource, max.getInterval(), max.getVersion(), new NumberedShardSpec(max.getShardSpec().getPartitionNum() + 1, ((NumberedShardSpec)max.getShardSpec()).getPartitions()));
                    } else {
                        log.warn("Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", dataSource, interval, maxVersion, max.getShardSpec().getClass(), max.getIdentifierAsString());
                        return null;
                    }
                }
                String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(Hashing.sha1().newHasher().putBytes(StringUtils.toUtf8(sequenceName)).putByte((byte)-1).putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)).hash().asBytes());
                ((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format("INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", IndexerSQLMetadataStorageCoordinator.this.dbTables.getPendingSegmentsTable(), IndexerSQLMetadataStorageCoordinator.this.connector.getQuoteString())).bind("id", newIdentifier.getIdentifierAsString())).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).bind("sequence_name", sequenceName)).bind("sequence_prev_id", previousSegmentIdNotNull)).bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)).bind("payload", IndexerSQLMetadataStorageCoordinator.this.jsonMapper.writeValueAsBytes(newIdentifier))).execute();
                log.info("Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", newIdentifier.getIdentifierAsString(), sequenceName, previousSegmentIdNotNull);
                return newIdentifier;
            }
        }, 3, 10);
    }

    @Override
    public int deletePendingSegments(String dataSource, Interval deleteInterval) {
        return this.connector.getDBI().inTransaction((handle, status) -> ((Update)((Update)((Update)handle.createStatement(StringUtils.format("delete from %s where datasource = :dataSource and created_date >= :start and created_date < :end", this.dbTables.getPendingSegmentsTable())).bind("dataSource", dataSource)).bind("start", deleteInterval.getStart().toString())).bind("end", deleteInterval.getEnd().toString())).execute());
    }

    private boolean announceHistoricalSegment(Handle handle, DataSegment segment, boolean used) throws IOException {
        try {
            if (this.segmentExists(handle, segment)) {
                log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
                return false;
            }
            ((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format("INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", this.dbTables.getSegmentsTable(), this.connector.getQuoteString())).bind("id", segment.getIdentifier())).bind("dataSource", segment.getDataSource())).bind("created_date", DateTimes.nowUtc().toString())).bind("start", segment.getInterval().getStart().toString())).bind("end", segment.getInterval().getEnd().toString())).bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec))).bind("version", segment.getVersion())).bind("used", used)).bind("payload", this.jsonMapper.writeValueAsBytes(segment))).execute();
            log.info("Published segment [%s] to DB with used flag [%s]", segment.getIdentifier(), used);
        }
        catch (Exception e) {
            log.error(e, "Exception inserting segment [%s] with used flag [%s] into DB", segment.getIdentifier(), used);
            throw e;
        }
        return true;
    }

    private boolean segmentExists(Handle handle, DataSegment segment) {
        return !((Query)handle.createQuery(StringUtils.format("SELECT id FROM %s WHERE id = :identifier", this.dbTables.getSegmentsTable())).bind("identifier", segment.getIdentifier())).map(StringMapper.FIRST).list().isEmpty();
    }

    @Override
    public DataSourceMetadata getDataSourceMetadata(String dataSource) {
        byte[] bytes = this.connector.lookup(this.dbTables.getDataSourceTable(), "dataSource", "commit_metadata_payload", dataSource);
        if (bytes == null) {
            return null;
        }
        try {
            return this.jsonMapper.readValue(bytes, DataSourceMetadata.class);
        }
        catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private byte[] getDataSourceMetadataWithHandleAsBytes(Handle handle, String dataSource) {
        return this.connector.lookupWithHandle(handle, this.dbTables.getDataSourceTable(), "dataSource", "commit_metadata_payload", dataSource);
    }

    protected DataSourceMetadataUpdateResult updateDataSourceMetadataWithHandle(Handle handle, String dataSource, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata) throws IOException {
        DataSourceMetadataUpdateResult retVal;
        boolean startMetadataMatchesExisting;
        DataSourceMetadata oldCommitMetadataFromDb;
        String oldCommitMetadataSha1FromDb;
        Preconditions.checkNotNull(dataSource, "dataSource");
        Preconditions.checkNotNull(startMetadata, "startMetadata");
        Preconditions.checkNotNull(endMetadata, "endMetadata");
        byte[] oldCommitMetadataBytesFromDb = this.getDataSourceMetadataWithHandleAsBytes(handle, dataSource);
        if (oldCommitMetadataBytesFromDb == null) {
            oldCommitMetadataSha1FromDb = null;
            oldCommitMetadataFromDb = null;
        } else {
            oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes());
            oldCommitMetadataFromDb = this.jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class);
        }
        boolean bl = startMetadataMatchesExisting = oldCommitMetadataFromDb == null ? startMetadata.isValidStart() : startMetadata.matches(oldCommitMetadataFromDb);
        if (!startMetadataMatchesExisting) {
            log.info("Not updating metadata, existing state is not the expected start state.", new Object[0]);
            log.debug("Existing database state [%s], request's start metadata [%s]", oldCommitMetadataFromDb, startMetadata);
            return DataSourceMetadataUpdateResult.FAILURE;
        }
        DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null ? endMetadata : oldCommitMetadataFromDb.plus(endMetadata);
        byte[] newCommitMetadataBytes = this.jsonMapper.writeValueAsBytes(newCommitMetadata);
        String newCommitMetadataSha1 = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes());
        if (oldCommitMetadataBytesFromDb == null) {
            int numRows = ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format("INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)", this.dbTables.getDataSourceTable())).bind("dataSource", dataSource)).bind("created_date", DateTimes.nowUtc().toString())).bind("commit_metadata_payload", newCommitMetadataBytes)).bind("commit_metadata_sha1", newCommitMetadataSha1)).execute();
            retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN;
        } else {
            int numRows = ((Update)((Update)((Update)((Update)handle.createStatement(StringUtils.format("UPDATE %s SET commit_metadata_payload = :new_commit_metadata_payload, commit_metadata_sha1 = :new_commit_metadata_sha1 WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1", this.dbTables.getDataSourceTable())).bind("dataSource", dataSource)).bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb)).bind("new_commit_metadata_payload", newCommitMetadataBytes)).bind("new_commit_metadata_sha1", newCommitMetadataSha1)).execute();
            DataSourceMetadataUpdateResult dataSourceMetadataUpdateResult = retVal = numRows == 1 ? DataSourceMetadataUpdateResult.SUCCESS : DataSourceMetadataUpdateResult.TRY_AGAIN;
        }
        if (retVal == DataSourceMetadataUpdateResult.SUCCESS) {
            log.info("Updated metadata from[%s] to[%s].", oldCommitMetadataFromDb, newCommitMetadata);
        } else {
            log.info("Not updating metadata, compare-and-swap failure.", new Object[0]);
        }
        return retVal;
    }

    @Override
    public boolean deleteDataSourceMetadata(final String dataSource) {
        return this.connector.retryWithHandle(new HandleCallback<Boolean>(){

            @Override
            public Boolean withHandle(Handle handle) throws Exception {
                int rows = ((Update)handle.createStatement(StringUtils.format("DELETE from %s WHERE dataSource = :dataSource", IndexerSQLMetadataStorageCoordinator.this.dbTables.getDataSourceTable())).bind("dataSource", dataSource)).execute();
                return rows > 0;
            }
        });
    }

    @Override
    public boolean resetDataSourceMetadata(final String dataSource, DataSourceMetadata dataSourceMetadata) throws IOException {
        final byte[] newCommitMetadataBytes = this.jsonMapper.writeValueAsBytes(dataSourceMetadata);
        final String newCommitMetadataSha1 = BaseEncoding.base16().encode(Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes());
        return this.connector.retryWithHandle(new HandleCallback<Boolean>(){

            @Override
            public Boolean withHandle(Handle handle) throws Exception {
                int numRows = ((Update)((Update)((Update)handle.createStatement(StringUtils.format("UPDATE %s SET commit_metadata_payload = :new_commit_metadata_payload, commit_metadata_sha1 = :new_commit_metadata_sha1 WHERE dataSource = :dataSource", IndexerSQLMetadataStorageCoordinator.this.dbTables.getDataSourceTable())).bind("dataSource", dataSource)).bind("new_commit_metadata_payload", newCommitMetadataBytes)).bind("new_commit_metadata_sha1", newCommitMetadataSha1)).execute();
                return numRows == 1;
            }
        });
    }

    @Override
    public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException {
        this.connector.getDBI().inTransaction(new TransactionCallback<Void>(){

            @Override
            public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception {
                for (DataSegment segment : segments) {
                    IndexerSQLMetadataStorageCoordinator.this.updatePayload(handle, segment);
                }
                return null;
            }
        });
    }

    @Override
    public void deleteSegments(final Set<DataSegment> segments) throws IOException {
        this.connector.getDBI().inTransaction(new TransactionCallback<Void>(){

            @Override
            public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException {
                for (DataSegment segment : segments) {
                    IndexerSQLMetadataStorageCoordinator.this.deleteSegment(handle, segment);
                }
                return null;
            }
        });
    }

    private void deleteSegment(Handle handle, DataSegment segment) {
        ((Update)handle.createStatement(StringUtils.format("DELETE from %s WHERE id = :id", this.dbTables.getSegmentsTable())).bind("id", segment.getIdentifier())).execute();
    }

    private void updatePayload(Handle handle, DataSegment segment) throws IOException {
        try {
            ((Update)((Update)handle.createStatement(StringUtils.format("UPDATE %s SET payload = :payload WHERE id = :id", this.dbTables.getSegmentsTable())).bind("id", segment.getIdentifier())).bind("payload", this.jsonMapper.writeValueAsBytes(segment))).execute();
        }
        catch (IOException e) {
            log.error(e, "Exception inserting into DB", new Object[0]);
            throw e;
        }
    }

    @Override
    public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval) {
        List<DataSegment> matchingSegments = this.connector.inReadOnlyTransaction(new TransactionCallback<List<DataSegment>>(){

            @Override
            public List<DataSegment> inTransaction(Handle handle, TransactionStatus status) throws Exception {
                return ((Query)((Query)((Query)handle.createQuery(StringUtils.format("SELECT payload FROM %1$s WHERE dataSource = :dataSource and start >= :start and start <= :end and %2$send%2$s <= :end and used = false", IndexerSQLMetadataStorageCoordinator.this.dbTables.getSegmentsTable(), IndexerSQLMetadataStorageCoordinator.this.connector.getQuoteString())).setFetchSize(IndexerSQLMetadataStorageCoordinator.this.connector.getStreamingFetchSize()).bind("dataSource", dataSource)).bind("start", interval.getStart().toString())).bind("end", interval.getEnd().toString())).map(ByteArrayMapper.FIRST).fold(Lists.newArrayList(), new Folder3<List<DataSegment>, byte[]>(){

                    @Override
                    public List<DataSegment> fold(List<DataSegment> accumulator, byte[] payload, FoldController foldController, StatementContext statementContext) throws SQLException {
                        try {
                            accumulator.add(IndexerSQLMetadataStorageCoordinator.this.jsonMapper.readValue(payload, DataSegment.class));
                            return accumulator;
                        }
                        catch (Exception e) {
                            throw Throwables.propagate(e);
                        }
                    }
                });
            }
        });
        log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
        return matchingSegments;
    }

    static enum DataSourceMetadataUpdateResult {
        SUCCESS,
        FAILURE,
        TRY_AGAIN;

    }
}

