package org.apache.flume.sink.kafka.v09;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import kafka.message.MessageAndMetadata;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.sink.kafka.v09.util.TestUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flume/sink/kafka/v09/TestKafkaSink.class */
public class TestKafkaSink {
    private static TestUtil testUtil = TestUtil.getInstance();

    @BeforeClass
    public static void setup() {
        testUtil.prepare();
        ArrayList arrayList = new ArrayList(3);
        arrayList.add("default-flume-topic");
        arrayList.add(TestConstants.STATIC_TOPIC);
        arrayList.add(TestConstants.CUSTOM_TOPIC);
        testUtil.initTopicList(arrayList);
    }

    @AfterClass
    public static void tearDown() {
        testUtil.tearDown();
    }

    @Test
    public void testKafkaProperties() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = new Context();
        context.put("kafka.kafka.topic", "");
        context.put("kafka.producer.value.serializer", "override.default.serializer");
        context.put("kafka.producer.fake.property", "kafka.property.value");
        context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
        context.put("brokerList", "real-broker-list");
        Configurables.configure(kafkaSink, context);
        Properties kafkaProps = kafkaSink.getKafkaProps();
        Assert.assertEquals(kafkaProps.getProperty("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
        Assert.assertEquals(kafkaProps.getProperty("value.serializer"), "override.default.serializer");
        Assert.assertEquals(kafkaProps.getProperty("fake.property"), "kafka.property.value");
        Assert.assertEquals(kafkaProps.getProperty("bootstrap.servers"), "localhost:9092,localhost:9092");
    }

    @Test
    public void testDefaultTopic() {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody("default-topic-test".getBytes()));
        transaction.commit();
        transaction.close();
        try {
            if (kafkaSink.process() == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        Assert.assertEquals("default-topic-test", new String((byte[]) testUtil.getNextMessageFromConsumer("default-flume-topic").message()));
    }

    @Test
    public void testStaticTopic() {
        Context prepareDefaultContext = prepareDefaultContext();
        prepareDefaultContext.put("kafka.topic", TestConstants.STATIC_TOPIC);
        try {
            if (prepareAndSend(prepareDefaultContext, "static-topic-test") == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        Assert.assertEquals("static-topic-test", new String((byte[]) testUtil.getNextMessageFromConsumer(TestConstants.STATIC_TOPIC).message()));
    }

    @Test
    public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        HashMap hashMap = new HashMap();
        hashMap.put("topic", TestConstants.CUSTOM_TOPIC);
        hashMap.put("key", TestConstants.CUSTOM_KEY);
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody("test-topic-and-key-from-header".getBytes(), hashMap));
        transaction.commit();
        transaction.close();
        try {
            if (kafkaSink.process() == Sink.Status.BACKOFF) {
                Assert.fail("Error Occurred");
            }
        } catch (EventDeliveryException e) {
        }
        MessageAndMetadata nextMessageFromConsumer = testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
        Assert.assertEquals("test-topic-and-key-from-header", new String((byte[]) nextMessageFromConsumer.message(), "UTF-8"));
        Assert.assertEquals(TestConstants.CUSTOM_KEY, new String((byte[]) nextMessageFromConsumer.key(), "UTF-8"));
    }

    @Test
    public void testEmptyChannel() throws UnsupportedEncodingException, EventDeliveryException {
        KafkaSink kafkaSink = new KafkaSink();
        Context prepareDefaultContext = prepareDefaultContext();
        Configurables.configure(kafkaSink, prepareDefaultContext);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, prepareDefaultContext);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        if (kafkaSink.process() != Sink.Status.BACKOFF) {
            Assert.fail("Error Occurred");
        }
        Assert.assertNull(testUtil.getNextMessageFromConsumer("default-flume-topic"));
    }

    private Context prepareDefaultContext() {
        Context context = new Context();
        context.put("kafka.bootstrap.servers", testUtil.getKafkaServerUrl());
        context.put("kafka.flumeBatchSize", "1");
        return context;
    }

    private Sink.Status prepareAndSend(Context context, String str) throws EventDeliveryException {
        KafkaSink kafkaSink = new KafkaSink();
        Configurables.configure(kafkaSink, context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure(memoryChannel, context);
        kafkaSink.setChannel(memoryChannel);
        kafkaSink.start();
        Transaction transaction = memoryChannel.getTransaction();
        transaction.begin();
        memoryChannel.put(EventBuilder.withBody(str.getBytes()));
        transaction.commit();
        transaction.close();
        return kafkaSink.process();
    }
}
