package org.apache.oozie.service;

import java.io.IOException;
import java.io.Writer;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.log4j.spi.LocationInfo;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.util.AuthUrlClient;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.SimpleTimestampedMessageParser;
import org.apache.oozie.util.TimestampedMessageParser;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.ZKUtils;

/* loaded from: input_file:WEB-INF/lib/oozie-core-5.2.1.201-eep-810.jar:org/apache/oozie/service/ZKXLogStreamingService.class */
public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable {
    private ZKUtils zk;
    private XLog log;

    @Override // org.apache.oozie.service.XLogStreamingService, org.apache.oozie.service.Service
    public void init(Services services) throws ServiceException {
        super.init(services);
        try {
            this.zk = ZKUtils.register(this);
            this.log = XLog.getLog(getClass());
        } catch (Exception e) {
            throw new ServiceException(ErrorCode.E1700, e.getMessage(), e);
        }
    }

    @Override // org.apache.oozie.service.XLogStreamingService, org.apache.oozie.service.Service
    public void destroy() {
        if (this.zk != null) {
            this.zk.unregister(this);
        }
        this.zk = null;
        super.destroy();
    }

    @Override // org.apache.oozie.service.XLogStreamingService, org.apache.oozie.util.Instrumentable
    public void instrument(Instrumentation instrumentation) {
        super.instrument(instrumentation);
    }

    @Override // org.apache.oozie.service.XLogStreamingService
    public void streamLog(XLogStreamer xLogStreamer, Date date, Date date2, Writer writer) throws IOException {
        if (!xLogStreamer.isLogEnabled()) {
            writer.write(StringEscapeUtils.escapeHtml4(xLogStreamer.getLogDisableMessage()));
        } else if (((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).isAllServerRequest(xLogStreamer.getRequestParam())) {
            collateLogs(xLogStreamer, date, date2, writer);
        } else {
            super.streamLog(xLogStreamer, date, date2, writer, false);
        }
    }

    private void collateLogs(XLogStreamer xLogStreamer, Date date, Date date2, Writer writer) throws IOException {
        ArrayList<String> arrayList = new ArrayList();
        try {
            List<ServiceInstance<Map>> allMetaData = this.zk.getAllMetaData();
            ArrayList<TimestampedMessageParser> arrayList2 = new ArrayList(allMetaData.size());
            try {
                Iterator<ServiceInstance<Map>> it = allMetaData.iterator();
                while (it.hasNext()) {
                    Map payload = it.next().getPayload();
                    String str = (String) payload.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
                    if (str.equals(this.zk.getZKId())) {
                        arrayList2.add(new TimestampedMessageParser(xLogStreamer.makeReader(date, date2), xLogStreamer.getXLogFilter()));
                    } else {
                        String str2 = (String) payload.get("OOZIE_URL");
                        String str3 = xLogStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB);
                        try {
                            String str4 = str2 + "/v2/job/" + str3 + LocationInfo.NA + RestConstants.JOB_SHOW_PARAM + "=" + xLogStreamer.getLogType() + "&" + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(xLogStreamer.getRequestParam());
                            String[] strArr = xLogStreamer.getRequestParam() != null ? xLogStreamer.getRequestParam().get(RestConstants.DO_AS_PARAM) : null;
                            arrayList2.add(new SimpleTimestampedMessageParser(AuthUrlClient.callServer((strArr == null || strArr.length <= 0 || strArr[0] == null || strArr[0].length() <= 0) ? str4 : str4.replace("&" + RestConstants.DO_AS_PARAM + "=" + URLEncoder.encode(strArr[0], StandardCharsets.UTF_8.name()), "")), xLogStreamer.getXLogFilter()));
                        } catch (IOException e) {
                            this.log.warn("Failed to retrieve logs for job [" + str3 + "] from Oozie server with ID [" + str + "] at [" + str2 + "]; log information may be incomplete", e);
                            arrayList.add(str);
                        }
                    }
                }
                if (!StringUtils.isEmpty(xLogStreamer.getXLogFilter().getTruncatedMessage())) {
                    writer.write(StringEscapeUtils.escapeHtml4(xLogStreamer.getXLogFilter().getTruncatedMessage()));
                }
                if (xLogStreamer.getXLogFilter().isDebugMode()) {
                    writer.write(StringEscapeUtils.escapeHtml4(xLogStreamer.getXLogFilter().getDebugMessage()));
                }
                if (!arrayList.isEmpty()) {
                    writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
                    for (String str5 : arrayList) {
                        writer.write("     ");
                        writer.write(str5);
                        writer.write("\n");
                    }
                    writer.write("\n");
                    writer.flush();
                }
                if (arrayList2.size() == 1) {
                    ((TimestampedMessageParser) arrayList2.get(0)).processRemaining(writer, xLogStreamer);
                } else {
                    TreeMap treeMap = new TreeMap();
                    for (TimestampedMessageParser timestampedMessageParser : arrayList2) {
                        if (timestampedMessageParser.increment()) {
                            treeMap.put(timestampedMessageParser.getLastTimestamp(), timestampedMessageParser);
                        }
                    }
                    while (treeMap.size() > 1) {
                        TimestampedMessageParser timestampedMessageParser2 = (TimestampedMessageParser) treeMap.pollFirstEntry().getValue();
                        writer.write(StringEscapeUtils.escapeHtml4(timestampedMessageParser2.getLastMessage()));
                        if (xLogStreamer.shouldFlushOutput(timestampedMessageParser2.getLastMessage().length())) {
                            writer.flush();
                        }
                        if (timestampedMessageParser2.increment()) {
                            treeMap.put(timestampedMessageParser2.getLastTimestamp(), timestampedMessageParser2);
                        }
                    }
                    if (treeMap.size() == 1) {
                        TimestampedMessageParser timestampedMessageParser3 = (TimestampedMessageParser) treeMap.values().iterator().next();
                        writer.write(StringEscapeUtils.escapeHtml4(timestampedMessageParser3.getLastMessage()));
                        timestampedMessageParser3.processRemaining(writer, xLogStreamer);
                    }
                }
            } finally {
                Iterator it2 = arrayList2.iterator();
                while (it2.hasNext()) {
                    ((TimestampedMessageParser) it2.next()).closeReader();
                }
            }
        } catch (Exception e2) {
            throw new IOException("Issue communicating with ZooKeeper: " + e2.getMessage(), e2);
        }
    }
}
