-
Notifications
You must be signed in to change notification settings - Fork 376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1757] Add retry when sending RPC to LifecycleManager #3008
base: main
Are you sure you want to change the base?
Changes from 21 commits
a928c40
591ad34
b636fbb
b63c232
d968c77
4b98374
42687e7
0a4b8b7
0364bd3
0999748
dce8f13
99154e8
5bcf9fc
f8cd555
bc6237a
12650d1
b0d6e58
f515888
0451af2
015fbb3
f317d6d
ee720f3
ebecf66
09a7cdb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,9 @@ | |
|
||
package org.apache.celeborn.common.rpc | ||
|
||
import java.util.Random | ||
import java.util.concurrent.TimeUnit | ||
|
||
import scala.concurrent.Future | ||
import scala.reflect.ClassTag | ||
|
||
|
@@ -30,6 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf) | |
extends Serializable with Logging { | ||
|
||
private[this] val defaultAskTimeout = conf.rpcAskTimeout | ||
private[celeborn] val waitTimeBound = conf.rpcTimeoutRetryWaitMs.toInt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/** | ||
* return the address for the [[RpcEndpointRef]] | ||
|
@@ -88,4 +92,58 @@ abstract class RpcEndpointRef(conf: CelebornConf) | |
val future = ask[T](message, timeout) | ||
timeout.awaitResult(future, address) | ||
} | ||
|
||
/** | ||
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a | ||
* default timeout, retry if timeout, throw an exception if this still fails. | ||
* | ||
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message | ||
* loop of [[RpcEndpoint]]. | ||
* | ||
* @param message the message to send | ||
* @tparam T type of the reply message | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add doc for other params |
||
* @return the reply message from the corresponding [[RpcEndpoint]] | ||
*/ | ||
def askSync[T: ClassTag](message: Any, retryCount: Int): T = | ||
askSync(message, defaultAskTimeout, retryCount) | ||
|
||
/** | ||
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a | ||
* specified timeout, retry if timeout, throw an exception if this still fails. | ||
* | ||
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message | ||
* loop of [[RpcEndpoint]]. | ||
* | ||
* @param message the message to send | ||
* @param timeout the timeout duration | ||
* @tparam T type of the reply message | ||
* @return the reply message from the corresponding [[RpcEndpoint]] | ||
*/ | ||
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int): T = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
var numRetries = retryCount | ||
while (numRetries > 0) { | ||
numRetries -= 1 | ||
try { | ||
val future = ask[T](message, timeout) | ||
return timeout.awaitResult(future, address) | ||
} catch { | ||
case e: RpcTimeoutException => | ||
if (numRetries > 0) { | ||
val random = new Random | ||
val retryWaitMs = random.nextInt(waitTimeBound) | ||
try { | ||
TimeUnit.MILLISECONDS.sleep(retryWaitMs) | ||
} catch { | ||
case _: InterruptedException => | ||
throw e | ||
} | ||
} else { | ||
throw e | ||
} | ||
} | ||
} | ||
// should never be here | ||
val future = ask[T](message, timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about return null directly?
|
||
timeout.awaitResult(future, address) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,8 @@ | |
package org.apache.celeborn.common.rpc | ||
|
||
import java.io.File | ||
import java.util.Random | ||
import java.util.concurrent.TimeUnit | ||
|
||
import scala.concurrent.Future | ||
|
||
|
@@ -104,6 +106,7 @@ object RpcEnv { | |
abstract class RpcEnv(config: RpcEnvConfig) { | ||
|
||
private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout | ||
private[celeborn] val waitTimeBound = config.conf.rpcTimeoutRetryWaitMs.toInt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/** | ||
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement | ||
|
@@ -142,6 +145,41 @@ abstract class RpcEnv(config: RpcEnvConfig) { | |
setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName)) | ||
} | ||
|
||
/** | ||
* Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName` with timeout retry. | ||
* This is a blocking action. | ||
*/ | ||
def setupEndpointRef( | ||
address: RpcAddress, | ||
endpointName: String, | ||
retryCount: Int): RpcEndpointRef = { | ||
turboFei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var numRetries = retryCount | ||
while (numRetries > 0) { | ||
numRetries -= 1 | ||
try { | ||
return setupEndpointRefByAddr(RpcEndpointAddress(address, endpointName)) | ||
} catch { | ||
case e: RpcTimeoutException => | ||
if (numRetries > 0) { | ||
val random = new Random | ||
val retryWaitMs = random.nextInt(waitTimeBound) | ||
try { | ||
TimeUnit.MILLISECONDS.sleep(retryWaitMs) | ||
} catch { | ||
case _: InterruptedException => | ||
throw e | ||
} | ||
} else { | ||
throw e | ||
} | ||
case e: RpcEndpointNotFoundException => | ||
throw e | ||
} | ||
} | ||
// should never be here | ||
null | ||
} | ||
|
||
/** | ||
* Stop [[RpcEndpoint]] specified by `endpoint`. | ||
*/ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And you can move this config to
celeborn.rpc
part.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder that you can introduce a new config
celeborn.client.rpc.retryWait
for client end.