Skip to content

Commit

Permalink
Configure the TTL_ATTRIBUTE_NAME of the Hadoop job if TTL is enabled …
Browse files Browse the repository at this point in the history
…on a table attribute
  • Loading branch information
julienrf committed Aug 28, 2024
1 parent a05419b commit 6c9cb1d
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ DescribeTableRequest, TableDescription }
import software.amazon.awssdk.services.dynamodb.model.{
DescribeTableRequest,
DescribeTimeToLiveRequest,
TableDescription,
TimeToLiveDescription,
TimeToLiveStatus
}

object DynamoDB {

Expand Down Expand Up @@ -45,10 +51,20 @@ object DynamoDB {
readThroughput: Option[Int],
throughputReadPercent: Option[Float]): (RDD[(Text, DynamoDBItemWritable)], TableDescription) = {

val tableDescription = DynamoUtils
.buildDynamoClient(endpoint, credentials.map(_.toProvider), region)
.describeTable(DescribeTableRequest.builder().tableName(table).build())
.table
val dynamoDbClient =
DynamoUtils.buildDynamoClient(endpoint, credentials.map(_.toProvider), region)

val tableDescription =
dynamoDbClient
.describeTable(DescribeTableRequest.builder().tableName(table).build())
.table

val maybeTtlDescription =
Option(
dynamoDbClient
.describeTimeToLive(DescribeTimeToLiveRequest.builder().tableName(table).build())
.timeToLiveDescription
)

val jobConf =
makeJobConf(
Expand All @@ -61,7 +77,8 @@ object DynamoDB {
maxMapTasks,
readThroughput,
throughputReadPercent,
tableDescription)
tableDescription,
maybeTtlDescription)

val rdd =
spark.sparkContext.hadoopRDD(
Expand All @@ -82,7 +99,8 @@ object DynamoDB {
maxMapTasks: Option[Int],
readThroughput: Option[Int],
throughputReadPercent: Option[Float],
description: TableDescription
description: TableDescription,
maybeTtlDescription: Option[TimeToLiveDescription]
): JobConf = {
val maybeItemCount = Option(description.itemCount).map(_.toLong)
val maybeAvgItemSize =
Expand Down Expand Up @@ -125,6 +143,11 @@ object DynamoDB {
jobConf,
DynamoDBConstants.THROUGHPUT_READ_PERCENT,
throughputReadPercent.map(_.toString))
val maybeTtlAttributeName =
maybeTtlDescription
.filter(_.timeToLiveStatus == TimeToLiveStatus.ENABLED)
.map(_.attributeName())
setOptionalConf(jobConf, DynamoDBConstants.TTL_ATTRIBUTE_NAME, maybeTtlAttributeName)

jobConf
}
Expand Down
26 changes: 26 additions & 0 deletions tests/src/test/configurations/dynamodb-to-alternator-ttl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
source:
type: dynamodb
table: TtlTable
region: dummy
endpoint:
host: http://dynamodb
port: 8000
credentials:
accessKey: dummy
secretKey: dummy

target:
type: dynamodb
table: TtlTable
region: dummy
endpoint:
host: http://scylla
port: 8000
credentials:
accessKey: dummy
secretKey: dummy
streamChanges: false

savepoints:
path: /app/savepoints
intervalSeconds: 300
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class DynamoDBInputFormatTest extends munit.FunSuite {
maxMapTasks = configuredMaxMapTasks,
readThroughput = configuredReadThroughput,
throughputReadPercent = configuredThroughputReadPercent,
description = tableDescriptionBuilder.build()
description = tableDescriptionBuilder.build(),
maybeTtlDescription = None
)
val splits = new DynamoDBInputFormat().getSplits(jobConf, 1)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.scylladb.migrator.alternator

import com.scylladb.migrator.SparkUtils.successfullyPerformMigration
import software.amazon.awssdk.services.dynamodb.model.{AttributeValue, GetItemRequest, PutItemRequest, TimeToLiveSpecification, UpdateTimeToLiveRequest}

import scala.jdk.CollectionConverters._
import scala.util.chaining._

class SkippedItemsTest extends MigratorSuite {

withTable("TtlTable").test("Expired items should be filtered out from the source table") { tableName =>
// Insert two items, one of them is expired
val oneDay = 24 * 60 * 60 // seconds
val now = System.currentTimeMillis() / 1000 // seconds
val keys1 = Map("id" -> AttributeValue.fromS("12345"))
val attrs1 = Map("foo" -> AttributeValue.fromN((now + oneDay).toString))
val item1Data = keys1 ++ attrs1
sourceDDb.putItem(
PutItemRequest.builder().tableName(tableName).item(item1Data.asJava).build()
)
val keys2 = Map("id" -> AttributeValue.fromS("67890"))
val attrs2 = Map("foo" -> AttributeValue.fromN((now - oneDay).toString))
val item2Data = keys2 ++ attrs2
sourceDDb.putItem(
PutItemRequest.builder().tableName(tableName).item(item2Data.asJava).build()
)

// Enable TTL
sourceDDb.updateTimeToLive(
UpdateTimeToLiveRequest
.builder()
.tableName(tableName)
.timeToLiveSpecification(
TimeToLiveSpecification
.builder()
.enabled(true)
.attributeName("foo")
.build()
)
.build()
)
.sdkHttpResponse()
.statusCode()
.tap { statusCode =>
assertEquals(statusCode, 200)
}

// Check that expired item is still present in the source before the migration
val getItem2Request =
GetItemRequest.builder().tableName(tableName).key(keys2.asJava).build()
sourceDDb
.getItem(getItem2Request)
.tap { itemResult =>
assert(itemResult.hasItem)
assertEquals(itemResult.item.asScala.toMap, item2Data)
}

successfullyPerformMigration("dynamodb-to-alternator-ttl.yaml")

checkItemWasMigrated(tableName, keys1, item1Data)

// Expired item has been skipped
targetAlternator
.getItem(getItem2Request)
.tap { itemResult =>
assert(!itemResult.hasItem)
}

}

}

0 comments on commit 6c9cb1d

Please sign in to comment.