Skip to content

Commit

Permalink
Loader: Fix column names for shredded tables (close #1332)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Jan 4, 2024
1 parent f1c6ae8 commit 9768ac7
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ object Migration {
discovery.shreddedTypes.filterNot(_.isAtomic).traverse {
case s: ShreddedType.Tabular =>
if (!disableMigration.contains(s.info.toCriterion))
EitherT.rightT[F, LoaderError](Description.Table(discovery.shredModels(s.info.getSchemaKey)))
EitherT.rightT[F, LoaderError](Description.Table(discovery.shredModels(s.info.getSchemaKey).mergeRedshiftSchemasResult))
else EitherT.rightT[F, LoaderError](Description.NoMigration)
case ShreddedType.Widerow(info) =>
EitherT.rightT[F, LoaderError](Description.WideRow(info))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import cats.data.EitherT
import cats.implicits._
import com.snowplowanalytics.iglu.core.SchemaKey
import com.snowplowanalytics.iglu.core.SchemaKey.ordering
import com.snowplowanalytics.iglu.schemaddl.redshift.{MergeRedshiftSchemasResult, foldMapMergeRedshiftSchemas}
import com.snowplowanalytics.iglu.schemaddl.redshift.{
MergeRedshiftSchemasResult,
ShredModel,
foldMapMergeRedshiftSchemas,
foldMapRedshiftSchemas
}
import com.snowplowanalytics.snowplow.rdbloader.cloud.JsonPathDiscovery
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo
import com.snowplowanalytics.snowplow.rdbloader.{DiscoveryStream, LoaderAction, LoaderError}
Expand All @@ -22,6 +27,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.TypesInfo.WideRow.WideRowFormat
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.{BlobStorage, Queue}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels
import com.snowplowanalytics.snowplow.rdbloader.discovery.DiscoveryFailure.IgluError
import com.snowplowanalytics.snowplow.rdbloader.state.State

Expand All @@ -42,7 +48,7 @@ case class DataDiscovery(
compression: Compression,
typesInfo: TypesInfo,
wideColumns: List[String],
shredModels: Map[SchemaKey, MergeRedshiftSchemasResult]
shredModels: Map[SchemaKey, DiscoveredShredModels]
) {

/** ETL id */
Expand All @@ -68,6 +74,17 @@ object DataDiscovery {
private implicit val LoggerName =
Logging.LoggerName(getClass.getSimpleName.stripSuffix("$"))

/**
* @param shredModel
* Used to construct column names for COPY FROM query
* @param mergeRedshiftSchemasResult
* Used for migration
*/
case class DiscoveredShredModels(
shredModel: ShredModel,
mergeRedshiftSchemasResult: MergeRedshiftSchemasResult
)

case class WithOrigin(discovery: DataDiscovery, origin: LoaderMessage.ShreddingComplete)

/**
Expand Down Expand Up @@ -167,15 +184,17 @@ object DataDiscovery {

def getShredModels[F[_]: Monad: Iglu](
nonAtomicTypes: List[ShreddedType]
): EitherT[F, LoaderError, Map[SchemaKey, MergeRedshiftSchemasResult]] = {
): EitherT[F, LoaderError, Map[SchemaKey, DiscoveredShredModels]] = {
val maxSchemaKeyPerTableName = getMaxSchemaKeyPerTableName(nonAtomicTypes)
nonAtomicTypes
.traverse { shreddedType =>
EitherT(Iglu[F].getSchemasWithSameModel(shreddedType.info.getSchemaKey)).map { schemas =>
val maxSchemaKey = maxSchemaKeyPerTableName(shreddedType.info.getName)
val filtered = schemas.filter(_.self.schemaKey <= maxSchemaKey).toNel.get
val mergeRedshiftSchemasResult = foldMapMergeRedshiftSchemas(filtered)
(shreddedType.info.getSchemaKey, mergeRedshiftSchemasResult)
val filtered = schemas.filter(_.self.schemaKey <= shreddedType.info.getSchemaKey).toNel.get
val maxFiltered = schemas.filter(_.self.schemaKey <= maxSchemaKey).toNel.get
val foldMapRedshiftSchemasResult: ShredModel = foldMapRedshiftSchemas(filtered)(shreddedType.info.getSchemaKey)
val foldMapMergeRedshiftSchemasResult = foldMapMergeRedshiftSchemas(maxFiltered)
(shreddedType.info.getSchemaKey, DiscoveredShredModels(foldMapRedshiftSchemasResult, foldMapMergeRedshiftSchemasResult))
}
}
.map(_.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import cats.syntax.either._
import com.snowplowanalytics.iglu.core.SchemaVer.Full
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.redshift.foldMapMergeRedshiftSchemas
import com.snowplowanalytics.iglu.schemaddl.redshift._
import com.snowplowanalytics.snowplow.rdbloader.LoaderError
import com.snowplowanalytics.snowplow.rdbloader.cloud.JsonPathDiscovery
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
Expand All @@ -23,6 +23,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.dsl.{Cache, Iglu, Logging}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.common.config.Semver
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels
import org.specs2.mutable.Specification
import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry
import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureAWS, PureCache, PureIglu, PureLogging, PureOps}
Expand Down Expand Up @@ -95,11 +96,21 @@ class DataDiscoverySpec extends Specification {
)
val shreddedTypes = List(s1, s2)
val shredModels = Map(
s1.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
s1.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
)(s1.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
)
),
s2.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
s2.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
)(s2.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
)
)
)

Expand Down Expand Up @@ -206,11 +217,21 @@ class DataDiscoverySpec extends Specification {
)

val shredModels = Map(
s1.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
s1.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
)(s1.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
)
),
s2.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
s2.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
)(s2.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
)
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import cats.syntax.option._
import cats.effect.Clock
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.redshift.{ShredModel, foldMapMergeRedshiftSchemas}
import com.snowplowanalytics.iglu.schemaddl.redshift.{ShredModel, foldMapMergeRedshiftSchemas, foldMapRedshiftSchemas}
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{Processor, Timestamps, TypesInfo}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
Expand All @@ -30,6 +30,7 @@ import com.snowplowanalytics.snowplow.rdbloader.SpecHelpers._
import com.snowplowanalytics.snowplow.rdbloader.cloud.authservice.LoadAuthService
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.db.Columns.{ColumnsToCopy, ColumnsToSkip}
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels
import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry
import com.snowplowanalytics.snowplow.rdbloader.test.{
Pure,
Expand Down Expand Up @@ -301,8 +302,13 @@ object LoadSpec {
BlobStorage.Key.coerce("s3://assets/com.acme/json_context_1.json")
)
val shredModels = Map(
shreddedType.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(shreddedType.info.getSchemaKey), Schema()))
shreddedType.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(shreddedType.info.getSchemaKey), Schema()))
)(shreddedType.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(shreddedType.info.getSchemaKey), Schema()))
)
)
)
val dataDiscovery = DataDiscovery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ object PureDAO {
i
),
discovery.shreddedTypes.map { shredded =>
val mergeResult = discovery.shredModels(shredded.info.getSchemaKey)
val isRecovery = mergeResult.recoveryModels.isDefinedAt(shredded.info.getSchemaKey)
val shredModel =
mergeResult.recoveryModels.getOrElse(shredded.info.getSchemaKey, mergeResult.goodModel)
val discoveredShredModels = discovery.shredModels(shredded.info.getSchemaKey)
val isRecovery = discoveredShredModels.shredModel match {
case _: ShredModel.GoodModel => false
case _: ShredModel.RecoveryModel => true
}
val isMigrationDisabled = disableMigration.contains(shredded.info.toCriterion)
val tableName = if (isMigrationDisabled) mergeResult.goodModel.tableName else shredModel.tableName
loadAuthMethod => Statement.ShreddedCopy(shredded, Compression.Gzip, loadAuthMethod, shredModel, tableName, isRecovery)
val tableName =
if (isMigrationDisabled) discoveredShredModels.mergeRedshiftSchemasResult.goodModel.tableName
else discoveredShredModels.shredModel.tableName

loadAuthMethod =>
Statement.ShreddedCopy(shredded, Compression.Gzip, loadAuthMethod, discoveredShredModels.shredModel, tableName, isRecovery)
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,23 @@ object Redshift {
.values
.map(_.head) // So we get only one copy statement for given path
.map { shreddedType =>
val mergeResult = discovery.shredModels(shreddedType.info.getSchemaKey)
val isRecovery = mergeResult.recoveryModels.isDefinedAt(shreddedType.info.getSchemaKey)
val shredModel =
mergeResult.recoveryModels.getOrElse(shreddedType.info.getSchemaKey, mergeResult.goodModel)
val discoveredShredModels = discovery.shredModels(shreddedType.info.getSchemaKey)
val isRecovery = discoveredShredModels.shredModel match {
case _: ShredModel.GoodModel => false
case _: ShredModel.RecoveryModel => true
}

val isMigrationDisabled = disableMigration.contains(shreddedType.info.toCriterion)
val tableName = if (isMigrationDisabled) mergeResult.goodModel.tableName else shredModel.tableName
val tableName =
if (isMigrationDisabled) discoveredShredModels.mergeRedshiftSchemasResult.goodModel.tableName
else discoveredShredModels.shredModel.tableName

loadAuthMethod =>
Statement.ShreddedCopy(
shreddedType,
discovery.compression,
loadAuthMethod,
shredModel,
discoveredShredModels.shredModel,
tableName,
isRecovery
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.loader.redshift
import cats.data.NonEmptyList
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.redshift.foldMapMergeRedshiftSchemas
import com.snowplowanalytics.iglu.schemaddl.redshift._
import com.snowplowanalytics.snowplow.rdbloader.db.Statement
import com.snowplowanalytics.snowplow.rdbloader.test.TestState
import com.snowplowanalytics.snowplow.rdbloader.db.{Migration, Target}
Expand All @@ -25,6 +25,7 @@ import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.
import com.snowplowanalytics.snowplow.rdbloader.db.Migration.Description
import com.snowplowanalytics.snowplow.rdbloader.discovery.ShreddedType.{Info, Tabular}
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels
import com.snowplowanalytics.snowplow.rdbloader.dsl.DAO
import com.snowplowanalytics.snowplow.rdbloader.test.Pure
import com.snowplowanalytics.snowplow.rdbloader.test.PureDAO
Expand Down Expand Up @@ -121,8 +122,13 @@ class RedshiftSpec extends Specification {
TypesInfo.Shredded(List.empty),
Nil,
shreddedTypes.map { s =>
s.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s.info.getSchemaKey), Schema()))
s.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s.info.getSchemaKey), Schema()))
)(s.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s.info.getSchemaKey), Schema()))
)
)
}.toMap
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ import doobie.Fragment
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaMap, SchemaVer, SelfDescribingSchema}
import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema
import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.{CommonProperties, ObjectProperty, StringProperty}
import com.snowplowanalytics.iglu.schemaddl.redshift.foldMapMergeRedshiftSchemas
import com.snowplowanalytics.iglu.schemaddl.redshift.{foldMapMergeRedshiftSchemas, foldMapRedshiftSchemas}
import com.snowplowanalytics.snowplow.rdbloader.common.LoaderMessage.{SnowplowEntity, TypesInfo}
import com.snowplowanalytics.snowplow.rdbloader.common.cloud.BlobStorage
import com.snowplowanalytics.snowplow.rdbloader.db.{Migration, Statement}
import com.snowplowanalytics.snowplow.rdbloader.discovery.{DataDiscovery, ShreddedType}
import com.snowplowanalytics.snowplow.rdbloader.dsl.{DAO, Iglu, Logging, Transaction}
import com.snowplowanalytics.snowplow.rdbloader.common.config.TransformerConfig.Compression
import com.snowplowanalytics.snowplow.rdbloader.discovery.DataDiscovery.DiscoveredShredModels
import org.specs2.mutable.Specification
import com.snowplowanalytics.snowplow.rdbloader.test.TestState.LogEntry
import com.snowplowanalytics.snowplow.rdbloader.test.{Pure, PureDAO, PureIglu, PureLogging, PureTransaction}
Expand Down Expand Up @@ -51,11 +52,21 @@ class MigrationSpec extends Specification {
)
val types = List(s1, s2)
val shredModels = Map(
s1.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
s1.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
)(s1.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
)
),
s2.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
s2.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
)(s2.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
)
)
)
val input =
Expand Down Expand Up @@ -136,11 +147,21 @@ class MigrationSpec extends Specification {
)

val shredModels = Map(
s1.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
s1.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
)(s1.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema()))
)
),
s2.info.getSchemaKey -> foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
s2.info.getSchemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
)(s2.info.getSchemaKey),
foldMapMergeRedshiftSchemas(
NonEmptyList.of(SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema()))
)
)
)

Expand Down

0 comments on commit 9768ac7

Please sign in to comment.