Skip to content

Commit

Permalink
Add more logs when configuring Hadoop jobs reading/writing DynamoDB t…
Browse files Browse the repository at this point in the history
…ables
  • Loading branch information
julienrf committed Aug 19, 2024
1 parent 8f0d2b8 commit 1326309
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 9 deletions.
10 changes: 8 additions & 2 deletions migrator/src/main/scala/com/scylladb/migrator/DynamoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,14 @@ object DynamoUtils {
maybeScanSegments: Option[Int],
maybeMaxMapTasks: Option[Int],
maybeAwsCredentials: Option[AWSCredentials]): Unit = {
setOptionalConf(jobConf, DynamoDBConstants.REGION, maybeRegion)
setOptionalConf(jobConf, DynamoDBConstants.ENDPOINT, maybeEndpoint.map(_.renderEndpoint))
for (region <- maybeRegion) {
log.info(s"Using AWS region: ${region}")
jobConf.set(DynamoDBConstants.REGION, region)
}
for (endpoint <- maybeEndpoint) {
log.info(s"Using AWS endpoint: ${endpoint.renderEndpoint}")
jobConf.set(DynamoDBConstants.ENDPOINT, endpoint.renderEndpoint)
}
setOptionalConf(jobConf, DynamoDBConstants.SCAN_SEGMENTS, maybeScanSegments.map(_.toString))
setOptionalConf(jobConf, DynamoDBConstants.MAX_MAP_TASKS, maybeMaxMapTasks.map(_.toString))
for (credentials <- maybeAwsCredentials) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ DescribeTableRequest, TableDescription }

object DynamoDB {

val log = LogManager.getLogger("com.scylladb.migrator.readers.DynamoDB")

def readRDD(
spark: SparkSession,
source: SourceSettings.DynamoDB): (RDD[(Text, DynamoDBItemWritable)], TableDescription) =
Expand Down Expand Up @@ -106,11 +109,18 @@ object DynamoDB {
jobConf,
DynamoDBConstants.TABLE_SIZE_BYTES,
Option(description.tableSizeBytes).map(_.toString))
jobConf.set(
DynamoDBConstants.READ_THROUGHPUT,
readThroughput
.getOrElse(DynamoUtils.tableReadThroughput(description))
.toString)
val throughput = readThroughput match {
case Some(value) =>
log.info(
s"Setting up Hadoop job to read the table using a configured throughput of ${value} RCU(s)")
value
case None =>
val value = DynamoUtils.tableReadThroughput(description)
log.info(
s"Setting up Hadoop job to read the table using a computed throughput of ${value} RCU(s)")
value
}
jobConf.set(DynamoDBConstants.READ_THROUGHPUT, throughput.toString)
setOptionalConf(
jobConf,
DynamoDBConstants.THROUGHPUT_READ_PERCENT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.scylladb.migrator.config.TargetSettings
import org.apache.hadoop.dynamodb.{ DynamoDBConstants, DynamoDBItemWritable }
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.{ Level, LogManager }
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.dynamodb.model.{ AttributeValue, TableDescription }
Expand All @@ -15,6 +15,8 @@ import java.util

object DynamoDB {

val log = LogManager.getLogger("com.scylladb.migrator.writers.DynamoDB")

def writeRDD(target: TargetSettings.DynamoDB,
renamesMap: Map[String, String],
rdd: RDD[(Text, DynamoDBItemWritable)],
Expand All @@ -31,7 +33,17 @@ object DynamoDB {
target.finalCredentials)
jobConf.set(DynamoDBConstants.OUTPUT_TABLE_NAME, target.table)
val writeThroughput =
target.writeThroughput.getOrElse(DynamoUtils.tableWriteThroughput(targetTableDesc))
target.writeThroughput match {
case Some(value) =>
log.info(
s"Setting up Hadoop job to write table using a configured throughput of ${value} WCU(s)")
value
case None =>
val value = DynamoUtils.tableWriteThroughput(targetTableDesc)
log.info(
s"Setting up Hadoop job to write table using a calculated throughput of ${value} WCU(s)")
value
}
jobConf.set(DynamoDBConstants.WRITE_THROUGHPUT, writeThroughput.toString)
setOptionalConf(
jobConf,
Expand Down

0 comments on commit 1326309

Please sign in to comment.