diff --git a/pom.xml b/pom.xml
index 5bf772d87..26c0daeab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,6 +31,9 @@
schedoscope-confschedoscope-exportschedoscope-core
+ schedoscope-transformation-oozie
+ schedoscope-transformation-pig
+ schedoscope-transformation-shellschedoscope-tutorialschedoscope-metascope
diff --git a/schedoscope-conf/src/main/resources/reference.conf b/schedoscope-conf/src/main/resources/reference.conf
index 9e1ddb798..f1466cea8 100644
--- a/schedoscope-conf/src/main/resources/reference.conf
+++ b/schedoscope-conf/src/main/resources/reference.conf
@@ -519,65 +519,6 @@ schedoscope {
},
- #
- # Settings for Pig transformations
- #
-
- pig: {
-
- #
- # Class implementing the Pig driver
- #
-
- driverClassName = "org.schedoscope.scheduler.driver.PigDriver"
-
- #
- # Location where to put Pig library jar in HDFS
- #
-
- location = "/tmp/soda/pig/"
-
- #
- # Ignored
- #
-
- libDirectory = ""
-
- #
- # Ignored.
- #
-
- url = ""
-
- #
- # Do not change. Pig jars should not be unpacked in HDFS.
- #
-
- unpack = false
-
- #
- # Number of parallel Driver actors to use for executing Pig
- # transformations
- #
-
- concurrency = 10
-
- #
- # Timeout for Pig transformations.
- #
-
- timeout = 1 day
-
- #
- # The handlers being notified after each driver run has
- # finished (succeeded or failed). These must implement the
- # trait org.schedoscope.scheduler.driver.DriverRunCompletionHandler
- #
-
- driverRunCompletionHandlers = ["org.schedoscope.scheduler.driver.DoNothingCompletionHandler"]
-
- },
-
#
# Settings for MapReduce transformations
#
@@ -638,67 +579,6 @@ schedoscope {
},
- #
- # Oozie Driver settings
- #
-
- oozie: {
-
- #
- # Class implementing the Oozie driver
- #
-
- driverClassName = "org.schedoscope.scheduler.driver.OozieDriver"
-
- #
- # Where to put Oozie bundles in HDFS
- #
-
- location = "/tmp/schedoscope/oozie/"
-
- #
- # Comma-separated list of directories where additional Oozie workflow
- # bundle jars can be found that are to be put onto HDFS when launching
- # Schedoscope.
- #
-
- libDirectory = ""
-
- #
- # URL of Oozie Server
- #
-
- url = "http://localhost:11000/oozie"
-
- #
- # Number of parallel Driver actors to use for executing Oozie
- # transformations
- #
-
- concurrency = 10
-
- #
- # Oozie bundle jars need to be unpacked in HDFS.
- #
-
- unpack = true
-
- #
- # Timeout for Oozie transformations
- #
-
- timeout = 1 day
-
- #
- # The handlers being notified after each driver run has
- # finished (succeeded or failed). These must implement the
- # trait org.schedoscope.scheduler.driver.DriverRunCompletionHandler
- #
-
- driverRunCompletionHandlers = ["org.schedoscope.scheduler.driver.DoNothingCompletionHandler"]
-
- },
-
#
# File system driver settings
#
@@ -815,65 +695,7 @@ schedoscope {
driverRunCompletionHandlers = ["org.schedoscope.scheduler.driver.DoNothingCompletionHandler"]
- },
-
- #
- # Shell driver settings
- #
-
- shell: {
-
- #
- # Class implementing the Shell driver
- #
-
- driverClassName = "org.schedoscope.scheduler.driver.ShellDriver"
-
- #
- # Number of parallel Shell Driver actors to use
- #
-
- concurrency = 1
-
- #
- # Ignored
- #
-
- location = "/"
-
- #
- # Ignored
- #
-
- libDirectory = ""
-
- #
- # Ignored
- #
-
- url = ""
-
- #
- # Ignored
- #
-
- unpack = false
-
- #
- # Timeout for Shell transformations
- #
-
- timeout = 1 day
-
- #
- # The handlers being notified after each driver run has
- # finished (succeeded or failed). These must implement the
- # trait org.schedoscope.scheduler.driver.DriverRunCompletionHandler
- #
-
- driverRunCompletionHandlers = ["org.schedoscope.scheduler.driver.DoNothingCompletionHandler"]
}
-
}
}
diff --git a/schedoscope-core/pom.xml b/schedoscope-core/pom.xml
index 62627b188..44ecb8add 100644
--- a/schedoscope-core/pom.xml
+++ b/schedoscope-core/pom.xml
@@ -51,6 +51,11 @@
schedoscope-conf${schedoscope.version}
+
+ guava
+ com.google.guava
+ 11.0
+ joda-timejoda-time
@@ -120,6 +125,12 @@
com.typesafe.akkaakka-contrib_${scala.version}${akka.version}
+
+
+ guava
+ com.google.guava
+
+ com.typesafe.akka
@@ -159,6 +170,10 @@
slf4j-apiorg.slf4j
+
+ guava
+ com.google.guava
+
@@ -174,20 +189,9 @@
slf4j-apiorg.slf4j
-
-
-
- org.apache.hadoop
- hadoop-minicluster
- ${hadoop.version}
-
- slf4j-log4j12
- org.slf4j
-
-
- slf4j-api
- org.slf4j
+ guava
+ com.google.guava
@@ -201,71 +205,9 @@
slf4j-apiorg.slf4j
-
-
-
- org.apache.pig
- pig
- ${pig.version}
-
- slf4j-api
- org.slf4j
-
-
- slf4j-log4j12
- org.slf4j
-
-
-
-
- com.twitter
- parquet-pig-bundle
- ${pig.parquet.version}
-
-
- org.apache.hive.hcatalog
- hive-hcatalog-pig-adapter
- ${hive.hcatalog.version}
-
-
- slf4j-log4j12
- org.slf4j
-
-
- slf4j-api
- org.slf4j
-
-
-
-
- org.apache.oozie
- oozie-client
- ${oozie.version}
-
-
- slf4j-api
- org.slf4j
-
-
- slf4j-simple
- org.slf4j
-
-
-
-
- minioozie
- minioozie
- ${minioozie.version}
- provided
-
-
- slf4j-log4j12
- org.slf4j
-
-
- slf4j-api
- org.slf4j
+ guava
+ com.google.guava
@@ -298,6 +240,10 @@
slf4j-apiorg.slf4j
+
+ guava
+ com.google.guava
+
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/FilesystemTransformation.scala b/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/FilesystemTransformation.scala
index c46d67298..e01ad9be3 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/FilesystemTransformation.scala
+++ b/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/FilesystemTransformation.scala
@@ -18,6 +18,7 @@ package org.schedoscope.dsl.transformations
import java.io.InputStream
import org.schedoscope.dsl.View
+import org.schedoscope.scheduler.service.ViewTransformationStatus
/**
* FileSystem transformations: compute views by copying or moving files
@@ -31,44 +32,82 @@ class FilesystemTransformation extends Transformation {
* Copy a file from one directory to the view's fullPath
*
*/
-case class CopyFrom(val fromPattern: String, val toView: View, val recursive: Boolean = true) extends FilesystemTransformation
+case class CopyFrom(val fromPattern: String, val toView: View, val recursive: Boolean = true) extends FilesystemTransformation {
+ override def viewTransformationStatus = ViewTransformationStatus(
+ "filesystem -> CopyFromTransformation",
+ Some(Map(
+ "from" -> fromPattern,
+ "destinationView" -> toView.urlPath, "recursive" -> recursive.toString())))
+}
/**
* Retrieve contents from a stream and store it on the view's fullPath
*
*/
-case class StoreFrom(val inputStream: InputStream, val toView: View) extends FilesystemTransformation
+case class StoreFrom(val inputStream: InputStream, val toView: View) extends FilesystemTransformation {
+ override def viewTransformationStatus = ViewTransformationStatus(
+ "filesystem -> StoreFromTransformation",
+ Some(Map("destinationView" -> toView.urlPath)))
+}
/**
* Copy file satisfying fromPattern to toPath
*
*/
-case class Copy(val fromPattern: String, val toPath: String, val recursive: Boolean = true) extends FilesystemTransformation
+case class Copy(val fromPattern: String, val toPath: String, val recursive: Boolean = true) extends FilesystemTransformation {
+ override def viewTransformationStatus = ViewTransformationStatus(
+ "filesystem -> CopyTransformation",
+ Some(Map(
+ "from" -> fromPattern,
+ "destinationPath" -> toPath)))
+}
/**
* Move files satisfying fromPattern to toPath
*
*/
-case class Move(val fromPattern: String, val toPath: String) extends FilesystemTransformation
+case class Move(val fromPattern: String, val toPath: String) extends FilesystemTransformation {
+ override def viewTransformationStatus = ViewTransformationStatus(
+ "filesystem -> MoveTransformation",
+ Some(Map(
+ "from" -> fromPattern,
+ "destinationPath" -> toPath)))
+}
/**
*
* Delete files satisfying fromPattern
*
*/
-case class Delete(val fromPattern: String, val recursive: Boolean = false) extends FilesystemTransformation
+case class Delete(val fromPattern: String, val recursive: Boolean = false) extends FilesystemTransformation {
+ override def viewTransformationStatus = ViewTransformationStatus(
+ "filesystem -> DeleteTransformation",
+ Some(Map(
+ "from" -> fromPattern,
+ "recursive" -> recursive.toString)))
+}
/**
* Touch an empty file
*
*/
-case class Touch(val fromPath: String) extends FilesystemTransformation
+case class Touch(val fromPath: String) extends FilesystemTransformation {
+ override def viewTransformationStatus = ViewTransformationStatus(
+ "filesystem -> TouchTransformation",
+ Some(Map(
+ "from" -> fromPath)))
+}
/**
* Create a directory
*
*/
-case class MkDir(val fromPath: String) extends FilesystemTransformation
+case class MkDir(val fromPath: String) extends FilesystemTransformation {
+ override def viewTransformationStatus = ViewTransformationStatus(
+ "filesystem -> MkDirTransformation",
+ Some(Map(
+ "from" -> fromPath)))
+}
/**
* Wraps a second transformation which will only be executed if the file specified as
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/HiveTransformation.scala b/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/HiveTransformation.scala
index 816629685..0cecea161 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/HiveTransformation.scala
+++ b/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/HiveTransformation.scala
@@ -21,6 +21,7 @@ import org.apache.commons.lang.StringUtils
import org.apache.hadoop.hive.metastore.api.{Function, ResourceType, ResourceUri}
import org.schedoscope.Settings
import org.schedoscope.dsl.View
+import org.schedoscope.scheduler.service.ViewTransformationStatus
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer}
@@ -47,6 +48,9 @@ case class HiveTransformation(sql: String, udfs: List[Function] = List()) extend
throw new InvalidTransformationException("Uneven count of joins and ons in Hive query")
}
+ override def viewTransformationStatus = ViewTransformationStatus(
+ name,
+ Some(Map("sql" -> sql)))
}
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/MapreduceTransformation.scala b/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/MapreduceTransformation.scala
index 812e9ffc4..153147740 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/MapreduceTransformation.scala
+++ b/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/MapreduceTransformation.scala
@@ -18,10 +18,13 @@ package org.schedoscope.dsl.transformations
import java.net.URI
import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, MRJobConfig}
import org.schedoscope.Schedoscope
import org.schedoscope.dsl.View
import org.schedoscope.scheduler.driver.{DriverRunState, MapreduceDriver}
+import org.schedoscope.scheduler.service.ViewTransformationStatus
/**
* Compute a view using a plain Map-Reduce job.
@@ -64,6 +67,12 @@ case class MapreduceTransformation(
.filter(lj => jarName == null || lj.contains(jarName))
}
+ override def viewTransformationStatus = ViewTransformationStatus(
+ name,
+ Some(Map(
+ "input" -> job.getConfiguration().get(FileInputFormat.INPUT_DIR),
+ "output" -> job.getConfiguration().get(FileOutputFormat.OUTDIR))))
+
def configure() {
// if job jar hasn't been registered, add all mapreduce libjars
// to distributed cache
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/Transformation.scala b/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/Transformation.scala
index 5ebf83970..752dd1038 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/Transformation.scala
+++ b/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/Transformation.scala
@@ -16,6 +16,7 @@
package org.schedoscope.dsl.transformations
import org.schedoscope.dsl.View
+import org.schedoscope.scheduler.service.ViewTransformationStatus
import scala.collection.mutable.HashMap
@@ -102,6 +103,11 @@ abstract class Transformation {
@throws[InvalidTransformationException]
def validateTransformation() = {}
+
+ /**
+ * Transformation types should override this to return more detailed transformation status to the Schedoscope service.
+ */
+ def viewTransformationStatus: ViewTransformationStatus = ViewTransformationStatus(name, None)
}
/**
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/scheduler/service/SchedoscopeServiceImpl.scala b/schedoscope-core/src/main/scala/org/schedoscope/scheduler/service/SchedoscopeServiceImpl.scala
index 956c6842a..e4f1b5df1 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/scheduler/service/SchedoscopeServiceImpl.scala
+++ b/schedoscope-core/src/main/scala/org/schedoscope/scheduler/service/SchedoscopeServiceImpl.scala
@@ -186,60 +186,7 @@ class SchedoscopeServiceImpl(actorSystem: ActorSystem, settings: SchedoscopeSett
private def getOrElse[T](o: T, d: T) = if (o != null) o else d
- private def viewTransformationStatus(transformation: Transformation): ViewTransformationStatus = {
- transformation match {
-
- case t: HiveTransformation => ViewTransformationStatus(
- t.name,
- Some(Map("sql" -> t.sql)))
-
- case t: MapreduceTransformation => ViewTransformationStatus(
- t.name,
- Some(Map(
- "input" -> t.job.getConfiguration().get(FileInputFormat.INPUT_DIR),
- "output" -> t.job.getConfiguration().get(FileOutputFormat.OUTDIR))))
-
- case t: PigTransformation => ViewTransformationStatus(
- t.name,
- Some(Map("latin" -> t.latin)))
-
- case t: OozieTransformation => ViewTransformationStatus(
- t.name,
- Some(Map(
- "bundle" -> t.bundle,
- "workflow" -> t.workflow)))
-
- case t: ShellTransformation => ViewTransformationStatus(
- t.name,
- Some(Map(
- "shell" -> t.shell, "script" -> t.script,
- "scriptFile" -> t.scriptFile)))
-
- case t: CopyFrom => ViewTransformationStatus(
- "filesystem -> CopyFromTransformation",
- Some(Map(
- "from" -> t.fromPattern,
- "destinationView" -> t.toView.urlPath, "recursive" -> t.recursive.toString())))
-
- case t: Copy => ViewTransformationStatus(
- "filesystem -> CopyTransformation",
- Some(Map(
- "from" -> t.fromPattern,
- "destinationPath" -> t.toPath)))
-
- case t: Move => ViewTransformationStatus(
- "filesystem -> MoveTransformation",
- Some(Map(
- "from" -> t.fromPattern,
- "destinationPath" -> t.toPath)))
-
- case t: StoreFrom => ViewTransformationStatus(
- "filesystem -> StoreFromTransformation",
- Some(Map("destinationView" -> t.toView.urlPath)))
-
- case t => ViewTransformationStatus(t.name, None)
- }
- }
+ private def viewTransformationStatus(transformation: Transformation): ViewTransformationStatus = transformation.viewTransformationStatus
private def viewExportStatus(exports: List[Transformation]): List[ViewTransformationStatus] = {
exports.map(e =>
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/test/TestableView.scala b/schedoscope-core/src/main/scala/org/schedoscope/test/TestableView.scala
index df3946158..d331742c8 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/test/TestableView.scala
+++ b/schedoscope-core/src/main/scala/org/schedoscope/test/TestableView.scala
@@ -17,7 +17,6 @@ package org.schedoscope.test
import org.apache.hadoop.fs.Path
import org.schedoscope.dsl.{FieldLike, Structure, View}
-import org.schedoscope.test.resources.OozieTestResources
import scala.collection.mutable.ListBuffer
@@ -231,12 +230,3 @@ trait test extends TestableView {
})
}
}
-
-/**
- * A test environment that is executed in a local minicluster
- */
-trait clustertest extends test {
- resources = new OozieTestResources()
-
- def cluster = resources.asInstanceOf[OozieTestResources].mo
-}
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/TestTags.scala b/schedoscope-core/src/test/scala/org/schedoscope/TestTags.scala
deleted file mode 100644
index 7637c8c9a..000000000
--- a/schedoscope-core/src/test/scala/org/schedoscope/TestTags.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Copyright 2015 Otto (GmbH & Co KG)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.schedoscope
-
-import org.scalatest.Tag
-
-object DriverTests extends Tag("driverTests")
-
-object OozieTests extends Tag("oozieTests")
-
-object ShellTests extends Tag("shellTests")
\ No newline at end of file
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/dsl/DslTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/dsl/DslTest.scala
index a811400af..64ec8c8e2 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/dsl/DslTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/dsl/DslTest.scala
@@ -24,7 +24,7 @@ import org.schedoscope.dsl.storageformats.{Parquet, TextFile}
import org.schedoscope.dsl.transformations.{HiveTransformation, NoOp}
import org.schedoscope.dsl.views.{DailyParameterization, JobMetadata, PointOccurrence}
import org.schedoscope.schema.ddl.HiveQl.ddl
-import test.eci.datahub._
+import test.views._
class DslTest extends FlatSpec with Matchers {
@@ -43,7 +43,7 @@ class DslTest extends FlatSpec with Matchers {
it should "have a module name which is defined by its package" in {
val productBrandView = ProductBrand(p("ec0106"), p("2014"), p("01"), p("01"))
- productBrandView.module shouldEqual "test_eci_datahub"
+ productBrandView.module shouldEqual "test_views"
}
it should "be commentable" in {
@@ -234,7 +234,7 @@ class DslTest extends FlatSpec with Matchers {
val ddlStatement = ddl(productBrand)
- ddlStatement.contains("CREATE EXTERNAL TABLE IF NOT EXISTS dev_test_eci_datahub.product_brand (") shouldBe true
+ ddlStatement.contains("CREATE EXTERNAL TABLE IF NOT EXISTS dev_test_views.product_brand (") shouldBe true
ddlStatement.contains("occurred_at STRING,") shouldBe true
ddlStatement.contains("product_id STRING,") shouldBe true
ddlStatement.contains("brand_name STRING,") shouldBe true
@@ -244,7 +244,7 @@ class DslTest extends FlatSpec with Matchers {
ddlStatement.contains("COMMENT 'ProductBrand joins brands with products'") shouldBe true
ddlStatement.contains("PARTITIONED BY (shop_code STRING, year STRING, month STRING, day STRING, date_id STRING)") shouldBe true
ddlStatement.contains("STORED AS PARQUET") shouldBe true
- ddlStatement.contains("LOCATION '/hdp/dev/test/eci/datahub/product_brand'") shouldBe true
+ ddlStatement.contains("LOCATION '/hdp/dev/test/views/product_brand'") shouldBe true
}
it should "be transformable into DDL for an env" in {
@@ -253,7 +253,7 @@ class DslTest extends FlatSpec with Matchers {
val ddlStatement = ddl(productBrand)
- ddlStatement.contains("CREATE EXTERNAL TABLE IF NOT EXISTS prod_test_eci_datahub.product_brand (") shouldBe true
+ ddlStatement.contains("CREATE EXTERNAL TABLE IF NOT EXISTS prod_test_views.product_brand (") shouldBe true
ddlStatement.contains("occurred_at STRING,") shouldBe true
ddlStatement.contains("product_id STRING,") shouldBe true
ddlStatement.contains("brand_name STRING,") shouldBe true
@@ -263,7 +263,7 @@ class DslTest extends FlatSpec with Matchers {
ddlStatement.contains("COMMENT 'ProductBrand joins brands with products'") shouldBe true
ddlStatement.contains("PARTITIONED BY (shop_code STRING, year STRING, month STRING, day STRING, date_id STRING)") shouldBe true
ddlStatement.contains("STORED AS PARQUET") shouldBe true
- ddlStatement.contains("LOCATION '/hdp/prod/test/eci/datahub/product_brand'") shouldBe true
+ ddlStatement.contains("LOCATION '/hdp/prod/test/views/product_brand'") shouldBe true
}
@@ -327,7 +327,7 @@ class DslTest extends FlatSpec with Matchers {
}
it should "be dynamically instantiatable via URL path" in {
- val views = View.viewsFromUrl("dev", "/test.eci.datahub/Product/e(EC0106,EC0101)/rymd(20140224-20131202)/")
+ val views = View.viewsFromUrl("dev", "/test.views/Product/e(EC0106,EC0101)/rymd(20140224-20131202)/")
views.length shouldBe 2 * 85
@@ -343,12 +343,12 @@ class DslTest extends FlatSpec with Matchers {
}
it should "throw an exception during dynamic instantiation" in {
- val thrown = the[java.lang.RuntimeException] thrownBy View.viewsFromUrl("dev", "/test.eci.datahub/RequireView/ec0106/")
- thrown.getMessage() shouldBe "Error while parsing view(s) /test.eci.datahub/RequireView/ec0106/ : requirement failed: Put in upper case: ec0106"
+ val thrown = the[java.lang.RuntimeException] thrownBy View.viewsFromUrl("dev", "/test.views/RequireView/ec0106/")
+ thrown.getMessage() shouldBe "Error while parsing view(s) /test.views/RequireView/ec0106/ : requirement failed: Put in upper case: ec0106"
}
it should "have the same urlPath as the one they were dynamically constructed with" in {
- val views = View.viewsFromUrl("dev", "test.eci.datahub/Product/EC0106/2014/02/24/20140224")
+ val views = View.viewsFromUrl("dev", "test.views/Product/EC0106/2014/02/24/20140224")
views.length shouldBe 1
@@ -356,13 +356,13 @@ class DslTest extends FlatSpec with Matchers {
val productParameters = product.partitionParameters
- product.urlPath shouldBe "test.eci.datahub/Product/EC0106/2014/02/24/20140224"
+ product.urlPath shouldBe "test.views/Product/EC0106/2014/02/24/20140224"
}
it should "be queryable" in {
- val views = View.viewsInPackage("test.eci.datahub")
+ val views = View.viewsInPackage("test.views")
- views should contain allOf(classOf[Brand], classOf[Product], classOf[ProductBrand], classOf[EdgeCasesView], classOf[AvroView], classOf[ViewWithDefaultParams], classOf[Click], classOf[ClickOfEC0101], classOf[ClickOfEC0101ViaOozie])
+ views should contain allOf(classOf[Brand], classOf[Product], classOf[ProductBrand], classOf[EdgeCasesView], classOf[AvroView], classOf[ViewWithDefaultParams], classOf[Click], classOf[ClickOfEC0101])
val traits = View.getTraits(classOf[ProductBrand])
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/dsl/transformations/ExportTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/dsl/transformations/ExportTest.scala
index 502d8c3e7..ea3c4af54 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/dsl/transformations/ExportTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/dsl/transformations/ExportTest.scala
@@ -24,13 +24,13 @@ import org.apache.curator.test.TestingServer
import org.json4s.native.JsonMethods.parse
import org.rarefiedredis.redis.adapter.jedis.JedisAdapter
import org.scalatest.{FlatSpec, Matchers}
-import org.schedoscope.{DriverTests, Schedoscope}
+import org.schedoscope.Schedoscope
import org.schedoscope.dsl.Field.v
import org.schedoscope.dsl.Parameter.p
import org.schedoscope.export.testsupport.{EmbeddedFtpSftpServer, EmbeddedKafkaCluster, SimpleTestKafkaConsumer}
import org.schedoscope.export.utils.RedisMRJedisFactory
import org.schedoscope.test.{rows, test}
-import _root_.test.eci.datahub._
+import _root_.test.views._
import scala.collection.JavaConversions.iterableAsScalaIterable
@@ -66,7 +66,7 @@ class ExportTest extends FlatSpec with Matchers {
v(url, "http://ec0106.com/url3"))
}
- "The test framework" should "execute hive transformations and perform JDBC export" taggedAs (DriverTests) in {
+ "The test framework" should "execute hive transformations and perform JDBC export" in {
new ClickOfEC0101WithJdbcExport(p("2014"), p("01"), p("01")) with test {
basedOn(ec0101Clicks, ec0106Clicks)
@@ -88,7 +88,7 @@ class ExportTest extends FlatSpec with Matchers {
}
val statement = dbConnection.createStatement()
- val resultSet = statement.executeQuery("SELECT COUNT(*) FROM TEST_TEST_ECI_DATAHUB_CLICK_OF_E_C0101_WITH_JDBC_EXPORT")
+ val resultSet = statement.executeQuery("SELECT COUNT(*) FROM TEST_TEST_VIEWS_CLICK_OF_E_C0101_WITH_JDBC_EXPORT")
resultSet.next()
resultSet.getInt(1) shouldBe 3
@@ -97,7 +97,7 @@ class ExportTest extends FlatSpec with Matchers {
statement.close()
}
- it should "execute hive transformations and perform Redis export" taggedAs (DriverTests) in {
+ it should "execute hive transformations and perform Redis export" in {
new ClickOfEC0101WithRedisExport(p("2014"), p("01"), p("01")) with test {
basedOn(ec0101Clicks, ec0106Clicks)
@@ -124,7 +124,7 @@ class ExportTest extends FlatSpec with Matchers {
}
- it should "execute hive transformations and perform Kafka export" taggedAs (DriverTests) in {
+ it should "execute hive transformations and perform Kafka export" in {
val zkServer = new TestingServer(2182);
zkServer.start()
@@ -160,7 +160,7 @@ class ExportTest extends FlatSpec with Matchers {
}
- it should "execute hive transformations and perform Ftp export" taggedAs (DriverTests) in {
+ it should "execute hive transformations and perform Ftp export" in {
val ftpServer = new EmbeddedFtpSftpServer()
ftpServer.startEmbeddedFtpServer()
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/dsl/views/ViewUrlParserTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/dsl/views/ViewUrlParserTest.scala
index f2f7d7e0f..9761c3426 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/dsl/views/ViewUrlParserTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/dsl/views/ViewUrlParserTest.scala
@@ -20,47 +20,47 @@ import org.schedoscope.dsl.Parameter
import org.schedoscope.dsl.Parameter.p
import org.schedoscope.dsl.TypedAny.typedAny
import org.schedoscope.dsl.views.ViewUrlParser.{ParsedView, parse, parseParameters}
-import test.eci.datahub.{Brand, Product}
+import test.views.{Brand, Product}
class ViewUrlParserTest extends FlatSpec with Matchers {
"ViewUrlParse.parse(viewUrlPath)" should "start with /env/package/view" in {
- val List(ParsedView(env, clazz, arguments)) = parse("dev", "/test.eci.datahub/Brand")
+ val List(ParsedView(env, clazz, arguments)) = parse("dev", "/test.views/Brand")
env shouldBe "dev"
clazz shouldBe classOf[Brand]
arguments should be(empty)
}
it should "work without preceding /" in {
- val List(ParsedView(env, clazz, arguments)) = parse("dev", "test.eci.datahub/Brand")
+ val List(ParsedView(env, clazz, arguments)) = parse("dev", "test.views/Brand")
env shouldBe "dev"
clazz shouldBe classOf[Brand]
arguments should be(empty)
}
it should "work with trailing /" in {
- val List(ParsedView(env, clazz, arguments)) = parse("dev", "test.eci.datahub/Brand/")
+ val List(ParsedView(env, clazz, arguments)) = parse("dev", "test.views/Brand/")
env shouldBe "dev"
clazz shouldBe classOf[Brand]
arguments should be(empty)
}
it should "work with preceding and trailing /" in {
- val List(ParsedView(env, clazz, arguments)) = parse("dev", "/test.eci.datahub/Brand/")
+ val List(ParsedView(env, clazz, arguments)) = parse("dev", "/test.views/Brand/")
env shouldBe "dev"
clazz shouldBe classOf[Brand]
arguments should be(empty)
}
it should "parse parameters as well" in {
- val List(ParsedView(env, clazz, arguments)) = parse("dev", "/test.eci.datahub/Product/EC0106/2014/01/12/")
+ val List(ParsedView(env, clazz, arguments)) = parse("dev", "/test.views/Product/EC0106/2014/01/12/")
env shouldBe "dev"
clazz shouldBe classOf[Product]
arguments should be(List(typedAny(p("EC0106")), typedAny(p("2014")), typedAny(p("01")), typedAny(p("12"))))
}
it should "parse multiple views" in {
- val parsedViews = parse("dev", "/test.eci.datahub/e(Product,Brand)/EC0106/2014/01/12/")
+ val parsedViews = parse("dev", "/test.views/e(Product,Brand)/EC0106/2014/01/12/")
parsedViews.size shouldBe 2
parsedViews(0).env shouldBe "dev"
parsedViews(0).viewClass shouldBe classOf[Product]
@@ -71,11 +71,11 @@ class ViewUrlParserTest extends FlatSpec with Matchers {
}
it should "fail when called with not enough arguments" in {
- an[IllegalArgumentException] should be thrownBy parse("dev", "/test.eci.datahub/")
+ an[IllegalArgumentException] should be thrownBy parse("dev", "/test.views/")
}
it should "fail when called with illegal class name" in {
- an[IllegalArgumentException] should be thrownBy parse("dev", "/test.eci.datahub/Brund/")
+ an[IllegalArgumentException] should be thrownBy parse("dev", "/test.views/Brund/")
}
it should "fail when called with illegal package name" in {
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/FilesystemDriverTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/FilesystemDriverTest.scala
index 6aec5e908..912613659 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/FilesystemDriverTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/FilesystemDriverTest.scala
@@ -18,21 +18,20 @@ package org.schedoscope.scheduler.driver
import java.io.File
import org.scalatest.{FlatSpec, Matchers}
-import org.schedoscope.DriverTests
import org.schedoscope.dsl.Parameter.p
import org.schedoscope.dsl.transformations.{FilesystemTransformation, _}
import org.schedoscope.test.resources.LocalTestResources
import org.schedoscope.test.resources.TestDriverRunCompletionHandlerCallCounter._
-import test.eci.datahub.Product
+import test.views.Product
class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
lazy val driver: Driver[FilesystemTransformation] = new LocalTestResources().driverFor[FilesystemTransformation]("filesystem")
- "FileSystemDriver" should "be have transformation name filesystem" taggedAs (DriverTests) in {
+ "FileSystemDriver" should "be have transformation name filesystem" in {
driver.transformationName shouldBe "filesystem"
}
- it should "execute Copy file transformation with a single file" taggedAs (DriverTests) in {
+ it should "execute Copy file transformation with a single file" in {
createInputFile("aTest.file")
outputFile("aTest.file") should not be 'exists
@@ -42,7 +41,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile("aTest.file") shouldBe 'exists
}
- it should "execute Copy file transformation of multiple files with pattern" taggedAs (DriverTests) in {
+ it should "execute Copy file transformation of multiple files with pattern" in {
createInputFile("aTest.file")
createInputFile("anotherTest.file")
@@ -55,7 +54,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile("anotherTest.file") shouldBe 'exists
}
- it should "execute Copy file transformation recursively" taggedAs (DriverTests) in {
+ it should "execute Copy file transformation recursively" in {
createInputFile(s"subfolder${/}aTest.file")
createInputFile(s"subfolder${/}anotherSubfolder${/}anotherTest.file")
@@ -68,7 +67,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile(s"subfolder${/}anotherSubfolder${/}anotherTest.file") shouldBe 'exists
}
- it should "execute Move file transformation with a single file" taggedAs (DriverTests) in {
+ it should "execute Move file transformation with a single file" in {
createInputFile("aTest.file")
inputFile("aTest.file") shouldBe 'exists
@@ -80,7 +79,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile("aTest.file") shouldBe 'exists
}
- it should "execute Move file transformation with a folder recursively" taggedAs (DriverTests) in {
+ it should "execute Move file transformation with a folder recursively" in {
createInputFile(s"subfolder${/}aTest.file")
inputFile(s"subfolder${/}aTest.file") shouldBe 'exists
@@ -92,7 +91,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile(s"subfolder${/}aTest.file") shouldBe 'exists
}
- it should "execute IfExists file transformation when a given file exists" taggedAs (DriverTests) in {
+ it should "execute IfExists file transformation when a given file exists" in {
createInputFile("check.file")
inputFile("check.file") shouldBe 'exists
@@ -107,7 +106,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile("anotherTest.file") should not be 'exists
}
- it should "execute IfNotExists file transformation when a given file does not exist" taggedAs (DriverTests) in {
+ it should "execute IfNotExists file transformation when a given file does not exist" in {
createInputFile("check.file")
inputFile("check.file") shouldBe 'exists
@@ -122,7 +121,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile("anotherTest.file") shouldBe 'exists
}
- it should "execute Touch file transformation" taggedAs (DriverTests) in {
+ it should "execute Touch file transformation" in {
outputFile("aTest.file") should not be 'exists
driver.runAndWait(Touch(outputPath("aTest.file"))) shouldBe a[DriverRunSucceeded[_]]
@@ -130,7 +129,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile("aTest.file") shouldBe 'exists
}
- it should "execute Delete file transformations on single files" taggedAs (DriverTests) in {
+ it should "execute Delete file transformations on single files" in {
createInputFile("aTest.file")
inputFile("aTest.file") shouldBe 'exists
@@ -139,7 +138,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
inputFile("aTest.file") should not be 'exists
}
- it should "execute Delete file transformations on folders recursively" taggedAs (DriverTests) in {
+ it should "execute Delete file transformations on folders recursively" in {
createInputFile(s"subfolder${/}aTest.file")
inputFile(s"subfolder${/}aTest.file") shouldBe 'exists
@@ -149,7 +148,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
inputFile("subfolder") should not be 'exists
}
- it should "execute CopyFrom file transformations by copying a single file to partition path of view" taggedAs (DriverTests) in {
+ it should "execute CopyFrom file transformations by copying a single file to partition path of view" in {
val product = new Product(p("EC0106"), p("2014"), p("01"), p("01")) {
override def fullPath = out
}
@@ -162,7 +161,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
new File(s"${product.fullPath}${/}aTest.file") shouldBe 'exists
}
- it should "execute CopyFrom file transformations by copying a single file from java resources to partition path of view" taggedAs (DriverTests) in {
+ it should "execute CopyFrom file transformations by copying a single file from java resources to partition path of view" in {
val product = new Product(p("EC0106"), p("2014"), p("01"), p("01")) {
override def fullPath = out
}
@@ -174,7 +173,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
new File(s"${product.fullPath}${/}classpathtest.txt") shouldBe 'exists
}
- it should "execute StoreFrom file transformations by copying an input stream to partition path of view" taggedAs (DriverTests) in {
+ it should "execute StoreFrom file transformations by copying an input stream to partition path of view" in {
val product = new Product(p("EC0106"), p("2014"), p("01"), p("01")) {
override def fullPath = out
}
@@ -186,7 +185,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
new File(s"${product.fullPath}${/}stream.out") shouldBe 'exists
}
- it should "execute CopyFrom file transformations by copying a folder recursively to partition path of view" taggedAs (DriverTests) in {
+ it should "execute CopyFrom file transformations by copying a folder recursively to partition path of view" in {
val product = new Product(p("EC0106"), p("2014"), p("01"), p("01")) {
override def fullPath = out
}
@@ -199,7 +198,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
new File(s"${product.fullPath}${/}subfolder${/}aTest.file") shouldBe 'exists
}
- it should "run asynchronously" taggedAs (DriverTests) in {
+ it should "run asynchronously" in {
outputFile("aTest.file") should not be 'exists
val runHandle = driver.run(Touch(outputPath("aTest.file")))
@@ -214,7 +213,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
outputFile("aTest.file") shouldBe 'exists
}
- it should "return DriverRunFailed in case of problems when running asynchronously" taggedAs (DriverTests) in {
+ it should "return DriverRunFailed in case of problems when running asynchronously" in {
createInputFile(s"subfolder${/}aTest.file")
inputFile("subfolder") shouldBe 'exists
@@ -230,7 +229,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
inputFile("subfolder") shouldBe 'exists
}
- it should "return DriverRunFailed in case of problems when running synchronously" taggedAs (DriverTests) in {
+ it should "return DriverRunFailed in case of problems when running synchronously" in {
createInputFile(s"subfolder${/}aTest.file")
inputFile("subfolder") shouldBe 'exists
@@ -241,7 +240,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
inputFile("subfolder") shouldBe 'exists
}
- it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" in {
val runHandle = driver.run(Touch(outputPath("aTest.file")))
while (driver.getDriverRunState(runHandle).isInstanceOf[DriverRunOngoing[FilesystemTransformation]]) {}
@@ -251,7 +250,7 @@ class FilesystemDriverTest extends FlatSpec with Matchers with TestFolder {
driverRunCompletedCalled(runHandle, driver.getDriverRunState(runHandle)) shouldBe true
}
- it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" in {
val runHandle = driver.run(Touch(outputPath("aTest.file")))
driver.driverRunStarted(runHandle)
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/HiveDriverTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/HiveDriverTest.scala
index be2a774bc..c23ba630c 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/HiveDriverTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/HiveDriverTest.scala
@@ -16,7 +16,6 @@
package org.schedoscope.scheduler.driver
import org.scalatest.{FlatSpec, Matchers}
-import org.schedoscope.DriverTests
import org.schedoscope.dsl.transformations.HiveTransformation
import org.schedoscope.test.resources.LocalTestResources
import org.schedoscope.test.resources.TestDriverRunCompletionHandlerCallCounter._
@@ -24,30 +23,30 @@ import org.schedoscope.test.resources.TestDriverRunCompletionHandlerCallCounter.
class HiveDriverTest extends FlatSpec with Matchers {
lazy val driver = new LocalTestResources().driverFor[HiveTransformation]("hive")
- "HiveDriver" should "have transformation name hive" taggedAs (DriverTests) in {
+ "HiveDriver" should "have transformation name hive" in {
driver.transformationName shouldBe "hive"
}
- it should "execute hive transformations synchronously" taggedAs (DriverTests) in {
+ it should "execute hive transformations synchronously" in {
val driverRunState = driver.runAndWait(HiveTransformation("SHOW TABLES"))
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute another hive transformations synchronously" taggedAs (DriverTests) in {
+ it should "execute another hive transformations synchronously" in {
val driverRunState = driver.runAndWait(HiveTransformation("SHOW TABLES"))
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute hive transformations and return errors when running synchronously" taggedAs (DriverTests) in {
+ it should "execute hive transformations and return errors when running synchronously" in {
val driverRunState = driver.runAndWait(HiveTransformation("FAIL ME"))
driverRunState shouldBe a[DriverRunFailed[_]]
}
- it should "execute hive transformations asynchronously" taggedAs (DriverTests) in {
+ it should "execute hive transformations asynchronously" in {
val driverRunHandle = driver.run(HiveTransformation("SHOW TABLES"))
var runWasAsynchronous = false
@@ -59,7 +58,7 @@ class HiveDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute hive transformations and return errors when running asynchronously" taggedAs (DriverTests) in {
+ it should "execute hive transformations and return errors when running asynchronously" in {
val driverRunHandle = driver.run(HiveTransformation("FAIL ME"))
var runWasAsynchronous = false
@@ -71,7 +70,7 @@ class HiveDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunFailed[_]]
}
- it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" in {
val runHandle = driver.run(HiveTransformation("SHOW TABLES"))
while (driver.getDriverRunState(runHandle).isInstanceOf[DriverRunOngoing[_]]) {}
@@ -81,7 +80,7 @@ class HiveDriverTest extends FlatSpec with Matchers {
driverRunCompletedCalled(runHandle, driver.getDriverRunState(runHandle)) shouldBe true
}
- it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" in {
val runHandle = driver.run(HiveTransformation("SHOW TABLES"))
driver.driverRunStarted(runHandle)
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/MapreduceDriverTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/MapreduceDriverTest.scala
index 15c7e2080..7f9cd2759 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/MapreduceDriverTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/MapreduceDriverTest.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.scalatest.{FlatSpec, Matchers}
-import org.schedoscope.DriverTests
import org.schedoscope.dsl.View
import org.schedoscope.dsl.transformations.{FailingMapper, MapreduceTransformation}
import org.schedoscope.test.resources.LocalTestResources
@@ -58,23 +57,23 @@ class MapreduceDriverTest extends FlatSpec with Matchers with TestFolder {
Files.write(Paths.get(s"${inputPath("")}/file.txt"), "some data".getBytes(StandardCharsets.UTF_8))
}
- "MapreduceDriver" should "have transformation name Mapreduce" taggedAs (DriverTests) in {
+ "MapreduceDriver" should "have transformation name Mapreduce" in {
driver.transformationName shouldBe "mapreduce"
}
- it should "execute Mapreduce transformations synchronously" taggedAs (DriverTests) in {
+ it should "execute Mapreduce transformations synchronously" in {
val driverRunState = driver.runAndWait(MapreduceTransformation(new DummyView(), identityJob))
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute another Mapreduce transformations synchronously" taggedAs (DriverTests) in {
+ it should "execute another Mapreduce transformations synchronously" in {
val driverRunState = driver.runAndWait(MapreduceTransformation(new DummyView(), identityJob))
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute Mapreduce transformations asynchronously" taggedAs (DriverTests) in {
+ it should "execute Mapreduce transformations asynchronously" in {
val driverRunHandle = driver.run(MapreduceTransformation(new DummyView(), identityJob))
var runWasAsynchronous = false
@@ -86,7 +85,7 @@ class MapreduceDriverTest extends FlatSpec with Matchers with TestFolder {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute Mapreduce transformations and return errors when running asynchronously" taggedAs (DriverTests) in {
+ it should "execute Mapreduce transformations and return errors when running asynchronously" in {
val driverRunHandle = driver.run(MapreduceTransformation(new DummyView(), failingJob))
var runWasAsynchronous = false
@@ -98,7 +97,7 @@ class MapreduceDriverTest extends FlatSpec with Matchers with TestFolder {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunFailed[_]]
}
- it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" in {
val runHandle = driver.run(MapreduceTransformation(new DummyView(), identityJob))
while (driver.getDriverRunState(runHandle).isInstanceOf[DriverRunOngoing[_]]) {}
@@ -108,7 +107,7 @@ class MapreduceDriverTest extends FlatSpec with Matchers with TestFolder {
driverRunCompletedCalled(runHandle, driver.getDriverRunState(runHandle)) shouldBe true
}
- it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" in {
val runHandle = driver.run(MapreduceTransformation(new DummyView(), identityJob))
driver.driverRunStarted(runHandle)
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/SeqDriverTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/SeqDriverTest.scala
index 40b71deb7..1e4349ccf 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/SeqDriverTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/SeqDriverTest.scala
@@ -16,7 +16,6 @@
package org.schedoscope.scheduler.driver
import org.scalatest.{FlatSpec, Matchers}
-import org.schedoscope.DriverTests
import org.schedoscope.dsl.transformations.{HiveTransformation, SeqTransformation, Transformation}
import org.schedoscope.test.resources.LocalTestResources
import org.schedoscope.test.resources.TestDriverRunCompletionHandlerCallCounter._
@@ -24,30 +23,30 @@ import org.schedoscope.test.resources.TestDriverRunCompletionHandlerCallCounter.
class SeqDriverTest extends FlatSpec with Matchers {
lazy val driver = new LocalTestResources().driverFor[SeqTransformation[Transformation, Transformation]]("seq")
- "SeqDriver" should "have transformation name seq" taggedAs (DriverTests) in {
+ "SeqDriver" should "have transformation name seq" in {
driver.transformationName shouldBe "seq"
}
- it should "execute seq transformations synchronously" taggedAs (DriverTests) in {
+ it should "execute seq transformations synchronously" in {
val driverRunState = driver.runAndWait(SeqTransformation(HiveTransformation("SHOW TABLES"), HiveTransformation("SHOW TABLES")))
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute another seq transformations synchronously" taggedAs (DriverTests) in {
+ it should "execute another seq transformations synchronously" in {
val driverRunState = driver.runAndWait(SeqTransformation(HiveTransformation("SHOW TABLES"), HiveTransformation("SHOW TABLES")))
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute seq transformations and return errors when running synchronously" taggedAs (DriverTests) in {
+ it should "execute seq transformations and return errors when running synchronously" in {
val driverRunState = driver.runAndWait(SeqTransformation(HiveTransformation("FAIL ME"), HiveTransformation("SHOW TABLES")))
driverRunState shouldBe a[DriverRunFailed[_]]
}
- it should "execute seq transformations asynchronously" taggedAs (DriverTests) in {
+ it should "execute seq transformations asynchronously" in {
val driverRunHandle = driver.run(SeqTransformation(HiveTransformation("SHOW TABLES"), HiveTransformation("SHOW TABLES")))
var runWasAsynchronous = false
@@ -59,7 +58,7 @@ class SeqDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunSucceeded[_]]
}
- it should "succeed when first transformation is driving and second errorneous" taggedAs (DriverTests) in {
+ it should "succeed when first transformation is driving and second errorneous" in {
val driverRunHandle = driver.run(SeqTransformation(HiveTransformation("SHOW TABLES"), HiveTransformation("FAIL ME")))
var runWasAsynchronous = false
@@ -71,7 +70,7 @@ class SeqDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunSucceeded[_]]
}
- it should "fail when first transformation is not driving and second errorneous" taggedAs (DriverTests) in {
+ it should "fail when first transformation is not driving and second errorneous" in {
val driverRunHandle = driver.run(SeqTransformation(HiveTransformation("SHOW TABLES"), HiveTransformation("FAIL ME"), false))
var runWasAsynchronous = false
@@ -83,7 +82,7 @@ class SeqDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunFailed[_]]
}
- it should "execute seq transformations and return errors when running asynchronously" taggedAs (DriverTests) in {
+ it should "execute seq transformations and return errors when running asynchronously" in {
val driverRunHandle = driver.run(SeqTransformation(HiveTransformation("FAIL ME"), HiveTransformation("SHOW TABLES")))
var runWasAsynchronous = false
@@ -95,7 +94,7 @@ class SeqDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunFailed[_]]
}
- it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" in {
val runHandle = driver.run(SeqTransformation(HiveTransformation("SHOW TABLES"), HiveTransformation("SHOW TABLES")))
while (driver.getDriverRunState(runHandle).isInstanceOf[DriverRunOngoing[_]]) {}
@@ -105,7 +104,7 @@ class SeqDriverTest extends FlatSpec with Matchers {
driverRunCompletedCalled(runHandle, driver.getDriverRunState(runHandle)) shouldBe true
}
- it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" in {
val runHandle = driver.run(SeqTransformation(HiveTransformation("SHOW TABLES"), HiveTransformation("SHOW TABLES")))
driver.driverRunStarted(runHandle)
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/IntermediateViewSchedulingStateMachineTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/IntermediateViewSchedulingStateMachineTest.scala
index 6cab2f043..111d2b6bd 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/IntermediateViewSchedulingStateMachineTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/IntermediateViewSchedulingStateMachineTest.scala
@@ -20,7 +20,7 @@ import org.schedoscope.dsl.Parameter.p
import org.schedoscope.dsl.transformations.Checksum.defaultDigest
import org.schedoscope.scheduler.messages.MaterializeViewMode._
import org.schedoscope.scheduler.states.PartyInterestedInViewSchedulingStateChange._
-import test.eci.datahub.{ProductBrandMaterializeOnce, ProductBrandsNoOpMirror}
+import test.views.{ProductBrandMaterializeOnce, ProductBrandsNoOpMirror}
class IntermediateViewSchedulingStateMachineTest extends FlatSpec with Matchers {
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/NoOpIntermediateViewSchedulingStateMachineTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/NoOpIntermediateViewSchedulingStateMachineTest.scala
index 434a1a57d..f05330bee 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/NoOpIntermediateViewSchedulingStateMachineTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/NoOpIntermediateViewSchedulingStateMachineTest.scala
@@ -19,7 +19,7 @@ import org.scalatest.{FlatSpec, Matchers}
import org.schedoscope.dsl.Parameter.p
import org.schedoscope.scheduler.messages.MaterializeViewMode._
import org.schedoscope.scheduler.states.PartyInterestedInViewSchedulingStateChange._
-import test.eci.datahub.{ProductBrandsNoOpMirror, ProductBrandsNoOpMirrorDependent}
+import test.views.{ProductBrandsNoOpMirror, ProductBrandsNoOpMirrorDependent}
class NoOpIntermediateViewSchedulingStateMachineTest extends FlatSpec with Matchers {
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/NoOpLeafViewSchedulingStateMachineTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/NoOpLeafViewSchedulingStateMachineTest.scala
index 0f791e3cf..e024b4084 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/NoOpLeafViewSchedulingStateMachineTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/scheduler/states/NoOpLeafViewSchedulingStateMachineTest.scala
@@ -19,7 +19,7 @@ import org.scalatest.{FlatSpec, Matchers}
import org.schedoscope.dsl.Parameter.p
import org.schedoscope.scheduler.messages.MaterializeViewMode._
import org.schedoscope.scheduler.states.PartyInterestedInViewSchedulingStateChange._
-import test.eci.datahub.ProductBrand
+import test.views.ProductBrand
class NoOpLeafViewSchedulingStateMachineTest extends FlatSpec with Matchers {
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/test/HiveTestFrameworkTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/test/HiveTestFrameworkTest.scala
index a0500e0a9..cd79414a8 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/test/HiveTestFrameworkTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/test/HiveTestFrameworkTest.scala
@@ -16,10 +16,9 @@
package org.schedoscope.test
import org.scalatest.{FlatSpec, Matchers}
-import org.schedoscope.DriverTests
import org.schedoscope.dsl.Field.v
import org.schedoscope.dsl.Parameter.p
-import test.eci.datahub.{Click, ClickOfEC0101}
+import test.views.{Click, ClickOfEC0101}
class HiveTestFrameworkTest extends FlatSpec with Matchers {
val ec0101Clicks = new Click(p("EC0101"), p("2014"), p("01"), p("01")) with rows {
@@ -46,7 +45,7 @@ class HiveTestFrameworkTest extends FlatSpec with Matchers {
v(url, "http://ec0106.com/url3"))
}
- "Hive test framework" should "execute hive transformations locally" taggedAs (DriverTests) in {
+ "Hive test framework" should "execute hive transformations locally" in {
new ClickOfEC0101(p("2014"), p("01"), p("01")) with test {
basedOn(ec0101Clicks, ec0106Clicks)
`then`()
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/test/ViewSerdeTest.scala b/schedoscope-core/src/test/scala/org/schedoscope/test/ViewSerdeTest.scala
index b812a76af..377780ee1 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/test/ViewSerdeTest.scala
+++ b/schedoscope-core/src/test/scala/org/schedoscope/test/ViewSerdeTest.scala
@@ -16,7 +16,6 @@
package org.schedoscope.test
import org.scalatest.{FlatSpec, Matchers}
-import org.schedoscope.DriverTests
import org.schedoscope.dsl.Structure
case class Contingency() extends Structure {
@@ -33,14 +32,15 @@ case class Nested() extends Structure {
class ViewSerdeTest extends FlatSpec with Matchers {
- "view Serde" should "deserialize structures" taggedAs (DriverTests) in {
+ "view Serde" should "deserialize structures" in {
val c = new Contingency()
val json = """{"value":1, "nested":{"o11":1,"o12":0,"o21":0,"o22":11}}"""
val res = ViewSerDe.deserializeField(manifest[Nested], json)
assert(res == Map("value" -> 1, "nested" -> Map("o11" -> 1, "o12" -> 0, "o21" -> 0, "o22" -> 11)))
}
- "view Serde" should "deserialize Lists" taggedAs (DriverTests) in {
+
+ it should "deserialize Lists" in {
assert(ViewSerDe.deserializeField(manifest[List[Long]], "[]") == List())
assert(ViewSerDe.deserializeField(manifest[List[Long]], "[1,2,3]") == List(1, 2, 3))
diff --git a/schedoscope-core/src/test/scala/test/eci/datahub/TestViews.scala b/schedoscope-core/src/test/scala/test/views/TestViews.scala
similarity index 72%
rename from schedoscope-core/src/test/scala/test/eci/datahub/TestViews.scala
rename to schedoscope-core/src/test/scala/test/views/TestViews.scala
index 0a3ed2bff..a508e90a1 100644
--- a/schedoscope-core/src/test/scala/test/eci/datahub/TestViews.scala
+++ b/schedoscope-core/src/test/scala/test/views/TestViews.scala
@@ -13,23 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package test.eci.datahub
-
-import org.schedoscope.dsl.Parameter
-import org.schedoscope.dsl.Parameter.p
-import org.schedoscope.dsl.{View, Structure}
-import org.schedoscope.dsl.storageformats._
+package test.views
+
+import org.schedoscope.dsl.{Parameter, Structure, View}
+import org.schedoscope.dsl.Parameter._
+import org.schedoscope.dsl.views._
+import org.schedoscope.dsl.storageformats.{Avro, Parquet, TextFile}
+import org.schedoscope.dsl.transformations.HiveTransformation
+import org.schedoscope.dsl.transformations.HiveTransformation._
import org.schedoscope.dsl.transformations.Export._
-import org.schedoscope.dsl.transformations.{HiveTransformation, OozieTransformation}
-import org.schedoscope.dsl.transformations.HiveTransformation.insertInto
-import org.schedoscope.dsl.transformations.OozieTransformation.oozieWFPath
-import org.schedoscope.dsl.views.{DailyParameterization, Id, JobMetadata, PointOccurrence}
import org.schedoscope.export.testsupport.EmbeddedFtpSftpServer
import scala.util.Random
-case class Brand(
- shopCode: Parameter[String]) extends View
+case class Brand(shopCode: Parameter[String]) extends View
with Id
with JobMetadata {
@@ -40,11 +37,10 @@ case class Brand(
asTableSuffix(shopCode)
}
-case class Product(
- shopCode: Parameter[String],
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class Product(shopCode: Parameter[String],
+ year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with Id
with PointOccurrence
with JobMetadata
@@ -58,11 +54,10 @@ case class Product(
asTableSuffix(shopCode)
}
-case class ProductBrand(
- shopCode: Parameter[String],
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class ProductBrand(shopCode: Parameter[String],
+ year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with PointOccurrence
with JobMetadata
with DailyParameterization {
@@ -96,11 +91,10 @@ case class ProductBrand(
""")))
}
-case class ProductBrandMaterializeOnce(
- shopCode: Parameter[String],
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class ProductBrandMaterializeOnce(shopCode: Parameter[String],
+ year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with PointOccurrence
with JobMetadata
with DailyParameterization {
@@ -135,10 +129,9 @@ case class ProductBrandMaterializeOnce(
""")))
}
-case class ProductBrandsNoOpMirror(
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View {
+case class ProductBrandsNoOpMirror(year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View {
dependsOn(() => ProductBrand(p("EC0101"), year, month, day))
dependsOn(() => ProductBrand(p("EC0102"), year, month, day))
@@ -180,28 +173,25 @@ case class AvroView(
storedAs(Avro("test.avsc"))
}
-case class ViewWithDefaultParams(
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String],
- defaultParameter: Int = 2) extends View {
+case class ViewWithDefaultParams(year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String],
+ defaultParameter: Int = 2) extends View {
}
-case class Click(
- shopCode: Parameter[String],
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class Click(shopCode: Parameter[String],
+ year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with Id
with DailyParameterization {
val url = fieldOf[String]
}
-case class ClickOfEC0101(
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class ClickOfEC0101(year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with Id
with DailyParameterization {
@@ -218,10 +208,9 @@ case class ClickOfEC0101(
WHERE ${click().shopCode.n} = '${click().shopCode.v.get}'""")))
}
-case class ClickOfEC0101WithJdbcExport(
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class ClickOfEC0101WithJdbcExport(year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with Id
with DailyParameterization {
@@ -241,10 +230,9 @@ case class ClickOfEC0101WithJdbcExport(
}
-case class ClickOfEC0101WithRedisExport(
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class ClickOfEC0101WithRedisExport(year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with Id
with DailyParameterization {
@@ -264,10 +252,9 @@ case class ClickOfEC0101WithRedisExport(
}
-case class ClickOfEC0101WithKafkaExport(
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class ClickOfEC0101WithKafkaExport(year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with Id
with DailyParameterization {
@@ -287,10 +274,9 @@ case class ClickOfEC0101WithKafkaExport(
}
-case class ClickOfEC0101WithFtpExport(
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
+case class ClickOfEC0101WithFtpExport(year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
with Id
with DailyParameterization {
@@ -315,23 +301,6 @@ case class ClickOfEC0101WithFtpExport(
filePrefix))
}
-case class ClickOfEC0101ViaOozie(
- year: Parameter[String],
- month: Parameter[String],
- day: Parameter[String]) extends View
- with Id
- with DailyParameterization {
-
- val url = fieldOf[String]
-
- val click = dependsOn(() => Click(p("EC0101"), year, month, day))
-
- transformVia(
- () => OozieTransformation(
- "bundle", "click",
- oozieWFPath("bundle", "click")))
-}
-
case class SimpleDependendView() extends View with Id {
val field1 = fieldOf[String]
tablePathBuilder = s => "src/test/resources/input"
diff --git a/schedoscope-export/README.md b/schedoscope-export/README.md
index 6d453419a..929f6779f 100644
--- a/schedoscope-export/README.md
+++ b/schedoscope-export/README.md
@@ -45,7 +45,7 @@ This Map/Reduce job moves data into a relational database via a JDBC connection.
#### Run the JDBC export
-The schedoscope-export project doesn't bundle any JDBC driver. It's necessary to add a JDBC driver to the classpath, the export job will copy into HDFS / distributed cache and add the driver to the classpath:
+The schedoscope-export project doesn't oozie.bundle any JDBC driver. It's necessary to add a JDBC driver to the classpath, the export job will copy into HDFS / distributed cache and add the driver to the classpath:
export YARN_USER_CLASSPATH=/path/to/jdbc/jar/file/mysql-connector-java-5.1.38.jar
diff --git a/schedoscope-transformation-oozie/pom.xml b/schedoscope-transformation-oozie/pom.xml
new file mode 100644
index 000000000..23f4c52d5
--- /dev/null
+++ b/schedoscope-transformation-oozie/pom.xml
@@ -0,0 +1,151 @@
+
+
+ 4.0.0
+ schedoscope-transformation-oozie
+ Schedoscope Oozie Transformation
+ Oozie transformation and drivers
+
+
+ schedoscope-suite
+ schedoscope
+ 0.6.5-SNAPSHOT
+
+
+
+
+ schedoscope
+ schedoscope-core
+ ${schedoscope.version}
+
+
+ org.apache.oozie
+ oozie-client
+ ${oozie.version}
+
+
+ slf4j-api
+ org.slf4j
+
+
+ slf4j-simple
+ org.slf4j
+
+
+ guava
+ com.google.guava
+
+
+
+
+ minioozie
+ minioozie
+ ${minioozie.version}
+ provided
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ slf4j-api
+ org.slf4j
+
+
+
+
+ org.scalatest
+ scalatest_2.11
+ 2.2.5
+ test
+
+
+ hadoop-launcher
+ hadoop-launcher
+ 0.1.0
+ test
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.3
+
+
+ 1.7
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 2.7
+
+ UTF-8
+
+
+
+ maven-source-plugin
+ 2.4
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.2.2
+
+
+
+ compile
+ testCompile
+ doc-jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19
+
+ true
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 1.0
+
+ ${project.build.directory}/surefire-reports
+ .
+ WDF TestSuite.txt
+ -Xmx1024m -XX:MaxPermSize=512M
+
+ ${project.build.directory}/hadoop
+
+
+
+
+ test
+
+ test
+
+
+
+
+
+
+
diff --git a/schedoscope-transformation-oozie/src/main/resources/reference.conf b/schedoscope-transformation-oozie/src/main/resources/reference.conf
new file mode 100644
index 000000000..08183de55
--- /dev/null
+++ b/schedoscope-transformation-oozie/src/main/resources/reference.conf
@@ -0,0 +1,67 @@
+#
+# Oozie transformation settings
+#
+
+schedoscope {
+
+ transformations = {
+
+ oozie: {
+
+ #
+ # Class implementing the Oozie driver
+ #
+
+ driverClassName = "org.schedoscope.scheduler.driver.OozieDriver"
+
+ #
+ # Where to put Oozie bundles in HDFS
+ #
+
+ location = "/tmp/schedoscope/oozie/"
+
+ #
+ # Comma-separated list of directories where additional Oozie workflow
+ # oozie.bundle jars can be found that are to be put onto HDFS when launching
+ # Schedoscope.
+ #
+
+ libDirectory = ""
+
+ #
+ # URL of Oozie Server
+ #
+
+ url = "http://localhost:11000/oozie"
+
+ #
+ # Number of parallel Driver actors to use for executing Oozie
+ # transformations
+ #
+
+ concurrency = 10
+
+ #
+ # Oozie oozie.bundle jars need to be unpacked in HDFS.
+ #
+
+ unpack = true
+
+ #
+ # Timeout for Oozie transformations
+ #
+
+ timeout = 1 day
+
+ #
+ # The handlers being notified after each driver run has
+ # finished (succeeded or failed). These must implement the
+ # trait org.schedoscope.scheduler.driver.DriverRunCompletionHandler
+ #
+
+ driverRunCompletionHandlers = ["org.schedoscope.scheduler.driver.DoNothingCompletionHandler"]
+
+ }
+
+ }
+}
\ No newline at end of file
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/OozieTransformation.scala b/schedoscope-transformation-oozie/src/main/scala/org/schedoscope/dsl/transformations/OozieTransformation.scala
similarity index 88%
rename from schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/OozieTransformation.scala
rename to schedoscope-transformation-oozie/src/main/scala/org/schedoscope/dsl/transformations/OozieTransformation.scala
index fd8e7963b..4ae7329ae 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/OozieTransformation.scala
+++ b/schedoscope-transformation-oozie/src/main/scala/org/schedoscope/dsl/transformations/OozieTransformation.scala
@@ -20,13 +20,14 @@ import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.schedoscope.Settings
+import org.schedoscope.scheduler.service.ViewTransformationStatus
import scala.collection.JavaConversions._
/**
* specifies the execution of an oozie workflow
*
- * @param bundle bundle name
+ * @param bundle oozie.bundle name
* @param workflow workflow name
* @param workflowAppPath path of the deployed workflow in hdfs
*
@@ -37,6 +38,12 @@ case class OozieTransformation(bundle: String, workflow: String, var workflowApp
override def fileResourcesToChecksum = List(workflowAppPath)
description = StringUtils.abbreviate(s"${bundle}/${workflow}", 100)
+
+ override def viewTransformationStatus = ViewTransformationStatus(
+ name,
+ Some(Map(
+ "bundle" -> bundle,
+ "workflow" -> workflow)))
}
object OozieTransformation {
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/scheduler/driver/OozieDriver.scala b/schedoscope-transformation-oozie/src/main/scala/org/schedoscope/scheduler/driver/OozieDriver.scala
similarity index 99%
rename from schedoscope-core/src/main/scala/org/schedoscope/scheduler/driver/OozieDriver.scala
rename to schedoscope-transformation-oozie/src/main/scala/org/schedoscope/scheduler/driver/OozieDriver.scala
index cfa474db1..9004aa341 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/scheduler/driver/OozieDriver.scala
+++ b/schedoscope-transformation-oozie/src/main/scala/org/schedoscope/scheduler/driver/OozieDriver.scala
@@ -71,7 +71,7 @@ class OozieDriver(val driverRunCompletionHandlerClassNames: List[String], val cl
}
/**
- * Rig Oozie transformations prior to test by loading the workflow bundle into the test environment's HDFS
+ * Rig Oozie transformations prior to test by loading the workflow oozie.bundle into the test environment's HDFS
* and tweak the path references accordingly
*/
override def rigTransformationForTest(t: OozieTransformation, testResources: TestResources) = {
diff --git a/schedoscope-transformation-oozie/src/main/scala/org/schedoscope/test/ClusterTest.scala b/schedoscope-transformation-oozie/src/main/scala/org/schedoscope/test/ClusterTest.scala
new file mode 100644
index 000000000..44594b5ba
--- /dev/null
+++ b/schedoscope-transformation-oozie/src/main/scala/org/schedoscope/test/ClusterTest.scala
@@ -0,0 +1,13 @@
+package org.schedoscope.test
+
+import org.schedoscope.test.resources.OozieTestResources
+
+
+/**
+ * A test environment that is executed in a local minicluster
+ */
+trait clustertest extends test {
+ resources = new OozieTestResources()
+
+ def cluster = resources.asInstanceOf[OozieTestResources].mo
+}
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/test/resources/OozieTestResources.scala b/schedoscope-transformation-oozie/src/main/scala/org/schedoscope/test/resources/OozieTestResources.scala
similarity index 100%
rename from schedoscope-core/src/main/scala/org/schedoscope/test/resources/OozieTestResources.scala
rename to schedoscope-transformation-oozie/src/main/scala/org/schedoscope/test/resources/OozieTestResources.scala
diff --git a/schedoscope-transformation-oozie/src/test/resources/log4j.properties b/schedoscope-transformation-oozie/src/test/resources/log4j.properties
new file mode 100644
index 000000000..c5e48adeb
--- /dev/null
+++ b/schedoscope-transformation-oozie/src/test/resources/log4j.properties
@@ -0,0 +1 @@
+log4j.rootLogger=OFF
\ No newline at end of file
diff --git a/schedoscope-transformation-oozie/src/test/resources/logback-test.xml b/schedoscope-transformation-oozie/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..6bf2ab018
--- /dev/null
+++ b/schedoscope-transformation-oozie/src/test/resources/logback-test.xml
@@ -0,0 +1,19 @@
+
+
+
+ [%-4level] [%date{ISO8601}] [%thread %X{sourceThread}]
+ [%X{akkaSource}] [%logger{36}] - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/schedoscope-core/src/test/resources/oozie/bundle/click/workflow.xml b/schedoscope-transformation-oozie/src/test/resources/oozie/bundle/click/workflow.xml
similarity index 100%
rename from schedoscope-core/src/test/resources/oozie/bundle/click/workflow.xml
rename to schedoscope-transformation-oozie/src/test/resources/oozie/bundle/click/workflow.xml
diff --git a/schedoscope-core/src/test/resources/oozie/bundle/failflow/workflow.xml b/schedoscope-transformation-oozie/src/test/resources/oozie/bundle/failflow/workflow.xml
similarity index 100%
rename from schedoscope-core/src/test/resources/oozie/bundle/failflow/workflow.xml
rename to schedoscope-transformation-oozie/src/test/resources/oozie/bundle/failflow/workflow.xml
diff --git a/schedoscope-core/src/test/resources/oozie/bundle/workflow/workflow.xml b/schedoscope-transformation-oozie/src/test/resources/oozie/bundle/workflow/workflow.xml
similarity index 100%
rename from schedoscope-core/src/test/resources/oozie/bundle/workflow/workflow.xml
rename to schedoscope-transformation-oozie/src/test/resources/oozie/bundle/workflow/workflow.xml
diff --git a/schedoscope-core/src/test/resources/ooziewftest.properties b/schedoscope-transformation-oozie/src/test/resources/ooziewftest.properties
similarity index 100%
rename from schedoscope-core/src/test/resources/ooziewftest.properties
rename to schedoscope-transformation-oozie/src/test/resources/ooziewftest.properties
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/dsl/transformations/OozieWFTest.scala b/schedoscope-transformation-oozie/src/test/scala/org/schedoscope/dsl/transformations/OozieWFTest.scala
similarity index 95%
rename from schedoscope-core/src/test/scala/org/schedoscope/dsl/transformations/OozieWFTest.scala
rename to schedoscope-transformation-oozie/src/test/scala/org/schedoscope/dsl/transformations/OozieWFTest.scala
index 7a088c1d2..7528e9350 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/dsl/transformations/OozieWFTest.scala
+++ b/schedoscope-transformation-oozie/src/test/scala/org/schedoscope/dsl/transformations/OozieWFTest.scala
@@ -34,9 +34,9 @@ case class Productfeed(ecShopCode: Parameter[String],
transformVia(() =>
OozieTransformation(
- "products_processed-bundle",
+ "products_processed-oozie.bundle",
"workflow-processed_productfeed",
- s"/hdp/${env}/applications/eci/scripts/oozie/products_processed-bundle/workflow-processed_productfeed/")
+ s"/hdp/${env}/applications/eci/scripts/oozie/products_processed-oozie.bundle/workflow-processed_productfeed/")
.configureWith(
configurationFromResource("ooziewftest.properties") ++
Map(
@@ -59,7 +59,7 @@ class OozieWFTest extends FlatSpec with BeforeAndAfter with Matchers {
val t = view.transformation().asInstanceOf[OozieTransformation]
- t.workflowAppPath shouldEqual "/hdp/dev/applications/eci/scripts/oozie/products_processed-bundle/workflow-processed_productfeed/"
+ t.workflowAppPath shouldEqual "/hdp/dev/applications/eci/scripts/oozie/products_processed-oozie.bundle/workflow-processed_productfeed/"
val expectedConfiguration = Map(
"oozie.bundle.application.path" -> "${nameNode}${bundlePath}",
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/OozieDriverTest.scala b/schedoscope-transformation-oozie/src/test/scala/org/schedoscope/scheduler/driver/OozieDriverTest.scala
similarity index 76%
rename from schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/OozieDriverTest.scala
rename to schedoscope-transformation-oozie/src/test/scala/org/schedoscope/scheduler/driver/OozieDriverTest.scala
index 96177662e..40813a19d 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/OozieDriverTest.scala
+++ b/schedoscope-transformation-oozie/src/test/scala/org/schedoscope/scheduler/driver/OozieDriverTest.scala
@@ -15,12 +15,10 @@
*/
package org.schedoscope.scheduler.driver
-import org.apache.hadoop.fs.Path
import org.scalatest.{FlatSpec, Matchers}
import org.schedoscope.dsl.transformations.OozieTransformation
import org.schedoscope.test.resources.OozieTestResources
import org.schedoscope.test.resources.TestDriverRunCompletionHandlerCallCounter._
-import org.schedoscope.{DriverTests, OozieTests}
class OozieDriverTest extends FlatSpec with Matchers {
@@ -30,47 +28,35 @@ class OozieDriverTest extends FlatSpec with Matchers {
lazy val driver = resources.driverFor[OozieTransformation]("oozie")
- def deployWorkflow(wf: OozieTransformation) = {
- val hdfs = resources.fileSystem
- val dest = new Path(s"${resources.namenode}/${wf.workflowAppPath}")
- val src = new Path(s"src/test/resources/oozie/${wf.bundle}/${wf.workflow}/workflow.xml")
-
- if (!hdfs.exists(dest))
- hdfs.mkdirs(dest)
-
- hdfs.copyFromLocalFile(src, dest)
- wf
- }
-
- lazy val workingOozieTransformation = deployWorkflow(
+ lazy val workingOozieTransformation = driver.rigTransformationForTest(
OozieTransformation(
"bundle", "workflow",
"/tmp/schedoscope/oozie/workflows/bundle/workflow/")
.configureWith(Map(
"jobTracker" -> cluster.getJobTrackerUri(),
"nameNode" -> cluster.getNameNodeUri(),
- "oozie.use.system.libpath" -> "false")).asInstanceOf[OozieTransformation])
+ "oozie.use.system.libpath" -> "false")).asInstanceOf[OozieTransformation], resources)
- lazy val failingOozieTransformation = deployWorkflow(
+ lazy val failingOozieTransformation = driver.rigTransformationForTest(
OozieTransformation(
"bundle", "failflow",
"/tmp/schedoscope/oozie/workflows/bundle/failflow/")
.configureWith(Map(
"jobTracker" -> cluster.getJobTrackerUri(),
"nameNode" -> cluster.getNameNodeUri(),
- "oozie.use.system.libpath" -> "false")).asInstanceOf[OozieTransformation])
+ "oozie.use.system.libpath" -> "false")).asInstanceOf[OozieTransformation], resources)
- "Oozie" should "have transformation name oozie" taggedAs(DriverTests, OozieTests) in {
+ "Oozie" should "have transformation name oozie" in {
driver.transformationName shouldBe "oozie"
}
- it should "execute oozie transformations synchronously" taggedAs(DriverTests, OozieTests) in {
+ it should "execute oozie transformations synchronously" in {
val driverRunState = driver.runAndWait(workingOozieTransformation)
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute oozie transformations asynchronously" taggedAs(DriverTests, OozieTests) in {
+ it should "execute oozie transformations asynchronously" in {
val driverRunHandle = driver.run(workingOozieTransformation)
var runWasAsynchronous = false
@@ -82,13 +68,13 @@ class OozieDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute oozie transformations and return errors while running synchronously" taggedAs(DriverTests, OozieTests) in {
+ it should "execute oozie transformations and return errors while running synchronously" in {
val driverRunState = driver.runAndWait(failingOozieTransformation)
driverRunState shouldBe a[DriverRunFailed[_]]
}
- it should "execute oozie transformations and return errors while running asynchronously" taggedAs(DriverTests, OozieTests) in {
+ it should "execute oozie transformations and return errors while running asynchronously" in {
val driverRunHandle = driver.run(failingOozieTransformation)
var runWasAsynchronous = false
@@ -100,7 +86,7 @@ class OozieDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunFailed[_]]
}
- it should "be able to kill running oozie transformations" taggedAs(DriverTests, OozieTests) in {
+ it should "be able to kill running oozie transformations" in {
val driverRunHandle = driver.run(workingOozieTransformation)
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunOngoing[_]]
driver.killRun(driverRunHandle)
@@ -112,7 +98,7 @@ class OozieDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunFailed[_]]
}
- it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" in {
val runHandle = driver.run(workingOozieTransformation)
while (driver.getDriverRunState(runHandle).isInstanceOf[DriverRunOngoing[_]]) {}
@@ -122,7 +108,7 @@ class OozieDriverTest extends FlatSpec with Matchers {
driverRunCompletedCalled(runHandle, driver.getDriverRunState(runHandle)) shouldBe true
}
- it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" in {
val runHandle = driver.run(workingOozieTransformation)
driver.driverRunStarted(runHandle)
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/test/OozieTestFrameworkTest.scala b/schedoscope-transformation-oozie/src/test/scala/org/schedoscope/test/OozieTestFrameworkTest.scala
similarity index 93%
rename from schedoscope-core/src/test/scala/org/schedoscope/test/OozieTestFrameworkTest.scala
rename to schedoscope-transformation-oozie/src/test/scala/org/schedoscope/test/OozieTestFrameworkTest.scala
index 4b29827d7..028a05a18 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/test/OozieTestFrameworkTest.scala
+++ b/schedoscope-transformation-oozie/src/test/scala/org/schedoscope/test/OozieTestFrameworkTest.scala
@@ -18,8 +18,7 @@ package org.schedoscope.test
import org.scalatest.{FlatSpec, Matchers}
import org.schedoscope.dsl.Field.v
import org.schedoscope.dsl.Parameter.p
-import org.schedoscope.{DriverTests, OozieTests}
-import test.eci.datahub.{Click, ClickOfEC0101ViaOozie}
+import test.views.{Click, ClickOfEC0101ViaOozie}
class OozieTestFrameworkTest extends FlatSpec with Matchers {
val ec0101Clicks = new Click(p("EC0101"), p("2014"), p("01"), p("01")) with rows {
@@ -46,7 +45,7 @@ class OozieTestFrameworkTest extends FlatSpec with Matchers {
v(url, "http://ec0106.com/url3"))
}
- "Oozie test framework" should "execute oozie workflows in MiniOozie cluster" taggedAs(DriverTests, OozieTests) in {
+ "Oozie test framework" should "execute oozie workflows in MiniOozie cluster" in {
new ClickOfEC0101ViaOozie(p("2014"), p("01"), p("01")) with clustertest {
basedOn(ec0101Clicks, ec0106Clicks)
withConfiguration(
diff --git a/schedoscope-transformation-oozie/src/test/scala/test/views/TestViews.scala b/schedoscope-transformation-oozie/src/test/scala/test/views/TestViews.scala
new file mode 100644
index 000000000..f70a76387
--- /dev/null
+++ b/schedoscope-transformation-oozie/src/test/scala/test/views/TestViews.scala
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2015 Otto (GmbH & Co KG)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package test.views
+
+import org.schedoscope.dsl.Parameter
+import org.schedoscope.dsl.Parameter.p
+import org.schedoscope.dsl.View
+import org.schedoscope.dsl.transformations.OozieTransformation
+import org.schedoscope.dsl.transformations.OozieTransformation.oozieWFPath
+import org.schedoscope.dsl.views.{DailyParameterization, Id}
+
+
+case class Click(shopCode: Parameter[String],
+ year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
+ with Id
+ with DailyParameterization {
+
+ val url = fieldOf[String]
+}
+
+case class ClickOfEC0101ViaOozie(year: Parameter[String],
+ month: Parameter[String],
+ day: Parameter[String]) extends View
+ with Id
+ with DailyParameterization {
+
+ val url = fieldOf[String]
+
+ val click = dependsOn(() => Click(p("EC0101"), year, month, day))
+
+ transformVia(
+ () => OozieTransformation(
+ "bundle", "click",
+ oozieWFPath("bundle", "click")))
+}
diff --git a/schedoscope-transformation-pig/pom.xml b/schedoscope-transformation-pig/pom.xml
new file mode 100644
index 000000000..d246b7ae4
--- /dev/null
+++ b/schedoscope-transformation-pig/pom.xml
@@ -0,0 +1,159 @@
+
+
+ 4.0.0
+ schedoscope-transformation-pig
+ Schedoscope Pig Transformation
+ Pig transformation and drivers
+
+
+ schedoscope-suite
+ schedoscope
+ 0.6.5-SNAPSHOT
+
+
+
+
+ schedoscope
+ schedoscope-core
+ ${schedoscope.version}
+
+
+ org.apache.pig
+ pig
+ ${pig.version}
+
+
+ slf4j-api
+ org.slf4j
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ guava
+ com.google.guava
+
+
+
+
+ com.twitter
+ parquet-pig-bundle
+ ${pig.parquet.version}
+
+
+ org.apache.hive.hcatalog
+ hive-hcatalog-pig-adapter
+ ${hive.hcatalog.version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ slf4j-api
+ org.slf4j
+
+
+ guava
+ com.google.guava
+
+
+
+
+ org.scalatest
+ scalatest_2.11
+ 2.2.5
+ test
+
+
+ hadoop-launcher
+ hadoop-launcher
+ 0.1.0
+ test
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.3
+
+
+ 1.7
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 2.7
+
+ UTF-8
+
+
+
+ maven-source-plugin
+ 2.4
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.2.2
+
+
+
+ compile
+ testCompile
+ doc-jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19
+
+ true
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 1.0
+
+ ${project.build.directory}/surefire-reports
+ .
+ WDF TestSuite.txt
+ -Xmx1024m -XX:MaxPermSize=512M
+
+ ${project.build.directory}/hadoop
+
+
+
+
+ test
+
+ test
+
+
+
+
+
+
+
diff --git a/schedoscope-transformation-pig/src/main/resources/reference.conf b/schedoscope-transformation-pig/src/main/resources/reference.conf
new file mode 100644
index 000000000..b2405174f
--- /dev/null
+++ b/schedoscope-transformation-pig/src/main/resources/reference.conf
@@ -0,0 +1,63 @@
+#
+# Pig transformation settings
+#
+
+schedoscope {
+
+ transformations = {
+
+ pig: {
+
+ #
+ # Class implementing the Pig driver
+ #
+
+ driverClassName = "org.schedoscope.scheduler.driver.PigDriver"
+
+ #
+ # Location where to put Pig library jar in HDFS
+ #
+
+ location = "/tmp/schedoscope/pig/"
+
+ #
+ # Ignored
+ #
+
+ libDirectory = ""
+
+ #
+ # Ignored.
+ #
+
+ url = ""
+
+ #
+ # Do not change. Pig jars should not be unpacked in HDFS.
+ #
+
+ unpack = false
+
+ #
+ # Number of parallel Driver actors to use for executing Pig
+ # transformations
+ #
+
+ concurrency = 10
+
+ #
+ # Timeout for Pig transformations.
+ #
+
+ timeout = 1 day
+
+ #
+ # The handlers being notified after each driver run has
+ # finished (succeeded or failed). These must implement the
+ # trait org.schedoscope.scheduler.driver.DriverRunCompletionHandler
+ #
+
+ driverRunCompletionHandlers = ["org.schedoscope.scheduler.driver.DoNothingCompletionHandler"]
+ }
+ }
+}
\ No newline at end of file
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/PigTransformation.scala b/schedoscope-transformation-pig/src/main/scala/org/schedoscope/dsl/transformations/PigTransformation.scala
similarity index 91%
rename from schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/PigTransformation.scala
rename to schedoscope-transformation-pig/src/main/scala/org/schedoscope/dsl/transformations/PigTransformation.scala
index cc418417a..143ebe76f 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/PigTransformation.scala
+++ b/schedoscope-transformation-pig/src/main/scala/org/schedoscope/dsl/transformations/PigTransformation.scala
@@ -22,7 +22,8 @@ import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege
import org.apache.hadoop.hive.ql.udf.UDFLength
import org.apache.hive.hcatalog.data.schema.HCatSchema
import org.apache.hive.hcatalog.pig.HCatLoader
-import parquet.pig.ParquetStorer
+import org.apache.pig.builtin.ParquetStorer
+import org.schedoscope.scheduler.service.ViewTransformationStatus
/**
@@ -54,6 +55,10 @@ case class PigTransformation(latin: String, dirsToDelete: List[String] = List())
})
.filter(cl => cl != null && !"".equals(cl.trim))
}
+
+ override def viewTransformationStatus = ViewTransformationStatus(
+ name,
+ Some(Map("latin" -> latin)))
}
object PigTransformation {
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/scheduler/driver/PigDriver.scala b/schedoscope-transformation-pig/src/main/scala/org/schedoscope/scheduler/driver/PigDriver.scala
similarity index 92%
rename from schedoscope-core/src/main/scala/org/schedoscope/scheduler/driver/PigDriver.scala
rename to schedoscope-transformation-pig/src/main/scala/org/schedoscope/scheduler/driver/PigDriver.scala
index 5b5756056..bb2d34517 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/scheduler/driver/PigDriver.scala
+++ b/schedoscope-transformation-pig/src/main/scala/org/schedoscope/scheduler/driver/PigDriver.scala
@@ -45,6 +45,15 @@ class PigDriver(val driverRunCompletionHandlerClassNames: List[String], val ugi:
executePigTransformation(t.latin, t.dirsToDelete, t.defaultLibraries, t.configuration.toMap, t.getView())
})
+ /**
+ * Rig Pig transformations prior to test by tweaking UDF path references to local classpath references.
+ */
+ override def rigTransformationForTest(t: PigTransformation, testResources: TestResources) = {
+ t.configureWith(Map("exec.type" -> "LOCAL"))
+
+ t
+ }
+
/**
* Really executed the given Pig Latin.
*/
diff --git a/schedoscope-transformation-pig/src/test/resources/log4j.properties b/schedoscope-transformation-pig/src/test/resources/log4j.properties
new file mode 100644
index 000000000..c5e48adeb
--- /dev/null
+++ b/schedoscope-transformation-pig/src/test/resources/log4j.properties
@@ -0,0 +1 @@
+log4j.rootLogger=OFF
\ No newline at end of file
diff --git a/schedoscope-transformation-pig/src/test/resources/logback-test.xml b/schedoscope-transformation-pig/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..6bf2ab018
--- /dev/null
+++ b/schedoscope-transformation-pig/src/test/resources/logback-test.xml
@@ -0,0 +1,19 @@
+
+
+
+ [%-4level] [%date{ISO8601}] [%thread %X{sourceThread}]
+ [%X{akkaSource}] [%logger{36}] - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/PigDriverTest.scala b/schedoscope-transformation-pig/src/test/scala/org/schedoscope/scheduler/driver/PigDriverTest.scala
similarity index 68%
rename from schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/PigDriverTest.scala
rename to schedoscope-transformation-pig/src/test/scala/org/schedoscope/scheduler/driver/PigDriverTest.scala
index 16f898076..7bb701d42 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/PigDriverTest.scala
+++ b/schedoscope-transformation-pig/src/test/scala/org/schedoscope/scheduler/driver/PigDriverTest.scala
@@ -16,38 +16,43 @@
package org.schedoscope.scheduler.driver
import org.scalatest.{FlatSpec, Matchers}
-import org.schedoscope.DriverTests
import org.schedoscope.dsl.transformations.PigTransformation
import org.schedoscope.test.resources.LocalTestResources
import org.schedoscope.test.resources.TestDriverRunCompletionHandlerCallCounter._
class PigDriverTest extends FlatSpec with Matchers {
- lazy val driver = new LocalTestResources().driverFor[PigTransformation]("pig")
+ lazy val testResources = new LocalTestResources()
- "PigDriver" should "have transformation name pig" taggedAs (DriverTests) in {
+ lazy val driver = testResources.driverFor[PigTransformation]("pig")
+
+ lazy val okTransformation = driver.rigTransformationForTest(PigTransformation("/* a comment */"), testResources)
+
+ lazy val badTransformation = driver.rigTransformationForTest(PigTransformation("FAIL ME"), testResources)
+
+ "PigDriver" should "have transformation name pig" in {
driver.transformationName shouldBe "pig"
}
- it should "execute pig transformations synchronously" taggedAs (DriverTests) in {
- val driverRunState = driver.runAndWait(PigTransformation("/* a comment */"))
+ it should "execute pig transformations synchronously" in {
+ val driverRunState = driver.runAndWait(okTransformation)
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute another pig transformations synchronously" taggedAs (DriverTests) in {
- val driverRunState = driver.runAndWait(PigTransformation("/* a comment */"))
+ it should "execute another pig transformations synchronously" in {
+ val driverRunState = driver.runAndWait(okTransformation)
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute pig transformations and return errors when running synchronously" taggedAs (DriverTests) in {
- val driverRunState = driver.runAndWait(PigTransformation("FAIL ME"))
+ it should "execute pig transformations and return errors when running synchronously" in {
+ val driverRunState = driver.runAndWait(badTransformation)
driverRunState shouldBe a[DriverRunFailed[_]]
}
- it should "execute pig transformations asynchronously" taggedAs (DriverTests) in {
- val driverRunHandle = driver.run(PigTransformation("/* a comment */"))
+ it should "execute pig transformations asynchronously" in {
+ val driverRunHandle = driver.run(okTransformation)
var runWasAsynchronous = false
@@ -58,8 +63,8 @@ class PigDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute pig transformations and return errors when running asynchronously" taggedAs (DriverTests) in {
- val driverRunHandle = driver.run(PigTransformation("FAIL ME"))
+ it should "execute pig transformations and return errors when running asynchronously" in {
+ val driverRunHandle = driver.run(badTransformation)
var runWasAsynchronous = false
@@ -70,8 +75,8 @@ class PigDriverTest extends FlatSpec with Matchers {
driver.getDriverRunState(driverRunHandle) shouldBe a[DriverRunFailed[_]]
}
- it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" taggedAs (DriverTests) in {
- val runHandle = driver.run(PigTransformation("/* a comment */"))
+ it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" in {
+ val runHandle = driver.run(okTransformation)
while (driver.getDriverRunState(runHandle).isInstanceOf[DriverRunOngoing[_]]) {}
@@ -80,8 +85,8 @@ class PigDriverTest extends FlatSpec with Matchers {
driverRunCompletedCalled(runHandle, driver.getDriverRunState(runHandle)) shouldBe true
}
- it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" taggedAs (DriverTests) in {
- val runHandle = driver.run(PigTransformation("/* a comment */"))
+ it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" in {
+ val runHandle = driver.run(okTransformation)
driver.driverRunStarted(runHandle)
diff --git a/schedoscope-transformation-shell/pom.xml b/schedoscope-transformation-shell/pom.xml
new file mode 100644
index 000000000..1e89f011a
--- /dev/null
+++ b/schedoscope-transformation-shell/pom.xml
@@ -0,0 +1,116 @@
+
+
+ 4.0.0
+ schedoscope-transformation-shell
+ Schedoscope Shell Transformation
+ Shell transformation and drivers
+
+
+ schedoscope-suite
+ schedoscope
+ 0.6.5-SNAPSHOT
+
+
+
+
+ schedoscope
+ schedoscope-core
+ ${schedoscope.version}
+
+
+ org.scalatest
+ scalatest_2.11
+ 2.2.5
+ test
+
+
+ hadoop-launcher
+ hadoop-launcher
+ 0.1.0
+ test
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.3
+
+
+ 1.7
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 2.7
+
+ UTF-8
+
+
+
+ maven-source-plugin
+ 2.4
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.2.2
+
+
+
+ compile
+ testCompile
+ doc-jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19
+
+ true
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 1.0
+
+ ${project.build.directory}/surefire-reports
+ .
+ WDF TestSuite.txt
+ -Xmx1024m -XX:MaxPermSize=512M
+
+ ${project.build.directory}/hadoop
+
+
+
+
+ test
+
+ test
+
+
+
+
+
+
+
diff --git a/schedoscope-transformation-shell/src/main/resources/reference.conf b/schedoscope-transformation-shell/src/main/resources/reference.conf
new file mode 100644
index 000000000..66d8f868f
--- /dev/null
+++ b/schedoscope-transformation-shell/src/main/resources/reference.conf
@@ -0,0 +1,62 @@
+#
+# Shell transformation settings
+#
+
+schedoscope {
+
+ transformations = {
+
+ shell: {
+
+ #
+ # Class implementing the Shell driver
+ #
+
+ driverClassName = "org.schedoscope.scheduler.driver.ShellDriver"
+
+ #
+ # Number of parallel Shell Driver actors to use
+ #
+
+ concurrency = 1
+
+ #
+ # Ignored
+ #
+
+ location = "/"
+
+ #
+ # Ignored
+ #
+
+ libDirectory = ""
+
+ #
+ # Ignored
+ #
+
+ url = ""
+
+ #
+ # Ignored
+ #
+
+ unpack = false
+
+ #
+ # Timeout for Shell transformations
+ #
+
+ timeout = 1 day
+
+ #
+ # The handlers being notified after each driver run has
+ # finished (succeeded or failed). These must implement the
+ # trait org.schedoscope.scheduler.driver.DriverRunCompletionHandler
+ #
+
+ driverRunCompletionHandlers = ["org.schedoscope.scheduler.driver.DoNothingCompletionHandler"]
+ }
+ }
+}
\ No newline at end of file
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/ShellTransformation.scala b/schedoscope-transformation-shell/src/main/scala/org/schedoscope/dsl/transformations/ShellTransformation.scala
similarity index 76%
rename from schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/ShellTransformation.scala
rename to schedoscope-transformation-shell/src/main/scala/org/schedoscope/dsl/transformations/ShellTransformation.scala
index d5edbf98b..5d53566d2 100644
--- a/schedoscope-core/src/main/scala/org/schedoscope/dsl/transformations/ShellTransformation.scala
+++ b/schedoscope-transformation-shell/src/main/scala/org/schedoscope/dsl/transformations/ShellTransformation.scala
@@ -1,5 +1,7 @@
package org.schedoscope.dsl.transformations
+import org.schedoscope.scheduler.service.ViewTransformationStatus
+
/**
* A shell transformation - externally transform a view using a shell script.
*
@@ -16,4 +18,10 @@ case class ShellTransformation(script: String = "", scriptFile: String = "", she
override def fileResourcesToChecksum = if (scriptFile != "") List(scriptFile) else List()
override def stringsToChecksum = List(script)
+
+ override def viewTransformationStatus = ViewTransformationStatus(
+ name,
+ Some(Map(
+ "shell" -> shell, "script" -> script,
+ "scriptFile" -> scriptFile)))
}
diff --git a/schedoscope-core/src/main/scala/org/schedoscope/scheduler/driver/ShellDriver.scala b/schedoscope-transformation-shell/src/main/scala/org/schedoscope/scheduler/driver/ShellDriver.scala
similarity index 100%
rename from schedoscope-core/src/main/scala/org/schedoscope/scheduler/driver/ShellDriver.scala
rename to schedoscope-transformation-shell/src/main/scala/org/schedoscope/scheduler/driver/ShellDriver.scala
diff --git a/schedoscope-transformation-shell/src/test/resources/log4j.properties b/schedoscope-transformation-shell/src/test/resources/log4j.properties
new file mode 100644
index 000000000..c5e48adeb
--- /dev/null
+++ b/schedoscope-transformation-shell/src/test/resources/log4j.properties
@@ -0,0 +1 @@
+log4j.rootLogger=OFF
\ No newline at end of file
diff --git a/schedoscope-transformation-shell/src/test/resources/logback-test.xml b/schedoscope-transformation-shell/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..6bf2ab018
--- /dev/null
+++ b/schedoscope-transformation-shell/src/test/resources/logback-test.xml
@@ -0,0 +1,19 @@
+
+
+
+ [%-4level] [%date{ISO8601}] [%thread %X{sourceThread}]
+ [%X{akkaSource}] [%logger{36}] - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/ShellDriverTest.scala b/schedoscope-transformation-shell/src/test/scala/org/schedoscope/scheduler/driver/ShellDriverTest.scala
similarity index 79%
rename from schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/ShellDriverTest.scala
rename to schedoscope-transformation-shell/src/test/scala/org/schedoscope/scheduler/driver/ShellDriverTest.scala
index 725e44291..8715900ef 100644
--- a/schedoscope-core/src/test/scala/org/schedoscope/scheduler/driver/ShellDriverTest.scala
+++ b/schedoscope-transformation-shell/src/test/scala/org/schedoscope/scheduler/driver/ShellDriverTest.scala
@@ -6,7 +6,6 @@ import org.scalatest.{FlatSpec, Matchers}
import org.schedoscope.dsl.transformations.ShellTransformation
import org.schedoscope.test.resources.LocalTestResources
import org.schedoscope.test.resources.TestDriverRunCompletionHandlerCallCounter._
-import org.schedoscope.{DriverTests, ShellTests}
import scala.io.Source
@@ -14,23 +13,23 @@ class ShellDriverTest extends FlatSpec with Matchers {
lazy val driver = new LocalTestResources().driverFor[ShellTransformation]("shell")
- "ShellDriver" should "have transformation name shell" taggedAs(DriverTests, ShellTests) in {
+ "ShellDriver" should "have transformation name shell" in {
driver.transformationName shouldBe "shell"
}
- it should "execute shell transformations synchronously" taggedAs(DriverTests, ShellTests) in {
+ it should "execute shell transformations synchronously" in {
val driverRunState = driver.runAndWait(ShellTransformation("ls -l > /dev/null"))
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute another shell transformations synchronously" taggedAs(DriverTests, ShellTests) in {
+ it should "execute another shell transformations synchronously" in {
val driverRunState = driver.runAndWait(ShellTransformation("ls -ld > /dev/null"))
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "pass environment to the shell" taggedAs(DriverTests, ShellTests) in {
+ it should "pass environment to the shell" in {
val file = File.createTempFile("_schedoscope", ".sh")
file.deleteOnExit()
@@ -43,13 +42,13 @@ class ShellDriverTest extends FlatSpec with Matchers {
driverRunState shouldBe a[DriverRunSucceeded[_]]
}
- it should "execute shell transformations and return errors when running synchronously" taggedAs(DriverTests, ShellTests) in {
+ it should "execute shell transformations and return errors when running synchronously" in {
val driverRunState = driver.runAndWait(ShellTransformation("exit 1"))
driverRunState shouldBe a[DriverRunFailed[_]]
}
- it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunCompleted upon request" in {
val runHandle = driver.run(ShellTransformation("#"))
while (driver.getDriverRunState(runHandle).isInstanceOf[DriverRunOngoing[_]]) {}
@@ -59,7 +58,7 @@ class ShellDriverTest extends FlatSpec with Matchers {
driverRunCompletedCalled(runHandle, driver.getDriverRunState(runHandle)) shouldBe true
}
- it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" taggedAs (DriverTests) in {
+ it should "call its DriverRunCompletitionHandlers' driverRunStarted upon request" in {
val runHandle = driver.run(ShellTransformation("#"))
driver.driverRunStarted(runHandle)
diff --git a/schedoscope-tutorial/pom.xml b/schedoscope-tutorial/pom.xml
index fd6f35b37..1c481c350 100644
--- a/schedoscope-tutorial/pom.xml
+++ b/schedoscope-tutorial/pom.xml
@@ -18,6 +18,11 @@
schedoscope-core${schedoscope.version}