Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve debug on migration #199

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions ansible/files/config.dynamodb.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
---
source:
type: dynamodb
table: "YOUR_TABLE_NAME"
Expand All @@ -25,8 +24,7 @@ source:
# Can set it equal to scanSegments. Or to allow multiple concurrent scans by
# a single task, set scanSegments to some multiple of maxMapTasks.
maxMapTasks: 700
streamChanges: false
# For scyllaDB - Alternator target, you need to specify endpoint URL.

target:
type: dynamodb
table: "YOUR_TABLE_NAME"
Expand All @@ -43,16 +41,17 @@ target:
credentials:
accessKey: empty
secretKey: empty
#scanSegments: 10000
maxMapTasks: 1
streamChanges: false

savepoints:
# Where should savepoint configurations be stored? This is a path on the host running
# the Spark driver - usually the Spark master.
path: /app/savepoints
# Interval in which savepoints will be created
intervalSeconds: 300

renames: []

validation:
# Should WRITETIMEs and TTLs be compared?
compareTimestamps: false
Expand Down
20 changes: 7 additions & 13 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,14 @@ target:
# # Optional - the session name to use. If not set, we use 'scylla-migrator'
# sessionName: <roleSessionName>
#
# # Split factor for reading/writing. This is required for Scylla targets.
# scanSegments: 1
#
# # throttling settings, set based on your capacity (or wanted capacity)
# readThroughput: 1
#
# # The value of dynamodb.throughput.read.percent can be between 0.1 and 1.5, inclusively.
# # 0.5 represents the default read rate, meaning that the job will attempt to consume half of the read capacity of the table.
# # If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the read request rate.
# # (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
# throughputReadPercent: 1.0
# # throttling settings, set based on your write capacity units (or wanted capacity)
# writeThroughput: 1
#
# # how many tasks per executor?
# maxMapTasks: 1
# # The value of dynamodb.throughput.write.percent can be between 0.1 and 1.5, inclusively.
# # 0.5 represents the default write rate, meaning that the job will attempt to consume half of the write capacity of the table.
# # If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the write request rate.
# # (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
# throughputWritePercent: 1.0
#
# # When transferring DynamoDB sources to DynamoDB targets (such as other DynamoDB tables or Alternator tables),
# # the migrator supports transferring live changes occuring on the source table after transferring an initial
Expand Down
19 changes: 8 additions & 11 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ A source of type ``dynamodb`` can be used together with a target of type ``dynam
# Optional - Split factor for reading. The default is to split the source data into chunks
# of 128 MB that can be processed in parallel by the Spark executors.
scanSegments: 1
# Optional - Throttling settings, set based on your database capacity (or wanted capacity)
# Optional - Throttling settings, set based on your database read capacity units (or wanted capacity)
readThroughput: 1
# Optional - Can be between 0.1 and 1.5, inclusively.
# 0.5 represents the default read rate, meaning that the job will attempt to consume half of the read capacity of the table.
Expand Down Expand Up @@ -295,17 +295,14 @@ DynamoDB Target
type: dynamodb
# Name of the table to write. If it does not exist, it will be created on the fly.
table: <table>
# Optional - Split factor for writing.
scanSegments: 1
# Optional - Throttling settings, set based on your database capacity (or wanted capacity)
readThroughput: 1
# Optional - Throttling settings, set based on your database write capacity units (or wanted capacity).
# By default, for provisioned tables we use the configured write capacity units, and for on-demand tables we use the value 40000.
writeThroughput: 1
# Optional - Can be between 0.1 and 1.5, inclusively.
# 0.5 represents the default read rate, meaning that the job will attempt to consume half of the read capacity of the table.
# If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the read request rate.
# (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
throughputReadPercent: 1.0
# Optional - At most how many tasks per Spark executor? Default is to use the same as 'scanSegments'.
maxMapTasks: 1
# 0.5 represents the default write rate, meaning that the job will attempt to consume half of the write capacity of the table.
# If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the write request rate.
# (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
throughputWritePercent: 1.0
# When transferring DynamoDB sources to DynamoDB targets (such as other DynamoDB tables or Alternator tables),
# the migrator supports transferring live changes occurring on the source table after transferring an initial
# snapshot.
Expand Down
19 changes: 7 additions & 12 deletions docs/source/migrate-from-dynamodb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,15 @@ Additionally, you can also set the following optional properties:
accessKey: <access-key>
secretKey: <secret-key>

# Split factor for writing.
scanSegments: 1

# Throttling settings, set based on your database capacity (or wanted capacity)
readThroughput: 1
# Throttling settings, set based on your database write capacity units (or wanted capacity).
# By default, for provisioned tables we use the configured write capacity units, and for on-demand tables we use the value 40000.
writeThroughput: 1

# Can be between 0.1 and 1.5, inclusively.
# 0.5 represents the default read rate, meaning that the job will attempt to consume half of the read capacity of the table.
# If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the read request rate.
# (The actual read rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
throughputReadPercent: 1.0

# At most how many tasks per Spark executor?
maxMapTasks: 1
# 0.5 represents the default write rate, meaning that the job will attempt to consume half of the write capacity of the table.
# If you increase the value above 0.5, spark will increase the request rate; decreasing the value below 0.5 decreases the write request rate.
# (The actual write rate will vary, depending on factors such as whether there is a uniform key distribution in the DynamoDB table.)
throughputWritePercent: 1.0

# When streamChanges is true, skip the initial snapshot transfer and only stream changes.
# This setting is ignored if streamChanges is false.
Expand Down
37 changes: 16 additions & 21 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,14 @@ object DynamoUtils {
maybeScanSegments: Option[Int],
maybeMaxMapTasks: Option[Int],
maybeAwsCredentials: Option[AWSCredentials]): Unit = {
setOptionalConf(jobConf, DynamoDBConstants.REGION, maybeRegion)
setOptionalConf(jobConf, DynamoDBConstants.ENDPOINT, maybeEndpoint.map(_.renderEndpoint))
for (region <- maybeRegion) {
log.info(s"Using AWS region: ${region}")
jobConf.set(DynamoDBConstants.REGION, region)
}
for (endpoint <- maybeEndpoint) {
log.info(s"Using AWS endpoint: ${endpoint.renderEndpoint}")
jobConf.set(DynamoDBConstants.ENDPOINT, endpoint.renderEndpoint)
}
setOptionalConf(jobConf, DynamoDBConstants.SCAN_SEGMENTS, maybeScanSegments.map(_.toString))
setOptionalConf(jobConf, DynamoDBConstants.MAX_MAP_TASKS, maybeMaxMapTasks.map(_.toString))
for (credentials <- maybeAwsCredentials) {
Expand All @@ -245,42 +251,31 @@ object DynamoUtils {
}

/**
* @return The read throughput (in bytes per second) of the
* provided table description.
* @return The read throughput (in RCU) of the provided table description.
* If the table billing mode is PROVISIONED, it returns the
* table RCU multiplied by the number of bytes per read
* capacity unit. Otherwise (e.g. ,in case of on-demand
* table RCU. Otherwise (e.g., in case of on-demand
* billing mode), it returns
* [[DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND]].
*/
def tableReadThroughput(description: TableDescription): Long =
tableThroughput(
description,
DynamoDBConstants.BYTES_PER_READ_CAPACITY_UNIT.longValue(),
_.readCapacityUnits)
tableThroughput(description, _.readCapacityUnits)

/**
* @return The write throughput (in bytes per second) of the
* provided table description.
* @return The write throughput (in WCU) of the provided table description.
* If the table billing mode is PROVISIONED, it returns the
* table WCU multiplied by the number of bytes per write
* capacity unit. Otherwise (e.g. ,in case of on-demand
* table WCU. Otherwise (e.g., in case of on-demand
* billing mode), it returns
* [[DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND]].
*/
def tableWriteThroughput(description: TableDescription): Long =
tableThroughput(
description,
DynamoDBConstants.BYTES_PER_WRITE_CAPACITY_UNIT.longValue(),
_.writeCapacityUnits)
tableThroughput(description, _.writeCapacityUnits)

private def tableThroughput(description: TableDescription,
bytesPerCapacityUnit: Long,
capacityUnits: ProvisionedThroughputDescription => Long): Long =
if (description.billingModeSummary == null || description.billingModeSummary.billingMode == BillingMode.PROVISIONED) {
capacityUnits(description.provisionedThroughput) * bytesPerCapacityUnit
capacityUnits(description.provisionedThroughput)
} else {
DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND * bytesPerCapacityUnit
DynamoDBConstants.DEFAULT_CAPACITY_FOR_ON_DEMAND
}

/** Reflection-friendly credentials provider used by the EMR DynamoDB connector */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ object AlternatorValidator {
targetSettings.finalCredentials,
targetSettings.region,
targetSettings.table,
targetSettings.scanSegments,
targetSettings.maxMapTasks,
readThroughput = None,
throughputReadPercent = None
sourceSettings.scanSegments, // Reuse same settings as source table
sourceSettings.maxMapTasks,
sourceSettings.readThroughput,
sourceSettings.throughputReadPercent
)

// Define some aliases to prevent the Spark engine to try to serialize the whole object graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ object TargetSettings {
region: Option[String],
credentials: Option[AWSCredentials],
table: String,
scanSegments: Option[Int],
writeThroughput: Option[Int],
throughputWritePercent: Option[Float],
maxMapTasks: Option[Int],
streamChanges: Boolean,
skipInitialSnapshotTransfer: Option[Boolean])
extends TargetSettings {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ DescribeTableRequest, TableDescription }

object DynamoDB {

val log = LogManager.getLogger("com.scylladb.migrator.readers.DynamoDB")

def readRDD(
spark: SparkSession,
source: SourceSettings.DynamoDB): (RDD[(Text, DynamoDBItemWritable)], TableDescription) =
Expand Down Expand Up @@ -106,11 +109,18 @@ object DynamoDB {
jobConf,
DynamoDBConstants.TABLE_SIZE_BYTES,
Option(description.tableSizeBytes).map(_.toString))
jobConf.set(
DynamoDBConstants.READ_THROUGHPUT,
readThroughput
.getOrElse(DynamoUtils.tableReadThroughput(description))
.toString)
val throughput = readThroughput match {
case Some(value) =>
log.info(
s"Setting up Hadoop job to read the table using a configured throughput of ${value} RCU(s)")
value
case None =>
val value = DynamoUtils.tableReadThroughput(description)
log.info(
s"Setting up Hadoop job to read the table using a computed throughput of ${value} RCU(s)")
value
}
jobConf.set(DynamoDBConstants.READ_THROUGHPUT, throughput.toString)
setOptionalConf(
jobConf,
DynamoDBConstants.THROUGHPUT_READ_PERCENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.scylladb.migrator.config.TargetSettings
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.{ Level, LogManager }
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ AttributeValue, TableDescription }
Expand All @@ -15,6 +15,8 @@ import java.util

object DynamoDB {

val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoDB")

def writeRDD(target: TargetSettings.DynamoDB,
renamesMap: Map[String, String],
rdd: RDD[(Text, DynamoDBItemWritable)],
Expand All @@ -26,12 +28,22 @@ object DynamoDB {
jobConf,
target.region,
target.endpoint,
target.scanSegments,
target.maxMapTasks,
maybeScanSegments = None,
maybeMaxMapTasks = None,
target.finalCredentials)
jobConf.set(DynamoDBConstants.OUTPUT_TABLE_NAME, target.table)
val writeThroughput =
target.writeThroughput.getOrElse(DynamoUtils.tableWriteThroughput(targetTableDesc))
target.writeThroughput match {
case Some(value) =>
log.info(
s"Setting up Hadoop job to write table using a configured throughput of ${value} WCU(s)")
value
case None =>
val value = DynamoUtils.tableWriteThroughput(targetTableDesc)
log.info(
s"Setting up Hadoop job to write table using a calculated throughput of ${value} WCU(s)")
value
}
jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT, writeThroughput.toString)
setOptionalConf(
jobConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@ class DynamoDBInputFormatTest extends munit.FunSuite {
}

test("no configured scanSegments in on-demand billing mode and table size is 100 GB") {
checkPartitions(1024)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None)
// segments are limited by the default read throughput
checkPartitions(200)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None)
}

test("no configured scanSegments in on-demand billing mode, table size is 100 GB, and read throughput is 1,000,000") {
checkPartitions(1024)(tableSizeBytes = 100 * GB, tableProvisionedThroughput = None, configuredReadThroughput = Some(1000000))
}

test("no configured scanSegments in provisioned billing mode") {
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = Some((25, 25)))
checkPartitions(10)(tableSizeBytes = 1 * GB, tableProvisionedThroughput = Some((10000, 10000)))
}

test("scanSegments = 42") {
Expand Down
Loading