From f8a6ca6e4b515ea51a2e5350ced06eee842ebfe5 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Thu, 1 Jun 2023 16:59:46 -0700 Subject: [PATCH] making it build in intellij --- .idea/vcs.xml | 23 +- flink-connectors/pom.xml | 64 ++-- .../core/memory/DataInputDeserializer.java | 2 +- .../core/memory/DataOutputSerializer.java | 2 +- .../flink/core/memory/MemorySegment.java | 2 +- .../apache/flink/core/memory/MemoryUtils.java | 24 +- .../java/org/apache/flink/types/Record.java | 2 +- .../runtime/TestDataOutputSerializer.java | 2 +- .../flink-end-to-end-tests-common/pom.xml | 116 +++---- .../NettyShuffleMemoryControlTestProgram.java | 2 +- flink-end-to-end-tests/pom.xml | 118 +++---- flink-filesystems/pom.xml | 16 +- .../io/network/partition/PageSizeUtil.java | 24 +- .../flink/runtime/util/SignalHandler.java | 59 +--- .../runtime/hadoop/HadoopUserUtilsTest.java | 78 ----- .../TaskManagerRunnerConfigurationTest.java | 300 ----------------- flink-table/flink-sql-client/pom.xml | 42 +-- .../client/cli/utils/SqlParserHelper.java | 9 +- .../client/gateway/local/DependencyTest.java | 315 ----------------- .../vector/heap/AbstractHeapVector.java | 2 +- .../table/data/binary/BinaryRowDataUtil.java | 2 +- mvnw | 316 ------------------ pom.xml | 43 ++- 23 files changed, 224 insertions(+), 1339 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java delete mode 100644 flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java delete mode 100755 mvnw diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 165d593076ee6..35eb1ddfbbc02 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,25 +1,6 @@ - - - - - + - + \ No newline at end of file diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 0a7a865559704..352c2bb1f112f 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -35,27 +35,27 @@ under the License. pom - flink-hadoop-compatibility - flink-hcatalog - flink-connector-elasticsearch-base - flink-connector-elasticsearch6 - flink-connector-elasticsearch7 - flink-connector-hbase-base - flink-connector-hbase-1.4 - flink-connector-hbase-2.2 - flink-connector-hive - flink-connector-jdbc - flink-connector-rabbitmq - flink-connector-cassandra - flink-connector-kafka - flink-connector-gcp-pubsub - flink-connector-aws-base - flink-connector-kinesis - flink-connector-aws-kinesis-streams - flink-connector-aws-kinesis-firehose - flink-connector-base - flink-file-sink-common - flink-connector-files + + + + + + + + + + + + + + + + + + + + + flink-connector-pulsar @@ -93,18 +93,18 @@ under the License. - flink-sql-connector-elasticsearch6 - flink-sql-connector-elasticsearch7 - flink-sql-connector-hbase-1.4 - flink-sql-connector-hbase-2.2 - flink-sql-connector-hive-2.3.9 - flink-sql-connector-hive-3.1.2 - flink-sql-connector-kafka - flink-sql-connector-aws-kinesis-streams - flink-sql-connector-aws-kinesis-firehose - flink-sql-connector-kinesis + + + + + + + + + + flink-sql-connector-pulsar - flink-sql-connector-rabbitmq + diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java index 9ece9f0013554..617c835f9915a 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataInputDeserializer.java @@ -402,7 +402,7 @@ public int getPosition() { // ------------------------------------------------------------------------ @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + private static final jdk.internal.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; @SuppressWarnings("restriction") private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java index 4d12ad5b81bb2..a52381e994b86 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java @@ -363,7 +363,7 @@ public void setPositionUnsafe(int position) { // ------------------------------------------------------------------------ @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + private static final jdk.internal.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; @SuppressWarnings("restriction") private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java index ce39a12265436..334b389437786 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java @@ -78,7 +78,7 @@ public final class MemorySegment { /** The unsafe handle for transparent memory copied (heap / off-heap). */ @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + private static final jdk.internal.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; /** The beginning of the byte array contents, relative to the byte array object. */ @SuppressWarnings("restriction") diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java index bb8904a39f294..cecb3b3658e9c 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryUtils.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.util.Preconditions; -import java.lang.reflect.Field; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -32,7 +31,7 @@ public class MemoryUtils { /** The "unsafe", which can be used to perform native memory accesses. */ @SuppressWarnings({"restriction", "UseOfSunClasses"}) - public static final sun.misc.Unsafe UNSAFE = getUnsafe(); + public static final jdk.internal.misc.Unsafe UNSAFE = getUnsafe(); /** The native byte order of the platform on which the system currently runs. */ public static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder(); @@ -45,25 +44,8 @@ public class MemoryUtils { getClassByName("java.nio.DirectByteBuffer"); @SuppressWarnings("restriction") - private static sun.misc.Unsafe getUnsafe() { - try { - Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - return (sun.misc.Unsafe) unsafeField.get(null); - } catch (SecurityException e) { - throw new Error( - "Could not access the sun.misc.Unsafe handle, permission denied by security manager.", - e); - } catch (NoSuchFieldException e) { - throw new Error("The static handle field in sun.misc.Unsafe was not found.", e); - } catch (IllegalArgumentException e) { - throw new Error("Bug: Illegal argument reflection access for static field.", e); - } catch (IllegalAccessException e) { - throw new Error("Access to sun.misc.Unsafe is forbidden by the runtime.", e); - } catch (Throwable t) { - throw new Error( - "Unclassified error while trying to access the sun.misc.Unsafe handle.", t); - } + private static jdk.internal.misc.Unsafe getUnsafe() { + return jdk.internal.misc.Unsafe.getUnsafe(); } private static long getClassFieldOffset( diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java index 59e1568fc8ff4..f83c9d25c6815 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Record.java +++ b/flink-core/src/main/java/org/apache/flink/types/Record.java @@ -1815,7 +1815,7 @@ private void resize(int minCapacityAdd) throws IOException { } @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + private static final jdk.internal.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; @SuppressWarnings("restriction") private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java index 1bba8e641fd1b..068392c46bd82 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java @@ -276,7 +276,7 @@ private void resize(int minCapacityAdd) throws IOException { } @SuppressWarnings("restriction") - private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + private static final jdk.internal.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; @SuppressWarnings("restriction") private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml index edb4b50c94dcd..3e238e8db093c 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml @@ -69,64 +69,64 @@ under the License. org.testcontainers testcontainers - - - org.apache.flink - flink-dist_${scala.binary.version} - ${project.version} - provided - - - - commons-beanutils - commons-beanutils - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-xc - - - org.codehaus.jackson - jackson-jaxrs - - - com.google.guava - guava - - - com.squareup.okio - okio - - - javax.servlet - servlet-api - - - com.nimbusds - nimbus-jose-jwt - - - com.nimbusds - nimbus-jose-jwt - - - com.google.inject - guice - - - com.google.inject.extensions - guice-servlet - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/flink-end-to-end-tests/flink-netty-shuffle-memory-control-test/src/main/java/org/apache/flink/streaming/tests/NettyShuffleMemoryControlTestProgram.java b/flink-end-to-end-tests/flink-netty-shuffle-memory-control-test/src/main/java/org/apache/flink/streaming/tests/NettyShuffleMemoryControlTestProgram.java index 3e8a10d5b0b8f..59a5170ab6ed5 100644 --- a/flink-end-to-end-tests/flink-netty-shuffle-memory-control-test/src/main/java/org/apache/flink/streaming/tests/NettyShuffleMemoryControlTestProgram.java +++ b/flink-end-to-end-tests/flink-netty-shuffle-memory-control-test/src/main/java/org/apache/flink/streaming/tests/NettyShuffleMemoryControlTestProgram.java @@ -28,7 +28,7 @@ import org.apache.flink.shaded.netty4.io.netty.util.internal.OutOfDirectMemoryError; -import sun.misc.Unsafe; +import jdk.internal.misc.Unsafe; import static org.apache.flink.util.Preconditions.checkArgument; diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 4a07f7a2ef55f..c3783b85facc1 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -39,52 +39,52 @@ under the License. - flink-cli-test - flink-parent-child-classloading-test-program - flink-parent-child-classloading-test-lib-package - flink-dataset-allround-test - flink-dataset-fine-grained-recovery-test - flink-datastream-allround-test - flink-batch-sql-test - flink-stream-sql-test - flink-distributed-cache-via-blob-test - flink-high-parallelism-iterations-test - flink-stream-stateful-job-upgrade-test - flink-queryable-state-test - flink-local-recovery-and-allocation-test - flink-quickstart-test - flink-confluent-schema-registry - flink-stream-state-ttl-test - flink-sql-client-test - flink-sql-gateway-test - flink-file-sink-test - flink-state-evolution-test - flink-rocksdb-state-memory-control-test + + + + + + + + + + + + + + + + + + + + + flink-end-to-end-tests-common - flink-metrics-availability-test - flink-metrics-reporter-prometheus-test - flink-heavy-deployment-stress-test - flink-connector-gcp-pubsub-emulator-tests - flink-streaming-kafka-test-base - flink-streaming-kafka-test - flink-plugins-test - flink-tpch-test - flink-streaming-kinesis-test - flink-end-to-end-tests-common-kafka - flink-tpcds-test - flink-netty-shuffle-memory-control-test - flink-python-test - flink-end-to-end-tests-hbase + + + + + + + + + + + + + + flink-end-to-end-tests-pulsar - flink-glue-schema-registry-avro-test - flink-glue-schema-registry-json-test - flink-end-to-end-tests-scala - flink-end-to-end-tests-aws-kinesis-streams - flink-end-to-end-tests-aws-kinesis-firehose - flink-end-to-end-tests-elasticsearch6 - flink-end-to-end-tests-elasticsearch7 - flink-end-to-end-tests-common-elasticsearch - flink-end-to-end-tests-sql + + + + + + + + + @@ -100,21 +100,21 @@ under the License. - - - - org.apache.flink - flink-yarn-tests - ${project.version} - provided - - - * - * - - - - + + + + + + + + + + + + + + + diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml index 22d721bea0762..a18cd448fbc97 100644 --- a/flink-filesystems/pom.xml +++ b/flink-filesystems/pom.xml @@ -38,14 +38,14 @@ under the License. - flink-hadoop-fs - flink-fs-hadoop-shaded - flink-s3-fs-base - flink-s3-fs-hadoop - flink-s3-fs-presto - flink-oss-fs-hadoop - flink-azure-fs-hadoop - flink-gs-fs-hadoop + + + + + + + + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java index 85bc2f52fc667..a81b06ed17f3f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PageSizeUtil.java @@ -20,16 +20,6 @@ import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent; -import org.apache.flink.shaded.netty4.io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess; - -import sun.misc.Unsafe; - -import javax.annotation.Nullable; - -import java.security.AccessController; -import java.security.PrivilegedAction; - /** Utility for accessing the system page size. */ public final class PageSizeUtil { @@ -93,19 +83,7 @@ private PageSizeUtil() {} private static final class PageSizeUtilInternal { static int getSystemPageSize() { - Unsafe unsafe = unsafe(); - return unsafe == null ? PAGE_SIZE_UNKNOWN : unsafe.pageSize(); - } - - @Nullable - private static Unsafe unsafe() { - if (PlatformDependent.hasUnsafe()) { - return (Unsafe) - AccessController.doPrivileged( - (PrivilegedAction) () -> UnsafeAccess.UNSAFE); - } else { - return null; - } + return PAGE_SIZE_UNKNOWN; } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java index 3137fac11cf0f..eb3631558c345 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SignalHandler.java @@ -18,10 +18,7 @@ package org.apache.flink.runtime.util; -import org.apache.flink.util.OperatingSystem; - import org.slf4j.Logger; -import sun.misc.Signal; /** * This signal handler / signal logger is based on Apache Hadoop's @@ -29,67 +26,13 @@ */ public class SignalHandler { - private static boolean registered = false; - - /** Our signal handler. */ - private static class Handler implements sun.misc.SignalHandler { - - private final Logger LOG; - private final sun.misc.SignalHandler prevHandler; - - Handler(String name, Logger LOG) { - this.LOG = LOG; - prevHandler = Signal.handle(new Signal(name), this); - } - - /** - * Handle an incoming signal. - * - * @param signal The incoming signal - */ - @Override - public void handle(Signal signal) { - LOG.info( - "RECEIVED SIGNAL {}: SIG{}. Shutting down as requested.", - signal.getNumber(), - signal.getName()); - prevHandler.handle(signal); - } - } - /** * Register some signal handlers. * * @param LOG The slf4j logger */ public static void register(final Logger LOG) { - synchronized (SignalHandler.class) { - if (registered) { - return; - } - registered = true; - - final String[] SIGNALS = - OperatingSystem.isWindows() - ? new String[] {"TERM", "INT"} - : new String[] {"TERM", "HUP", "INT"}; - - StringBuilder bld = new StringBuilder(); - bld.append("Registered UNIX signal handlers for ["); - String separator = ""; - for (String signalName : SIGNALS) { - try { - new Handler(signalName, LOG); - bld.append(separator); - bld.append(signalName); - separator = ", "; - } catch (Exception e) { - LOG.info("Error while registering signal handler", e); - } - } - bld.append("]"); - LOG.info(bld.toString()); - } + // noop } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java deleted file mode 100644 index 0c09db21e7a7f..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/hadoop/HadoopUserUtilsTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.hadoop; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import sun.security.krb5.KrbException; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -/** Unit tests for Hadoop user utils. */ -class HadoopUserUtilsTest { - - @BeforeAll - public static void setPropertiesToEnableKerberosConfigInit() throws KrbException { - System.setProperty("java.security.krb5.realm", ""); - System.setProperty("java.security.krb5.kdc", ""); - System.setProperty("java.security.krb5.conf", "/dev/null"); - sun.security.krb5.Config.refresh(); - } - - @AfterAll - public static void cleanupHadoopConfigs() { - UserGroupInformation.setConfiguration(new Configuration()); - } - - @Test - public void testIsProxyUserShouldReturnFalseWhenNormalUser() { - UserGroupInformation.setConfiguration( - getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS)); - UserGroupInformation userGroupInformation = createTestUser(AuthenticationMethod.KERBEROS); - - assertFalse(HadoopUserUtils.isProxyUser(userGroupInformation)); - } - - @Test - public void testIsProxyUserShouldReturnTrueWhenProxyUser() { - UserGroupInformation.setConfiguration( - getHadoopConfigWithAuthMethod(AuthenticationMethod.KERBEROS)); - UserGroupInformation userGroupInformation = createTestUser(AuthenticationMethod.PROXY); - - assertTrue(HadoopUserUtils.isProxyUser(userGroupInformation)); - } - - private static Configuration getHadoopConfigWithAuthMethod( - AuthenticationMethod authenticationMethod) { - Configuration conf = new Configuration(true); - conf.set("hadoop.security.authentication", authenticationMethod.name()); - return conf; - } - - private static UserGroupInformation createTestUser(AuthenticationMethod authenticationMethod) { - UserGroupInformation user = UserGroupInformation.createRemoteUser("test-user"); - user.setAuthenticationMethod(authenticationMethod); - return user; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java deleted file mode 100644 index f8b3382947de2..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor; - -import org.apache.flink.configuration.AkkaOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.MemorySize; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.configuration.TaskManagerOptionsInternal; -import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.entrypoint.FlinkParseException; -import org.apache.flink.runtime.entrypoint.WorkingDirectory; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; -import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler; -import org.apache.flink.runtime.rpc.AddressResolution; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.RpcSystem; -import org.apache.flink.util.IOUtils; -import org.apache.flink.util.concurrent.Executors; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.opentest4j.TestAbortedException; -import sun.net.util.IPAddressUtil; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; - -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.URI; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** - * Validates that the TaskManagerRunner startup properly obeys the configuration values. - * - *

NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run in parallel to other - * tests in the same JVM as it modifies a static (private) member of the {@link FileSystem} class - * and verifies its content. - */ -@NotThreadSafe -class TaskManagerRunnerConfigurationTest { - - private static final RpcSystem RPC_SYSTEM = RpcSystem.load(); - - private static final int TEST_TIMEOUT_SECONDS = 10; - - @TempDir private Path temporaryFolder; - - @Test - void testTaskManagerRpcServiceShouldBindToConfiguredTaskManagerHostname() throws Exception { - final String taskmanagerHost = "testhostname"; - final Configuration config = - createFlinkConfigWithPredefinedTaskManagerHostname(taskmanagerHost); - final HighAvailabilityServices highAvailabilityServices = - createHighAvailabilityServices(config); - - RpcService taskManagerRpcService = null; - try { - taskManagerRpcService = - TaskManagerRunner.createRpcService( - config, highAvailabilityServices, RPC_SYSTEM); - - assertThat(taskManagerRpcService.getPort()).isGreaterThanOrEqualTo(0); - assertThat(taskManagerRpcService.getAddress()).isEqualTo(taskmanagerHost); - } finally { - maybeCloseRpcService(taskManagerRpcService); - highAvailabilityServices.closeAndCleanupAllData(); - } - } - - @Test - void testTaskManagerRpcServiceShouldBindToHostnameAddress() throws Exception { - final Configuration config = createFlinkConfigWithHostBindPolicy(HostBindPolicy.NAME); - final HighAvailabilityServices highAvailabilityServices = - createHighAvailabilityServices(config); - - RpcService taskManagerRpcService = null; - try { - taskManagerRpcService = - TaskManagerRunner.createRpcService( - config, highAvailabilityServices, RPC_SYSTEM); - assertThat(taskManagerRpcService.getAddress()).isNotNull().isNotEmpty(); - } finally { - maybeCloseRpcService(taskManagerRpcService); - highAvailabilityServices.closeAndCleanupAllData(); - } - } - - @Test - void testTaskManagerRpcServiceShouldBindToIpAddressDeterminedByConnectingToResourceManager() - throws Exception { - final ServerSocket testJobManagerSocket = openServerSocket(); - final Configuration config = - createFlinkConfigWithJobManagerPort(testJobManagerSocket.getLocalPort()); - final HighAvailabilityServices highAvailabilityServices = - createHighAvailabilityServices(config); - - RpcService taskManagerRpcService = null; - try { - taskManagerRpcService = - TaskManagerRunner.createRpcService( - config, highAvailabilityServices, RPC_SYSTEM); - assertThat(taskManagerRpcService.getAddress()) - .matches( - value -> - (IPAddressUtil.isIPv4LiteralAddress(value) - || IPAddressUtil.isIPv6LiteralAddress(value))); - } finally { - maybeCloseRpcService(taskManagerRpcService); - highAvailabilityServices.closeAndCleanupAllData(); - IOUtils.closeQuietly(testJobManagerSocket); - } - } - - @Test - void testCreatingTaskManagerRpcServiceShouldFailIfRpcPortRangeIsInvalid() throws Exception { - final Configuration config = - new Configuration( - createFlinkConfigWithPredefinedTaskManagerHostname("example.org")); - config.setString(TaskManagerOptions.RPC_PORT, "-1"); - - final HighAvailabilityServices highAvailabilityServices = - createHighAvailabilityServices(config); - - try { - assertThatThrownBy( - () -> - TaskManagerRunner.createRpcService( - config, highAvailabilityServices, RPC_SYSTEM)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Invalid port range definition: -1"); - } finally { - highAvailabilityServices.closeAndCleanupAllData(); - } - } - - @Test - void testDefaultFsParameterLoading() throws Exception { - try { - final File tmpDir = - Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()) - .toFile(); - final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME); - - final URI defaultFS = new URI("otherFS", null, "localhost", 1234, null, null, null); - - final PrintWriter pw1 = new PrintWriter(confFile); - pw1.println("fs.default-scheme: " + defaultFS); - pw1.close(); - - String[] args = new String[] {"--configDir", tmpDir.toString()}; - Configuration configuration = TaskManagerRunner.loadConfiguration(args); - FileSystem.initialize(configuration); - - assertThat(defaultFS).isEqualTo(FileSystem.getDefaultFsUri()); - } finally { - // reset FS settings - FileSystem.initialize(new Configuration()); - } - } - - @Test - void testLoadDynamicalProperties() throws IOException, FlinkParseException { - final File tmpDir = - Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()).toFile(); - final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME); - final PrintWriter pw1 = new PrintWriter(confFile); - final long managedMemory = 1024 * 1024 * 256; - pw1.println(JobManagerOptions.ADDRESS.key() + ": localhost"); - pw1.println(TaskManagerOptions.MANAGED_MEMORY_SIZE.key() + ": " + managedMemory + "b"); - pw1.close(); - - final String jmHost = "host1"; - final int jmPort = 12345; - String[] args = - new String[] { - "--configDir", - tmpDir.toString(), - "-D" + JobManagerOptions.ADDRESS.key() + "=" + jmHost, - "-D" + JobManagerOptions.PORT.key() + "=" + jmPort - }; - Configuration configuration = TaskManagerRunner.loadConfiguration(args); - assertThat(MemorySize.parse(managedMemory + "b")) - .isEqualTo(configuration.get(TaskManagerOptions.MANAGED_MEMORY_SIZE)); - assertThat(jmHost).isEqualTo(configuration.get(JobManagerOptions.ADDRESS)); - assertThat(jmPort).isEqualTo(configuration.getInteger(JobManagerOptions.PORT)); - } - - @Test - void testNodeIdShouldBeConfiguredValueIfExplicitlySet() throws Exception { - String nodeId = "node1"; - Configuration configuration = new Configuration(); - configuration.set(TaskManagerOptionsInternal.TASK_MANAGER_NODE_ID, nodeId); - TaskManagerServicesConfiguration servicesConfiguration = - createTaskManagerServiceConfiguration(configuration); - assertThat(servicesConfiguration.getNodeId()).isEqualTo(nodeId); - } - - @Test - void testNodeIdShouldBeExternalAddressIfNotExplicitlySet() throws Exception { - TaskManagerServicesConfiguration servicesConfiguration = - createTaskManagerServiceConfiguration(new Configuration()); - assertThat(servicesConfiguration.getNodeId()) - .isEqualTo(InetAddress.getLocalHost().getHostName()); - } - - private TaskManagerServicesConfiguration createTaskManagerServiceConfiguration( - Configuration config) throws Exception { - return TaskManagerServicesConfiguration.fromConfiguration( - config, - ResourceID.generate(), - InetAddress.getLocalHost().getHostName(), - true, - TaskExecutorResourceUtils.resourceSpecFromConfigForLocalExecution(config), - WorkingDirectory.create( - Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()) - .toFile())); - } - - private static Configuration createFlinkConfigWithPredefinedTaskManagerHostname( - final String taskmanagerHost) { - final Configuration config = new Configuration(); - config.setString(TaskManagerOptions.HOST, taskmanagerHost); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - return new UnmodifiableConfiguration(config); - } - - private static Configuration createFlinkConfigWithHostBindPolicy( - final HostBindPolicy bindPolicy) { - final Configuration config = new Configuration(); - config.setString(TaskManagerOptions.HOST_BIND_POLICY, bindPolicy.toString()); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.set(AkkaOptions.LOOKUP_TIMEOUT_DURATION, Duration.ofMillis(10)); - return new UnmodifiableConfiguration(config); - } - - private static Configuration createFlinkConfigWithJobManagerPort(final int port) { - Configuration config = new Configuration(); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(JobManagerOptions.PORT, port); - return new UnmodifiableConfiguration(config); - } - - private HighAvailabilityServices createHighAvailabilityServices(final Configuration config) - throws Exception { - return HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.directExecutor(), - AddressResolution.NO_ADDRESS_RESOLUTION, - RpcSystem.load(), - NoOpFatalErrorHandler.INSTANCE); - } - - private static ServerSocket openServerSocket() { - try { - return new ServerSocket(0); - } catch (IOException e) { - throw new TestAbortedException("Skip test because could not open a server socket"); - } - } - - private static void maybeCloseRpcService(@Nullable final RpcService rpcService) - throws Exception { - if (rpcService != null) { - rpcService.stopService().get(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); - } - } -} diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml index 43a607ae1de96..bd395da65f07f 100644 --- a/flink-table/flink-sql-client/pom.xml +++ b/flink-table/flink-sql-client/pom.xml @@ -109,25 +109,25 @@ under the License. test - - org.apache.flink - flink-connector-hive_${scala.binary.version} - ${project.version} - test - - - org.apache.hadoop - hadoop-hdfs - - - - - - org.apache.flink - flink-python - ${project.version} - test - + + + + + + + + + + + + + + + + + + + org.apache.flink @@ -335,6 +335,10 @@ under the License. ${hive.version} test + + org.pentaho + pentaho-aggdesigner-algorithm + org.apache.hive hive-vector-code-gen diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/SqlParserHelper.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/SqlParserHelper.java index 2e931e00f5bc5..ceed66ff58ec1 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/SqlParserHelper.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/utils/SqlParserHelper.java @@ -22,8 +22,6 @@ import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentInternal; -import org.apache.flink.table.catalog.hive.HiveCatalog; -import org.apache.flink.table.catalog.hive.HiveTestUtils; import org.apache.flink.table.delegation.Parser; /** An utility class that provides pre-prepared tables and sql parser. */ @@ -41,12 +39,7 @@ public SqlParserHelper(SqlDialect sqlDialect) { useHiveCatalog = false; tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build()); } else if (SqlDialect.HIVE == sqlDialect) { - useHiveCatalog = true; - HiveCatalog hiveCatalog = HiveTestUtils.createHiveCatalog(); - tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build()); - tableEnv.getConfig().setSqlDialect(sqlDialect); - tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); - tableEnv.useCatalog(hiveCatalog.getName()); + throw new IllegalArgumentException("removed hive dialect"); } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java deleted file mode 100644 index 2de67ff74fc0c..0000000000000 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.client.gateway.local; - -import org.apache.flink.client.cli.DefaultCLI; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.sql.parser.hive.ddl.SqlCreateHiveTable; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.Column; -import org.apache.flink.table.catalog.CommonCatalogOptions; -import org.apache.flink.table.catalog.GenericInMemoryCatalog; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.ResolvedCatalogTable; -import org.apache.flink.table.catalog.ResolvedSchema; -import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; -import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; -import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; -import org.apache.flink.table.catalog.hive.HiveCatalog; -import org.apache.flink.table.catalog.hive.HiveTestUtils; -import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory; -import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions; -import org.apache.flink.table.client.gateway.Executor; -import org.apache.flink.table.client.gateway.context.DefaultContext; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.factories.CatalogFactory; -import org.apache.flink.table.factories.DynamicTableSourceFactory; -import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.types.DataType; -import org.apache.flink.types.Row; -import org.apache.flink.util.CollectionUtil; - -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Dependency tests for {@link LocalExecutor}. Mainly for testing classloading of dependencies. */ -public class DependencyTest { - - public static final String CONNECTOR_TYPE_VALUE = "test-connector"; - public static final String TEST_PROPERTY = "test-property"; - private static final String TEST_PROPERTY_VALUE = "test-value"; - - public static final String CATALOG_TYPE_TEST = "DependencyTest"; - - private static final List INIT_SQL = - Arrays.asList( - String.format( - "CREATE TABLE TableNumber1 (\n" - + " IntegerField1 INT,\n" - + " StringField1 STRING,\n" - + " rowtimeField TIMESTAMP(3),\n" - + " WATERMARK FOR rowtimeField AS rowtimeField\n" - + ") WITH (\n" - + " 'connector' = '%s',\n" - + " '%s' = '%s'\n" - + ")", - CONNECTOR_TYPE_VALUE, TEST_PROPERTY, TEST_PROPERTY_VALUE), - String.format( - "CREATE CATALOG TestCatalog WITH ('type' = '%s')", CATALOG_TYPE_TEST)); - - private static final String SESSION_ID = "test-session"; - - @Test - public void testTableFactoryDiscovery() throws Exception { - final LocalExecutor executor = createLocalExecutor(); - try { - final TableResult tableResult = - executeSql(executor, SESSION_ID, "DESCRIBE TableNumber1"); - assertThat( - ResolvedSchema.physical( - new String[] { - "name", "type", "null", "key", "extras", "watermark" - }, - new DataType[] { - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.BOOLEAN(), - DataTypes.STRING(), - DataTypes.STRING(), - DataTypes.STRING() - })) - .isEqualTo(tableResult.getResolvedSchema()); - List schemaData = - Arrays.asList( - Row.of("IntegerField1", "INT", true, null, null, null), - Row.of("StringField1", "STRING", true, null, null, null), - Row.of( - "rowtimeField", - "TIMESTAMP(3) *ROWTIME*", - true, - null, - null, - "`rowtimeField`")); - assertThat(CollectionUtil.iteratorToList(tableResult.collect())).isEqualTo(schemaData); - } finally { - executor.closeSession(SESSION_ID); - } - } - - @Test - public void testSqlParseWithUserClassLoader() throws Exception { - final LocalExecutor executor = createLocalExecutor(); - try { - Operation operation = - executor.parseStatement( - SESSION_ID, "SELECT IntegerField1, StringField1 FROM TableNumber1"); - - assertThat(operation).isInstanceOf(QueryOperation.class); - } finally { - executor.closeSession(SESSION_ID); - } - } - - private LocalExecutor createLocalExecutor() throws Exception { - // create default context - DefaultContext defaultContext = - new DefaultContext( - Collections.emptyList(), - new Configuration(), - Collections.singletonList(new DefaultCLI())); - LocalExecutor executor = new LocalExecutor(defaultContext); - executor.openSession(SESSION_ID); - for (String line : INIT_SQL) { - executor.executeOperation(SESSION_ID, executor.parseStatement(SESSION_ID, line)); - } - return executor; - } - - private TableResult executeSql(Executor executor, String sessionId, String sql) { - Operation operation = executor.parseStatement(sessionId, sql); - return executor.executeOperation(sessionId, operation); - } - - // -------------------------------------------------------------------------------------------- - - /** Table source that can be discovered if classloading is correct. */ - public static class TestTableSourceFactory implements DynamicTableSourceFactory { - - @Override - public String factoryIdentifier() { - return CONNECTOR_TYPE_VALUE; - } - - @Override - public Set> requiredOptions() { - return Collections.singleton( - ConfigOptions.key(CONNECTOR_TYPE_VALUE).stringType().noDefaultValue()); - } - - @Override - public Set> optionalOptions() { - return Collections.emptySet(); - } - - @Override - public DynamicTableSource createDynamicTableSource(Context context) { - return null; - } - } - - /** Catalog that can be discovered if classloading is correct. */ - public static class TestCatalogFactory implements CatalogFactory { - - private static final ConfigOption DEFAULT_DATABASE = - ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) - .stringType() - .defaultValue(GenericInMemoryCatalog.DEFAULT_DB); - - @Override - public String factoryIdentifier() { - return CATALOG_TYPE_TEST; - } - - @Override - public Set> requiredOptions() { - return Collections.emptySet(); - } - - @Override - public Set> optionalOptions() { - final Set> options = new HashSet<>(); - options.add(DEFAULT_DATABASE); - return options; - } - - @Override - public Catalog createCatalog(Context context) { - final Configuration configuration = Configuration.fromMap(context.getOptions()); - return new TestCatalog(context.getName(), configuration.getString(DEFAULT_DATABASE)); - } - } - - /** Test catalog. */ - public static class TestCatalog extends GenericInMemoryCatalog { - public TestCatalog(String name, String defaultDatabase) { - super(name, defaultDatabase); - } - } - - /** - * A test factory that is the same as {@link HiveCatalogFactory} except returning a {@link - * HiveCatalog} always with an embedded Hive metastore to test logic of {@link - * HiveCatalogFactory}. - */ - public static class TestHiveCatalogFactory extends HiveCatalogFactory { - public static final String ADDITIONAL_TEST_DATABASE = "additional_test_database"; - public static final String TEST_TABLE = "test_table"; - static final String TABLE_WITH_PARAMETERIZED_TYPES = "param_types_table"; - - @Override - public String factoryIdentifier() { - return "hive-test"; - } - - @Override - public Catalog createCatalog(Context context) { - final Configuration configuration = Configuration.fromMap(context.getOptions()); - - // Developers may already have their own production/testing hive-site.xml set in their - // environment, - // and Flink tests should avoid using those hive-site.xml. - // Thus, explicitly create a testing HiveConf for unit tests here - Catalog hiveCatalog = - HiveTestUtils.createHiveCatalog( - context.getName(), - configuration.getString(HiveCatalogFactoryOptions.HIVE_VERSION)); - - // Creates an additional database to test tableEnv.useDatabase() will switch current - // database of the catalog - hiveCatalog.open(); - try { - hiveCatalog.createDatabase( - ADDITIONAL_TEST_DATABASE, - new CatalogDatabaseImpl(new HashMap<>(), null), - false); - hiveCatalog.createTable( - new ObjectPath(ADDITIONAL_TEST_DATABASE, TEST_TABLE), - createResolvedTable( - new String[] {"testcol"}, new DataType[] {DataTypes.INT()}), - false); - // create a table to test parameterized types - hiveCatalog.createTable( - new ObjectPath("default", TABLE_WITH_PARAMETERIZED_TYPES), - createResolvedTable( - new String[] {"dec", "ch", "vch"}, - new DataType[] { - DataTypes.DECIMAL(10, 10), - DataTypes.CHAR(5), - DataTypes.VARCHAR(15) - }), - false); - } catch (DatabaseAlreadyExistException - | TableAlreadyExistException - | DatabaseNotExistException e) { - throw new CatalogException(e); - } - - return hiveCatalog; - } - - private ResolvedCatalogTable createResolvedTable( - String[] fieldNames, DataType[] fieldDataTypes) { - final Map options = new HashMap<>(); - options.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER); - final CatalogTable origin = - CatalogTable.of( - Schema.newBuilder().fromFields(fieldNames, fieldDataTypes).build(), - null, - Collections.emptyList(), - options); - final List resolvedColumns = - IntStream.range(0, fieldNames.length) - .mapToObj(i -> Column.physical(fieldNames[i], fieldDataTypes[i])) - .collect(Collectors.toList()); - return new ResolvedCatalogTable( - origin, new ResolvedSchema(resolvedColumns, Collections.emptyList(), null)); - } - } -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/AbstractHeapVector.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/AbstractHeapVector.java index d864e90ac6452..fc226cb408197 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/AbstractHeapVector.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/AbstractHeapVector.java @@ -31,7 +31,7 @@ public abstract class AbstractHeapVector extends AbstractWritableVector { public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; - public static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + public static final jdk.internal.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; public static final int BYTE_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); public static final int INT_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(int[].class); public static final int LONG_ARRAY_OFFSET = UNSAFE.arrayBaseOffset(long[].class); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java index d066fd940088b..92de21b0fefae 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java @@ -27,7 +27,7 @@ */ public class BinaryRowDataUtil { - public static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; + public static final jdk.internal.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE; public static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); public static final BinaryRowData EMPTY_ROW = new BinaryRowData(0); diff --git a/mvnw b/mvnw deleted file mode 100755 index 5643201c7d822..0000000000000 --- a/mvnw +++ /dev/null @@ -1,316 +0,0 @@ -#!/bin/sh -# ---------------------------------------------------------------------------- -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# ---------------------------------------------------------------------------- - -# ---------------------------------------------------------------------------- -# Maven Start Up Batch script -# -# Required ENV vars: -# ------------------ -# JAVA_HOME - location of a JDK home dir -# -# Optional ENV vars -# ----------------- -# M2_HOME - location of maven2's installed home dir -# MAVEN_OPTS - parameters passed to the Java VM when running Maven -# e.g. to debug Maven itself, use -# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 -# MAVEN_SKIP_RC - flag to disable loading of mavenrc files -# ---------------------------------------------------------------------------- - -if [ -z "$MAVEN_SKIP_RC" ] ; then - - if [ -f /usr/local/etc/mavenrc ] ; then - . /usr/local/etc/mavenrc - fi - - if [ -f /etc/mavenrc ] ; then - . /etc/mavenrc - fi - - if [ -f "$HOME/.mavenrc" ] ; then - . "$HOME/.mavenrc" - fi - -fi - -# OS specific support. $var _must_ be set to either true or false. -cygwin=false; -darwin=false; -mingw=false -case "`uname`" in - CYGWIN*) cygwin=true ;; - MINGW*) mingw=true;; - Darwin*) darwin=true - # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home - # See https://developer.apple.com/library/mac/qa/qa1170/_index.html - if [ -z "$JAVA_HOME" ]; then - if [ -x "/usr/libexec/java_home" ]; then - export JAVA_HOME="`/usr/libexec/java_home`" - else - export JAVA_HOME="/Library/Java/Home" - fi - fi - ;; -esac - -if [ -z "$JAVA_HOME" ] ; then - if [ -r /etc/gentoo-release ] ; then - JAVA_HOME=`java-config --jre-home` - fi -fi - -if [ -z "$M2_HOME" ] ; then - ## resolve links - $0 may be a link to maven's home - PRG="$0" - - # need this for relative symlinks - while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG="`dirname "$PRG"`/$link" - fi - done - - saveddir=`pwd` - - M2_HOME=`dirname "$PRG"`/.. - - # make it fully qualified - M2_HOME=`cd "$M2_HOME" && pwd` - - cd "$saveddir" - # echo Using m2 at $M2_HOME -fi - -# For Cygwin, ensure paths are in UNIX format before anything is touched -if $cygwin ; then - [ -n "$M2_HOME" ] && - M2_HOME=`cygpath --unix "$M2_HOME"` - [ -n "$JAVA_HOME" ] && - JAVA_HOME=`cygpath --unix "$JAVA_HOME"` - [ -n "$CLASSPATH" ] && - CLASSPATH=`cygpath --path --unix "$CLASSPATH"` -fi - -# For Mingw, ensure paths are in UNIX format before anything is touched -if $mingw ; then - [ -n "$M2_HOME" ] && - M2_HOME="`(cd "$M2_HOME"; pwd)`" - [ -n "$JAVA_HOME" ] && - JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" -fi - -if [ -z "$JAVA_HOME" ]; then - javaExecutable="`which javac`" - if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then - # readlink(1) is not available as standard on Solaris 10. - readLink=`which readlink` - if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then - if $darwin ; then - javaHome="`dirname \"$javaExecutable\"`" - javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" - else - javaExecutable="`readlink -f \"$javaExecutable\"`" - fi - javaHome="`dirname \"$javaExecutable\"`" - javaHome=`expr "$javaHome" : '\(.*\)/bin'` - JAVA_HOME="$javaHome" - export JAVA_HOME - fi - fi -fi - -if [ -z "$JAVACMD" ] ; then - if [ -n "$JAVA_HOME" ] ; then - if [ -x "$JAVA_HOME/jre/sh/java" ] ; then - # IBM's JDK on AIX uses strange locations for the executables - JAVACMD="$JAVA_HOME/jre/sh/java" - else - JAVACMD="$JAVA_HOME/bin/java" - fi - else - JAVACMD="`\\unset -f command; \\command -v java`" - fi -fi - -if [ ! -x "$JAVACMD" ] ; then - echo "Error: JAVA_HOME is not defined correctly." >&2 - echo " We cannot execute $JAVACMD" >&2 - exit 1 -fi - -if [ -z "$JAVA_HOME" ] ; then - echo "Warning: JAVA_HOME environment variable is not set." -fi - -CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher - -# traverses directory structure from process work directory to filesystem root -# first directory with .mvn subdirectory is considered project base directory -find_maven_basedir() { - - if [ -z "$1" ] - then - echo "Path not specified to find_maven_basedir" - return 1 - fi - - basedir="$1" - wdir="$1" - while [ "$wdir" != '/' ] ; do - if [ -d "$wdir"/.mvn ] ; then - basedir=$wdir - break - fi - # workaround for JBEAP-8937 (on Solaris 10/Sparc) - if [ -d "${wdir}" ]; then - wdir=`cd "$wdir/.."; pwd` - fi - # end of workaround - done - echo "${basedir}" -} - -# concatenates all lines of a file -concat_lines() { - if [ -f "$1" ]; then - echo "$(tr -s '\n' ' ' < "$1")" - fi -} - -BASE_DIR=`find_maven_basedir "$(pwd)"` -if [ -z "$BASE_DIR" ]; then - exit 1; -fi - -########################################################################################## -# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central -# This allows using the maven wrapper in projects that prohibit checking in binary data. -########################################################################################## -if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then - if [ "$MVNW_VERBOSE" = true ]; then - echo "Found .mvn/wrapper/maven-wrapper.jar" - fi -else - if [ "$MVNW_VERBOSE" = true ]; then - echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." - fi - if [ -n "$MVNW_REPOURL" ]; then - jarUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar" - else - jarUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.0/maven-wrapper-3.1.0.jar" - fi - while IFS="=" read key value; do - case "$key" in (wrapperUrl) jarUrl="$value"; break ;; - esac - done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" - if [ "$MVNW_VERBOSE" = true ]; then - echo "Downloading from: $jarUrl" - fi - wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" - if $cygwin; then - wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` - fi - - if command -v wget > /dev/null; then - if [ "$MVNW_VERBOSE" = true ]; then - echo "Found wget ... using wget" - fi - if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then - wget "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" - else - wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" - fi - elif command -v curl > /dev/null; then - if [ "$MVNW_VERBOSE" = true ]; then - echo "Found curl ... using curl" - fi - if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then - curl -o "$wrapperJarPath" "$jarUrl" -f - else - curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f - fi - - else - if [ "$MVNW_VERBOSE" = true ]; then - echo "Falling back to using Java to download" - fi - javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" - # For Cygwin, switch paths to Windows format before running javac - if $cygwin; then - javaClass=`cygpath --path --windows "$javaClass"` - fi - if [ -e "$javaClass" ]; then - if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then - if [ "$MVNW_VERBOSE" = true ]; then - echo " - Compiling MavenWrapperDownloader.java ..." - fi - # Compiling the Java class - ("$JAVA_HOME/bin/javac" "$javaClass") - fi - if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then - # Running the downloader - if [ "$MVNW_VERBOSE" = true ]; then - echo " - Running MavenWrapperDownloader.java ..." - fi - ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") - fi - fi - fi -fi -########################################################################################## -# End of extension -########################################################################################## - -export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} -if [ "$MVNW_VERBOSE" = true ]; then - echo $MAVEN_PROJECTBASEDIR -fi -MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" - -# For Cygwin, switch paths to Windows format before running java -if $cygwin; then - [ -n "$M2_HOME" ] && - M2_HOME=`cygpath --path --windows "$M2_HOME"` - [ -n "$JAVA_HOME" ] && - JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` - [ -n "$CLASSPATH" ] && - CLASSPATH=`cygpath --path --windows "$CLASSPATH"` - [ -n "$MAVEN_PROJECTBASEDIR" ] && - MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` -fi - -# Provide a "standardized" way to retrieve the CLI args that will -# work with both Windows and non-Windows executions. -MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" -export MAVEN_CMD_LINE_ARGS - -WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain - -exec "$JAVACMD" \ - $MAVEN_OPTS \ - $MAVEN_DEBUG_OPTS \ - -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ - "-Dmaven.home=${M2_HOME}" \ - "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ - ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/pom.xml b/pom.xml index d52b9fbca1f3a..4943f010df6a2 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,17 @@ under the License. false + + mvnrepository + http://repo1.maven.org/maven2 + + false + + + true + + + @@ -67,39 +78,39 @@ under the License. flink-architecture-tests flink-core flink-java - flink-scala - flink-filesystems + + flink-rpc flink-runtime flink-runtime-web flink-optimizer flink-streaming-java - flink-streaming-scala + flink-connectors flink-formats - flink-examples + flink-clients flink-container flink-queryable-state - flink-tests + flink-end-to-end-tests flink-test-utils-parent flink-state-backends flink-dstl flink-libraries flink-table - flink-quickstart - flink-contrib - flink-dist - flink-dist-scala + + + + flink-metrics flink-yarn - flink-yarn-tests - flink-fs-tests - flink-docs - flink-python - flink-walkthroughs - flink-kubernetes + + + + + + flink-external-resources tools/ci/flink-ci-tools @@ -1046,11 +1057,13 @@ under the License. 11 11 + --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.management/sun.management=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED + --add-exports=java.base/jdk.internal.misc=ALL-UNNAMED