/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.typesafe.scalalogging.Logger;
import java.io.PrintStream;
import java.io.Serializable;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.tools.ConsoleConsumer;
import kafka.utils.Exit$;
import kafka.utils.Implicits;
import kafka.utils.Logging;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.CommandLineUtils;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

public final class ConsoleConsumer$
implements Logging {
    public static final ConsoleConsumer$ MODULE$ = new ConsoleConsumer$();
    private static int messageCount = 0;
    private static final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private static Logger logger;
    private static String logIdent;
    private static volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!bitmap$0) {
                logger = Logging.logger$(this);
                bitmap$0 = true;
            }
        }
        return logger;
    }

    @Override
    public Logger logger() {
        if (!bitmap$0) {
            return this.logger$lzycompute();
        }
        return logger;
    }

    @Override
    public String logIdent() {
        return logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        logIdent = x$1;
    }

    public int messageCount() {
        return messageCount;
    }

    public void messageCount_$eq(int x$1) {
        messageCount = x$1;
    }

    private CountDownLatch shutdownLatch() {
        return shutdownLatch;
    }

    public void main(String[] args) {
        ConsoleConsumer.ConsumerConfig conf = new ConsoleConsumer.ConsumerConfig(args);
        try {
            this.run(conf);
            return;
        }
        catch (AuthenticationException e) {
            if (this.logger().underlying().isErrorEnabled()) {
                String msgWithLogIdent_msg = "Authentication failed: terminating consumer process";
                Object var5_4 = null;
                this.logger().underlying().error(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg), (Throwable)e);
            }
            throw Exit$.MODULE$.exit(1, (Option<String>)None$.MODULE$);
        }
        catch (Throwable e) {
            if (this.logger().underlying().isErrorEnabled()) {
                String msgWithLogIdent_msg = "Unknown error when running consumer: ";
                Object var6_6 = null;
                this.logger().underlying().error(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg), e);
            }
            throw Exit$.MODULE$.exit(1, (Option<String>)None$.MODULE$);
        }
    }

    public void run(ConsoleConsumer.ConsumerConfig conf) {
        long timeoutMs = conf.timeoutMs() >= 0 ? (long)conf.timeoutMs() : Long.MAX_VALUE;
        KafkaConsumer consumer = new KafkaConsumer(this.consumerProps(conf), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
        ConsoleConsumer.ConsumerWrapper consumerWrapper = conf.partitionArg().isDefined() ? new ConsoleConsumer.ConsumerWrapper((Option<String>)Option$.MODULE$.apply((Object)conf.topicArg()), conf.partitionArg(), (Option<Object>)Option$.MODULE$.apply((Object)BoxesRunTime.boxToLong((long)conf.offsetArg())), (Option<String>)None$.MODULE$, (Consumer<byte[], byte[]>)consumer, timeoutMs, Time.SYSTEM) : new ConsoleConsumer.ConsumerWrapper((Option<String>)Option$.MODULE$.apply((Object)conf.topicArg()), (Option<Object>)None$.MODULE$, (Option<Object>)None$.MODULE$, (Option<String>)Option$.MODULE$.apply((Object)conf.includedTopicsArg()), (Consumer<byte[], byte[]>)consumer, timeoutMs, Time.SYSTEM);
        this.addShutdownHook(consumerWrapper, conf);
        try {
            this.process(Predef$.MODULE$.int2Integer(conf.maxMessages()), conf.formatter(), consumerWrapper, System.out, conf.skipMessageOnError());
        }
        finally {
            consumerWrapper.cleanup();
            conf.formatter().close();
            this.reportRecordCount();
            this.shutdownLatch().countDown();
        }
    }

    public void addShutdownHook(ConsoleConsumer.ConsumerWrapper consumer, ConsoleConsumer.ConsumerConfig conf) {
        Exit.addShutdownHook((String)"consumer-shutdown-hook", () -> Exit$.$anonfun$addShutdownHook$1((Function0)(JFunction0.mcV.sp & Serializable)() -> {
            consumer.wakeup();
            MODULE$.shutdownLatch().await();
            if (conf.enableSystestEventsLogging()) {
                System.out.println("shutdown_complete");
                return;
            }
        }));
    }

    public void process(Integer maxMessages, MessageFormatter formatter, ConsoleConsumer.ConsumerWrapper consumer, PrintStream output, boolean skipMessageOnError) {
        while (this.messageCount() < Predef$.MODULE$.Integer2int(maxMessages) || BoxesRunTime.equalsNumObject((Number)maxMessages, (Object)BoxesRunTime.boxToInteger((int)-1))) {
            ConsumerRecord<byte[], byte[]> consumerRecord;
            try {
                consumerRecord = consumer.receive();
            }
            catch (WakeupException wakeupException) {
                if (this.logger().underlying().isTraceEnabled()) {
                    String msgWithLogIdent_msg = "Caught WakeupException because consumer is shutdown, ignore and terminate.";
                    Object var9_9 = null;
                    this.logger().underlying().trace(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg));
                    return;
                }
                return;
            }
            catch (Throwable e) {
                if (this.logger().underlying().isErrorEnabled()) {
                    String msgWithLogIdent_msg = "Error processing message, terminating consumer process: ";
                    Object var10_11 = null;
                    this.logger().underlying().error(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg), e);
                    return;
                }
                return;
            }
            ConsumerRecord<byte[], byte[]> msg = consumerRecord;
            this.messageCount_$eq(this.messageCount() + 1);
            try {
                formatter.writeTo(new ConsumerRecord(msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(), 0, 0, msg.key(), msg.value(), msg.headers(), Optional.empty()), output);
            }
            catch (Throwable e) {
                if (skipMessageOnError) {
                    if (this.logger().underlying().isErrorEnabled()) {
                        String msgWithLogIdent_msg = "Error processing message, skipping this message: ";
                        Object var11_8 = null;
                        this.logger().underlying().error(Logging.msgWithLogIdent$(this, msgWithLogIdent_msg), e);
                    }
                }
                throw e;
            }
            if (!this.checkErr(output, formatter)) continue;
            return;
        }
    }

    public void reportRecordCount() {
        System.err.println(new StringBuilder(30).append("Processed a total of ").append(this.messageCount()).append(" messages").toString());
    }

    public boolean checkErr(PrintStream output, MessageFormatter formatter) {
        boolean gotError = output.checkError();
        if (gotError) {
            System.err.println("Unable to write to standard out, closing consumer.");
        }
        return gotError;
    }

    public Properties consumerProps(ConsoleConsumer.ConsumerConfig config) {
        Properties props = new Properties();
        new Implicits.PropertiesOps(props).$plus$plus$eq(config.consumerProps());
        new Implicits.PropertiesOps(props).$plus$plus$eq(config.extraConsumerProps());
        this.setAutoOffsetResetValue(config, props);
        props.setProperty("use.brokers", Boolean.toString(config.useBrokers()));
        if (config.options.has(config.bootstrapServerOpt())) {
            props.put("bootstrap.servers", config.bootstrapServer());
        }
        if (props.getProperty("client.id") == null) {
            props.put("client.id", "console-consumer");
        }
        CommandLineUtils.maybeMergeOptions((Properties)props, (String)"isolation.level", (OptionSet)config.options, config.isolationLevelOpt());
        return props;
    }

    public void setAutoOffsetResetValue(ConsoleConsumer.ConsumerConfig config, Properties props) {
        String string = "latest";
        String string2 = "earliest";
        if (props.containsKey("auto.offset.reset")) {
            String autoResetOption = props.getProperty("auto.offset.reset");
            if (config.options.has((OptionSpec)config.resetBeginningOpt()) && !string2.equals(autoResetOption)) {
                System.err.println(new StringBuilder(96).append("Can't simultaneously specify --from-beginning and 'auto.offset.reset=").append(autoResetOption).append("', ").append("please remove one option").toString());
                throw Exit$.MODULE$.exit(1, (Option<String>)None$.MODULE$);
            }
            return;
        }
        String autoResetOption = config.options.has((OptionSpec)config.resetBeginningOpt()) ? string2 : string;
        props.put("auto.offset.reset", autoResetOption);
    }

    public static final /* synthetic */ String $anonfun$main$1() {
        return "Authentication failed: terminating consumer process";
    }

    public static final /* synthetic */ AuthenticationException $anonfun$main$2(AuthenticationException e$1) {
        return e$1;
    }

    public static final /* synthetic */ String $anonfun$main$3() {
        return "Unknown error when running consumer: ";
    }

    public static final /* synthetic */ Throwable $anonfun$main$4(Throwable e$2) {
        return e$2;
    }

    public static final /* synthetic */ String $anonfun$process$1() {
        return "Caught WakeupException because consumer is shutdown, ignore and terminate.";
    }

    public static final /* synthetic */ String $anonfun$process$2() {
        return "Error processing message, terminating consumer process: ";
    }

    public static final /* synthetic */ Throwable $anonfun$process$3(Throwable e$3) {
        return e$3;
    }

    public static final /* synthetic */ String $anonfun$process$4() {
        return "Error processing message, skipping this message: ";
    }

    public static final /* synthetic */ Throwable $anonfun$process$5(Throwable e$4) {
        return e$4;
    }

    private ConsoleConsumer$() {
    }
}

