diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala index 8882b1a1e..f03bdca1d 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Migration.scala @@ -164,7 +164,7 @@ object Migration { discovery.shreddedTypes.filterNot(_.isAtomic).traverse { case s: ShreddedType.Tabular => if (!disableMigration.contains(s.info.toCriterion)) - EitherT.rightT[F, LoaderError](Description.Table(discovery.shredModels(s.info.getSchemaKey))) + EitherT.rightT[F, LoaderError](Description.Table(discovery.shredModels(s.info.getSchemaKey).mergeRedshiftSchemasResult)) else EitherT.rightT[F, LoaderError](Description.NoMigration) case ShreddedType.Widerow(info) => EitherT.rightT[F, LoaderError](Description.WideRow(info)) diff --git a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala index 8b0595a8e..021414dc6 100644 --- a/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala +++ b/modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscovery.scala @@ -12,7 +12,12 @@ import cats.data.EitherT import cats.implicits._ import com.snowplowanalytics.iglu.core.SchemaKey import com.snowplowanalytics.iglu.core.SchemaKey.ordering -import com.snowplowanalytics.iglu.schemaddl.redshift.{MergeRedshiftSchemasResult, foldMapMergeRedshiftSchemas} +import com.snowplowanalytics.iglu.schemaddl.redshift.{ + MergeRedshiftSchemasResult, + ShredModel, + foldMapMergeRedshiftSchemas, + foldMapRedshiftSchemas +} import com.snowplowanalytics.snowplow.rdbloader.cloud.JsonPathDiscovery import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo import com.snowplowanalytics.snowplow.rdbloader.{DiscoveryStream, LoaderAction, LoaderError} @@ -22,6 +27,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure.IgluError import com.snowplowanalytics.snowplow.rdbloader.state.State @@ -42,7 +48,7 @@ case class DataDiscovery( compression: Compression, typesInfo: TypesInfo, wideColumns: List[String], - shredModels: Map[SchemaKey, MergeRedshiftSchemasResult] + shredModels: Map[SchemaKey, DiscoveredShredModels] ) { /** ETL id */ @@ -68,6 +74,17 @@ object DataDiscovery { private implicit val LoggerName = Logging.LoggerName(getClass.getSimpleName.stripSuffix("$")) + /** + * @param shredModel + * Used to construct column names for COPY FROM query + * @param mergeRedshiftSchemasResult + * Used for migration + */ + case class DiscoveredShredModels( + shredModel: ShredModel, + mergeRedshiftSchemasResult: MergeRedshiftSchemasResult + ) + case class WithOrigin(discovery: DataDiscovery, origin: LoaderMessage.ShreddingComplete) /** @@ -167,15 +184,17 @@ object DataDiscovery { def getShredModels[F[_]: Monad: Iglu]( nonAtomicTypes: List[ShreddedType] - ): EitherT[F, LoaderError, Map[SchemaKey, MergeRedshiftSchemasResult]] = { + ): EitherT[F, LoaderError, Map[SchemaKey, DiscoveredShredModels]] = { val maxSchemaKeyPerTableName = getMaxSchemaKeyPerTableName(nonAtomicTypes) nonAtomicTypes .traverse { shreddedType => EitherT(Iglu[F].getSchemasWithSameModel(shreddedType.info.getSchemaKey)).map { schemas => val maxSchemaKey = maxSchemaKeyPerTableName(shreddedType.info.getName) - val filtered = schemas.filter(_.self.schemaKey <= maxSchemaKey).toNel.get - val mergeRedshiftSchemasResult = foldMapMergeRedshiftSchemas(filtered) - (shreddedType.info.getSchemaKey, mergeRedshiftSchemasResult) + val filtered = schemas.filter(_.self.schemaKey <= shreddedType.info.getSchemaKey).toNel.get + val maxFiltered = schemas.filter(_.self.schemaKey <= maxSchemaKey).toNel.get + val foldMapRedshiftSchemasResult: ShredModel = foldMapRedshiftSchemas(filtered)(shreddedType.info.getSchemaKey) + val foldMapMergeRedshiftSchemasResult = foldMapMergeRedshiftSchemas(maxFiltered) + (shreddedType.info.getSchemaKey, DiscoveredShredModels(foldMapRedshiftSchemasResult, foldMapMergeRedshiftSchemasResult)) } } .map(_.toMap) diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala index 782cadee7..8a39fee05 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/discovery/DataDiscoverySpec.scala @@ -13,7 +13,7 @@ import cats.syntax.either._ import com.snowplowanalytics.iglu.core.SchemaVer.Full import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema} import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema -import com.snowplowanalytics.iglu.schemaddl.redshift.foldMapMergeRedshiftSchemas +import com.snowplowanalytics.iglu.schemaddl.redshift._ import com.snowplowanalytics.snowplow.rdbloader.LoaderError import com.snowplowanalytics.snowplow.rdbloader.cloud.JsonPathDiscovery import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage @@ -23,6 +23,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.dsl.{Cache, Iglu, Logging} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression import com.snowplowanalytics.snowplow.rdbloader.common.config.Semver +import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels import org.specs2.mutable.Specification import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureAWS, PureCache, PureIglu, PureLogging, PureOps} @@ -95,11 +96,21 @@ class DataDiscoverySpec extends Specification { ) val shreddedTypes = List(s1, s2) val shredModels = Map( - s1.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + s1.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + )(s1.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + ) ), - s2.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + s2.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + )(s2.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + ) ) ) @@ -206,11 +217,21 @@ class DataDiscoverySpec extends Specification { ) val shredModels = Map( - s1.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + s1.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + )(s1.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + ) ), - s2.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + s2.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + )(s2.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + ) ) ) diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala index 9bc2f3724..278682b64 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/loading/LoadSpec.scala @@ -16,7 +16,7 @@ import cats.syntax.option._ import cats.effect.Clock import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema} import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema -import com.snowplowanalytics.iglu.schemaddl.redshift.{ShredModel, foldMapMergeRedshiftSchemas} +import com.snowplowanalytics.iglu.schemaddl.redshift.{ShredModel, foldMapMergeRedshiftSchemas, foldMapRedshiftSchemas} import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{Processor, Timestamps, TypesInfo} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression @@ -30,6 +30,7 @@ import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers._ import com.snowplowanalytics.snowplow.rdbloader.cloud.authservice.LoadAuthService import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip} +import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry import com.snowplowanalytics.snowplow.rdbloader.test.{ Pure, @@ -301,8 +302,13 @@ object LoadSpec { BlobStorage.Key.coerce("s3://assets/com.acme/json_context_1.json") ) val shredModels = Map( - shreddedType.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(shreddedType.info.getSchemaKey), Schema())) + shreddedType.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(shreddedType.info.getSchemaKey), Schema())) + )(shreddedType.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(shreddedType.info.getSchemaKey), Schema())) + ) ) ) val dataDiscovery = DataDiscovery( diff --git a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala index 8482741ca..db4fb09df 100644 --- a/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala +++ b/modules/loader/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/test/PureDAO.scala @@ -99,13 +99,18 @@ object PureDAO { i ), discovery.shreddedTypes.map { shredded => - val mergeResult = discovery.shredModels(shredded.info.getSchemaKey) - val isRecovery = mergeResult.recoveryModels.isDefinedAt(shredded.info.getSchemaKey) - val shredModel = - mergeResult.recoveryModels.getOrElse(shredded.info.getSchemaKey, mergeResult.goodModel) + val discoveredShredModels = discovery.shredModels(shredded.info.getSchemaKey) + val isRecovery = discoveredShredModels.shredModel match { + case _: ShredModel.GoodModel => false + case _: ShredModel.RecoveryModel => true + } val isMigrationDisabled = disableMigration.contains(shredded.info.toCriterion) - val tableName = if (isMigrationDisabled) mergeResult.goodModel.tableName else shredModel.tableName - loadAuthMethod => Statement.ShreddedCopy(shredded, Compression.Gzip, loadAuthMethod, shredModel, tableName, isRecovery) + val tableName = + if (isMigrationDisabled) discoveredShredModels.mergeRedshiftSchemasResult.goodModel.tableName + else discoveredShredModels.shredModel.tableName + + loadAuthMethod => + Statement.ShreddedCopy(shredded, Compression.Gzip, loadAuthMethod, discoveredShredModels.shredModel, tableName, isRecovery) } ) diff --git a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala index 5c13a56c0..ca792f8a3 100644 --- a/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala +++ b/modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala @@ -93,18 +93,23 @@ object Redshift { .values .map(_.head) // So we get only one copy statement for given path .map { shreddedType => - val mergeResult = discovery.shredModels(shreddedType.info.getSchemaKey) - val isRecovery = mergeResult.recoveryModels.isDefinedAt(shreddedType.info.getSchemaKey) - val shredModel = - mergeResult.recoveryModels.getOrElse(shreddedType.info.getSchemaKey, mergeResult.goodModel) + val discoveredShredModels = discovery.shredModels(shreddedType.info.getSchemaKey) + val isRecovery = discoveredShredModels.shredModel match { + case _: ShredModel.GoodModel => false + case _: ShredModel.RecoveryModel => true + } + val isMigrationDisabled = disableMigration.contains(shreddedType.info.toCriterion) - val tableName = if (isMigrationDisabled) mergeResult.goodModel.tableName else shredModel.tableName + val tableName = + if (isMigrationDisabled) discoveredShredModels.mergeRedshiftSchemasResult.goodModel.tableName + else discoveredShredModels.shredModel.tableName + loadAuthMethod => Statement.ShreddedCopy( shreddedType, discovery.compression, loadAuthMethod, - shredModel, + discoveredShredModels.shredModel, tableName, isRecovery ) diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala index 89c7dbda3..11f189c47 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/RedshiftSpec.scala @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.loader.redshift import cats.data.NonEmptyList import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema} import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema -import com.snowplowanalytics.iglu.schemaddl.redshift.foldMapMergeRedshiftSchemas +import com.snowplowanalytics.iglu.schemaddl.redshift._ import com.snowplowanalytics.snowplow.rdbloader.db.Statement import com.snowplowanalytics.snowplow.rdbloader.test.TestState import com.snowplowanalytics.snowplow.rdbloader.db.{Migration, Target} @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig. import com.snowplowanalytics.snowplow.rdbloader.db.Migration.Description import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType.{Info, Tabular} import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery +import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO import com.snowplowanalytics.snowplow.rdbloader.test.Pure import com.snowplowanalytics.snowplow.rdbloader.test.PureDAO @@ -121,8 +122,13 @@ class RedshiftSpec extends Specification { TypesInfo.Shredded(List.empty), Nil, shreddedTypes.map { s => - s.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s.info.getSchemaKey), Schema())) + s.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s.info.getSchemaKey), Schema())) + )(s.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s.info.getSchemaKey), Schema())) + ) ) }.toMap ) diff --git a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala index 22efe8ca4..826c8b4e8 100644 --- a/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala +++ b/modules/redshift-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/redshift/db/MigrationSpec.scala @@ -12,13 +12,14 @@ import doobie.Fragment import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema} import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.{CommonProperties, ObjectProperty, StringProperty} -import com.snowplowanalytics.iglu.schemaddl.redshift.foldMapMergeRedshiftSchemas +import com.snowplowanalytics.iglu.schemaddl.redshift.{foldMapMergeRedshiftSchemas, foldMapRedshiftSchemas} import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo} import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage import com.snowplowanalytics.snowplow.rdbloader.db.{Migration, Statement} import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType} import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction} import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression +import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels import org.specs2.mutable.Specification import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureDAO, PureIglu, PureLogging, PureTransaction} @@ -51,11 +52,21 @@ class MigrationSpec extends Specification { ) val types = List(s1, s2) val shredModels = Map( - s1.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + s1.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + )(s1.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + ) ), - s2.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + s2.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + )(s2.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + ) ) ) val input = @@ -136,11 +147,21 @@ class MigrationSpec extends Specification { ) val shredModels = Map( - s1.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + s1.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + )(s1.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())) + ) ), - s2.info.getSchemaKey -> foldMapMergeRedshiftSchemas( - NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + s2.info.getSchemaKey -> DiscoveredShredModels( + foldMapRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + )(s2.info.getSchemaKey), + foldMapMergeRedshiftSchemas( + NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())) + ) ) )