/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.cli.marlin;

import com.google.common.collect.ImmutableMap;
import com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils;
import com.mapr.cli.DbCommands;
import com.mapr.cli.DbRegionCommands;
import com.mapr.cli.MapRCliUtil;
import com.mapr.cli.common.FileclientRun;
import com.mapr.cli.marlin.AssignCommands;
import com.mapr.cli.marlin.CursorCommands;
import com.mapr.cli.marlin.RecentStreamsListManager;
import com.mapr.cli.marlin.RecentStreamsListManagers;
import com.mapr.cli.marlin.StreamCompactCommands;
import com.mapr.cli.marlin.StreamReplicaCommands;
import com.mapr.cli.marlin.StreamUpstreamCommands;
import com.mapr.cli.marlin.TopicCommands;
import com.mapr.cli.table.TabletStats;
import com.mapr.cliframework.base.CLIBaseClass;
import com.mapr.cliframework.base.CLICommand;
import com.mapr.cliframework.base.CLIInterface;
import com.mapr.cliframework.base.CLIProcessingException;
import com.mapr.cliframework.base.CLIUsageOnlyCommand;
import com.mapr.cliframework.base.CommandOutput;
import com.mapr.cliframework.base.ProcessedInput;
import com.mapr.cliframework.base.TextCommandOutput;
import com.mapr.cliframework.base.inputparams.BooleanInputParameter;
import com.mapr.cliframework.base.inputparams.IntegerInputParameter;
import com.mapr.cliframework.base.inputparams.LongInputParameter;
import com.mapr.cliframework.base.inputparams.NoValueInputParameter;
import com.mapr.cliframework.base.inputparams.TextInputParameter;
import com.mapr.fs.MapRFileSystem;
import com.mapr.fs.proto.Dbserver;
import com.mapr.fs.proto.Marlincommon;
import com.mapr.fs.tables.TableProperties;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.TimestampType;
import com.mapr.streams.impl.admin.MStreamDescriptor;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

public class StreamsCommands
extends CLIBaseClass
implements CLIInterface {
    private static final Logger LOG = Logger.getLogger(StreamsCommands.class);
    private static final String PATH_PARAM_NAME = "path";
    private static final String TTL_PARAM_NAME = "ttl";
    private static final String AUTO_CREATE_TOPICS_PARAM_NAME = "autocreate";
    private static final String DEFAULT_PARTITIONS = "defaultpartitions";
    private static final String COMPRESSION_PARAM_NAME = "compression";
    private static final String CLIENT_COMPRESSION_PARAM_NAME = "clientcompression";
    private static final String PRODUCE_PERM_PARAM_NAME = "produceperm";
    private static final String LISTEN_PERM_PARAM_NAME = "consumeperm";
    private static final String TOPIC_PERM_PARAM_NAME = "topicperm";
    private static final String COPY_PERM_PARAM_NAME = "copyperm";
    private static final String ADMIN_PERM_PARAM_NAME = "adminperm";
    private static final String COPY_META_FROM_PARAM_NAME = "copymetafrom";
    private static final String ISCHANGELOG_PARAM_NAME = "ischangelog";
    private static final String NTHREADS_PARAM_NAME = "nthreads";
    private static final String DEFAULT_TIMESTAMP_TYPE_PARAM_NAME = "defaulttimestamptype";
    private static final String DEFAULT_LOG_COMPACTION_PARAM_NAME = "compact";
    private static final String MIN_COMPACTION_LAG_MS = "mincompactionlag";
    private static final String DELETE_RETENTION_MS = "deleteretention";
    private static final String PID_EXPIRY_SECS_PARAM_NAME = "pidexpirysecs";
    private static final String FORCE_PARAM_NAME = "force";
    private static final String COMPACTION_THROTTLE_FACTOR = "throttlefactor";
    private static final String KAFKATOPIC_PARAM_NAME = "kafkatopic";
    private static final CLICommand createCommand = new CLICommand("create", "usage: stream create -path <path>", StreamsCommands.class, CLICommand.ExecutionTypeEnum.NATIVE, (Map)new ImmutableMap.Builder().put((Object)"path", (Object)new TextInputParameter("path", "Stream Path", true, null)).put((Object)"ttl", (Object)new LongInputParameter("ttl", "Time to live in seconds. default:604800", false, null)).put((Object)"autocreate", (Object)new BooleanInputParameter("autocreate", "Auto create topics. default:true", false, null)).put((Object)"defaultpartitions", (Object)new IntegerInputParameter("defaultpartitions", "Default partitions per topic. default:1", false, null)).put((Object)"compression", (Object)new TextInputParameter("compression", "off|lz4|lzf|zlib. default:inherit from parent directory", false, null)).put((Object)"clientcompression", (Object)new BooleanInputParameter("compression", "Compress on client. default:false", false, null).setInvisible(true)).put((Object)"produceperm", (Object)new TextInputParameter("produceperm", "Producer access control expression. default u:creator", false, null)).put((Object)"consumeperm", (Object)new TextInputParameter("consumeperm", "Consumer access control expression. default u:creator", false, null)).put((Object)"topicperm", (Object)new TextInputParameter("topicperm", "Topic CRUD access control expression. default u:creator", false, null)).put((Object)"copyperm", (Object)new TextInputParameter("copyperm", "Stream copy access control expression. default u:creator", false, null)).put((Object)"adminperm", (Object)new TextInputParameter("adminperm", "Stream administration access control expression. default u:creator", false, null)).put((Object)"copymetafrom", (Object)new TextInputParameter("copymetafrom", "Stream to copy attributes from. default:none", false, null)).put((Object)"ischangelog", (Object)new BooleanInputParameter("ischangelog", "Stream to store changelog. default:false", false, null)).put((Object)"defaulttimestamptype", (Object)new TextInputParameter("defaulttimestamptype", "Timestamp type: createtime | logappendtime. default: createtime", false, null)).put((Object)"mincompactionlag", (Object)new LongInputParameter("mincompactionlag", "Set time in millisecond a message should remain uncompacted for. default: 0", false, null).setInvisible(true)).put((Object)"deleteretention", (Object)new LongInputParameter("deleteretention", "Set the time in millisecond for which delete records are retained. default: 86400000", false, null).setInvisible(true)).put((Object)"pidexpirysecs", (Object)new LongInputParameter("pidexpirysecs", "Producer ID expiry in secs. default: 604800", false, null)).build(), null).setShortUsage("stream create -path <path>");
    private static final CLICommand editCommand = new CLICommand("edit", "usage: stream edit -path <path>", StreamsCommands.class, CLICommand.ExecutionTypeEnum.NATIVE, (Map)new ImmutableMap.Builder().put((Object)"path", (Object)new TextInputParameter("path", "Stream Path", true, null)).put((Object)"ttl", (Object)new LongInputParameter("ttl", "Time to live in seconds", false, null)).put((Object)"autocreate", (Object)new BooleanInputParameter("autocreate", "Auto create topics", false, null)).put((Object)"defaultpartitions", (Object)new IntegerInputParameter("defaultpartitions", "Default partitions per topic", false, null)).put((Object)"compression", (Object)new TextInputParameter("compression", "off|lz4|lzf|zlib", false, null)).put((Object)"clientcompression", (Object)new BooleanInputParameter("compression", "Compress on client. default:false", false, null).setInvisible(true)).put((Object)"produceperm", (Object)new TextInputParameter("produceperm", "Producer access control expression. default u:creator", false, null)).put((Object)"consumeperm", (Object)new TextInputParameter("consumeperm", "Consumer access control expression. default u:creator", false, null)).put((Object)"topicperm", (Object)new TextInputParameter("topicperm", "Topic CRUD access control expression. default u:creator", false, null)).put((Object)"copyperm", (Object)new TextInputParameter("copyperm", "Stream copy access control expression. default u:creator", false, null)).put((Object)"adminperm", (Object)new TextInputParameter("adminperm", "Stream administration access control expression. default u:creator", false, null)).put((Object)"defaulttimestamptype", (Object)new TextInputParameter("defaulttimestamptype", "timestamp type: createtime | logappendtime. default: createtime", false, null)).put((Object)"compact", (Object)new BooleanInputParameter("compact", "Set log compaction for stream. default: false", false, null)).put((Object)"mincompactionlag", (Object)new LongInputParameter("mincompactionlag", "Set time in millisecond a message should remain uncompacted for. default: 0", false, null)).put((Object)"deleteretention", (Object)new LongInputParameter("deleteretention", "Set the time in millisecond for which delete records are retained. default: 86400000", false, null)).put((Object)"pidexpirysecs", (Object)new LongInputParameter("pidexpirysecs", "Producer ID expiry in secs. default: 604800", false, null)).put((Object)"force", (Object)new NoValueInputParameter("force", "When used with -compact, forces enabling log compaction on a stream", false, false)).put((Object)"throttlefactor", (Object)new LongInputParameter("throttlefactor", "When used with -compact, throttles log compaction on a stream", false, null).setInvisible(true)).build(), null).setShortUsage("stream edit -path <path>");
    private static final CLICommand infoCommand = new CLICommand("info", "usage: stream info -path <path>", StreamsCommands.class, CLICommand.ExecutionTypeEnum.NATIVE, (Map)new ImmutableMap.Builder().put((Object)"path", (Object)new TextInputParameter("path", "Stream Path", true, null)).build(), null).setShortUsage("stream info -path <path>");
    private static final CLICommand deleteCommand = new CLICommand("delete", "usage: stream delete -path <path>", StreamsCommands.class, CLICommand.ExecutionTypeEnum.NATIVE, (Map)new ImmutableMap.Builder().put((Object)"path", (Object)new TextInputParameter("path", "Stream Path", true, null)).build(), null).setShortUsage("stream delete -path <path>");
    private static final CLICommand purgeCommand = new CLICommand("purge", "usage: stream purge -path <path>", StreamsCommands.class, CLICommand.ExecutionTypeEnum.NATIVE, (Map)new ImmutableMap.Builder().put((Object)"path", (Object)new TextInputParameter("path", "Stream Path", true, null)).put((Object)"nthreads", (Object)new IntegerInputParameter("nthreads", "number of parallel threads", false, Integer.valueOf(16)).setInvisible(true)).build(), null).setShortUsage("stream purge -path <path>");
    private static final CLICommand listRecentCommand = new CLICommand("listrecent", "usage: stream listrecent", StreamsCommands.class, CLICommand.ExecutionTypeEnum.NATIVE).setShortUsage("stream listrecent");
    public static final CLICommand streamCommands = new CLICommand("stream", "stream", CLIUsageOnlyCommand.class, CLICommand.ExecutionTypeEnum.NATIVE, new CLICommand[]{createCommand, editCommand, infoCommand, deleteCommand, purgeCommand, TopicCommands.topicCommands, CursorCommands.cursorCommands, AssignCommands.assignCommands, StreamReplicaCommands.replicaCommands, StreamUpstreamCommands.upstreamCommands, StreamCompactCommands.compactInfoCommand, listRecentCommand}).setShortUsage("stream [create|edit|info|delete|topic|cursor|replica|upstream|listrecent]");

    public StreamsCommands(ProcessedInput input, CLICommand cliCommand) {
        super(input, cliCommand);
    }

    public CommandOutput executeRealCommand() throws CLIProcessingException {
        CommandOutput.OutputHierarchy out = new CommandOutput.OutputHierarchy();
        CommandOutput output = new CommandOutput();
        output.setOutput(out);
        if (!super.validateInput()) {
            return output;
        }
        String cmdName = this.cliCommand.getCommandName();
        if (cmdName.equalsIgnoreCase(createCommand.getCommandName())) {
            return this.createStream();
        }
        if (cmdName.equalsIgnoreCase(editCommand.getCommandName())) {
            this.editStream(out);
        } else if (cmdName.equalsIgnoreCase(infoCommand.getCommandName())) {
            this.infoStream(out);
        } else if (cmdName.equalsIgnoreCase(deleteCommand.getCommandName())) {
            this.deleteStream(out);
        } else if (cmdName.equalsIgnoreCase(purgeCommand.getCommandName())) {
            this.purgeStream(out);
        } else if (cmdName.equalsIgnoreCase(listRecentCommand.getCommandName())) {
            this.listRecentStreams(out);
        }
        return output;
    }

    private CommandOutput createStream() throws CLIProcessingException {
        CommandOutput.OutputHierarchy out = new CommandOutput.OutputHierarchy();
        CommandOutput output = new CommandOutput();
        output.setOutput(out);
        try {
            String perms;
            final String user = this.getUserLoginId();
            String streamName = this.getParamTextValue(PATH_PARAM_NAME, 0);
            final String path = DbCommands.getTransformedPath(streamName, this.getUserLoginId());
            final Configuration conf = new Configuration();
            StreamDescriptor sdesc = null;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("begin creating stream=" + path + " user=" + user));
            }
            if (this.isParamPresent(COPY_META_FROM_PARAM_NAME)) {
                String copyFrom = this.getParamTextValue(COPY_META_FROM_PARAM_NAME, 0);
                sdesc = this.getStreamDescriptor(copyFrom);
            } else {
                sdesc = Streams.newStreamDescriptor();
            }
            if (this.isParamPresent(DEFAULT_PARTITIONS)) {
                int val = this.getParamIntValue(DEFAULT_PARTITIONS, 0);
                sdesc.setDefaultPartitions(val);
            }
            if (this.isParamPresent(TTL_PARAM_NAME)) {
                long ttl = this.getParamLongValue(TTL_PARAM_NAME, 0);
                sdesc.setTimeToLiveSec(ttl);
            }
            if (this.isParamPresent(COMPRESSION_PARAM_NAME)) {
                String ctype = this.getParamTextValue(COMPRESSION_PARAM_NAME, 0);
                sdesc.setCompressionAlgo(ctype);
            }
            if (this.isParamPresent(CLIENT_COMPRESSION_PARAM_NAME)) {
                boolean val = this.getParamBooleanValue(CLIENT_COMPRESSION_PARAM_NAME, 0);
                MStreamDescriptor mdesc = (MStreamDescriptor)sdesc;
                mdesc.setClientCompression(val);
            }
            boolean ischangelog = false;
            if (this.isParamPresent(ISCHANGELOG_PARAM_NAME)) {
                boolean val = this.getParamBooleanValue(ISCHANGELOG_PARAM_NAME, 0);
                sdesc.setIsChangelog(val);
                ischangelog = val;
            }
            if (this.isParamPresent(AUTO_CREATE_TOPICS_PARAM_NAME)) {
                boolean val = this.getParamBooleanValue(AUTO_CREATE_TOPICS_PARAM_NAME, 0);
                if (ischangelog && !val) {
                    out.addError(new CommandOutput.OutputHierarchy.OutputError(22, "autocreate cannot be false for changelog stream."));
                    return output;
                }
                sdesc.setAutoCreateTopics(val);
            }
            boolean permsSpecified = false;
            if (this.isParamPresent(PRODUCE_PERM_PARAM_NAME)) {
                perms = this.getParamTextValue(PRODUCE_PERM_PARAM_NAME, 0);
                sdesc.setProducePerms(perms);
                permsSpecified = true;
            }
            if (this.isParamPresent(LISTEN_PERM_PARAM_NAME)) {
                perms = this.getParamTextValue(LISTEN_PERM_PARAM_NAME, 0);
                sdesc.setConsumePerms(perms);
                permsSpecified = true;
            }
            if (this.isParamPresent(TOPIC_PERM_PARAM_NAME)) {
                perms = this.getParamTextValue(TOPIC_PERM_PARAM_NAME, 0);
                sdesc.setTopicPerms(perms);
                permsSpecified = true;
            }
            if (this.isParamPresent(COPY_PERM_PARAM_NAME)) {
                perms = this.getParamTextValue(COPY_PERM_PARAM_NAME, 0);
                sdesc.setCopyPerms(perms);
                permsSpecified = true;
            }
            if (this.isParamPresent(ADMIN_PERM_PARAM_NAME)) {
                perms = this.getParamTextValue(ADMIN_PERM_PARAM_NAME, 0);
                sdesc.setAdminPerms(perms);
                permsSpecified = true;
            }
            if (this.isParamPresent(DEFAULT_TIMESTAMP_TYPE_PARAM_NAME)) {
                String timestampType = this.getParamTextValue(DEFAULT_TIMESTAMP_TYPE_PARAM_NAME, 0);
                sdesc.setDefaultTimestampType(TimestampType.forName((String)timestampType));
            }
            if (this.isParamPresent(MIN_COMPACTION_LAG_MS)) {
                long lag = this.getParamLongValue(MIN_COMPACTION_LAG_MS, 0);
                sdesc.setMinCompactionLagMS(lag);
            }
            if (this.isParamPresent(DELETE_RETENTION_MS)) {
                long ts = this.getParamLongValue(DELETE_RETENTION_MS, 0);
                sdesc.setDeleteRetentionMS(ts);
            }
            if (this.isParamPresent(PID_EXPIRY_SECS_PARAM_NAME)) {
                long producerIdExpirySecs = this.getParamLongValue(PID_EXPIRY_SECS_PARAM_NAME, 0);
                sdesc.setProducerIdExpirySecs(producerIdExpirySecs);
            }
            final StreamDescriptor desc = sdesc;
            new FileclientRun(user){

                @Override
                public void runAsProxyUser() throws CLIProcessingException, IOException {
                    try {
                        Admin madmin = Streams.newAdmin((Configuration)conf);
                        madmin.createStream(path, desc);
                        RecentStreamsListManagers.getRecentStreamsListManagerForUser(user).add(path);
                    }
                    catch (Exception e) {
                        LOG.error((Object)("createStream failed : " + e));
                        throw e;
                    }
                }
            };
            if (!permsSpecified) {
                return new TextCommandOutput(("Warning: produce/consume/topic permissions defaulting to creator. To change, execute 'maprcli stream edit -path " + path + " -produceperm <ACE> -consumeperm <ACE> -topicperm <ACE>'").getBytes());
            }
        }
        catch (CLIProcessingException e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(22, e.getMessage()));
        }
        catch (Exception e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(10003, e.getMessage()));
        }
        return output;
    }

    private void editStream(CommandOutput.OutputHierarchy out) throws CLIProcessingException {
        try {
            MStreamDescriptor mdesc;
            String user = this.getUserLoginId();
            String streamName = this.getParamTextValue(PATH_PARAM_NAME, 0);
            final String path = DbCommands.getTransformedPath(streamName, this.getUserLoginId());
            final Configuration conf = new Configuration();
            final StreamDescriptor sdesc = Streams.newStreamDescriptor();
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("begin editing stream=" + path + " user=" + user));
            }
            if (this.isParamPresent(DEFAULT_PARTITIONS)) {
                int n = this.getParamIntValue(DEFAULT_PARTITIONS, 0);
                sdesc.setDefaultPartitions(n);
            }
            if (this.isParamPresent(TTL_PARAM_NAME)) {
                long l = this.getParamLongValue(TTL_PARAM_NAME, 0);
                sdesc.setTimeToLiveSec(l);
            }
            if (this.isParamPresent(COMPRESSION_PARAM_NAME)) {
                String string = this.getParamTextValue(COMPRESSION_PARAM_NAME, 0);
                sdesc.setCompressionAlgo(string);
            }
            if (this.isParamPresent(CLIENT_COMPRESSION_PARAM_NAME)) {
                boolean bl = this.getParamBooleanValue(CLIENT_COMPRESSION_PARAM_NAME, 0);
                mdesc = (MStreamDescriptor)sdesc;
                mdesc.setClientCompression(bl);
            }
            if (this.isParamPresent(AUTO_CREATE_TOPICS_PARAM_NAME)) {
                boolean bl = this.getParamBooleanValue(AUTO_CREATE_TOPICS_PARAM_NAME, 0);
                sdesc.setAutoCreateTopics(bl);
            }
            if (this.isParamPresent(PRODUCE_PERM_PARAM_NAME)) {
                String string = this.getParamTextValue(PRODUCE_PERM_PARAM_NAME, 0);
                sdesc.setProducePerms(string);
            }
            if (this.isParamPresent(LISTEN_PERM_PARAM_NAME)) {
                String string = this.getParamTextValue(LISTEN_PERM_PARAM_NAME, 0);
                sdesc.setConsumePerms(string);
            }
            if (this.isParamPresent(TOPIC_PERM_PARAM_NAME)) {
                String string = this.getParamTextValue(TOPIC_PERM_PARAM_NAME, 0);
                sdesc.setTopicPerms(string);
            }
            if (this.isParamPresent(COPY_PERM_PARAM_NAME)) {
                String string = this.getParamTextValue(COPY_PERM_PARAM_NAME, 0);
                sdesc.setCopyPerms(string);
            }
            if (this.isParamPresent(ADMIN_PERM_PARAM_NAME)) {
                String string = this.getParamTextValue(ADMIN_PERM_PARAM_NAME, 0);
                sdesc.setAdminPerms(string);
            }
            if (this.isParamPresent(DEFAULT_TIMESTAMP_TYPE_PARAM_NAME)) {
                String string = this.getParamTextValue(DEFAULT_TIMESTAMP_TYPE_PARAM_NAME, 0);
                sdesc.setDefaultTimestampType(TimestampType.forName((String)string));
            }
            if (this.isParamPresent(DEFAULT_LOG_COMPACTION_PARAM_NAME)) {
                boolean bl = this.getParamBooleanValue(DEFAULT_LOG_COMPACTION_PARAM_NAME, 0);
                mdesc = (MStreamDescriptor)sdesc;
                mdesc.setCompact(bl);
            }
            if (this.isParamPresent(MIN_COMPACTION_LAG_MS)) {
                long l = this.getParamLongValue(MIN_COMPACTION_LAG_MS, 0);
                sdesc.setMinCompactionLagMS(l);
            }
            if (this.isParamPresent(DELETE_RETENTION_MS)) {
                long l = this.getParamLongValue(DELETE_RETENTION_MS, 0);
                sdesc.setDeleteRetentionMS(l);
            }
            if (this.isParamPresent(PID_EXPIRY_SECS_PARAM_NAME)) {
                long l = this.getParamLongValue(PID_EXPIRY_SECS_PARAM_NAME, 0);
                sdesc.setProducerIdExpirySecs(l);
            }
            if (this.isParamPresent(FORCE_PARAM_NAME)) {
                sdesc.setForce();
            }
            if (this.isParamPresent(COMPACTION_THROTTLE_FACTOR)) {
                long l = this.getParamLongValue(COMPACTION_THROTTLE_FACTOR, 0);
                sdesc.setCompactionThrottleFactor(l);
            }
            StreamDescriptor streamDescriptor = sdesc;
            final RecentStreamsListManager manager = RecentStreamsListManagers.getRecentStreamsListManagerForUser(user);
            final MapRFileSystem mfs = MapRCliUtil.getMapRFileSystem();
            new FileclientRun(user){

                @Override
                public void runAsProxyUser() throws CLIProcessingException, IOException {
                    try {
                        Admin madmin = Streams.newAdmin((Configuration)conf);
                        madmin.editStream(path, sdesc);
                        manager.moveToTop(path);
                    }
                    catch (Exception e) {
                        LOG.error((Object)("editStream failed : " + e));
                        manager.deleteIfNotExist(path, mfs);
                        throw e;
                    }
                }
            };
        }
        catch (CLIProcessingException e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(22, e.getMessage()));
        }
        catch (Exception e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(10003, e.getMessage()));
        }
    }

    private StreamDescriptor getStreamDescriptor(final String path) throws CLIProcessingException, IOException {
        final ArrayList sdList = new ArrayList();
        String user = this.getUserLoginId();
        final Configuration conf = new Configuration();
        new FileclientRun(user){

            @Override
            public void runAsProxyUser() throws CLIProcessingException, IOException {
                try {
                    Admin madmin = Streams.newAdmin((Configuration)conf);
                    StreamDescriptor sdesc = madmin.getStreamDescriptor(path);
                    sdList.add(sdesc);
                }
                catch (Exception e) {
                    LOG.error((Object)("getting stream info for stream " + path + " failed: " + e));
                    throw new IOException(e.getMessage());
                }
            }
        };
        return (StreamDescriptor)sdList.get(0);
    }

    private void infoStream(CommandOutput.OutputHierarchy out) throws CLIProcessingException {
        try {
            String user = this.getUserLoginId();
            String streamName = this.getParamTextValue(PATH_PARAM_NAME, 0);
            final String path = DbCommands.getTransformedPath(streamName, this.getUserLoginId());
            final Configuration conf = new Configuration();
            final ArrayList ntopicsList = new ArrayList();
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("begin info stream=" + path + " user=" + user));
            }
            StreamDescriptor mdesc = this.getStreamDescriptor(streamName);
            MStreamDescriptor sdesc = (MStreamDescriptor)mdesc;
            final RecentStreamsListManager manager = RecentStreamsListManagers.getRecentStreamsListManagerForUser(user);
            final MapRFileSystem mfs = MapRCliUtil.getMapRFileSystem();
            new FileclientRun(user){

                @Override
                public void runAsProxyUser() throws CLIProcessingException, IOException {
                    try {
                        Admin madmin = Streams.newAdmin((Configuration)conf);
                        int ntopics = madmin.countTopics(path);
                        ntopicsList.add(ntopics);
                        manager.moveToTop(path);
                    }
                    catch (Exception e) {
                        LOG.error((Object)("getting count of topics failed : " + e));
                        manager.deleteIfNotExist(path, mfs);
                        throw e;
                    }
                }
            };
            int ntopics = (Integer)ntopicsList.get(0);
            long physicalSize = 0L;
            long logicalSize = 0L;
            TabletStats tabletStats = new TabletStats(path, this.getUserLoginId());
            CommandOutput.OutputHierarchy tmpOut = new CommandOutput.OutputHierarchy();
            List<Dbserver.TabletDesc> tablets = tabletStats.getTablets(tmpOut, 0, Integer.MAX_VALUE);
            if (tablets == null) {
                if (out.getOutputErrors() != null && out.getOutputErrors().size() > 1) {
                    LOG.error((Object)("Error fetching regions for stream " + streamName + " : " + ((CommandOutput.OutputHierarchy.OutputError)out.getOutputErrors().get(0)).getErrorDescription()));
                } else {
                    LOG.error((Object)("Error fetching regions for stream " + streamName));
                }
            }
            for (Dbserver.TabletDesc tablet : tablets) {
                try {
                    Dbserver.TabletStatResponse tsr = tabletStats.getTabletStatResponse(tablet);
                    if (tsr == null || !tsr.hasUsage()) continue;
                    Dbserver.SpaceUsage su = tsr.getUsage();
                    long blockSize = 8192L;
                    physicalSize += su.getNumPhysicalBlocks() * blockSize;
                    logicalSize += su.getNumLogicalBlocks() * blockSize;
                }
                catch (Exception e) {
                    LOG.error((Object)("Error fetching region stats for stream " + streamName), (Throwable)e);
                }
            }
            CommandOutput.OutputHierarchy.OutputNode streamNode = new CommandOutput.OutputHierarchy.OutputNode();
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(PATH_PARAM_NAME, (Object)path));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode("physicalsize", physicalSize));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode("logicalsize", logicalSize));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode("numtopics", ntopics));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(DEFAULT_PARTITIONS, sdesc.getDefaultPartitions()));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(TTL_PARAM_NAME, sdesc.getTimeToLiveSec()));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(COMPRESSION_PARAM_NAME, (Object)sdesc.getCompressionAlgo()));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(AUTO_CREATE_TOPICS_PARAM_NAME, (Object)sdesc.getAutoCreateTopics()));
            if (sdesc.hasProducePerms()) {
                streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(PRODUCE_PERM_PARAM_NAME, (Object)sdesc.getProducePerms()));
            }
            if (sdesc.hasConsumePerms()) {
                streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(LISTEN_PERM_PARAM_NAME, (Object)sdesc.getConsumePerms()));
            }
            if (sdesc.hasTopicPerms()) {
                streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(TOPIC_PERM_PARAM_NAME, (Object)sdesc.getTopicPerms()));
            }
            if (sdesc.hasCopyPerms()) {
                streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(COPY_PERM_PARAM_NAME, (Object)sdesc.getCopyPerms()));
            }
            if (sdesc.hasAdminPerms()) {
                streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(ADMIN_PERM_PARAM_NAME, (Object)sdesc.getAdminPerms()));
            }
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(KAFKATOPIC_PARAM_NAME, (Object)sdesc.isKafkaTopic()));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(ISCHANGELOG_PARAM_NAME, (Object)sdesc.getIsChangelog()));
            if (sdesc.hasTimestampType()) {
                streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(DEFAULT_TIMESTAMP_TYPE_PARAM_NAME, (Object)sdesc.getDefaultTimestampType()));
            }
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(DEFAULT_LOG_COMPACTION_PARAM_NAME, (Object)sdesc.getCompact()));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(MIN_COMPACTION_LAG_MS, sdesc.getMinCompactionLagMS()));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(DELETE_RETENTION_MS, sdesc.getDeleteRetentionMS()));
            streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(COMPACTION_THROTTLE_FACTOR, sdesc.getCompactionThrottleFactor()));
            if (sdesc.hasProducerIdExpirySecs()) {
                streamNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(PID_EXPIRY_SECS_PARAM_NAME, sdesc.getProducerIdExpirySecs()));
            }
            out.addNode(streamNode);
        }
        catch (CLIProcessingException e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(22, e.getMessage()));
        }
        catch (Exception e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(10003, e.getMessage()));
        }
    }

    private void deleteStream(CommandOutput.OutputHierarchy out) throws CLIProcessingException {
        try {
            final String user = this.getUserLoginId();
            String streamName = this.getParamTextValue(PATH_PARAM_NAME, 0);
            final String path = DbCommands.getTransformedPath(streamName, this.getUserLoginId());
            final Configuration conf = new Configuration();
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("begin deleting stream=" + path + " user=" + user));
            }
            new FileclientRun(user){

                @Override
                public void runAsProxyUser() throws CLIProcessingException, IOException {
                    try {
                        Admin madmin = Streams.newAdmin((Configuration)conf);
                        madmin.deleteStream(path);
                        RecentStreamsListManagers.getRecentStreamsListManagerForUser(user).delete(path);
                    }
                    catch (Exception e) {
                        LOG.error((Object)("deleteStream failed : " + e));
                        throw e;
                    }
                }
            };
        }
        catch (CLIProcessingException e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(22, e.getMessage()));
        }
        catch (Exception e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(10003, e.getMessage()));
        }
    }

    private void purgeStream(CommandOutput.OutputHierarchy out) throws CLIProcessingException {
        String user = this.getUserLoginId();
        String streamName = this.getParamTextValue(PATH_PARAM_NAME, 0);
        String path = DbCommands.getTransformedPath(streamName, this.getUserLoginId());
        RecentStreamsListManager manager = RecentStreamsListManagers.getRecentStreamsListManagerForUser(user);
        MapRFileSystem mfs = MapRCliUtil.getMapRFileSystem();
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("begin purging stream=" + path + " user=" + user));
            }
            int nthreads = this.getParamIntValue(NTHREADS_PARAM_NAME, 0);
            int ctype = Dbserver.ForcedCompactionType.ForcedCompactionTTL.getNumber();
            manager.moveToTop(path);
            CommandOutput.OutputHierarchy.OutputError err = DbRegionCommands.packAllRegionsOfTable(path, ctype, nthreads, this.getUserLoginId());
            if (err != null && err.getErrorCode() != 0) {
                out.addError(err);
                return;
            }
            if (StreamsCommands.isLogCompactionEnabled(path) && (err = DbRegionCommands.packAllRegionsOfTable(path, ctype = Dbserver.ForcedCompactionType.ForcedCompactionDefault.getNumber(), nthreads, this.getUserLoginId())) != null && err.getErrorCode() != 0) {
                out.addError(err);
                return;
            }
        }
        catch (Exception e) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(10003, e.getMessage()));
            manager.deleteIfNotExist(path, mfs);
        }
    }

    private void listRecentStreams(CommandOutput.OutputHierarchy out) throws CLIProcessingException {
        RecentStreamsListManager manager = RecentStreamsListManagers.getRecentStreamsListManagerForUser(this.getUserLoginId());
        boolean hasHomeDir = manager.hasHomeDir();
        if (LOG.isDebugEnabled() && hasHomeDir) {
            LOG.debug((Object)("Home Directory /user/" + this.getUserLoginId() + " found"));
        }
        if (!hasHomeDir) {
            out.addError(new CommandOutput.OutputHierarchy.OutputError(22, "Home directory (/user/" + this.getUserLoginId() + ") is missing for this user. Please create it in order to cache the recent tables administered by this user."));
        } else {
            HashSet<String> recentPaths = new HashSet<String>();
            String currentClusterName = CLDBRpcCommonUtils.getInstance().getCurrentClusterName();
            for (String path : manager.getListFromFile()) {
                String recentPath = null;
                String[] splitPaths = path.split("/");
                if (path.startsWith("/mapr/") && splitPaths.length > 3 && path.split("/")[2].equals(currentClusterName)) {
                    String clusterPath = path.substring("/mapr/".length());
                    recentPath = clusterPath.substring(clusterPath.indexOf(47));
                } else {
                    recentPath = path;
                }
                if (recentPaths.contains(recentPath)) continue;
                CommandOutput.OutputHierarchy.OutputNode tableNode = new CommandOutput.OutputHierarchy.OutputNode();
                tableNode.addChild(new CommandOutput.OutputHierarchy.OutputNode(PATH_PARAM_NAME, (Object)recentPath));
                out.addNode(tableNode);
                recentPaths.add(recentPath);
            }
        }
    }

    public static boolean isStream(String pathName) throws IOException, CLIProcessingException {
        Path path;
        MapRFileSystem mfs = MapRCliUtil.getMapRFileSystem();
        if (!mfs.isTable(path = new Path(pathName))) {
            return false;
        }
        TableProperties tableProp = mfs.getTableProperties(path);
        return tableProp.getAttr().getIsMarlinTable();
    }

    public static String millisToDate(long millisSinceEpoch) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss.SSSZ a");
        return format.format(new Date(millisSinceEpoch));
    }

    private static boolean isLogCompactionEnabled(String pathName) throws IOException, CLIProcessingException {
        Path path;
        MapRFileSystem mfs = MapRCliUtil.getMapRFileSystem();
        if (!mfs.isTable(path = new Path(pathName))) {
            return false;
        }
        TableProperties tableProp = mfs.getTableProperties(path);
        return tableProp.getAttr().getMarlinAttr().getCompactionConfig() == Marlincommon.CompactionConfig.CompactionEnable || tableProp.getAttr().getMarlinAttr().getCompactionConfig() == Marlincommon.CompactionConfig.CompactionForceEnable;
    }
}

