package org.apache.hadoop.hdfs.qjournal.server;

import com.google.common.base.Charsets;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
import java.io.File;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StopWatch;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-hdfs-2.7.0-mapr-1707-beta/share/hadoop/hdfs/hadoop-hdfs-2.7.0-mapr-1707-beta-tests.jar:org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.class
  input_file:test-classes/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.class
 */
/* loaded from: input_file:hadoop-hdfs-2.7.0-mapr-1707-beta-tests.jar:org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.class */
public class TestJournalNode {
    private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(12345, "mycluster", "my-bp", 0);
    private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestJournalNode.class);
    private JournalNode jn;
    private Journal journal;
    private final Configuration conf = new Configuration();
    private IPCLoggerChannel ch;
    private String journalId;

    @Before
    public void setup() throws Exception {
        File file = new File(MiniDFSCluster.getBaseDirectory() + File.separator + "TestJournalNode");
        FileUtil.fullyDelete(file);
        this.conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, file.getAbsolutePath());
        this.conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0");
        this.jn = new JournalNode();
        this.jn.setConf(this.conf);
        this.jn.start();
        this.journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();
        this.journal = this.jn.getOrCreateJournal(this.journalId);
        this.journal.format(FAKE_NSINFO);
        this.ch = new IPCLoggerChannel(this.conf, FAKE_NSINFO, this.journalId, this.jn.getBoundIpcAddress());
    }

    @After
    public void teardown() throws Exception {
        this.jn.stop(0);
    }

    @Test(timeout = 100000)
    public void testJournal() throws Exception {
        MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(this.journal.getMetricsForTests().getName());
        MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics);
        MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics);
        MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics);
        IPCLoggerChannel iPCLoggerChannel = new IPCLoggerChannel(this.conf, FAKE_NSINFO, this.journalId, this.jn.getBoundIpcAddress());
        iPCLoggerChannel.newEpoch(1L).get();
        iPCLoggerChannel.setEpoch(1L);
        iPCLoggerChannel.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        iPCLoggerChannel.sendEdits(1L, 1L, 1, "hello".getBytes(Charsets.UTF_8)).get();
        MetricsRecordBuilder metrics2 = MetricsAsserts.getMetrics(this.journal.getMetricsForTests().getName());
        MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics2);
        MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics2);
        MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics2);
        iPCLoggerChannel.setCommittedTxId(100L);
        iPCLoggerChannel.sendEdits(1L, 2L, 1, "goodbye".getBytes(Charsets.UTF_8)).get();
        MetricsRecordBuilder metrics3 = MetricsAsserts.getMetrics(this.journal.getMetricsForTests().getName());
        MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics3);
        MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics3);
        MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics3);
    }

    @Test(timeout = 100000)
    public void testReturnsSegmentInfoAtEpochTransition() throws Exception {
        this.ch.newEpoch(1L).get();
        this.ch.setEpoch(1L);
        this.ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        this.ch.sendEdits(1L, 1L, 2, QJMTestUtil.createTxnData(1, 2)).get();
        QJournalProtocolProtos.NewEpochResponseProto newEpochResponseProto = this.ch.newEpoch(2L).get();
        this.ch.setEpoch(2L);
        Assert.assertEquals(1L, newEpochResponseProto.getLastSegmentTxId());
        this.ch.finalizeLogSegment(1L, 2L).get();
        QJournalProtocolProtos.NewEpochResponseProto newEpochResponseProto2 = this.ch.newEpoch(3L).get();
        this.ch.setEpoch(3L);
        Assert.assertEquals(1L, newEpochResponseProto2.getLastSegmentTxId());
        this.ch.startLogSegment(3L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        QJournalProtocolProtos.NewEpochResponseProto newEpochResponseProto3 = this.ch.newEpoch(4L).get();
        this.ch.setEpoch(4L);
        Assert.assertEquals(1L, newEpochResponseProto3.getLastSegmentTxId());
    }

    /* JADX WARN: Type inference failed for: r0v28, types: [byte[], byte[][]] */
    @Test(timeout = 100000)
    public void testHttpServer() throws Exception {
        String httpServerURI = this.jn.getHttpServerURI();
        String urlGet = DFSTestUtil.urlGet(new URL(httpServerURI + "/jmx"));
        Assert.assertTrue("Bad contents: " + urlGet, urlGet.contains("Hadoop:service=JournalNode,name=JvmMetrics"));
        byte[] createTxnData = QJMTestUtil.createTxnData(1, 3);
        IPCLoggerChannel iPCLoggerChannel = new IPCLoggerChannel(this.conf, FAKE_NSINFO, this.journalId, this.jn.getBoundIpcAddress());
        iPCLoggerChannel.newEpoch(1L).get();
        iPCLoggerChannel.setEpoch(1L);
        iPCLoggerChannel.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        iPCLoggerChannel.sendEdits(1L, 1L, 3, createTxnData).get();
        iPCLoggerChannel.finalizeLogSegment(1L, 3L).get();
        Assert.assertArrayEquals(Bytes.concat(new byte[]{Ints.toByteArray(HdfsConstants.NAMENODE_LAYOUT_VERSION), new byte[]{0, 0, 0, 0}, createTxnData}), DFSTestUtil.urlGetBytes(new URL(httpServerURI + "/getJournal?segmentTxId=1&jid=" + this.journalId)));
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(httpServerURI + "/getJournal?segmentTxId=12345&jid=" + this.journalId).openConnection();
        try {
            Assert.assertEquals(404L, httpURLConnection.getResponseCode());
            httpURLConnection.disconnect();
        } catch (Throwable th) {
            httpURLConnection.disconnect();
            throw th;
        }
    }

    @Test(timeout = 100000)
    public void testAcceptRecoveryBehavior() throws Exception {
        try {
            this.ch.prepareRecovery(1L).get();
            Assert.fail("Did not throw IllegalState when trying to run paxos without an epoch");
        } catch (ExecutionException e) {
            GenericTestUtils.assertExceptionContains("bad epoch", e);
        }
        this.ch.newEpoch(1L).get();
        this.ch.setEpoch(1L);
        QJournalProtocolProtos.PrepareRecoveryResponseProto prepareRecoveryResponseProto = this.ch.prepareRecovery(1L).get();
        System.err.println("Prep: " + prepareRecoveryResponseProto);
        Assert.assertFalse(prepareRecoveryResponseProto.hasAcceptedInEpoch());
        Assert.assertFalse(prepareRecoveryResponseProto.hasSegmentState());
        this.ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        this.ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get();
        QJournalProtocolProtos.PrepareRecoveryResponseProto prepareRecoveryResponseProto2 = this.ch.prepareRecovery(1L).get();
        System.err.println("Prep: " + prepareRecoveryResponseProto2);
        Assert.assertFalse(prepareRecoveryResponseProto2.hasAcceptedInEpoch());
        Assert.assertTrue(prepareRecoveryResponseProto2.hasSegmentState());
        this.ch.acceptRecovery(prepareRecoveryResponseProto2.getSegmentState(), new URL("file:///dev/null")).get();
        this.ch.newEpoch(2L);
        this.ch.setEpoch(2L);
        QJournalProtocolProtos.PrepareRecoveryResponseProto prepareRecoveryResponseProto3 = this.ch.prepareRecovery(1L).get();
        Assert.assertEquals(1L, prepareRecoveryResponseProto3.getAcceptedInEpoch());
        Assert.assertEquals(1L, prepareRecoveryResponseProto3.getSegmentState().getEndTxId());
        this.ch.setEpoch(1L);
        try {
            this.ch.prepareRecovery(1L).get();
            Assert.fail("prepare from earlier epoch not rejected");
        } catch (ExecutionException e2) {
            GenericTestUtils.assertExceptionContains("epoch 1 is less than the last promised epoch 2", e2);
        }
        try {
            this.ch.acceptRecovery(prepareRecoveryResponseProto3.getSegmentState(), new URL("file:///dev/null")).get();
            Assert.fail("accept from earlier epoch not rejected");
        } catch (ExecutionException e3) {
            GenericTestUtils.assertExceptionContains("epoch 1 is less than the last promised epoch 2", e3);
        }
    }

    @Test(timeout = 100000)
    public void testFailToStartWithBadConfig() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, "non-absolute-path");
        configuration.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
        assertJNFailsToStart(configuration, "should be an absolute path");
        File file = new File(TEST_BUILD_DATA, "testjournalnodefile");
        Assert.assertTrue(file.createNewFile());
        try {
            configuration.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, file.getAbsolutePath());
            assertJNFailsToStart(configuration, "Not a directory");
            file.delete();
            configuration.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, Shell.WINDOWS ? "\\\\cannotBeCreated" : "/proc/does-not-exist");
            assertJNFailsToStart(configuration, "Cannot create directory");
        } catch (Throwable th) {
            file.delete();
            throw th;
        }
    }

    private static void assertJNFailsToStart(Configuration configuration, String str) {
        try {
            JournalNode journalNode = new JournalNode();
            journalNode.setConf(configuration);
            journalNode.start();
        } catch (Exception e) {
            GenericTestUtils.assertExceptionContains(str, e);
        }
    }

    @Test(timeout = 100000)
    public void testPerformance() throws Exception {
        doPerfTest(8192, 1024);
    }

    private void doPerfTest(int i, int i2) throws Exception {
        byte[] bArr = new byte[i];
        this.ch.newEpoch(1L).get();
        this.ch.setEpoch(1L);
        this.ch.startLogSegment(1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION).get();
        StopWatch start = new StopWatch().start();
        for (int i3 = 1; i3 < i2; i3++) {
            this.ch.sendEdits(1L, i3, 1, bArr).get();
        }
        long now = start.now(TimeUnit.MILLISECONDS);
        System.err.println("Wrote " + i2 + " batches of " + i + " bytes in " + now + "ms");
        System.err.println("Time per batch: " + (((float) now) / i2) + "ms");
        System.err.println("Throughput: " + (((i2 * i) * 1000) / now) + " bytes/sec");
    }

    static {
        DefaultMetricsSystem.setMiniClusterMode(true);
    }
}
