package org.apache.mahout.clustering.streaming.mapreduce;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.math.Centroid;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.neighborhood.BruteSearch;
import org.apache.mahout.math.neighborhood.ProjectionSearch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/mahout/clustering/streaming/mapreduce/StreamingKMeansDriver.class */
public final class StreamingKMeansDriver extends AbstractJob {
    public static final String ESTIMATED_NUM_MAP_CLUSTERS = "estimatedNumMapClusters";
    public static final String ESTIMATED_DISTANCE_CUTOFF = "estimatedDistanceCutoff";
    public static final String MAX_NUM_ITERATIONS = "maxNumIterations";
    public static final String TRIM_FRACTION = "trimFraction";
    public static final String RANDOM_INIT = "randomInit";
    public static final String IGNORE_WEIGHTS = "ignoreWeights";
    public static final String TEST_PROBABILITY = "testProbability";
    public static final String NUM_BALLKMEANS_RUNS = "numBallKMeansRuns";
    public static final String SEARCHER_CLASS_OPTION = "searcherClass";
    public static final String NUM_PROJECTIONS_OPTION = "numProjections";
    public static final String SEARCH_SIZE_OPTION = "searchSize";
    public static final String REDUCE_STREAMING_KMEANS = "reduceStreamingKMeans";
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamingKMeansDriver.class);
    public static final float INVALID_DISTANCE_CUTOFF = -1.0f;

    @Override // org.apache.hadoop.util.Tool
    public int run(String[] strArr) throws Exception {
        addInputOption();
        addOutputOption();
        addOption(DefaultOptionCreator.overwriteOption().create());
        addOption(DefaultOptionCreator.numClustersOption().withDescription("The k in k-Means. Approximately this many clusters will be generated.").create());
        addOption(ESTIMATED_NUM_MAP_CLUSTERS, "km", "The estimated number of clusters to use for the Map phase of the job when running StreamingKMeans. This should be around k * log(n), where k is the final number of clusters and n is the total number of data points to cluster.", String.valueOf(1));
        addOption(ESTIMATED_DISTANCE_CUTOFF, "e", "The initial estimated distance cutoff between two points for forming new clusters. If no value is given, it's estimated from the data set", String.valueOf(-1.0f));
        addOption(MAX_NUM_ITERATIONS, "mi", "The maximum number of iterations to run for the BallKMeans algorithm used by the reducer. If no value is given, defaults to 10.", String.valueOf(10));
        addOption(TRIM_FRACTION, "tf", "The 'ball' aspect of ball k-means means that only the closest points to the centroid will actually be used for updating. The fraction of the points to be used is those points whose distance to the center is within trimFraction * distance to the closest other center. If no value is given, defaults to 0.9.", String.valueOf(0.9d));
        addFlag(RANDOM_INIT, "ri", "Whether to use k-means++ initialization or random initialization of the seed centroids. Essentially, k-means++ provides better clusters, but takes longer, whereas random initialization takes less time, but produces worse clusters, and tends to fail more often and needs multiple runs to compare to k-means++. If set, uses the random initialization.");
        addFlag(IGNORE_WEIGHTS, "iw", "Whether to correct the weights of the centroids after the clustering is done. The weights end up being wrong because of the trimFraction and possible train/test splits. In some cases, especially in a pipeline, having an accurate count of the weights is useful. If set, ignores the final weights");
        addOption(TEST_PROBABILITY, "testp", "A double value between 0 and 1 that represents the percentage of points to be used for 'testing' different clustering runs in the final BallKMeans step. If no value is given, defaults to 0.1", String.valueOf(0.1d));
        addOption(NUM_BALLKMEANS_RUNS, "nbkm", "Number of BallKMeans runs to use at the end to try to cluster the points. If no value is given, defaults to 4", String.valueOf(4));
        addOption(DefaultOptionCreator.distanceMeasureOption().create());
        addOption(SEARCHER_CLASS_OPTION, "sc", "The type of searcher to be used when performing nearest neighbor searches. Defaults to ProjectionSearch.", ProjectionSearch.class.getCanonicalName());
        addOption(NUM_PROJECTIONS_OPTION, "np", "The number of projections considered in estimating the distances between vectors. Only used when the distance measure requested is either ProjectionSearch or FastProjectionSearch. If no value is given, defaults to 3.", String.valueOf(3));
        addOption(SEARCH_SIZE_OPTION, "s", "In more efficient searches (non BruteSearch), not all distances are calculated for determining the nearest neighbors. The number of elements whose distances from the query vector is actually computer is proportional to searchSize. If no value is given, defaults to 1.", String.valueOf(2));
        addFlag(REDUCE_STREAMING_KMEANS, "rskm", "There might be too many intermediate clusters from the mapper to fit into memory, so the reducer can run another pass of StreamingKMeans to collapse them down to a fewer clusters");
        addOption(DefaultOptionCreator.methodOption().create());
        if (parseArguments(strArr) == null) {
            return -1;
        }
        Path outputPath = getOutputPath();
        if (hasOption("overwrite")) {
            HadoopUtil.delete(getConf(), outputPath);
        }
        configureOptionsForWorkers();
        run(getConf(), getInputPath(), outputPath);
        return 0;
    }

    private void configureOptionsForWorkers() throws ClassNotFoundException {
        log.info("Starting to configure options for workers");
        String option = getOption("method");
        int parseInt = Integer.parseInt(getOption(DefaultOptionCreator.NUM_CLUSTERS_OPTION));
        int parseInt2 = Integer.parseInt(getOption(ESTIMATED_NUM_MAP_CLUSTERS));
        float parseFloat = Float.parseFloat(getOption(ESTIMATED_DISTANCE_CUTOFF));
        int parseInt3 = Integer.parseInt(getOption(MAX_NUM_ITERATIONS));
        float parseFloat2 = Float.parseFloat(getOption(TRIM_FRACTION));
        boolean hasOption = hasOption(RANDOM_INIT);
        boolean hasOption2 = hasOption(IGNORE_WEIGHTS);
        float parseFloat3 = Float.parseFloat(getOption(TEST_PROBABILITY));
        int parseInt4 = Integer.parseInt(getOption(NUM_BALLKMEANS_RUNS));
        String option2 = getOption(DefaultOptionCreator.DISTANCE_MEASURE_OPTION);
        String option3 = getOption(SEARCHER_CLASS_OPTION);
        boolean z = false;
        boolean z2 = false;
        if (!option3.equals(BruteSearch.class.getName())) {
            z = true;
            z2 = true;
        }
        int i = 0;
        if (z) {
            i = Integer.parseInt(getOption(SEARCH_SIZE_OPTION));
        }
        int i2 = 0;
        if (z2) {
            i2 = Integer.parseInt(getOption(NUM_PROJECTIONS_OPTION));
        }
        configureOptionsForWorkers(getConf(), parseInt, parseInt2, parseFloat, parseInt3, parseFloat2, hasOption, hasOption2, parseFloat3, parseInt4, option2, option3, i, i2, option, hasOption(REDUCE_STREAMING_KMEANS));
    }

    public static void configureOptionsForWorkers(Configuration configuration, int i, int i2, float f, int i3, float f2, boolean z, boolean z2, float f3, int i4, String str, String str2, int i5, int i6, String str3, boolean z3) throws ClassNotFoundException {
        Preconditions.checkArgument(i > 0, "Invalid number of clusters requested: " + i + ". Must be: numClusters > 0!");
        Preconditions.checkArgument(i2 > i, "Invalid number of estimated map clusters; There must be more than the final number of clusters (k log n vs k)");
        Preconditions.checkArgument(f == -1.0f || f > PackedInts.COMPACT, "estimatedDistanceCutoff must be equal to -1 or must be greater then 0!");
        Preconditions.checkArgument(i3 > 0, "Must have at least one BallKMeans iteration");
        Preconditions.checkArgument(f2 > PackedInts.COMPACT, "trimFraction must be positive");
        Preconditions.checkArgument(f3 >= PackedInts.COMPACT && f3 < 1.0f, "test probability is not in the interval [0, 1)");
        Preconditions.checkArgument(i4 > 0, "numBallKMeans cannot be negative");
        if (!str2.contains("Brute")) {
            Preconditions.checkArgument(i5 > 0, "Invalid searchSize. Must be positive.");
            if (str2.contains("Projection")) {
                Preconditions.checkArgument(i6 > 0, "Invalid numProjections. Must be positive");
            }
        }
        configuration.setInt(DefaultOptionCreator.NUM_CLUSTERS_OPTION, i);
        configuration.setInt(ESTIMATED_NUM_MAP_CLUSTERS, i2);
        if (f != -1.0f) {
            configuration.setFloat(ESTIMATED_DISTANCE_CUTOFF, f);
        }
        configuration.setInt(MAX_NUM_ITERATIONS, i3);
        configuration.setFloat(TRIM_FRACTION, f2);
        configuration.setBoolean(RANDOM_INIT, z);
        configuration.setBoolean(IGNORE_WEIGHTS, z2);
        configuration.setFloat(TEST_PROBABILITY, f3);
        configuration.setInt(NUM_BALLKMEANS_RUNS, i4);
        Class.forName(str);
        configuration.set(DefaultOptionCreator.DISTANCE_MEASURE_OPTION, str);
        Class.forName(str2);
        configuration.set(SEARCHER_CLASS_OPTION, str2);
        configuration.setInt(SEARCH_SIZE_OPTION, i5);
        configuration.setInt(NUM_PROJECTIONS_OPTION, i6);
        configuration.set("method", str3);
        configuration.setBoolean(REDUCE_STREAMING_KMEANS, z3);
        log.info("Parameters are: [k] numClusters {}; [SKM] estimatedNumMapClusters {}; estimatedDistanceCutoff {} [BKM] maxNumIterations {}; trimFraction {}; randomInit {}; ignoreWeights {}; testProbability {}; numBallKMeansRuns {}; [S] measureClass {}; searcherClass {}; searcherSize {}; numProjections {}; method {}; reduceStreamingKMeans {}", Integer.valueOf(i), Integer.valueOf(i2), Float.valueOf(f), Integer.valueOf(i3), Float.valueOf(f2), Boolean.valueOf(z), Boolean.valueOf(z2), Float.valueOf(f3), Integer.valueOf(i4), str, str2, Integer.valueOf(i5), Integer.valueOf(i6), str3, Boolean.valueOf(z3));
    }

    public static int run(Configuration configuration, Path path, Path path2) throws IOException, InterruptedException, ClassNotFoundException, ExecutionException {
        log.info("Starting StreamingKMeans clustering for vectors in {}; results are output to {}", path.toString(), path2.toString());
        return configuration.get("method", "mapreduce").equals("sequential") ? runSequentially(configuration, path, path2) : runMapReduce(configuration, path, path2);
    }

    private static int runSequentially(Configuration configuration, Path path, Path path2) throws IOException, ExecutionException, InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ArrayList arrayList = new ArrayList();
        for (FileStatus fileStatus : HadoopUtil.listStatus(FileSystem.get(configuration), path, PathFilters.logsCRCFilter())) {
            arrayList.add(newCachedThreadPool.submit(new StreamingKMeansThread(fileStatus.getPath(), configuration)));
        }
        log.info("Finished running Mappers");
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Iterator it2 = ((Iterable) ((Future) it.next()).get()).iterator();
            while (it2.hasNext()) {
                arrayList2.add((Centroid) it2.next());
            }
        }
        newCachedThreadPool.shutdown();
        newCachedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        log.info("Finished StreamingKMeans");
        SequenceFile.Writer createWriter = SequenceFile.createWriter(FileSystem.get(configuration), configuration, new Path(path2, "part-r-00000"), IntWritable.class, CentroidWritable.class);
        int i = 0;
        Iterator<Vector> it3 = StreamingKMeansReducer.getBestCentroids(arrayList2, configuration).iterator();
        while (it3.hasNext()) {
            int i2 = i;
            i++;
            createWriter.append((Writable) new IntWritable(i2), (Writable) new CentroidWritable((Centroid) it3.next()));
        }
        createWriter.close();
        log.info("Finished BallKMeans. Took {}.", Double.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000.0d));
        return 0;
    }

    public static int runMapReduce(Configuration configuration, Path path, Path path2) throws IOException, ClassNotFoundException, InterruptedException {
        Job prepareJob = HadoopUtil.prepareJob(path, path2, SequenceFileInputFormat.class, StreamingKMeansMapper.class, IntWritable.class, CentroidWritable.class, StreamingKMeansReducer.class, IntWritable.class, CentroidWritable.class, SequenceFileOutputFormat.class, configuration);
        prepareJob.setJobName(HadoopUtil.getCustomJobName(StreamingKMeansDriver.class.getSimpleName(), prepareJob, StreamingKMeansMapper.class, StreamingKMeansReducer.class));
        prepareJob.setNumReduceTasks(1);
        prepareJob.setJarByClass(StreamingKMeansDriver.class);
        long currentTimeMillis = System.currentTimeMillis();
        if (!prepareJob.waitForCompletion(true)) {
            return -1;
        }
        log.info("StreamingKMeans clustering complete. Results are in {}. Took {} ms", path2.toString(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return 0;
    }

    private StreamingKMeansDriver() {
    }

    public static void main(String[] strArr) throws Exception {
        ToolRunner.run(new StreamingKMeansDriver(), strArr);
    }
}
