/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.stream.sql.AsyncLookupJoinITCase$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils$TestAddWithOpen$;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils$TestExceptionThrown$;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils$TestMod$;
import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils$TestWrapperUdf$;
import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\t]e\u0001\u0002\u00180\u0001\u0001C\u0001b\u0012\u0001\u0003\u0002\u0003\u0006I\u0001\u0013\u0005\tA\u0002\u0011\t\u0011)A\u0005C\"Aq\r\u0001B\u0001B\u0003%\u0001\u000e\u0003\u0005x\u0001\t\u0005\t\u0015!\u0003b\u0011\u0015A\b\u0001\"\u0001z\u0011%\t\t\u0001\u0001b\u0001\n\u0003\t\u0019\u0001\u0003\u0005\u0002\"\u0001\u0001\u000b\u0011BA\u0003\u0011%\t\u0019\u0003\u0001b\u0001\n\u0003\t\u0019\u0001\u0003\u0005\u0002&\u0001\u0001\u000b\u0011BA\u0003\u0011\u001d\t9\u0003\u0001C!\u0003SAq!!\u0012\u0001\t\u0003\nI\u0003C\u0004\u0002P\u0001!I!!\u0015\t\u0013\u0005\r\u0005!%A\u0005\n\u0005\u0015\u0005bBAN\u0001\u0011%\u0011Q\u0014\u0005\b\u0003O\u0003A\u0011BAU\u0011\u001d\ty\u000b\u0001C\u0001\u0003SAq!!/\u0001\t\u0003\tI\u0003C\u0004\u0002>\u0002!\t!!\u000b\t\u000f\u0005\u0005\u0007\u0001\"\u0001\u0002*!9\u0011Q\u0019\u0001\u0005\u0002\u0005%\u0002bBAe\u0001\u0011\u0005\u0011\u0011\u0006\u0005\b\u0003\u001b\u0004A\u0011AA\u0015\u0011\u001d\t\t\u000e\u0001C\u0001\u0003SAq!!6\u0001\t\u0003\tI\u0003C\u0004\u0002Z\u0002!\t!!\u000b\t\u000f\u0005u\u0007\u0001\"\u0001\u0002*!9\u0011\u0011\u001d\u0001\u0005\u0002\u0005%\u0002bBAs\u0001\u0011\u0005\u0011\u0011\u0006\u0005\b\u0003S\u0004A\u0011AAv\u0011\u001d\u0011\t\u0001\u0001C\u0001\u0005\u0007AqAa\u0005\u0001\t\u0003\tI\u0003C\u0004\u0003\u0018\u0001!\t!!\u000b\t\u000f\tm\u0001\u0001\"\u0001\u0002*\u001d9!\u0011J\u0018\t\u0002\t-cA\u0002\u00180\u0011\u0003\u0011i\u0005\u0003\u0004yG\u0011\u0005!Q\u000b\u0005\n\u0005/\u001a#\u0019!C\u0001\u00053B\u0001Ba\u0018$A\u0003%!1\f\u0005\n\u0005C\u001a#\u0019!C\u0001\u00053B\u0001Ba\u0019$A\u0003%!1\f\u0005\n\u0005K\u001a#\u0019!C\u0001\u00053B\u0001Ba\u001a$A\u0003%!1\f\u0005\n\u0005S\u001a#\u0019!C\u0001\u00053B\u0001Ba\u001b$A\u0003%!1\f\u0005\b\u0005[\u001aC\u0011\u0001B8\u0005U\t5/\u001f8d\u0019>|7.\u001e9K_&t\u0017\nV\"bg\u0016T!\u0001M\u0019\u0002\u0007M\fHN\u0003\u00023g\u000511\u000f\u001e:fC6T!\u0001N\u001b\u0002\u000fI,h\u000e^5nK*\u0011agN\u0001\ba2\fgN\\3s\u0015\tA\u0014(A\u0003uC\ndWM\u0003\u0002;w\u0005)a\r\\5oW*\u0011A(P\u0001\u0007CB\f7\r[3\u000b\u0003y\n1a\u001c:h\u0007\u0001\u0019\"\u0001A!\u0011\u0005\t+U\"A\"\u000b\u0005\u0011\u001b\u0014!B;uS2\u001c\u0018B\u0001$D\u0005i\u0019FO]3b[&twmV5uQN#\u0018\r^3UKN$()Y:f\u0003\u001d\u0011\u0017mY6f]\u0012\u0004\"!S/\u000f\u0005)[fBA&[\u001d\ta\u0015L\u0004\u0002N1:\u0011aj\u0016\b\u0003\u001fZs!\u0001U+\u000f\u0005E#V\"\u0001*\u000b\u0005M{\u0014A\u0002\u001fs_>$h(C\u0001?\u0013\taT(\u0003\u0002;w%\u0011\u0001(O\u0005\u0003m]J!\u0001N\u001b\n\u0005\u0011\u001b\u0014B\u0001/D\u0003i\u0019FO]3b[&twmV5uQN#\u0018\r^3UKN$()Y:f\u0013\tqvL\u0001\tTi\u0006$XMQ1dW\u0016tG-T8eK*\u0011AlQ\u0001\f_\nTWm\u0019;SKV\u001cX\r\u0005\u0002cK6\t1MC\u0001e\u0003\u0015\u00198-\u00197b\u0013\t17MA\u0004C_>dW-\u00198\u0002\u001f\u0005\u001c\u0018P\\2PkR\u0004X\u000f^'pI\u0016\u0004\"!\u001b;\u000f\u0005)\fhBA6o\u001d\tiE.\u0003\u0002no\u0005\u0019\u0011\r]5\n\u0005=\u0004\u0018AB2p]\u001aLwM\u0003\u0002no%\u0011!o]\u0001\u0017\u000bb,7-\u001e;j_:\u001cuN\u001c4jO>\u0003H/[8og*\u0011q\u000e]\u0005\u0003kZ\u0014q\"Q:z]\u000e|U\u000f\u001e9vi6{G-\u001a\u0006\u0003eN\f1\"\u001a8bE2,7)Y2iK\u00061A(\u001b8jiz\"RA\u001f?~}~\u0004\"a\u001f\u0001\u000e\u0003=BQaR\u0003A\u0002!CQ\u0001Y\u0003A\u0002\u0005DQaZ\u0003A\u0002!DQa^\u0003A\u0002\u0005\fA\u0001Z1uCV\u0011\u0011Q\u0001\t\u0007\u0003\u000f\t\t\"!\u0006\u000e\u0005\u0005%!\u0002BA\u0006\u0003\u001b\t\u0011\"[7nkR\f'\r\\3\u000b\u0007\u0005=1-\u0001\u0006d_2dWm\u0019;j_:LA!a\u0005\u0002\n\t!A*[:u!\u0011\t9\"!\b\u000e\u0005\u0005e!bAA\u000es\u0005)A/\u001f9fg&!\u0011qDA\r\u0005\r\u0011vn^\u0001\u0006I\u0006$\u0018\rI\u0001\tkN,'\u000fR1uC\u0006IQo]3s\t\u0006$\u0018\rI\u0001\u0007E\u00164wN]3\u0015\u0005\u0005-\u0002c\u00012\u0002.%\u0019\u0011qF2\u0003\tUs\u0017\u000e\u001e\u0015\u0004\u0015\u0005M\u0002\u0003BA\u001b\u0003\u0003j!!a\u000e\u000b\u00075\fID\u0003\u0003\u0002<\u0005u\u0012a\u00026va&$XM\u001d\u0006\u0004\u0003\u007fi\u0014!\u00026v]&$\u0018\u0002BA\"\u0003o\u0011!BQ3g_J,W)Y2i\u0003\u0015\tg\r^3sQ\rY\u0011\u0011\n\t\u0005\u0003k\tY%\u0003\u0003\u0002N\u0005]\"!C!gi\u0016\u0014X)Y2i\u0003E\u0019'/Z1uK2{wn[;q)\u0006\u0014G.\u001a\u000b\t\u0003W\t\u0019&a\u001a\u0002z!9\u0011Q\u000b\u0007A\u0002\u0005]\u0013!\u0003;bE2,g*Y7f!\u0011\tI&!\u0019\u000f\t\u0005m\u0013Q\f\t\u0003#\u000eL1!a\u0018d\u0003\u0019\u0001&/\u001a3fM&!\u00111MA3\u0005\u0019\u0019FO]5oO*\u0019\u0011qL2\t\u000f\u0005\u0005A\u00021\u0001\u0002jA1\u00111NA;\u0003+qA!!\u001c\u0002r9\u0019\u0011+a\u001c\n\u0003\u0011L1!a\u001dd\u0003\u001d\u0001\u0018mY6bO\u0016LA!a\u0005\u0002x)\u0019\u00111O2\t\u0013\u0005mD\u0002%AA\u0002\u0005u\u0014a\u00047p_.,\b\u000f\u00165sKNDw\u000e\u001c3\u0011\u0007\t\fy(C\u0002\u0002\u0002\u000e\u00141!\u00138u\u0003m\u0019'/Z1uK2{wn[;q)\u0006\u0014G.\u001a\u0013eK\u001a\fW\u000f\u001c;%gU\u0011\u0011q\u0011\u0016\u0005\u0003{\nIi\u000b\u0002\u0002\fB!\u0011QRAL\u001b\t\tyI\u0003\u0003\u0002\u0012\u0006M\u0015!C;oG\",7m[3e\u0015\r\t)jY\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BAM\u0003\u001f\u0013\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0003]9W\r^!ts:\u001c'+\u001a;ss2{wn[;q\u0011&tG\u000f\u0006\u0004\u0002X\u0005}\u00151\u0015\u0005\b\u0003Cs\u0001\u0019AA,\u0003-awn\\6vaR\u000b'\r\\3\t\u000f\u0005\u0015f\u00021\u0001\u0002~\u0005YQ.\u0019=BiR,W\u000e\u001d;t\u0003=\u0019'/Z1uKN\u001b\u0017M\u001c+bE2,GCBA\u0016\u0003W\u000bi\u000bC\u0004\u0002V=\u0001\r!a\u0016\t\u000f\u0005\u0005q\u00021\u0001\u0002j\u0005QC/Z:u\u0003NLhn\u0019&pS:$V-\u001c9pe\u0006dG+\u00192mK>sW*\u001e7uS.+\u0017PR5fY\u0012\u001c\bf\u0001\t\u00024B!\u0011QGA[\u0013\u0011\t9,a\u000e\u0003\u0019Q+7\u000f\u001e+f[Bd\u0017\r^3\u00025Q,7\u000f^!ts:\u001c'j\\5o)\u0016l\u0007o\u001c:bYR\u000b'\r\\3)\u0007E\t\u0019,\u0001\u0014uKN$\u0018i]=oG*{\u0017N\u001c+f[B|'/\u00197UC\ndWmV5uQB+8\u000f\u001b#po:D3AEAZ\u00031\"Xm\u001d;Bgft7MS8j]R+W\u000e]8sC2$\u0016M\u00197f/&$\bNT8o\u000bF,\u0018\r\u001c$jYR,'\u000fK\u0002\u0014\u0003g\u000b\u0001\u0007^3ti\u0006\u001b\u0018P\\2MK\u001a$(j\\5o)\u0016l\u0007o\u001c:bYR\u000b'\r\\3XSRDGj\\2bYB\u0013X\rZ5dCR,\u0007f\u0001\u000b\u00024\u00069C/Z:u\u0003NLhn\u0019&pS:$V-\u001c9pe\u0006dG+\u00192mK>sW*\u001e7uS\u001aKW\r\u001c3tQ\r)\u00121W\u0001/i\u0016\u001cH/Q:z]\u000eTu.\u001b8UK6\u0004xN]1m)\u0006\u0014G.Z(o\u001bVdG/\u001b$jK2$7oV5uQV#g\rK\u0002\u0017\u0003g\u000bq\u0005^3ti\u0006\u001b\u0018P\\2K_&tG+Z7q_J\fG\u000eV1cY\u0016<\u0016\u000e\u001e5VI\u001a4\u0015\u000e\u001c;fe\"\u001aq#a-\u0002IQ,7\u000f^!hO\u0006sG-Q:z]\u000edUM\u001a;K_&tG+Z7q_J\fG\u000eV1cY\u0016D3\u0001GAZ\u0003%\"Xm\u001d;BO\u001e\fe\u000eZ!ts:\u001cG*\u001a4u\u0015>LgnV5uQR\u0013\u0018PU3t_24X-T8eK\"\u001a\u0011$a-\u0002=Q,7\u000f^!ts:\u001cG*\u001a4u\u0015>Lg\u000eV3na>\u0014\u0018\r\u001c+bE2,\u0007f\u0001\u000e\u00024\u0006iC/Z:u\u000bb\u001cW\r\u001d;j_:$\u0006N]8x]\u001a\u0013x.\\!ts:\u001c'j\\5o)\u0016l\u0007o\u001c:bYR\u000b'\r\\3)\u0007m\t\u0019,\u0001\u0013uKN$Hj\\8lkB\u001c\u0015m\u00195f'\"\f'/\u001b8h\u0003\u000e\u0014xn]:Tk\n$\u0018m]6tQ\ra\u00121W\u0001\u0003U&$B!!<\u0002~B!\u0011q^A}\u001b\t\t\tP\u0003\u0003\u0002t\u0006U\u0018\u0001\u00027b]\u001eT!!a>\u0002\t)\fg/Y\u0005\u0005\u0003w\f\tPA\u0004J]R,w-\u001a:\t\u000f\u0005}X\u00041\u0001\u0002~\u0005\t\u0011.\u0001\u0002kYR!!Q\u0001B\u0006!\u0011\tyOa\u0002\n\t\t%\u0011\u0011\u001f\u0002\u0005\u0019>tw\rC\u0004\u0003\u000ey\u0001\rAa\u0004\u0002\u00031\u00042A\u0019B\t\u0013\r\u0011IaY\u0001$i\u0016\u001cH/Q:z]\u000eTu.\u001b8UK6\u0004xN]1m)\u0006\u0014G.Z,ji\"\u0014V\r\u001e:zQ\ry\u00121W\u0001Ci\u0016\u001cH/Q:z]\u000eTu.\u001b8UK6\u0004xN]1m)\u0006\u0014G.Z,ji\"dun\\6vaRC'/Z:i_2$w+\u001b;i\u0013:\u001cXO\u001a4jG&,g\u000e\u001e*fiJL\bf\u0001\u0011\u00024\u0006\u0001E/Z:u\u0003NLhn\u0019&pS:$V-\u001c9pe\u0006dG+\u00192mK^KG\u000f\u001b'p_.,\b\u000f\u00165sKNDw\u000e\u001c3XSRD7+\u001e4gS\u000eLWM\u001c;SKR\u0014\u0018\u0010K\u0002\"\u0003gCs\u0001\u0001B\u0011\u0005[\u0011y\u0003\u0005\u0003\u0003$\t%RB\u0001B\u0013\u0015\u0011\u00119#a\u000e\u0002\u0013\u0015DH/\u001a8tS>t\u0017\u0002\u0002B\u0016\u0005K\u0011!\"\u0012=uK:$w+\u001b;i\u0003\u00151\u0018\r\\;fY\t\u0011\td\t\u0002\u00034A!!Q\u0007B#\u001b\t\u00119D\u0003\u0003\u0003:\tm\u0012!\u00049be\u0006lW\r^3sSj,GM\u0003\u0003\u0003>\t}\u0012AC3yi\u0016t7/[8og*!\u0011q\bB!\u0015\r\u0011\u0019%O\u0001\ni\u0016\u001cH/\u001e;jYNLAAa\u0012\u00038\tQ\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;FqR,gn]5p]\u0006)\u0012i]=oG2{wn[;q\u0015>Lg.\u0013+DCN,\u0007CA>$'\r\u0019#q\n\t\u0004E\nE\u0013b\u0001B*G\n1\u0011I\\=SK\u001a$\"Aa\u0013\u0002'\u0015s\u0015I\u0011'F?>\u0013%*R\"U?J+UkU#\u0016\u0005\tm\u0003\u0003BAx\u0005;J1AZAy\u0003Q)e*\u0011\"M\u000b~{%IS#D)~\u0013V)V*FA\u0005!B)S*B\u00052+ul\u0014\"K\u000b\u000e#vLU#V'\u0016\u000bQ\u0003R%T\u0003\ncUiX(C\u0015\u0016\u001bEk\u0018*F+N+\u0005%\u0001\u0007F\u001d\u0006\u0013E*R0D\u0003\u000eCU)A\u0007F\u001d\u0006\u0013E*R0D\u0003\u000eCU\tI\u0001\u000e\t&\u001b\u0016I\u0011'F?\u000e\u000b5\tS#\u0002\u001d\u0011K5+\u0011\"M\u000b~\u001b\u0015i\u0011%FA\u0005Q\u0001/\u0019:b[\u0016$XM]:\u0015\u0005\tE\u0004C\u0002B:\u0005s\u0012i(\u0004\u0002\u0003v)!!qOA{\u0003\u0011)H/\u001b7\n\t\tm$Q\u000f\u0002\u000b\u0007>dG.Z2uS>t\u0007#\u00022\u0003\u0000\t\r\u0015b\u0001BAG\n)\u0011I\u001d:bsB!\u0011q\u001eBC\u0013\u0011\u00119)!=\u0003\r=\u0013'.Z2uQ\u001di#1\u0012BI\u0005'\u0003BA!\u000e\u0003\u000e&!!q\u0012B\u001c\u0005)\u0001\u0016M]1nKR,'o]\u0001\u0005]\u0006lW-\t\u0002\u0003\u0016\u000695\u000b^1uK\n\u000b7m[3oIvZ\b' \u0017!\u001f\nTWm\u0019;SKV\u001cX-P>2{2\u0002\u0013i]=oG>+H\u000f];u\u001b>$W-P>3{2\u0002SI\\1cY\u0016\u001c\u0015m\u00195f{m\u001cT\u0010")
public class AsyncLookupJoinITCase
extends StreamingWithStateTestBase {
    private final boolean objectReuse;
    private final ExecutionConfigOptions.AsyncOutputMode asyncOutputMode;
    private final boolean enableCache;
    private final List<Row> data;
    private final List<Row> userData;

    @Parameters(name="StateBackend={0}, ObjectReuse={1}, AsyncOutputMode={2}, EnableCache={3}")
    public static Collection<Object[]> parameters() {
        return AsyncLookupJoinITCase$.MODULE$.parameters();
    }

    public static Boolean DISABLE_CACHE() {
        return AsyncLookupJoinITCase$.MODULE$.DISABLE_CACHE();
    }

    public static Boolean ENABLE_CACHE() {
        return AsyncLookupJoinITCase$.MODULE$.ENABLE_CACHE();
    }

    public static Boolean DISABLE_OBJECT_REUSE() {
        return AsyncLookupJoinITCase$.MODULE$.DISABLE_OBJECT_REUSE();
    }

    public static Boolean ENABLE_OBJECT_REUSE() {
        return AsyncLookupJoinITCase$.MODULE$.ENABLE_OBJECT_REUSE();
    }

    public List<Row> data() {
        return this.data;
    }

    public List<Row> userData() {
        return this.userData;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        TestValuesTableFactory.RESOURCE_COUNTER.set(0);
        ExecutionConfig executionConfig = this.objectReuse ? this.env().getConfig().enableObjectReuse() : this.env().getConfig().disableObjectReuse();
        this.tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_OUTPUT_MODE, (Object)this.asyncOutputMode);
        this.createScanTable("src", this.data());
        this.createLookupTable("user_table", this.userData(), this.createLookupTable$default$3());
        this.createLookupTable("user_table_with_lookup_threshold2", this.userData(), 2);
        this.createLookupTable("user_table_with_lookup_threshold3", this.userData(), 3);
    }

    @Override
    @AfterEach
    public void after() {
        super.after();
        Assertions.assertThat((AtomicInteger)TestValuesTableFactory.RESOURCE_COUNTER).hasValue(0);
    }

    private void createLookupTable(String tableName, List<Row> data, int lookupThreshold) {
        String dataId = TestValuesTableFactory.registerData(data);
        String cacheOptions = this.enableCache ? new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(59).append("\n           |  '").append(LookupOptions.CACHE_TYPE.key()).append("' = '").append(LookupOptions.LookupCacheType.PARTIAL).append("',\n           |  '").append(LookupOptions.PARTIAL_CACHE_MAX_ROWS.key()).append("' = '").append(Long.MAX_VALUE).append("',\n           |").toString())).stripMargin() : "";
        String lookupThresholdOption = lookupThreshold > 0 ? new StringBuilder(28).append("'start-lookup-threshold'='").append(lookupThreshold).append("',").toString() : "";
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(429).append("\n                       |CREATE TABLE ").append(tableName).append(" (\n                       |  `age` INT,\n                       |  `id` BIGINT,\n                       |  `name` STRING\n                       |) WITH (\n                       |  ").append(cacheOptions).append("\n                       |  ").append(lookupThresholdOption).append("\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(dataId).append("',\n                       |  'async' = 'true'\n                       |)\n                       |").toString())).stripMargin());
    }

    private int createLookupTable$default$3() {
        return -1;
    }

    private String getAsyncRetryLookupHint(String lookupTable, int maxAttempts) {
        return new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(239).append("\n       |/*+ LOOKUP('table'='").append(lookupTable).append("', \n       | 'async'='true', \n       | 'time-out'='300s',\n       | 'retry-predicate'='lookup_miss',\n       | 'retry-strategy'='fixed_delay',\n       | 'fixed-delay'='1 ms',\n       | 'max-attempts'='").append(maxAttempts).append("')\n       |*/").toString())).stripMargin();
    }

    private void createScanTable(String tableName, List<Row> data) {
        String dataId = TestValuesTableFactory.registerData(data);
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(386).append("\n                       |CREATE TABLE ").append(tableName).append(" (\n                       |  `id` BIGINT,\n                       |  `len` INT,\n                       |  `content` STRING,\n                       |  `proctime` AS PROCTIME()\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(dataId).append("'\n                       |)\n                       |").toString())).stripMargin());
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableOnMultiKeyFields() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.id, t1.len, D.name\n        |FROM (select content, id, len, proctime FROM src AS T) t1\n        |JOIN user_table for system_time as of t1.proctime AS D\n        |ON t1.content = D.name AND t1.id = D.id\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,12,Julian", (List)new .colon.colon((Object)"3,15,Fabian", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAsyncJoinTemporalTable() {
        String sql = "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,12,Julian,Julian", (List)new .colon.colon((Object)"2,15,Hello,Jark", (List)new .colon.colon((Object)"3,15,Fabian,Fabian", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableWithPushDown() {
        String sql = "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"2,15,Hello,Jark", (List)new .colon.colon((Object)"3,15,Fabian,Fabian", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableWithNonEqualFilter() {
        String sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= D.age";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"2,15,Hello,Jark,22", (List)new .colon.colon((Object)"3,15,Fabian,Fabian,33", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAsyncLeftJoinTemporalTableWithLocalPredicate() {
        String sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM src AS T LEFT JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' WHERE T.id > 1";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"2,15,Hello,null,null", (List)new .colon.colon((Object)"3,15,Fabian,Fabian,33", (List)new .colon.colon((Object)"8,11,Hello world,null,null", (List)new .colon.colon((Object)"9,12,Hello world!,null,null", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableOnMultiFields() {
        String sql = "SELECT T.id, T.len, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id AND T.content = D.name";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,12,Julian", (List)new .colon.colon((Object)"3,15,Fabian", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableOnMultiFieldsWithUdf() {
        this.tEnv().createTemporarySystemFunction("mod1", (UserDefinedFunction)UserDefinedFunctionTestUtils$TestMod$.MODULE$);
        this.tEnv().createTemporarySystemFunction("wrapper1", (UserDefinedFunction)UserDefinedFunctionTestUtils$TestWrapperUdf$.MODULE$);
        String sql = "SELECT T.id, T.len, wrapper1(D.name) as name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON mod1(T.id, 4) = D.id AND T.content = D.name";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,12,Julian", (List)new .colon.colon((Object)"3,15,Fabian", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableWithUdfFilter() {
        this.tEnv().createTemporarySystemFunction("add", (UserDefinedFunction)new UserDefinedFunctionTestUtils.TestAddWithOpen());
        String sql = "SELECT T.id, T.len, T.content, D.name FROM src AS T JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id WHERE add(T.id, D.id) > 3 AND add(T.id, 2) > 3 AND add (D.id, 2) > 3";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"2,15,Hello,Jark", (List)new .colon.colon((Object)"3,15,Fabian,Fabian", (List)Nil$.MODULE$));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        Assertions.assertThat((AtomicInteger)UserDefinedFunctionTestUtils$TestAddWithOpen$.MODULE$.aliveCounter()).hasValue(0);
    }

    @TestTemplate
    public void testAggAndAsyncLeftJoinTemporalTable() {
        String sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T group by len";
        Table table1 = this.tEnv().sqlQuery(sql1);
        this.tEnv().createTemporaryView("t1", table1);
        String sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table for system_time as of t1.proctime AS D ON t1.id = D.id";
        TestingRetractSink sink = new TestingRetractSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql2)).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink).setParallelism(1);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"3,Fabian,33", (List)new .colon.colon((Object)"8,null,null", (List)new .colon.colon((Object)"9,null,null", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAggAndAsyncLeftJoinWithTryResolveMode() {
        this.tEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, (Object)OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE);
        String sql1 = "SELECT max(id) as id, PROCTIME() as proctime FROM src AS T group by len";
        Table table1 = this.tEnv().sqlQuery(sql1);
        this.tEnv().createTemporaryView("t1", table1);
        String sql2 = "SELECT t1.id, D.name, D.age FROM t1 LEFT JOIN user_table for system_time as of t1.proctime AS D ON t1.id = D.id";
        TestingRetractSink sink = new TestingRetractSink();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> {
            package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql2)).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink).setParallelism(1);
            this.env().execute();
            Seq expected = (Seq)new .colon.colon((Object)"3,Fabian,33", (List)new .colon.colon((Object)"8,null,null", (List)new .colon.colon((Object)"9,null,null", (List)Nil$.MODULE$)));
            Assertions.assertThat((Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
        }).hasMessageContaining("Required sync lookup function by planner") instanceof TableException;
    }

    @TestTemplate
    public void testAsyncLeftJoinTemporalTable() {
        String sql = "SELECT T.id, T.len, D.name, D.age FROM src AS T LEFT JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,12,Julian,11", (List)new .colon.colon((Object)"2,15,Jark,22", (List)new .colon.colon((Object)"3,15,Fabian,33", (List)new .colon.colon((Object)"8,11,null,null", (List)new .colon.colon((Object)"9,12,null,null", (List)Nil$.MODULE$)))));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testExceptionThrownFromAsyncJoinTemporalTable() {
        this.tEnv().createTemporarySystemFunction("errorFunc", (UserDefinedFunction)UserDefinedFunctionTestUtils$TestExceptionThrown$.MODULE$);
        String sql = "SELECT T.id, T.len, D.name, D.age FROM src AS T LEFT JOIN user_table for system_time as of T.proctime AS D ON T.id = D.id where errorFunc(D.name) > cast(1000 as decimal(10,4))";
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        Assertions.assertThatThrownBy(() -> this.env().execute()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(NumberFormatException.class, (String)"Cannot parse")});
    }

    @TestTemplate
    public void testLookupCacheSharingAcrossSubtasks() {
        if (!this.enableCache) {
            return;
        }
        LookupCacheManager.keepCacheOnRelease((boolean)true);
        try {
            String sourceDdl = new StringOps(Predef$.MODULE$.augmentString("\n           |CREATE TABLE T (\n           |  id BIGINT,\n           |  proc AS PROCTIME()\n           |) WITH (\n           |  'connector' = 'datagen',\n           |  'fields.id.kind' = 'sequence',\n           |  'fields.id.start' = '1',\n           |  'fields.id.end' = '6'\n           |)\n           |")).stripMargin();
            this.tEnv().executeSql(sourceDdl);
            String sql = new StringOps(Predef$.MODULE$.augmentString("\n          |SELECT T.id, D.name, D.age FROM T \n          |LEFT JOIN user_table FOR SYSTEM_TIME AS OF T.proc AS D \n          |ON T.id = D.id\n          |")).stripMargin();
            TestingAppendSink sink = new TestingAppendSink();
            package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
            this.env().execute();
            Map managedCaches = LookupCacheManager.getInstance().getManagedCaches();
            Assertions.assertThat((int)managedCaches.size()).isEqualTo(1);
            LookupCache cache = ((LookupCacheManager.RefCountedCache)managedCaches.get(managedCaches.keySet().iterator().next())).getCache();
            Assertions.assertThat((long)cache.size()).isEqualTo(6L);
            Assertions.assertThatIterable((Iterable)cache.getIfPresent((RowData)GenericRowData.of((Object[])new Object[]{this.jl(1L)}))).containsExactlyInAnyOrder((Object[])new RowData[]{GenericRowData.of((Object[])new Object[]{this.ji(11), this.jl(1L), BinaryStringData.fromString((String)"Julian")})});
            Assertions.assertThatIterable((Iterable)cache.getIfPresent((RowData)GenericRowData.of((Object[])new Object[]{this.jl(2L)}))).containsExactlyInAnyOrder((Object[])new RowData[]{GenericRowData.of((Object[])new Object[]{this.ji(22), this.jl(2L), BinaryStringData.fromString((String)"Jark")})});
            Assertions.assertThatIterable((Iterable)cache.getIfPresent((RowData)GenericRowData.of((Object[])new Object[]{this.jl(3L)}))).containsExactlyInAnyOrder((Object[])new RowData[]{GenericRowData.of((Object[])new Object[]{this.ji(33), this.jl(3L), BinaryStringData.fromString((String)"Fabian")})});
            Assertions.assertThatIterable((Iterable)cache.getIfPresent((RowData)GenericRowData.of((Object[])new Object[]{this.jl(4L)}))).isEmpty();
        }
        finally {
            LookupCacheManager.getInstance().checkAllReleased();
            LookupCacheManager.getInstance().clear();
            LookupCacheManager.keepCacheOnRelease((boolean)false);
        }
    }

    public Integer ji(int i) {
        return new Integer(i);
    }

    public Long jl(long l) {
        return new Long(l);
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableWithRetry() {
        String maxRetryTwiceHint = this.getAsyncRetryLookupHint("D", 2);
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(203).append("\n                   |SELECT ").append(maxRetryTwiceHint).append(" T.id, T.len, T.content, D.name FROM src AS T\n                   |JOIN user_table for system_time as of T.proctime AS D\n                   |ON T.id = D.id\n                   |").toString())).stripMargin())).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,12,Julian,Julian", (List)new .colon.colon((Object)"2,15,Hello,Jark", (List)new .colon.colon((Object)"3,15,Fabian,Fabian", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableWithLookupThresholdWithInsufficientRetry() {
        String maxRetryOnceHint = this.getAsyncRetryLookupHint("D", 1);
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(226).append("\n                   |SELECT ").append(maxRetryOnceHint).append(" T.id, T.len, T.content, D.name FROM src AS T\n                   |JOIN user_table_with_lookup_threshold3 for system_time as of T.proctime AS D\n                   |ON T.id = D.id\n                   |").toString())).stripMargin())).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Assertions.assertThat(sink.getAppendResults()).isEqualTo((Object)Nil$.MODULE$);
    }

    @TestTemplate
    public void testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry() {
        String maxRetryTwiceHint = this.getAsyncRetryLookupHint("D", 2);
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(226).append("\n                   |SELECT ").append(maxRetryTwiceHint).append(" T.id, T.len, T.content, D.name FROM src AS T\n                   |JOIN user_table_with_lookup_threshold2 for system_time as of T.proctime AS D\n                   |ON T.id = D.id\n                   |").toString())).stripMargin())).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,12,Julian,Julian", (List)new .colon.colon((Object)"2,15,Hello,Jark", (List)new .colon.colon((Object)"3,15,Fabian,Fabian", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    public AsyncLookupJoinITCase(StreamingWithStateTestBase.StateBackendMode backend, boolean objectReuse, ExecutionConfigOptions.AsyncOutputMode asyncOutputMode, boolean enableCache) {
        this.objectReuse = objectReuse;
        this.asyncOutputMode = asyncOutputMode;
        this.enableCache = enableCache;
        super(backend);
        this.data = new .colon.colon((Object)this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)1L), BoxesRunTime.boxToInteger((int)12), "Julian"})), (List)new .colon.colon((Object)this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)2L), BoxesRunTime.boxToInteger((int)15), "Hello"})), (List)new .colon.colon((Object)this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)3L), BoxesRunTime.boxToInteger((int)15), "Fabian"})), (List)new .colon.colon((Object)this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)8L), BoxesRunTime.boxToInteger((int)11), "Hello world"})), (List)new .colon.colon((Object)this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)9L), BoxesRunTime.boxToInteger((int)12), "Hello world!"})), (List)Nil$.MODULE$)))));
        this.userData = new .colon.colon((Object)this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)11), BoxesRunTime.boxToLong((long)1L), "Julian"})), (List)new .colon.colon((Object)this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)22), BoxesRunTime.boxToLong((long)2L), "Jark"})), (List)new .colon.colon((Object)this.rowOf((Seq<Object>)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)33), BoxesRunTime.boxToLong((long)3L), "Fabian"})), (List)Nil$.MODULE$)));
    }
}

