Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 2026c68

Browse files
committed
emit shuffle pod watch exception
Signed-off-by: forrestchen <[email protected]>
1 parent d7dd259 commit 2026c68

File tree

1 file changed

+7
-11
lines changed

1 file changed

+7
-11
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala

+7-11
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,14 @@ private[spark] class KubernetesExternalShuffleManagerImpl(
106106
}
107107

108108
private def addShufflePodToCache(pod: Pod): Unit = shufflePodCache.synchronized {
109-
if (shufflePodCache.contains(pod.getSpec.getNodeName)) {
110-
val registeredPodName = shufflePodCache.get(pod.getSpec.getNodeName).get
111-
logError(s"Ambiguous specification of shuffle service pod. " +
112-
s"Found multiple matching pods: ${pod.getMetadata.getName}, " +
113-
s"${registeredPodName} on ${pod.getSpec.getNodeName}")
114-
115-
throw new SparkException(s"Ambiguous specification of shuffle service pod. " +
116-
s"Found multiple matching pods: ${pod.getMetadata.getName}, " +
117-
s"${registeredPodName} on ${pod.getSpec.getNodeName}")
118-
} else {
119-
shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP
109+
if (shufflePodCache.exists(kv => kv._1 == pod.getSpec.getNodeName
110+
&& kv._2 != pod.getStatus.getPodIP)) {
111+
val registeredPodIP = shufflePodCache(pod.getSpec.getNodeName)
112+
logWarning(s"Ambiguous specification of shuffle service pod. " +
113+
s"Found multiple matching pods: ${pod.getMetadata.getName}(${pod.getStatus.getPodIP}), " +
114+
s"$registeredPodIP on ${pod.getSpec.getNodeName}, will update")
120115
}
116+
shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP
121117
}
122118

123119
override def stop(): Unit = {

0 commit comments

Comments
 (0)