/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Maps;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManagerOnDemand;
import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.test.EdgeManagerForTest;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestEdge {
    @Test(timeout=5000L)
    public void testOneToOneEdgeManager() {
        EdgeManagerPluginContext mockContext = (EdgeManagerPluginContext)Mockito.mock(EdgeManagerPluginContext.class);
        Mockito.when((Object)mockContext.getSourceVertexName()).thenReturn((Object)"Source");
        Mockito.when((Object)mockContext.getDestinationVertexName()).thenReturn((Object)"Destination");
        Mockito.when((Object)mockContext.getSourceVertexNumTasks()).thenReturn((Object)3);
        OneToOneEdgeManager manager = new OneToOneEdgeManager(mockContext);
        manager.initialize();
        HashMap destinationTaskAndInputIndices = Maps.newHashMap();
        DataMovementEvent event = DataMovementEvent.create((int)1, null);
        Mockito.when((Object)mockContext.getDestinationVertexNumTasks()).thenReturn((Object)4);
        try {
            manager.routeDataMovementEventToDestination(event, 1, 1, (Map)destinationTaskAndInputIndices);
            Assert.fail();
        }
        catch (IllegalStateException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("1-1 source and destination task counts must match"));
        }
        Mockito.when((Object)mockContext.getDestinationVertexNumTasks()).thenReturn((Object)3);
        manager.routeDataMovementEventToDestination(event, 1, 1, (Map)destinationTaskAndInputIndices);
        Assert.assertEquals((long)1L, (long)destinationTaskAndInputIndices.size());
        Assert.assertEquals((long)1L, (long)((Integer)destinationTaskAndInputIndices.entrySet().iterator().next().getKey()).intValue());
        Assert.assertEquals((long)0L, (long)((Integer)((List)destinationTaskAndInputIndices.entrySet().iterator().next().getValue()).get(0)).intValue());
    }

    @Test(timeout=5000L)
    public void testOneToOneEdgeManagerODR() {
        EdgeManagerPluginContext mockContext = (EdgeManagerPluginContext)Mockito.mock(EdgeManagerPluginContext.class);
        Mockito.when((Object)mockContext.getSourceVertexName()).thenReturn((Object)"Source");
        Mockito.when((Object)mockContext.getDestinationVertexName()).thenReturn((Object)"Destination");
        Mockito.when((Object)mockContext.getSourceVertexNumTasks()).thenReturn((Object)3);
        OneToOneEdgeManagerOnDemand manager = new OneToOneEdgeManagerOnDemand(mockContext);
        manager.initialize();
        HashMap destinationTaskAndInputIndices = Maps.newHashMap();
        DataMovementEvent event = DataMovementEvent.create((int)1, null);
        Mockito.when((Object)mockContext.getDestinationVertexNumTasks()).thenReturn((Object)4);
        try {
            manager.routeDataMovementEventToDestination(event, 1, 1, (Map)destinationTaskAndInputIndices);
            Assert.fail();
        }
        catch (IllegalStateException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("1-1 source and destination task counts must match"));
        }
        Mockito.when((Object)mockContext.getDestinationVertexNumTasks()).thenReturn((Object)3);
        manager.routeDataMovementEventToDestination(event, 1, 1, (Map)destinationTaskAndInputIndices);
        Assert.assertEquals((long)1L, (long)destinationTaskAndInputIndices.size());
        Assert.assertEquals((long)1L, (long)((Integer)destinationTaskAndInputIndices.entrySet().iterator().next().getKey()).intValue());
        Assert.assertEquals((long)0L, (long)((Integer)((List)destinationTaskAndInputIndices.entrySet().iterator().next().getValue()).get(0)).intValue());
    }

    @Test(timeout=5000L)
    public void testScatterGatherManager() {
        EdgeManagerPluginContext mockContext = (EdgeManagerPluginContext)Mockito.mock(EdgeManagerPluginContext.class);
        Mockito.when((Object)mockContext.getSourceVertexName()).thenReturn((Object)"Source");
        Mockito.when((Object)mockContext.getDestinationVertexName()).thenReturn((Object)"Destination");
        ScatterGatherEdgeManager manager = new ScatterGatherEdgeManager(mockContext);
        manager.initialize();
        Mockito.when((Object)mockContext.getDestinationVertexNumTasks()).thenReturn((Object)-1);
        try {
            manager.getNumSourceTaskPhysicalOutputs(0);
            Assert.fail();
        }
        catch (IllegalArgumentException e) {
            e.printStackTrace();
            Assert.assertTrue((boolean)e.getMessage().contains("ScatteGather edge manager must have destination vertex task parallelism specified"));
        }
        Mockito.when((Object)mockContext.getDestinationVertexNumTasks()).thenReturn((Object)0);
        manager.getNumSourceTaskPhysicalOutputs(0);
    }

    @Test(timeout=5000L)
    public void testCompositeEventHandling() throws TezException {
        EventHandler eventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        EdgeProperty edgeProp = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)((OutputDescriptor)Mockito.mock(OutputDescriptor.class)), (InputDescriptor)((InputDescriptor)Mockito.mock(InputDescriptor.class)));
        Edge edge = new Edge(edgeProp, eventHandler, (Configuration)new TezConfiguration());
        TezVertexID srcVertexID = this.createVertexID(1);
        TezVertexID destVertexID = this.createVertexID(2);
        LinkedHashMap<TezTaskID, Task> srcTasks = this.mockTasks(srcVertexID, 1);
        LinkedHashMap<TezTaskID, Task> destTasks = this.mockTasks(destVertexID, 5);
        TezTaskID srcTaskID = srcTasks.keySet().iterator().next();
        Vertex srcVertex = this.mockVertex("src", srcVertexID, srcTasks);
        Vertex destVertex = this.mockVertex("dest", destVertexID, destTasks);
        edge.setSourceVertex(srcVertex);
        edge.setDestinationVertex(destVertex);
        edge.initialize();
        TezTaskAttemptID srcTAID = this.createTAIDForTest(srcTaskID, 2);
        EventMetaData srcMeta = new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "consumerVertex", "producerVertex", srcTAID);
        CompositeDataMovementEvent cdmEvent = CompositeDataMovementEvent.create((int)0, (int)destTasks.size(), (ByteBuffer)ByteBuffer.wrap("bytes".getBytes()));
        cdmEvent.setVersion(2);
        TezEvent tezEvent = new TezEvent((Event)cdmEvent, srcMeta);
        edge.sendTezEventToDestinationTasks(tezEvent);
        this.verifyEvents(srcTAID, destTasks);
        this.resetTaskMocks(destTasks.values());
        for (int i = 0; i < destTasks.size(); ++i) {
            DataMovementEvent dmEvent = DataMovementEvent.create((int)i, (ByteBuffer)ByteBuffer.wrap("bytes".getBytes()));
            dmEvent.setVersion(2);
            tezEvent = new TezEvent((Event)dmEvent, srcMeta);
            edge.sendTezEventToDestinationTasks(tezEvent);
        }
        this.verifyEvents(srcTAID, destTasks);
    }

    private void verifyEvents(TezTaskAttemptID srcTAID, LinkedHashMap<TezTaskID, Task> destTasks) {
        int count = 0;
        for (Map.Entry<TezTaskID, Task> taskEntry : destTasks.entrySet()) {
            Task mockTask = taskEntry.getValue();
            ArgumentCaptor args = ArgumentCaptor.forClass(TezEvent.class);
            ((Task)Mockito.verify((Object)mockTask, (VerificationMode)Mockito.times((int)1))).registerTezEvent((TezEvent)args.capture());
            TezEvent capturedEvent = (TezEvent)args.getValue();
            DataMovementEvent dmEvent = (DataMovementEvent)capturedEvent.getEvent();
            Assert.assertEquals((long)srcTAID.getId(), (long)dmEvent.getVersion());
            Assert.assertEquals((long)count++, (long)dmEvent.getSourceIndex());
            Assert.assertEquals((long)srcTAID.getTaskID().getId(), (long)dmEvent.getTargetIndex());
            byte[] res = new byte[dmEvent.getUserPayload().limit() - dmEvent.getUserPayload().position()];
            dmEvent.getUserPayload().slice().get(res);
            Assert.assertTrue((boolean)Arrays.equals("bytes".getBytes(), res));
        }
    }

    private void resetTaskMocks(Collection<Task> tasks) {
        for (Task task : tasks) {
            TezTaskID taskID = task.getTaskID();
            Mockito.reset((Object[])new Task[]{task});
            ((Task)Mockito.doReturn((Object)taskID).when((Object)task)).getTaskID();
        }
    }

    private LinkedHashMap<TezTaskID, Task> mockTasks(TezVertexID vertexID, int numTasks) {
        LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>();
        for (int i = 0; i < numTasks; ++i) {
            Task task = (Task)Mockito.mock(Task.class);
            TezTaskID taskID = TezTaskID.getInstance((TezVertexID)vertexID, (int)i);
            ((Task)Mockito.doReturn((Object)taskID).when((Object)task)).getTaskID();
            tasks.put(taskID, task);
        }
        return tasks;
    }

    private Vertex mockVertex(String name, TezVertexID vertexID, LinkedHashMap<TezTaskID, Task> tasks) {
        Vertex vertex = (Vertex)Mockito.mock(Vertex.class);
        ((Vertex)Mockito.doReturn((Object)vertexID).when((Object)vertex)).getVertexId();
        ((Vertex)Mockito.doReturn((Object)name).when((Object)vertex)).getName();
        ((Vertex)Mockito.doReturn(tasks).when((Object)vertex)).getTasks();
        ((Vertex)Mockito.doReturn((Object)tasks.size()).when((Object)vertex)).getTotalTasks();
        for (Map.Entry<TezTaskID, Task> entry : tasks.entrySet()) {
            ((Vertex)Mockito.doReturn((Object)entry.getValue()).when((Object)vertex)).getTask((TezTaskID)Mockito.eq((Object)entry.getKey()));
            ((Vertex)Mockito.doReturn((Object)entry.getValue()).when((Object)vertex)).getTask(Mockito.eq((int)entry.getKey().getId()));
        }
        return vertex;
    }

    private TezVertexID createVertexID(int id) {
        TezDAGID dagID = TezDAGID.getInstance((String)"1000", (int)1, (int)1);
        TezVertexID vertexID = TezVertexID.getInstance((TezDAGID)dagID, (int)id);
        return vertexID;
    }

    private TezTaskAttemptID createTAIDForTest(TezTaskID taskID, int taId) {
        TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance((TezTaskID)taskID, (int)taId);
        return taskAttemptID;
    }

    @Test(timeout=5000L)
    public void testInvalidPhysicalInputCount() throws Exception {
        EventHandler mockEventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        Edge edge = new Edge(EdgeProperty.create((EdgeManagerPluginDescriptor)((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CustomEdgeManagerWithInvalidReturnValue.class.getName()).setUserPayload(new CustomEdgeManagerWithInvalidReturnValue.EdgeManagerConfig(-1, 1, 1, 1).toUserPayload())), (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)""), (InputDescriptor)InputDescriptor.create((String)"")), mockEventHandler, (Configuration)new TezConfiguration());
        TezVertexID v1Id = this.createVertexID(1);
        TezVertexID v2Id = this.createVertexID(2);
        edge.setSourceVertex(this.mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
        edge.setDestinationVertex(this.mockVertex("v2", v2Id, new LinkedHashMap<TezTaskID, Task>()));
        edge.initialize();
        try {
            edge.getDestinationSpec(0);
            Assert.fail();
        }
        catch (AMUserCodeException e) {
            e.printStackTrace();
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("PhysicalInputCount should not be negative"));
        }
    }

    @Test(timeout=5000L)
    public void testInvalidPhysicalOutputCount() throws Exception {
        EventHandler mockEventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        Edge edge = new Edge(EdgeProperty.create((EdgeManagerPluginDescriptor)((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CustomEdgeManagerWithInvalidReturnValue.class.getName()).setUserPayload(new CustomEdgeManagerWithInvalidReturnValue.EdgeManagerConfig(1, -1, 1, 1).toUserPayload())), (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)""), (InputDescriptor)InputDescriptor.create((String)"")), mockEventHandler, (Configuration)new TezConfiguration());
        TezVertexID v1Id = this.createVertexID(1);
        TezVertexID v2Id = this.createVertexID(2);
        edge.setSourceVertex(this.mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
        edge.setDestinationVertex(this.mockVertex("v2", v2Id, new LinkedHashMap<TezTaskID, Task>()));
        edge.initialize();
        try {
            edge.getSourceSpec(0);
            Assert.fail();
        }
        catch (AMUserCodeException e) {
            e.printStackTrace();
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("PhysicalOutputCount should not be negative"));
        }
    }

    @Test(timeout=5000L)
    public void testInvalidConsumerNumber() throws Exception {
        EventHandler mockEventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        Edge edge = new Edge(EdgeProperty.create((EdgeManagerPluginDescriptor)((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CustomEdgeManagerWithInvalidReturnValue.class.getName()).setUserPayload(new CustomEdgeManagerWithInvalidReturnValue.EdgeManagerConfig(1, 1, 0, 1).toUserPayload())), (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)""), (InputDescriptor)InputDescriptor.create((String)"")), mockEventHandler, (Configuration)new TezConfiguration());
        TezVertexID v1Id = this.createVertexID(1);
        TezVertexID v2Id = this.createVertexID(2);
        edge.setSourceVertex(this.mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
        edge.setDestinationVertex(this.mockVertex("v2", v2Id, new LinkedHashMap<TezTaskID, Task>()));
        edge.initialize();
        try {
            TezEvent ireEvent = new TezEvent((Event)InputReadErrorEvent.create((String)"diag", (int)0, (int)1), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "v2", "v1", TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v2Id, (int)1), (int)1)));
            edge.sendTezEventToSourceTasks(ireEvent);
            Assert.fail();
        }
        catch (AMUserCodeException e) {
            e.printStackTrace();
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("ConsumerTaskNum must be positive"));
        }
    }

    @Test(timeout=5000L)
    public void testInvalidSourceTaskIndex() throws Exception {
        EventHandler mockEventHandler = (EventHandler)Mockito.mock(EventHandler.class);
        Edge edge = new Edge(EdgeProperty.create((EdgeManagerPluginDescriptor)((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CustomEdgeManagerWithInvalidReturnValue.class.getName()).setUserPayload(new CustomEdgeManagerWithInvalidReturnValue.EdgeManagerConfig(1, 1, 1, -1).toUserPayload())), (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)""), (InputDescriptor)InputDescriptor.create((String)"")), mockEventHandler, (Configuration)new TezConfiguration());
        TezVertexID v1Id = this.createVertexID(1);
        TezVertexID v2Id = this.createVertexID(2);
        edge.setSourceVertex(this.mockVertex("v1", v1Id, new LinkedHashMap<TezTaskID, Task>()));
        edge.setDestinationVertex(this.mockVertex("v2", v2Id, new LinkedHashMap<TezTaskID, Task>()));
        edge.initialize();
        try {
            TezEvent ireEvent = new TezEvent((Event)InputReadErrorEvent.create((String)"diag", (int)0, (int)1), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "v2", "v1", TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v2Id, (int)1), (int)1)));
            edge.sendTezEventToSourceTasks(ireEvent);
            Assert.fail();
        }
        catch (AMUserCodeException e) {
            e.printStackTrace();
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("SourceTaskIndex should not be negative"));
        }
    }

    @Test(timeout=5000L)
    public void testEdgeManagerPluginCtxGetVertexGroupName() throws TezException {
        EdgeManagerPluginDescriptor edgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        EdgeProperty edgeProp = EdgeProperty.create((EdgeManagerPluginDescriptor)edgeManagerDescriptor, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"));
        Edge edge = new Edge(edgeProp, null, null);
        Vertex srcV = (Vertex)Mockito.mock(Vertex.class);
        Vertex destV = (Vertex)Mockito.mock(Vertex.class);
        String srcName = "srcV";
        String destName = "destV";
        Mockito.when((Object)srcV.getName()).thenReturn((Object)srcName);
        Mockito.when((Object)destV.getName()).thenReturn((Object)destName);
        edge.setSourceVertex(srcV);
        edge.setDestinationVertex(destV);
        Assert.assertNull((Object)edge.edgeManager.getContext().getVertexGroupName());
        String group = "group";
        Mockito.when((Object)destV.getGroupInputSpecList()).thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList("v1", "v3"), null)));
        Assert.assertNull((Object)edge.edgeManager.getContext().getVertexGroupName());
        Mockito.when((Object)destV.getGroupInputSpecList()).thenReturn(Arrays.asList(new GroupInputSpec(group, Arrays.asList(srcName, "v3"), null)));
        Assert.assertEquals((Object)group, (Object)edge.edgeManager.getContext().getVertexGroupName());
    }

    public static class CustomEdgeManagerWithInvalidReturnValue
    extends EdgeManagerPlugin {
        EdgeManagerConfig emConf;

        public CustomEdgeManagerWithInvalidReturnValue(EdgeManagerPluginContext context) {
            super(context);
        }

        public void initialize() throws Exception {
            this.emConf = EdgeManagerConfig.fromUserPayload(this.getContext().getUserPayload());
        }

        public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
            return this.emConf.physicalInput;
        }

        public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
            return this.emConf.physicalOutput;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
        }

        public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
        }

        public int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception {
            return this.emConf.consumerNumber;
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
            return this.emConf.sourceTaskIndex;
        }

        public static class EdgeManagerConfig
        implements Writable {
            int physicalInput = 1;
            int physicalOutput = 1;
            int consumerNumber = 1;
            int sourceTaskIndex = 1;

            public EdgeManagerConfig() {
            }

            public EdgeManagerConfig(int physicalInput, int physicalOutput, int consumerNumber, int sourceTaskIndex) {
                this.physicalInput = physicalInput;
                this.physicalOutput = physicalOutput;
                this.consumerNumber = consumerNumber;
                this.sourceTaskIndex = sourceTaskIndex;
            }

            public UserPayload toUserPayload() throws IOException {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream out = new DataOutputStream(bos);
                this.write(out);
                return UserPayload.create((ByteBuffer)ByteBuffer.wrap(bos.toByteArray()));
            }

            public static EdgeManagerConfig fromUserPayload(UserPayload payload) throws IOException {
                EdgeManagerConfig emConf = new EdgeManagerConfig();
                DataInputByteBuffer in = new DataInputByteBuffer();
                in.reset(new ByteBuffer[]{payload.getPayload()});
                emConf.readFields((DataInput)in);
                return emConf;
            }

            public void write(DataOutput out) throws IOException {
                out.writeInt(this.physicalInput);
                out.writeInt(this.physicalOutput);
                out.writeInt(this.consumerNumber);
                out.writeInt(this.sourceTaskIndex);
            }

            public void readFields(DataInput in) throws IOException {
                this.physicalInput = in.readInt();
                this.physicalOutput = in.readInt();
                this.consumerNumber = in.readInt();
                this.sourceTaskIndex = in.readInt();
            }
        }
    }
}

