/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.kafka;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import kafka.message.MessageAndMetadata;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.sink.kafka.KafkaSink;
import org.apache.flume.sink.kafka.util.TestUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestKafkaSink {
    private static TestUtil testUtil = TestUtil.getInstance();

    @BeforeClass
    public static void setup() {
        testUtil.prepare();
        ArrayList<String> topics = new ArrayList<String>(3);
        topics.add("default-flume-topic");
        topics.add("static-topic");
        topics.add("custom-topic");
        testUtil.initTopicList(topics);
    }

    @AfterClass
    public static void tearDown() {
        testUtil.tearDown();
    }

    @Test
    public void testDefaultTopic() {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "default-topic-test";
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes());
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException ex) {
            // empty catch block
        }
        String fetchedMsg = new String((byte[])testUtil.getNextMessageFromConsumer("default-flume-topic").message());
        Assert.assertEquals((Object)msg, (Object)fetchedMsg);
    }

    @Test
    public void testStaticTopic() {
        Context context = this.prepareDefaultContext();
        context.put("topic", "static-topic");
        String msg = "static-topic-test";
        try {
            Sink.Status status = this.prepareAndSend(context, msg);
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException ex) {
            // empty catch block
        }
        String fetchedMsg = new String((byte[])testUtil.getNextMessageFromConsumer("static-topic").message());
        Assert.assertEquals((Object)msg, (Object)fetchedMsg);
    }

    @Test
    public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        String msg = "test-topic-and-key-from-header";
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("topic", "custom-topic");
        headers.put("key", "custom-key");
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes(), headers);
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        try {
            Sink.Status status = kafkaSink.process();
            if (status == Sink.Status.BACKOFF) {
                Assert.fail((String)"Error Occurred");
            }
        }
        catch (EventDeliveryException ex) {
            // empty catch block
        }
        MessageAndMetadata fetchedMsg = testUtil.getNextMessageFromConsumer("custom-topic");
        Assert.assertEquals((Object)msg, (Object)new String((byte[])fetchedMsg.message(), "UTF-8"));
        Assert.assertEquals((Object)"custom-key", (Object)new String((byte[])fetchedMsg.key(), "UTF-8"));
    }

    @Test
    public void testEmptyChannel() throws UnsupportedEncodingException, EventDeliveryException {
        KafkaSink kafkaSink = new KafkaSink();
        Context context = this.prepareDefaultContext();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        Sink.Status status = kafkaSink.process();
        if (status != Sink.Status.BACKOFF) {
            Assert.fail((String)"Error Occurred");
        }
        Assert.assertNull((Object)testUtil.getNextMessageFromConsumer("default-flume-topic"));
    }

    private Context prepareDefaultContext() {
        Context context = new Context();
        context.put("brokerList", testUtil.getKafkaServerUrl());
        context.put("kafka.request.required.acks", "1");
        context.put("kafka.producer.type", "sync");
        context.put("batchSize", "1");
        return context;
    }

    private Sink.Status prepareAndSend(Context context, String msg) throws EventDeliveryException {
        KafkaSink kafkaSink = new KafkaSink();
        Configurables.configure((Object)kafkaSink, (Context)context);
        MemoryChannel memoryChannel = new MemoryChannel();
        Configurables.configure((Object)memoryChannel, (Context)context);
        kafkaSink.setChannel((Channel)memoryChannel);
        kafkaSink.start();
        Transaction tx = memoryChannel.getTransaction();
        tx.begin();
        Event event = EventBuilder.withBody((byte[])msg.getBytes());
        memoryChannel.put(event);
        tx.commit();
        tx.close();
        return kafkaSink.process();
    }
}

