package org.apache.oozie.service;

import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import jodd.util.StringPool;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.util.AuthUrlClient;
import org.apache.oozie.util.Instrumentable;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.SimpleTimestampedMessageParser;
import org.apache.oozie.util.TimestampedMessageParser;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.ZKUtils;

/* loaded from: input_file:WEB-INF/lib/oozie-core-4.2.0-mapr-1710-r1.jar:org/apache/oozie/service/ZKXLogStreamingService.class */
public class ZKXLogStreamingService extends XLogStreamingService implements Service, Instrumentable {
    private ZKUtils zk;
    private XLog log;

    @Override // org.apache.oozie.service.XLogStreamingService, org.apache.oozie.service.Service
    public void init(Services services) throws ServiceException {
        super.init(services);
        try {
            this.zk = ZKUtils.register(this);
            this.log = XLog.getLog(getClass());
        } catch (Exception e) {
            throw new ServiceException(ErrorCode.E1700, e.getMessage(), e);
        }
    }

    @Override // org.apache.oozie.service.XLogStreamingService, org.apache.oozie.service.Service
    public void destroy() {
        if (this.zk != null) {
            this.zk.unregister(this);
        }
        this.zk = null;
        super.destroy();
    }

    @Override // org.apache.oozie.service.XLogStreamingService, org.apache.oozie.util.Instrumentable
    public void instrument(Instrumentation instrumentation) {
        super.instrument(instrumentation);
    }

    @Override // org.apache.oozie.service.XLogStreamingService
    public void streamLog(XLogFilter xLogFilter, Date date, Date date2, Writer writer, Map<String, String[]> map) throws IOException {
        XLogService xLogService = (XLogService) Services.get().get(XLogService.class);
        if (!xLogService.getLogOverWS()) {
            writer.write("Log streaming disabled!!");
        } else if (((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).isAllServerRequest(map)) {
            collateLogs(xLogFilter, date, date2, writer, map, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), xLogService.getOozieLogRotation(), "log");
        } else {
            new XLogStreamer(xLogFilter, xLogService.getOozieLogPath(), xLogService.getOozieLogName(), xLogService.getOozieLogRotation()).streamLog(writer, date, date2, this.bufferLen);
        }
    }

    @Override // org.apache.oozie.service.XLogStreamingService
    public void streamErrorLog(XLogFilter xLogFilter, Date date, Date date2, Writer writer, Map<String, String[]> map) throws IOException {
        XLogService xLogService = (XLogService) Services.get().get(XLogService.class);
        if (!xLogService.isErrorLogEnabled()) {
            writer.write("Error Log streaming disabled!!");
        } else if (((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).isAllServerRequest(map)) {
            collateLogs(xLogFilter, date, date2, writer, map, xLogService.getOozieLogPath(), xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation(), "errorlog");
        } else {
            new XLogStreamer(xLogFilter, xLogService.getOozieErrorLogPath(), xLogService.getOozieErrorLogName(), xLogService.getOozieErrorLogRotation()).streamLog(writer, date, date2, this.bufferLen);
        }
    }

    @Override // org.apache.oozie.service.XLogStreamingService
    public void streamAuditLog(XLogFilter xLogFilter, Date date, Date date2, Writer writer, Map<String, String[]> map) throws IOException {
        XLogService xLogService = (XLogService) Services.get().get(XLogService.class);
        if (!xLogService.isAuditLogEnabled()) {
            writer.write("Audit Log streaming disabled!!");
        } else if (((JobsConcurrencyService) Services.get().get(JobsConcurrencyService.class)).isAllServerRequest(map)) {
            collateLogs(xLogFilter, date, date2, writer, map, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation(), "auditlog");
        } else {
            new XLogStreamer(xLogFilter, xLogService.getOozieAuditLogPath(), xLogService.getOozieAuditLogName(), xLogService.getOozieAuditLogRotation()).streamLog(writer, date, date2, this.bufferLen);
        }
    }

    private void collateLogs(XLogFilter xLogFilter, Date date, Date date2, Writer writer, Map<String, String[]> map, String str, String str2, int i, String str3) throws IOException {
        ArrayList<String> arrayList = new ArrayList();
        try {
            List<ServiceInstance<Map>> allMetaData = this.zk.getAllMetaData();
            ArrayList<TimestampedMessageParser> arrayList2 = new ArrayList(allMetaData.size());
            try {
                Iterator<ServiceInstance<Map>> it = allMetaData.iterator();
                while (it.hasNext()) {
                    Map payload = it.next().getPayload();
                    String str4 = (String) payload.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
                    if (str4.equals(this.zk.getZKId())) {
                        arrayList2.add(new TimestampedMessageParser(new XLogStreamer(xLogFilter, str, str2, i).makeReader(date, date2), xLogFilter));
                    } else {
                        String str5 = (String) payload.get("OOZIE_URL");
                        String str6 = xLogFilter.getFilterParams().get(DagXLogInfoService.JOB);
                        try {
                            arrayList2.add(new SimpleTimestampedMessageParser(AuthUrlClient.callServer(str5 + "/v2/job/" + str6 + "?" + RestConstants.JOB_SHOW_PARAM + "=" + str3 + StringPool.AMPERSAND + RestConstants.ALL_SERVER_REQUEST + "=false" + AuthUrlClient.getQueryParamString(map)), xLogFilter));
                        } catch (IOException e) {
                            this.log.warn("Failed to retrieve logs for job [" + str6 + "] from Oozie server with ID [" + str4 + "] at [" + str5 + "]; log information may be incomplete", e);
                            arrayList.add(str4);
                        }
                    }
                }
                if (xLogFilter.isDebugMode()) {
                    writer.write(xLogFilter.getDebugMessage());
                }
                if (!arrayList.isEmpty()) {
                    writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
                    for (String str7 : arrayList) {
                        writer.write("     ");
                        writer.write(str7);
                        writer.write("\n");
                    }
                    writer.write("\n");
                    writer.flush();
                }
                if (arrayList2.size() == 1) {
                    ((TimestampedMessageParser) arrayList2.get(0)).processRemaining(writer, this.bufferLen);
                } else {
                    TreeMap treeMap = new TreeMap();
                    for (TimestampedMessageParser timestampedMessageParser : arrayList2) {
                        if (timestampedMessageParser.increment()) {
                            treeMap.put(timestampedMessageParser.getLastTimestamp(), timestampedMessageParser);
                        }
                    }
                    int i2 = 0;
                    while (treeMap.size() > 1) {
                        TimestampedMessageParser timestampedMessageParser2 = (TimestampedMessageParser) treeMap.pollFirstEntry().getValue();
                        writer.write(timestampedMessageParser2.getLastMessage());
                        i2 = timestampedMessageParser2.getLastMessage().length();
                        if (i2 > this.bufferLen) {
                            writer.flush();
                            i2 = 0;
                        }
                        if (timestampedMessageParser2.increment()) {
                            treeMap.put(timestampedMessageParser2.getLastTimestamp(), timestampedMessageParser2);
                        }
                    }
                    if (treeMap.size() == 1) {
                        TimestampedMessageParser timestampedMessageParser3 = (TimestampedMessageParser) treeMap.values().iterator().next();
                        writer.write(timestampedMessageParser3.getLastMessage());
                        timestampedMessageParser3.processRemaining(writer, this.bufferLen, i2 + timestampedMessageParser3.getLastMessage().length());
                    }
                }
            } finally {
                Iterator it2 = arrayList2.iterator();
                while (it2.hasNext()) {
                    ((TimestampedMessageParser) it2.next()).closeReader();
                }
                writer.flush();
            }
        } catch (Exception e2) {
            throw new IOException("Issue communicating with ZooKeeper: " + e2.getMessage(), e2);
        }
    }
}
