/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sqoop.test.testcases;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import kafka.message.MessageAndMetadata;
import org.apache.sqoop.common.test.kafka.TestUtil;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MLink;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MToConfig;
import org.apache.sqoop.test.testcases.ConnectorTestCase;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

public class KafkaConnectorTestCase
extends ConnectorTestCase {
    private static TestUtil testUtil = TestUtil.getInstance();
    private static final String TOPIC = "mytopic";

    @BeforeClass(alwaysRun=true)
    public static void startKafka() throws IOException {
        testUtil.prepare();
    }

    @AfterClass(alwaysRun=true)
    public static void stopKafka() throws IOException {
        testUtil.tearDown();
    }

    protected void fillKafkaLinkConfig(MLink link) {
        MLinkConfig configs = link.getConnectorLinkConfig();
        configs.getStringInput("linkConfig.brokerList").setValue((Object)testUtil.getKafkaServerUrl());
        configs.getStringInput("linkConfig.zookeeperConnect").setValue((Object)testUtil.getZkUrl());
    }

    protected void fillKafkaToConfig(MJob job) {
        MToConfig toConfig = job.getToJobConfig();
        toConfig.getStringInput("toJobConfig.topic").setValue((Object)TOPIC);
        ArrayList<String> topics = new ArrayList<String>(1);
        topics.add(TOPIC);
        testUtil.initTopicList(topics);
    }

    protected void validateContent(String[] content) throws UnsupportedEncodingException {
        HashSet<String> inputSet = new HashSet<String>(Arrays.asList(content));
        HashSet<String> outputSet = new HashSet<String>();
        for (String str : content) {
            MessageAndMetadata fetchedMsg = testUtil.getNextMessageFromConsumer(TOPIC);
            outputSet.add(SqoopIDFUtils.toText((String)new String((byte[])fetchedMsg.message(), "UTF-8")));
        }
        Assert.assertEquals(inputSet, outputSet);
    }
}

