diff --git a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala index b6b6d6ce..e4251a88 100644 --- a/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala +++ b/migrator/src/main/scala/com/scylladb/migrator/readers/DynamoDB.scala @@ -10,7 +10,13 @@ import org.apache.hadoop.mapred.JobConf import org.apache.log4j.LogManager import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession -import software.amazon.awssdk.services.dynamodb.model.{ DescribeTableRequest, TableDescription } +import software.amazon.awssdk.services.dynamodb.model.{ + DescribeTableRequest, + DescribeTimeToLiveRequest, + TableDescription, + TimeToLiveDescription, + TimeToLiveStatus +} object DynamoDB { @@ -45,10 +51,20 @@ object DynamoDB { readThroughput: Option[Int], throughputReadPercent: Option[Float]): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = { - val tableDescription = DynamoUtils - .buildDynamoClient(endpoint, credentials.map(_.toProvider), region) - .describeTable(DescribeTableRequest.builder().tableName(table).build()) - .table + val dynamoDbClient = + DynamoUtils.buildDynamoClient(endpoint, credentials.map(_.toProvider), region) + + val tableDescription = + dynamoDbClient + .describeTable(DescribeTableRequest.builder().tableName(table).build()) + .table + + val maybeTtlDescription = + Option( + dynamoDbClient + .describeTimeToLive(DescribeTimeToLiveRequest.builder().tableName(table).build()) + .timeToLiveDescription + ) val jobConf = makeJobConf( @@ -61,7 +77,8 @@ object DynamoDB { maxMapTasks, readThroughput, throughputReadPercent, - tableDescription) + tableDescription, + maybeTtlDescription) val rdd = spark.sparkContext.hadoopRDD( @@ -82,7 +99,8 @@ object DynamoDB { maxMapTasks: Option[Int], readThroughput: Option[Int], throughputReadPercent: Option[Float], - description: TableDescription + description: TableDescription, + maybeTtlDescription: Option[TimeToLiveDescription] ): JobConf = { val maybeItemCount = Option(description.itemCount).map(_.toLong) val maybeAvgItemSize = @@ -125,6 +143,11 @@ object DynamoDB { jobConf, DynamoDBConstants.THROUGHPUT_READ_PERCENT, throughputReadPercent.map(_.toString)) + val maybeTtlAttributeName = + maybeTtlDescription + .filter(_.timeToLiveStatus == TimeToLiveStatus.ENABLED) + .map(_.attributeName()) + setOptionalConf(jobConf, DynamoDBConstants.TTL_ATTRIBUTE_NAME, maybeTtlAttributeName) jobConf } diff --git a/tests/src/test/configurations/dynamodb-to-alternator-ttl.yaml b/tests/src/test/configurations/dynamodb-to-alternator-ttl.yaml new file mode 100644 index 00000000..c7b6c1ee --- /dev/null +++ b/tests/src/test/configurations/dynamodb-to-alternator-ttl.yaml @@ -0,0 +1,26 @@ +source: + type: dynamodb + table: TtlTable + region: dummy + endpoint: + host: http://dynamodb + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + +target: + type: dynamodb + table: TtlTable + region: dummy + endpoint: + host: http://scylla + port: 8000 + credentials: + accessKey: dummy + secretKey: dummy + streamChanges: false + +savepoints: + path: /app/savepoints + intervalSeconds: 300 diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala index 23f05ace..f0f19cac 100644 --- a/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/DynamoDBInputFormatTest.scala @@ -74,7 +74,8 @@ class DynamoDBInputFormatTest extends munit.FunSuite { maxMapTasks = configuredMaxMapTasks, readThroughput = configuredReadThroughput, throughputReadPercent = configuredThroughputReadPercent, - description = tableDescriptionBuilder.build() + description = tableDescriptionBuilder.build(), + maybeTtlDescription = None ) val splits = new DynamoDBInputFormat().getSplits(jobConf, 1) diff --git a/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedItemsTest.scala b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedItemsTest.scala new file mode 100644 index 00000000..26973c4a --- /dev/null +++ b/tests/src/test/scala/com/scylladb/migrator/alternator/SkippedItemsTest.scala @@ -0,0 +1,71 @@ +package com.scylladb.migrator.alternator + +import com.scylladb.migrator.SparkUtils.successfullyPerformMigration +import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, GetItemRequest, PutItemRequest, TimeToLiveSpecification, UpdateTimeToLiveRequest} + +import scala.jdk.CollectionConverters._ +import scala.util.chaining._ + +class SkippedItemsTest extends MigratorSuite { + + withTable("TtlTable").test("Expired items should be filtered out from the source table") { tableName => + // Insert two items, one of them is expired + val oneDay = 24 * 60 * 60 // seconds + val now = System.currentTimeMillis() / 1000 // seconds + val keys1 = Map("id" -> AttributeValue.fromS("12345")) + val attrs1 = Map("foo" -> AttributeValue.fromN((now + oneDay).toString)) + val item1Data = keys1 ++ attrs1 + sourceDDb.putItem( + PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build() + ) + val keys2 = Map("id" -> AttributeValue.fromS("67890")) + val attrs2 = Map("foo" -> AttributeValue.fromN((now - oneDay).toString)) + val item2Data = keys2 ++ attrs2 + sourceDDb.putItem( + PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build() + ) + + // Enable TTL + sourceDDb.updateTimeToLive( + UpdateTimeToLiveRequest + .builder() + .tableName(tableName) + .timeToLiveSpecification( + TimeToLiveSpecification + .builder() + .enabled(true) + .attributeName("foo") + .build() + ) + .build() + ) + .sdkHttpResponse() + .statusCode() + .tap { statusCode => + assertEquals(statusCode, 200) + } + + // Check that expired item is still present in the source before the migration + val getItem2Request = + GetItemRequest.builder().tableName(tableName).key(keys2.asJava).build() + sourceDDb + .getItem(getItem2Request) + .tap { itemResult => + assert(itemResult.hasItem) + assertEquals(itemResult.item.asScala.toMap, item2Data) + } + + successfullyPerformMigration("dynamodb-to-alternator-ttl.yaml") + + checkItemWasMigrated(tableName, keys1, item1Data) + + // Expired item has been skipped + targetAlternator + .getItem(getItem2Request) + .tap { itemResult => + assert(!itemResult.hasItem) + } + + } + +}