Skip to content

Commit

Permalink
Add a test checking that skipSegments performs partial migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
julienrf committed Aug 11, 2024
1 parent b606d52 commit feeb7b5
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 1 deletion.
37 changes: 37 additions & 0 deletions tests/src/test/configurations/dynamodb-to-alternator-part1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
source:
type: dynamodb
table: SkippedSegments
region: dummy
endpoint:
host: http://dynamodb
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
scanSegments: 3

target:
type: dynamodb
table: SkippedSegments
region: dummy
endpoint:
host: http://scylla
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
streamChanges: false

savepoints:
path: /app/savepoints
intervalSeconds: 300

skipSegments: [1, 2]

validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
37 changes: 37 additions & 0 deletions tests/src/test/configurations/dynamodb-to-alternator-part2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
source:
type: dynamodb
table: SkippedSegments
region: dummy
endpoint:
host: http://dynamodb
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
scanSegments: 3

target:
type: dynamodb
table: SkippedSegments
region: dummy
endpoint:
host: http://scylla
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
streamChanges: false

savepoints:
path: /app/savepoints
intervalSeconds: 300

skipSegments: [0]

validation:
compareTimestamps: true
ttlToleranceMillis: 60000
writetimeToleranceMillis: 1000
failuresToFetch: 100
floatingPointTolerance: 0.001
timestampMsTolerance: 0
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class DynamoDBInputFormatTest extends munit.FunSuite {
maxMapTasks = configuredMaxMapTasks,
readThroughput = configuredReadThroughput,
throughputReadPercent = configuredThroughputReadPercent,
description = tableDescriptionBuilder.build()
description = tableDescriptionBuilder.build(),
skipSegments = None
)
val splits = new DynamoDBInputFormat().getSplits(jobConf, 1)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.SparkUtils.{submitSparkJob, successfullyPerformMigration}
import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, PutItemRequest, ResourceNotFoundException, ScanRequest}

import java.util.UUID
import scala.jdk.CollectionConverters._
import scala.util.chaining._

class SkippedSegmentsTest extends MigratorSuite {

withTable("SkippedSegments").test("Run partial migrations") { tableName =>
// We rely on the fact that both config files have `scanSegments: 3` and
// complementary `skipSegments` properties
val configPart1 = "dynamodb-to-alternator-part1.yaml"
val configPart2 = "dynamodb-to-alternator-part2.yaml"

createRandomData(tableName)

// Initially, the target table does not exist
try {
targetAlternator.describeTable(describeTableRequest(tableName))
fail(s"The table ${tableName} should not exist yet")
} catch {
case _: ResourceNotFoundException =>
() // OK
}

// Perform the first part of the migration
successfullyPerformMigration(configPart1)

// Verify that some items have been copied to the target database …
val itemCount = targetAlternatorItemCount(tableName)
assert(itemCount > 90L && itemCount < 110L)
// … but not all of them, hence the validator fails
submitSparkJob(configPart2, "com.scylladb.migrator.Validator").exitValue().tap { statusCode =>
assertEquals(statusCode, 1)
}

// Perform the other (complementary) part of the migration
successfullyPerformMigration(configPart2)

// Validate that all the data from the source have been migrated to the target database
submitSparkJob(configPart2, "com.scylladb.migrator.Validator").exitValue().tap { statusCode =>
assertEquals(statusCode, 0, "Validation failed")
}
}

def createRandomData(tableName: String): Unit = {
for (_ <- 1 to 300) {
val itemData = Map(
"id" -> AttributeValue.fromS(UUID.randomUUID().toString),
"foo" -> AttributeValue.fromS(UUID.randomUUID().toString),
"bar" -> AttributeValue.fromS(UUID.randomUUID().toString),
"baz" -> AttributeValue.fromS(UUID.randomUUID().toString)
)
sourceDDb.putItem(PutItemRequest.builder().tableName(tableName).item(itemData.asJava).build())
}
}

def targetAlternatorItemCount(tableName: String): Long =
targetAlternator
.scanPaginator(ScanRequest.builder().tableName(tableName).build())
.items()
.stream()
.count()

}

0 comments on commit feeb7b5

Please sign in to comment.