Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip expired items of tables when TTL is enabled #206

Merged
merged 1 commit into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ DescribeTableRequest, TableDescription }
import software.amazon.awssdk.services.dynamodb.model.{
DescribeTableRequest,
DescribeTimeToLiveRequest,
TableDescription,
TimeToLiveDescription,
TimeToLiveStatus
}

object DynamoDB {

Expand Down Expand Up @@ -48,10 +54,20 @@ object DynamoDB {
throughputReadPercent: Option[Float],
skipSegments: Option[Set[Int]]): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = {

val tableDescription = DynamoUtils
.buildDynamoClient(endpoint, credentials.map(_.toProvider), region)
.describeTable(DescribeTableRequest.builder().tableName(table).build())
.table
val dynamoDbClient =
DynamoUtils.buildDynamoClient(endpoint, credentials.map(_.toProvider), region)

val tableDescription =
dynamoDbClient
.describeTable(DescribeTableRequest.builder().tableName(table).build())
.table

val maybeTtlDescription =
Option(
dynamoDbClient
.describeTimeToLive(DescribeTimeToLiveRequest.builder().tableName(table).build())
.timeToLiveDescription
)

val jobConf =
makeJobConf(
Expand All @@ -65,6 +81,7 @@ object DynamoDB {
readThroughput,
throughputReadPercent,
tableDescription,
maybeTtlDescription,
skipSegments)

val rdd =
Expand All @@ -87,6 +104,7 @@ object DynamoDB {
readThroughput: Option[Int],
throughputReadPercent: Option[Float],
description: TableDescription,
maybeTtlDescription: Option[TimeToLiveDescription],
skipSegments: Option[Set[Int]]
): JobConf = {
val maybeItemCount = Option(description.itemCount).map(_.toLong)
Expand Down Expand Up @@ -135,6 +153,11 @@ object DynamoDB {
DynamoDBConstants.EXCLUDED_SCAN_SEGMENTS,
skipSegments.map(_.mkString(","))
)
val maybeTtlAttributeName =
maybeTtlDescription
.filter(_.timeToLiveStatus == TimeToLiveStatus.ENABLED)
.map(_.attributeName())
setOptionalConf(jobConf, DynamoDBConstants.TTL_ATTRIBUTE_NAME, maybeTtlAttributeName)

jobConf
}
Expand Down
26 changes: 26 additions & 0 deletions tests/src/test/configurations/dynamodb-to-alternator-ttl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
source:
type: dynamodb
table: TtlTable
region: dummy
endpoint:
host: http://dynamodb
port: 8000
credentials:
accessKey: dummy
secretKey: dummy

target:
type: dynamodb
table: TtlTable
region: dummy
endpoint:
host: http://scylla
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
streamChanges: false

savepoints:
path: /app/savepoints
intervalSeconds: 300
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class DynamoDBInputFormatTest extends munit.FunSuite {
readThroughput = configuredReadThroughput,
throughputReadPercent = configuredThroughputReadPercent,
description = tableDescriptionBuilder.build(),
maybeTtlDescription = None,
skipSegments = None
)
val splits = new DynamoDBInputFormat().getSplits(jobConf, 1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.SparkUtils.successfullyPerformMigration
import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, GetItemRequest, PutItemRequest, TimeToLiveSpecification, UpdateTimeToLiveRequest}

import scala.jdk.CollectionConverters._
import scala.util.chaining._

class SkippedItemsTest extends MigratorSuite {

withTable("TtlTable").test("Expired items should be filtered out from the source table") { tableName =>
// Insert two items, one of them is expired
val oneDay = 24 * 60 * 60 // seconds
val now = System.currentTimeMillis() / 1000 // seconds
val keys1 = Map("id" -> AttributeValue.fromS("12345"))
val attrs1 = Map("foo" -> AttributeValue.fromN((now + oneDay).toString))
val item1Data = keys1 ++ attrs1
sourceDDb.putItem(
PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build()
)
val keys2 = Map("id" -> AttributeValue.fromS("67890"))
val attrs2 = Map("foo" -> AttributeValue.fromN((now - oneDay).toString))
val item2Data = keys2 ++ attrs2
sourceDDb.putItem(
PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build()
)

// Enable TTL
sourceDDb.updateTimeToLive(
UpdateTimeToLiveRequest
.builder()
.tableName(tableName)
.timeToLiveSpecification(
TimeToLiveSpecification
.builder()
.enabled(true)
.attributeName("foo")
.build()
)
.build()
)
.sdkHttpResponse()
.statusCode()
.tap { statusCode =>
assertEquals(statusCode, 200)
}

// Check that expired item is still present in the source before the migration
val getItem2Request =
GetItemRequest.builder().tableName(tableName).key(keys2.asJava).build()
sourceDDb
.getItem(getItem2Request)
.tap { itemResult =>
assert(itemResult.hasItem)
assertEquals(itemResult.item.asScala.toMap, item2Data)
}

successfullyPerformMigration("dynamodb-to-alternator-ttl.yaml")

checkItemWasMigrated(tableName, keys1, item1Data)

// Expired item has been skipped
targetAlternator
.getItem(getItem2Request)
.tap { itemResult =>
assert(!itemResult.hasItem)
}

}

}
Loading