From a31fe6bbd4119f1ee736cb4f43af75d2f728982b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mateusz=20S=C5=82abek?= Date: Wed, 2 Oct 2024 16:47:02 +0200 Subject: [PATCH] add cross compilation with scala 2.13 and remove compatibility test with flink 1.14 --- build.sbt | 70 ++----------------- .../common/BaseGenericITSpec.scala | 2 - .../common/StreamingDockerTest.scala | 2 +- ...ssknacker.engine.DeploymentManagerProvider | 1 - .../manager/src/it/resources/application.conf | 19 ----- .../manager/src/it/resources/logback-test.xml | 19 ----- ...ink114StreamingDeploymentManagerSpec.scala | 15 ---- ...i.typeinformation.TypeInformationDetection | 1 - ....engine.process.FlinkCompatibilityProvider | 1 - ...14StreamExecutionEnvPreparerProvider.scala | 39 ----------- .../PreFlink119TypeInformationDetection.scala | 18 ----- ...ker.engine.api.component.ComponentProvider | 1 - .../model/src/test/resources/application.conf | 10 --- .../executors/PipelineExecutorUtils.java | 63 ----------------- .../compatibility/Flink114GenericITSpec.scala | 6 -- .../Flink114SchemaCompatibilityTest.scala | 5 -- .../compatibility/Flink114TimestampTest.scala | 16 ----- .../nussknacker/compatibility/FlinkSpec.scala | 38 ---------- project/build.properties | 2 +- 19 files changed, 7 insertions(+), 321 deletions(-) delete mode 100644 flink114/manager/src/it/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider delete mode 100644 flink114/manager/src/it/resources/application.conf delete mode 100644 flink114/manager/src/it/resources/logback-test.xml delete mode 100644 flink114/manager/src/it/scala/pl/touk/nussknacker/engine/management/streaming/Flink114StreamingDeploymentManagerSpec.scala delete mode 100644 flink114/model/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection delete mode 100644 flink114/model/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.process.FlinkCompatibilityProvider delete mode 100644 flink114/model/src/main/scala/pl/touk/nussknacker/compatibility/Flink114StreamExecutionEnvPreparerProvider.scala delete mode 100644 flink114/model/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/PreFlink119TypeInformationDetection.scala delete mode 100644 flink114/model/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider delete mode 100644 flink114/model/src/test/resources/application.conf delete mode 100644 flink114/model/src/test/scala/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java delete mode 100644 flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114GenericITSpec.scala delete mode 100644 flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114SchemaCompatibilityTest.scala delete mode 100644 flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114TimestampTest.scala delete mode 100644 flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/FlinkSpec.scala diff --git a/build.sbt b/build.sbt index 9895cc4..bb3f3e9 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ val scala213V = "2.13.12" lazy val supportedScalaVersions = List(scala212V, scala213V) lazy val defaultScalaV = sys.env.get("NUSSKNACKER_SCALA_VERSION") match { - case None | Some("2.12") => scala212V + case None | Some("2.12") => scala213V case Some("2.13") => scala213V case Some(unsupported) => throw new IllegalArgumentException(s"Nu doesn't support $unsupported Scala version") } @@ -20,7 +20,6 @@ val scalaCollectionsCompatV = "2.9.0" val silencerV_2_12 = "1.6.0" val silencerV = "1.7.17" -val flink114V = "1.14.5" val flink116V = "1.16.0" val currentFlinkV = "1.19.1" val sttpV = "3.8.11" @@ -154,7 +153,7 @@ lazy val commonTest = (project in file("commonTest")) ExclusionRule("org.apache.flink", "flink-scala_2.12"), ), "pl.touk.nussknacker" %% "nussknacker-flink-executor" % nussknackerV, - "org.apache.flink" %% "flink-streaming-scala" % currentFlinkV % "provided", + "org.apache.flink" % "flink-streaming-java" % currentFlinkV % "provided", "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersScalaV, "pl.touk.nussknacker" %% "nussknacker-flink-manager" % nussknackerV excludeAll ( ExclusionRule("org.apache.flink", "flink-scala_2.12"), @@ -170,40 +169,6 @@ lazy val commonTest = (project in file("commonTest")) ) .dependsOn(flink116KafkaComponents) -lazy val flink114ModelCompat = (project in file("flink114/model")) - .settings(commonSettings) - .settings(flinkSettingsCommonForBefore1_15(flink114V)) - .settings( - name := "nussknacker-flink-1-14-model", - libraryDependencies ++= deps(flink114V), - dependencyOverrides ++= Seq( - "org.apache.kafka" % "kafka-clients" % kafkaV, - "org.apache.kafka" %% "kafka" % kafkaV - ) - ) - .dependsOn(commonTest % Test) - -lazy val flink114ManagerCompat = (project in file("flink114/manager")) - .settings(commonSettings) - .configs(IntegrationTest) - .settings(Defaults.itSettings) - .settings(flinkSettingsCommonForBefore1_15(flink114V)) - .settings( - name := "nussknacker-flink-1-14-manager", - libraryDependencies ++= managerDeps(flink114V), - dependencyOverrides ++= Seq( - // For some strange reason, docker client libraries have conflict with schema registry client :/ - "org.glassfish.jersey.core" % "jersey-common" % "2.22.2", - // must be the same as used by flink - otherwise it is evicted by version from deployment-manager-api - "com.typesafe.akka" %% "akka-actor" % "2.6.20", - "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" - ), - IntegrationTest / Keys.test := (IntegrationTest / Keys.test) - .dependsOn(flink114ModelCompat / Compile / assembly) - .value, - ) - .dependsOn(commonTest % IntegrationTest) - lazy val flink116ModelCompat = (project in file("flink116/model")) .settings(commonSettings) .settings(publishSettings) @@ -258,29 +223,6 @@ lazy val flink116KafkaComponents = (project in file("flink116/kafka-components") ) ) -def flinkExclusionsForBefore1_15 = Seq( - "org.apache.flink" % "flink-streaming-java", - "org.apache.flink" % "flink-statebackend-rocksdb", - "org.apache.flink" % "flink-connector-kafka", - "org.apache.flink" % "flink-test-utils" -) - -def flinkDependenciesCommonForBefore1_15(version: String) = Seq( - "org.apache.flink" %% "flink-connector-kafka" % version % "provided", - "org.apache.flink" % "flink-runtime" % version % "provided", - "org.apache.flink" %% "flink-test-utils" % version % "provided", - "org.apache.flink" % "flink-statebackend-rocksdb" % version % "provided" -) - -def flinkOverridesCommonForBefore1_15(version: String) = - flinkDependenciesCommonForBefore1_15(version) - -def flinkSettingsCommonForBefore1_15(version: String) = Seq( - excludeDependencies ++= flinkExclusionsForBefore1_15, - libraryDependencies ++= flinkDependenciesCommonForBefore1_15(version), - dependencyOverrides ++= flinkOverrides(version) ++ flinkOverridesCommonForBefore1_15(version) -) - def managerDeps(version: String) = Seq( "pl.touk.nussknacker" %% "nussknacker-flink-manager" % nussknackerV excludeAll ( ExclusionRule("org.apache.flink", "flink-scala_2.12"), @@ -288,7 +230,7 @@ def managerDeps(version: String) = Seq( "pl.touk.nussknacker" %% "nussknacker-http-utils" % nussknackerV % "provided,it,test", "pl.touk.nussknacker" %% "nussknacker-scenario-compiler" % nussknackerV % "provided,it,test", "pl.touk.nussknacker" %% "nussknacker-deployment-manager-api" % nussknackerV % "provided", - "org.apache.flink" %% "flink-streaming-scala" % version excludeAll ( + "org.apache.flink" % "flink-streaming-java" % version excludeAll ( ExclusionRule("log4j", "log4j"), ExclusionRule("org.slf4j", "slf4j-log4j12"), ), @@ -304,7 +246,7 @@ def deps(version: String) = Seq( "pl.touk.nussknacker" %% "nussknacker-flink-base-unbounded-components" % nussknackerV, "pl.touk.nussknacker" %% "nussknacker-flink-executor" % nussknackerV, "pl.touk.nussknacker" %% "nussknacker-flink-test-utils" % nussknackerV % "test", - "org.apache.flink" %% "flink-streaming-scala" % version % "test, provided", + "org.apache.flink" % "flink-streaming-java" % version % "test, provided", // in normal deployment (from designer) flink-metrics-dropwizard and flink-metrics-dropwizard-core should be replaced in flink-dropwizard-metrics-deps directory in container/distribution "org.apache.flink" % "flink-metrics-dropwizard" % version, ) @@ -319,7 +261,7 @@ def flinkOverrides(version: String) = Seq( "org.apache.flink" % "flink-runtime" % version % "provided", "org.apache.flink" % "flink-test-utils" % version % "provided", "org.apache.flink" % "flink-statebackend-rocksdb" % version % "provided", - "org.apache.flink" %% "flink-connector-kafka" % version % "provided", + "org.apache.flink" % "flink-connector-kafka" % version % "provided", "org.apache.flink" % "flink-metrics-dropwizard" % version % "test", ) @@ -351,8 +293,6 @@ def nussknackerAssemblyStrategy: String => MergeStrategy = { lazy val modules: List[ProjectReference] = List[ProjectReference]( flink116KafkaComponents, - flink114ManagerCompat, - flink114ModelCompat, flink116ManagerCompat, flink116ModelCompat, commonTest diff --git a/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseGenericITSpec.scala b/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseGenericITSpec.scala index 67ac352..2a0b614 100644 --- a/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseGenericITSpec.scala +++ b/commonTest/src/main/scala/pl/touk/nussknacker/compatibility/common/BaseGenericITSpec.scala @@ -234,8 +234,6 @@ trait BaseGenericITSpec extends AnyFunSuiteLike with Matchers with KafkaSpec wit } } - private def parseJson(str: String) = io.circe.parser.parse(str).right.get - private def consumeOneRawAvroMessage(topic: String) = { val consumer = kafkaClient.createConsumer() consumer.consumeWithConsumerRecord(topic, secondsToWaitForAvro).head diff --git a/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/StreamingDockerTest.scala b/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/StreamingDockerTest.scala index b35ed5d..687be86 100644 --- a/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/StreamingDockerTest.scala +++ b/commonTest/src/main/scala/pl/touk/nussknacker/engine/management/common/StreamingDockerTest.scala @@ -64,7 +64,7 @@ trait StreamingDockerTest } private def prepareVolumeDir(): Path = { - import scala.collection.JavaConverters._ + import scala.jdk.CollectionConverters._ Files.createTempDirectory( "dockerTest", PosixFilePermissions.asFileAttribute(PosixFilePermission.values().toSet[PosixFilePermission].asJava) diff --git a/flink114/manager/src/it/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider b/flink114/manager/src/it/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider deleted file mode 100644 index 5293ee2..0000000 --- a/flink114/manager/src/it/resources/META-INF/services/pl.touk.nussknacker.engine.DeploymentManagerProvider +++ /dev/null @@ -1 +0,0 @@ -pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider diff --git a/flink114/manager/src/it/resources/application.conf b/flink114/manager/src/it/resources/application.conf deleted file mode 100644 index ba0d8be..0000000 --- a/flink114/manager/src/it/resources/application.conf +++ /dev/null @@ -1,19 +0,0 @@ - -deploymentConfig { - type: "test" - restUrl: "http://localhost:8081" - queryableStateProxyUrl: "localhost:9069" -} - -modelConfig { - rocksDB: { - enable: false - } - kafka { - kafkaAddress: "dummy:9092" - kafkaProperties { - "schema.registry.url": "http://dummy:1111" - } - } -} - diff --git a/flink114/manager/src/it/resources/logback-test.xml b/flink114/manager/src/it/resources/logback-test.xml deleted file mode 100644 index 0c85e88..0000000 --- a/flink114/manager/src/it/resources/logback-test.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - - - - - - - diff --git a/flink114/manager/src/it/scala/pl/touk/nussknacker/engine/management/streaming/Flink114StreamingDeploymentManagerSpec.scala b/flink114/manager/src/it/scala/pl/touk/nussknacker/engine/management/streaming/Flink114StreamingDeploymentManagerSpec.scala deleted file mode 100644 index b794b27..0000000 --- a/flink114/manager/src/it/scala/pl/touk/nussknacker/engine/management/streaming/Flink114StreamingDeploymentManagerSpec.scala +++ /dev/null @@ -1,15 +0,0 @@ -package pl.touk.nussknacker.engine.management.streaming - -import pl.touk.nussknacker.engine.management.FlinkStreamingDeploymentManagerProvider -import pl.touk.nussknacker.engine.management.common.CommonFlinkStreamingDeploymentManagerSpec -import pl.touk.nussknacker.engine.util.config.ScalaMajorVersionConfig - -class Flink114StreamingDeploymentManagerSpec extends CommonFlinkStreamingDeploymentManagerSpec { - override protected def classPath: String = - s"./flink114/model/target/scala-${ScalaMajorVersionConfig.scalaMajorVersion}/nussknacker-flink-1-14-model-assembly.jar" - - override protected def deploymentManagerProvider: FlinkStreamingDeploymentManagerProvider = - new FlinkStreamingDeploymentManagerProvider() - - override protected val flinkVersion: String = "1.14.5" -} diff --git a/flink114/model/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection b/flink114/model/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection deleted file mode 100644 index c66f955..0000000 --- a/flink114/model/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.flink.api.typeinformation.TypeInformationDetection +++ /dev/null @@ -1 +0,0 @@ -pl.touk.nussknacker.engine.flink.api.typeinformation.PreFlink119TypeInformationDetection diff --git a/flink114/model/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.process.FlinkCompatibilityProvider b/flink114/model/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.process.FlinkCompatibilityProvider deleted file mode 100644 index 019dae0..0000000 --- a/flink114/model/src/main/resources/META-INF/services/pl.touk.nussknacker.engine.process.FlinkCompatibilityProvider +++ /dev/null @@ -1 +0,0 @@ -pl.touk.nussknacker.compatibility.Flink114StreamExecutionEnvPreparerProvider \ No newline at end of file diff --git a/flink114/model/src/main/scala/pl/touk/nussknacker/compatibility/Flink114StreamExecutionEnvPreparerProvider.scala b/flink114/model/src/main/scala/pl/touk/nussknacker/compatibility/Flink114StreamExecutionEnvPreparerProvider.scala deleted file mode 100644 index 44b28ca..0000000 --- a/flink114/model/src/main/scala/pl/touk/nussknacker/compatibility/Flink114StreamExecutionEnvPreparerProvider.scala +++ /dev/null @@ -1,39 +0,0 @@ -package pl.touk.nussknacker.compatibility - -import com.typesafe.scalalogging.LazyLogging -import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders -import org.apache.flink.streaming.api.datastream.{DataStream, SingleOutputStreamOperator} -import org.apache.flink.util.OutputTag -import pl.touk.nussknacker.engine.process.registrar.{DefaultStreamExecutionEnvPreparer, StreamExecutionEnvPreparer} -import pl.touk.nussknacker.engine.process.{ExecutionConfigPreparer, FlinkCompatibilityProvider, FlinkJobConfig} - -class Flink114StreamExecutionEnvPreparerProvider extends FlinkCompatibilityProvider with LazyLogging { - - override def createExecutionEnvPreparer( - jobConfig: FlinkJobConfig, - executionConfigPreparer: ExecutionConfigPreparer - ): StreamExecutionEnvPreparer = { - - new DefaultStreamExecutionEnvPreparer(jobConfig, executionConfigPreparer) { - - override def flinkClassLoaderSimulation: ClassLoader = { - FlinkUserCodeClassLoaders.childFirst( - Array.empty, - Thread.currentThread().getContextClassLoader, - Array.empty, - (t: Throwable) => throw t, - true - ) - } - - override def sideOutputGetter[T]( - singleOutputStreamOperator: SingleOutputStreamOperator[_], - outputTag: OutputTag[T] - ): DataStream[T] = { - singleOutputStreamOperator.getSideOutput(outputTag) - } - - } - } - -} diff --git a/flink114/model/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/PreFlink119TypeInformationDetection.scala b/flink114/model/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/PreFlink119TypeInformationDetection.scala deleted file mode 100644 index e28f20b..0000000 --- a/flink114/model/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/PreFlink119TypeInformationDetection.scala +++ /dev/null @@ -1,18 +0,0 @@ -package pl.touk.nussknacker.engine.flink.api.typeinformation - -import org.apache.flink.api.common.typeutils.{CompositeTypeSerializerUtil, TypeSerializer, TypeSerializerSnapshot} -import pl.touk.nussknacker.engine.process.typeinformation.TypingResultAwareTypeInformationDetection - -class PreFlink119TypeInformationDetection extends TypingResultAwareTypeInformationDetection { - - override protected def constructIntermediateCompatibilityResult( - newNestedSerializers: Array[TypeSerializer[_]], - oldNestedSerializerSnapshots: Array[TypeSerializerSnapshot[_]] - ): CompositeTypeSerializerUtil.IntermediateCompatibilityResult[Nothing] = - CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult( - newNestedSerializers, - oldNestedSerializerSnapshots - ) - - override def priority: Int = 0 -} diff --git a/flink114/model/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider b/flink114/model/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider deleted file mode 100644 index 919bf8b..0000000 --- a/flink114/model/src/test/resources/META-INF/services/pl.touk.nussknacker.engine.api.component.ComponentProvider +++ /dev/null @@ -1 +0,0 @@ -pl.touk.nussknacker.compatibility.common.MockKafkaComponentProvider diff --git a/flink114/model/src/test/resources/application.conf b/flink114/model/src/test/resources/application.conf deleted file mode 100644 index e0c2bf1..0000000 --- a/flink114/model/src/test/resources/application.conf +++ /dev/null @@ -1,10 +0,0 @@ -checkpointConfig { - checkpointInterval: 10s -} -timeout: 10s -asyncExecutionConfig { - bufferSize: 200 - workers: 8 -} - -components.mockKafka.disabled: true diff --git a/flink114/model/src/test/scala/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink114/model/src/test/scala/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java deleted file mode 100644 index 0eda5d9..0000000 --- a/flink114/model/src/test/scala/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.client.deployment.executors; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.dag.Pipeline; -import org.apache.flink.client.FlinkPipelineTranslationUtil; -import org.apache.flink.client.cli.ExecutionConfigAccessor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptionsInternal; -import org.apache.flink.runtime.jobgraph.JobGraph; - -import javax.annotation.Nonnull; - -import java.net.MalformedURLException; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -// the class is copied from flink release-1.14 and pathed because we bind in MiniClusterExecutionEnvironment to flink16+ version with getJobGraph(Pipeline, Configuration, ClassLoader) -// but the method in flink-1.14 doesn't have ClassLoader argument which leads to binary incompatibility -public class PipelineExecutorUtils { - - public static JobGraph getJobGraph( - @Nonnull final Pipeline pipeline, - @Nonnull final Configuration configuration, - @Nonnull ClassLoader userClassloader // dummy parameter <- the only one line changed - ) throws MalformedURLException { - checkNotNull(pipeline); - checkNotNull(configuration); - - final ExecutionConfigAccessor executionConfigAccessor = - ExecutionConfigAccessor.fromConfiguration(configuration); - final JobGraph jobGraph = - FlinkPipelineTranslationUtil.getJobGraph( - pipeline, configuration, executionConfigAccessor.getParallelism()); - - configuration - .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) - .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID))); - - jobGraph.addJars(executionConfigAccessor.getJars()); - jobGraph.setClasspaths(executionConfigAccessor.getClasspaths()); - jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings()); - - return jobGraph; - } -} diff --git a/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114GenericITSpec.scala b/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114GenericITSpec.scala deleted file mode 100644 index ddcc3eb..0000000 --- a/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114GenericITSpec.scala +++ /dev/null @@ -1,6 +0,0 @@ -package pl.touk.nussknacker.compatibility - -import org.scalatest.funsuite.AnyFunSuite -import pl.touk.nussknacker.compatibility.common.BaseGenericITSpec - -class Flink114GenericITSpec extends AnyFunSuite with BaseGenericITSpec with FlinkSpec diff --git a/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114SchemaCompatibilityTest.scala b/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114SchemaCompatibilityTest.scala deleted file mode 100644 index 3367022..0000000 --- a/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114SchemaCompatibilityTest.scala +++ /dev/null @@ -1,5 +0,0 @@ -package pl.touk.nussknacker.compatibility - -import pl.touk.nussknacker.compatibility.common.BaseSchemaCompatibilityTest - -class Flink114SchemaCompatibilityTest extends BaseSchemaCompatibilityTest diff --git a/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114TimestampTest.scala b/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114TimestampTest.scala deleted file mode 100644 index 3b0d62c..0000000 --- a/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/Flink114TimestampTest.scala +++ /dev/null @@ -1,16 +0,0 @@ -package pl.touk.nussknacker.compatibility - -import org.scalatest.matchers.should.Matchers -import pl.touk.nussknacker.compatibility.common.BaseTimestampTest -import pl.touk.nussknacker.engine.process.helpers.TestResultsHolder - -import java.lang - -class Flink114TimestampTest extends BaseTimestampTest with FlinkSpec with Matchers { - override protected val sinkForLongsResultsHolder: () => TestResultsHolder[lang.Long] = - () => Flink114TimestampTest.sinkForLongsResultsHolder -} - -object Flink114TimestampTest extends Serializable { - private val sinkForLongsResultsHolder = new TestResultsHolder[java.lang.Long] -} diff --git a/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/FlinkSpec.scala b/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/FlinkSpec.scala deleted file mode 100644 index a8e3e72..0000000 --- a/flink114/model/src/test/scala/pl/touk/nussknacker/compatibility/FlinkSpec.scala +++ /dev/null @@ -1,38 +0,0 @@ -package pl.touk.nussknacker.compatibility - -import org.apache.flink.configuration.{Configuration, CoreOptions} -import org.scalatest.{BeforeAndAfterAll, Suite} -import pl.touk.nussknacker.engine.flink.test.FlinkMiniClusterHolder.AdditionalEnvironmentConfig -import pl.touk.nussknacker.engine.flink.test.{FlinkMiniClusterHolder, FlinkTestConfiguration} - -trait FlinkSpec extends BeforeAndAfterAll { - self: Suite => - - var flinkMiniCluster: FlinkMiniClusterHolder = _ - - override protected def beforeAll(): Unit = { - super.beforeAll() - - val userFlinkClusterConfig = prepareFlinkConfiguration() - userFlinkClusterConfig.set[java.lang.Boolean](CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true) - flinkMiniCluster = FlinkMiniClusterHolder(userFlinkClusterConfig, prepareEnvConfig()) - flinkMiniCluster.start() - } - - protected def prepareEnvConfig(): AdditionalEnvironmentConfig = { - AdditionalEnvironmentConfig() - } - - protected def prepareFlinkConfiguration(): Configuration = { - FlinkTestConfiguration.configuration() - } - - override protected def afterAll(): Unit = { - try { - flinkMiniCluster.stop() - } finally { - super.afterAll() - } - } - -} diff --git a/project/build.properties b/project/build.properties index 0b699c3..3829f19 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.10.2 +sbt.version=1.10.2 \ No newline at end of file