Skip to content

Commit

Permalink
[SPARK-45605][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBU…
Browse files Browse the repository at this point in the history
…F][EXAMPLES] Replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues`

### What changes were proposed in this pull request?
This pr replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues`  due to `s.c.MapOps.mapValues` marked as deprecated since Scala 2.13.0:

https://github.com/scala/scala/blob/bf45e199e96383b96a6955520d7d2524c78e6e12/src/library/scala/collection/Map.scala#L256-L262

```scala
  deprecated("Use .view.mapValues(f). A future version will include a strict version of this method (for now, .view.mapValues(f).toMap).", "2.13.0")
  def mapValues[W](f: V => W): MapView[K, W] = new MapView.MapValues(this, f)
```

### Why are the changes needed?
Cleanup deprecated API usage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Pass GitHub Acitons
- Packaged the client, manually tested `DFSReadWriteTest/MiniReadWriteTest/PowerIterationClusteringExample`.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#43448 from LuciferYang/SPARK-45605.

Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
LuciferYang authored and srowen committed Oct 29, 2023
1 parent 4ae99f9 commit 89ca8b6
Show file tree
Hide file tree
Showing 83 changed files with 140 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CountMinSketchSuite extends AnyFunSuite { // scalastyle:ignore funsuite

val exactFreq = {
val sampledItems = sampledItemIndices.map(allItems)
sampledItems.groupBy(identity).mapValues(_.length.toLong)
sampledItems.groupBy(identity).view.mapValues(_.length.toLong)
}

val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ private[sql] object AvroUtils extends Logging {
private[this] val avroFieldArray = avroSchema.getFields.asScala.toArray
private[this] val fieldMap = avroSchema.getFields.asScala
.groupBy(_.name.toLowerCase(Locale.ROOT))
.view
.mapValues(_.toSeq) // toSeq needed for scala 2.13

/** The fields which have matching equivalents in both Avro and Catalyst schemas. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class SparkSession private[sql] (
proto.SqlCommand
.newBuilder()
.setSql(sqlText)
.putAllNamedArguments(args.asScala.mapValues(lit(_).expr).toMap.asJava)))
.putAllNamedArguments(args.asScala.view.mapValues(lit(_).expr).toMap.asJava)))
val plan = proto.Plan.newBuilder().setCommand(cmd)
// .toBuffer forces that the iterator is consumed and closed
val responseSeq = client.execute(plan.build()).toBuffer.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class ClientDataFrameStatSuite extends RemoteSparkSession {
val columnNames = crosstab.schema.fieldNames
assert(columnNames(0) === "a_b")
// reduce by key
val expected = data.map(t => (t, 1)).groupBy(_._1).mapValues(_.length)
val expected = data.map(t => (t, 1)).groupBy(_._1).view.mapValues(_.length)
val rows = crosstab.collect()
rows.foreach { row =>
val i = row.getString(0).toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ package object dsl {
}

def fillValueMap(valueMap: Map[String, Any]): Relation = {
val (cols, values) = valueMap.mapValues(toLiteralProto).toSeq.unzip
val (cols, values) = valueMap.view.mapValues(toLiteralProto).toSeq.unzip
Relation
.newBuilder()
.setFillNa(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,11 @@ class SparkConnectPlanner(
if (!namedArguments.isEmpty) {
NameParameterizedQuery(
parsedPlan,
namedArguments.asScala.mapValues(transformExpression).toMap)
namedArguments.asScala.view.mapValues(transformExpression).toMap)
} else if (!posArguments.isEmpty) {
PosParameterizedQuery(parsedPlan, posArguments.asScala.map(transformExpression).toSeq)
} else if (!args.isEmpty) {
NameParameterizedQuery(parsedPlan, args.asScala.mapValues(transformLiteral).toMap)
NameParameterizedQuery(parsedPlan, args.asScala.view.mapValues(transformLiteral).toMap)
} else if (!posArgs.isEmpty) {
PosParameterizedQuery(parsedPlan, posArgs.asScala.map(transformLiteral).toSeq)
} else {
Expand Down Expand Up @@ -2518,15 +2518,18 @@ class SparkConnectPlanner(
val df = if (!namedArguments.isEmpty) {
session.sql(
getSqlCommand.getSql,
namedArguments.asScala.mapValues(e => Column(transformExpression(e))).toMap,
namedArguments.asScala.view.mapValues(e => Column(transformExpression(e))).toMap,
tracker)
} else if (!posArguments.isEmpty) {
session.sql(
getSqlCommand.getSql,
posArguments.asScala.map(e => Column(transformExpression(e))).toArray,
tracker)
} else if (!args.isEmpty) {
session.sql(getSqlCommand.getSql, args.asScala.mapValues(transformLiteral).toMap, tracker)
session.sql(
getSqlCommand.getSql,
args.asScala.view.mapValues(transformLiteral).toMap,
tracker)
} else if (!posArgs.isEmpty) {
session.sql(getSqlCommand.getSql, posArgs.asScala.map(transformLiteral).toArray, tracker)
} else {
Expand Down Expand Up @@ -3262,7 +3265,7 @@ class SparkConnectPlanner(
proto.GetResourcesCommandResult
.newBuilder()
.putAllResources(
session.sparkContext.resources
session.sparkContext.resources.view
.mapValues(resource =>
proto.ResourceInformation
.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2294,7 +2294,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
Execute { q =>
// wait to reach the last offset in every partition
q.awaitOffset(0,
KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L).toMap), streamingTimeout.toMillis)
KafkaSourceOffset(partitionOffsets.view.mapValues(_ => 3L).toMap),
streamingTimeout.toMillis)
},
CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
StopStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ class KafkaTestUtils(
}

def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).mapValues(_.size).toSeq
zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).view.mapValues(_.size).toSeq
}

/** Create a Kafka topic and wait until it is propagated to the whole cluster */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ object ConsumerStrategies {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down Expand Up @@ -320,7 +320,7 @@ object ConsumerStrategies {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down Expand Up @@ -404,7 +404,7 @@ object ConsumerStrategies {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
def consumer(): Consumer[K, V] = this.synchronized {
if (null == kc) {
kc = consumerStrategy.onStart(
currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava)
currentOffsets.view.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava)
}
kc
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ class DirectKafkaStreamSuite
/** Get the generated offset ranges from the DirectKafkaStream */
private def getOffsetRanges[K, V](
kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = {
kafkaStream.generatedRDDs.mapValues { rdd =>
kafkaStream.generatedRDDs.view.mapValues { rdd =>
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}.toSeq.sortBy { _._1 }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private[kafka010] class KafkaTestUtils extends Logging {

/** Java-friendly function for sending messages to the Kafka broker */
def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
sendMessages(topic, Map(messageToFreq.asScala.view.mapValues(_.intValue()).toSeq: _*))
}

/** Send the messages to the Kafka broker */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator(
sentSeqNumbers += ((num, seqNumber))
}

shardIdToSeqNumbers.mapValues(_.toSeq).toMap
shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService())
}
producer.flushSync()
shardIdToSeqNumbers.mapValues(_.toSeq).toMap
shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")

shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
shardIdToData = shardIdToDataAndSeqNumbers.mapValues(_.map(_._1)).toMap
shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues(_.map(_._2)).toMap
shardIdToData = shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._1)).toMap
shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._2)).toMap
shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
val seqNumRange = SequenceNumberRange(
testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private[sql] object ProtobufUtils extends Logging {
private[this] val protoFieldArray = descriptor.getFields.asScala.toArray
private[this] val fieldMap = descriptor.getFields.asScala
.groupBy(_.getName.toLowerCase(Locale.ROOT))
.view
.mapValues(_.toSeq) // toSeq needed for scala 2.13

/** The fields which have matching equivalents in both Protobuf and Catalyst schemas. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(
withReplacement,
fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize
fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize
seed))

/**
Expand Down Expand Up @@ -179,7 +179,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKeyExact(
withReplacement,
fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize
fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize
seed))

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
* @note This does not necessarily mean the caching or computation was successful.
*/
def getPersistentRDDs: JMap[java.lang.Integer, JavaRDD[_]] = {
sc.getPersistentRDDs.mapValues(s => JavaRDD.fromRDD(s)).toMap
sc.getPersistentRDDs.view.mapValues(s => JavaRDD.fromRDD(s)).toMap
.asJava.asInstanceOf[JMap[java.lang.Integer, JavaRDD[_]]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ private[spark] class PythonWorkerFactory(
daemon = null
daemonPort = 0
} else {
simpleWorkers.mapValues(_.destroy())
simpleWorkers.view.mapValues(_.destroy())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,6 @@ object InputFormatInfo {
}
}

nodeToSplit.mapValues(_.toSet).toMap
nodeToSplit.view.mapValues(_.toSet).toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private[spark] class TaskSchedulerImpl(
.build[String, ExecutorDecommissionState]()

def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
executorIdToRunningTaskIds.toMap.view.mapValues(_.size).toMap
}

// The set of executors we have on each host; this is used to compute hostsAlive, which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized {
executorDataMap.mapValues(v => v.registrationTs).toMap
executorDataMap.view.mapValues(v => v.registrationTs).toMap
}

override def isExecutorActive(id: String): Boolean = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ private[protobuf] class ApplicationEnvironmentInfoWrapperSerializer
new ResourceProfileInfo(
id = info.getId,
executorResources =
info.getExecutorResourcesMap.asScala.mapValues(deserializeExecutorResourceRequest).toMap,
info.getExecutorResourcesMap.asScala.view.mapValues(deserializeExecutorResourceRequest)
.toMap,
taskResources =
info.getTaskResourcesMap.asScala.mapValues(deserializeTaskResourceRequest).toMap)
info.getTaskResourcesMap.asScala.view.mapValues(deserializeTaskResourceRequest).toMap)
}

private def deserializeExecutorResourceRequest(info: StoreTypes.ExecutorResourceRequest):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private[protobuf] class ExecutorSummaryWrapperSerializer
blacklistedInStages = binary.getBlacklistedInStagesList.asScala.map(_.toInt).toSet,
peakMemoryMetrics = peakMemoryMetrics,
attributes = binary.getAttributesMap.asScala.toMap,
resources = binary.getResourcesMap.asScala.mapValues(deserializeResourceInformation).toMap,
resources =
binary.getResourcesMap.asScala.view.mapValues(deserializeResourceInformation).toMap,
resourceProfileId = binary.getResourceProfileId,
isExcluded = binary.getIsExcluded,
excludedInStages = binary.getExcludedInStagesList.asScala.map(_.toInt).toSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,6 @@ private[protobuf] class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWr
numCompletedStages = info.getNumCompletedStages,
numSkippedStages = info.getNumSkippedStages,
numFailedStages = info.getNumFailedStages,
killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
killedTasksSummary = info.getKillTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa
new StageDataWrapper(
info = info,
jobIds = binary.getJobIdsList.asScala.map(_.toInt).toSet,
locality = binary.getLocalityMap.asScala.mapValues(_.toLong).toMap
locality = binary.getLocalityMap.asScala.view.mapValues(_.toLong).toMap
)
}

Expand All @@ -402,7 +402,7 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa
entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap)
} else None
val executorSummary = if (MapUtils.isNotEmpty(binary.getExecutorSummaryMap)) {
Some(binary.getExecutorSummaryMap.asScala.mapValues(
Some(binary.getExecutorSummaryMap.asScala.view.mapValues(
ExecutorStageSummarySerializer.deserialize).toMap)
} else None
val speculationSummary =
Expand Down Expand Up @@ -475,7 +475,7 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa
tasks = tasks,
executorSummary = executorSummary,
speculationSummary = speculationSummary,
killedTasksSummary = binary.getKilledTasksSummaryMap.asScala.mapValues(_.toInt).toMap,
killedTasksSummary = binary.getKilledTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap,
resourceProfileId = binary.getResourceProfileId,
peakExecutorMetrics = peakExecutorMetrics,
taskMetricsDistributions = taskMetricsDistributions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class SparkThrowableSuite extends SparkFunSuite {
}

def checkIfUnique(ss: Seq[Any]): Unit = {
val dups = ss.groupBy(identity).mapValues(_.size).filter(_._2 > 1).keys.toSeq
val dups = ss.groupBy(identity).view.mapValues(_.size).filter(_._2 > 1).keys.toSeq
assert(dups.isEmpty, s"Duplicate error classes: ${dups.mkString(", ")}")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
seed: Long,
n: Long): Unit = {
val trials = stratifiedData.countByKey()
val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
val expectedSampleSize = stratifiedData.countByKey().view.mapValues(count =>
math.ceil(count * samplingRate).toInt)
val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
val sample = if (exact) {
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
.getPartitions
.map(coalescedRDD.getPreferredLocations(_).head)
.groupBy(identity)
.view
.mapValues(_.size)

// Make sure the coalesced partitions are distributed fairly evenly between the two locations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
// assert that each address was assigned `slots` times
info.assignedAddrs
.groupBy(identity)
.view
.mapValues(_.size)
.foreach(x => assert(x._2 == slots))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS
val blockLocs = rddUpdates.map { update =>
(update.blockUpdatedInfo.blockId.name,
update.blockUpdatedInfo.blockManagerId)}
val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
val blocksToManagers = blockLocs.groupBy(_._1).view.mapValues(_.size)
assert(blocksToManagers.exists(_._2 > 1),
s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" +
s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer()
)

configureMockTransfer(blocks.mapValues(_ => createMockManagedBuffer(0)).toMap)
configureMockTransfer(blocks.view.mapValues(_ => createMockManagedBuffer(0)).toMap)

val iterator = createShuffleBlockIteratorWithDefaults(
Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
val expected = (0 until 3).map { p =>
var v = (0 until size).map { i => (i / 4, i) }.filter { case (k, _) => k % 3 == p }.toSet
if (withPartialAgg) {
v = v.groupBy(_._1).mapValues { s => s.map(_._2).sum }.toSet
v = v.groupBy(_._1).view.mapValues { s => s.map(_._2).sum }.toSet
}
(p, v.toSet)
}.toSet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ object DFSReadWriteTest {
.flatMap(_.split("\t"))
.filter(_.nonEmpty)
.groupBy(w => w)
.view
.mapValues(_.size)
.values
.sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ object MiniReadWriteTest {
.flatMap(_.split("\t"))
.filter(_.nonEmpty)
.groupBy(w => w)
.view
.mapValues(_.size)
.values
.sum
Expand Down
Loading

0 comments on commit 89ca8b6

Please sign in to comment.