Skip to content

Commit

Permalink
Merge branch 'main' into branch-0.1
Browse files Browse the repository at this point in the history
# Conflicts:
#	client-spark/shuffle-manager-2/pom.xml
#	client-spark/shuffle-manager-3/pom.xml
#	client-spark/shuffle-manager-common/pom.xml
#	client/pom.xml
#	common/pom.xml
#	pom.xml
#	server-common/pom.xml
#	server-master/pom.xml
#	server-worker/pom.xml
  • Loading branch information
FMX committed Jul 7, 2022
2 parents 71ce1ae + ad6458b commit f00fae8
Show file tree
Hide file tree
Showing 58 changed files with 193 additions and 2,522 deletions.
17 changes: 9 additions & 8 deletions CONFIGURATION_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ off-heap-memory = numDirs * queueCapacity * bufferSize + network memory

For example, if an RSS worker has 10 storage directories, each directory has a queue whose capacity
is 4096, and the buffer size is set to 256 kilobytes. The necessary off-heap memory is 10 gigabytes.
NetWorker memory will be consumed when netty reads from a TPC channel, there will need some extra
memory. In conclusion, RSS worker off-heap memory should be set to `(numDirs * queueCapacity * bufferSize * 1.2)`.
Network memory will be consumed when netty reads from a TPC channel, there will need some extra
memory. Empirically, RSS worker off-heap memory should be set to `(numDirs * queueCapacity * bufferSize * 1.2)`.

### Client-Side Configurations

Expand All @@ -39,20 +39,21 @@ memory. In conclusion, RSS worker off-heap memory should be set to `(numDirs * q
| spark.rss.fetch.chunk.maxReqsInFlight | 3 | Amount of in-flight chunk fetch request. |
| spark.rss.data.io.threads | 8 | Amount of thread count for task to push data. |
| spark.rss.push.data.replicate | true | When true the RSS worker will replicate shuffle data to another RSS worker to ensure shuffle data won't be lost after the node failure. |
| spark.rss.application.heartbeatInterval | 10s | Application heartbeat interval. |
| spark.rss.stage.end.timeout | 240s | Time out for StageEnd. |
| spark.rss.shuffle.writer.mode | hash | RSS support two different shuffle writers. Hash-based shuffle writer works fine when shuffle partition count is normal. Sort-based shuffle writer works fine when memory pressure is high or shuffle partition count it huge. |

### RSS Master Configurations

| Item | Default | Description |
| :---: | :---: | :--: |
| rss.worker.timeout | 120s | |
| rss.application.timeout | 120s | |
| rss.stage.end.timeout | 120s | |
| rss.shuffle.writer.mode | hash | RSS support two different shuffle writers. Hash-based shuffle writer works fine when shuffle partition count is normal. Sort-based shuffle writer works fine when memory pressure is high or shuffle partition count it huge. |
| rss.rpc.io.clientThreads | min{64, availableCores} | |
| rss.rpc.io.serverThreads | min{64, availableCores} | |
| rss.master.port.maxretry | 1 | When RSS master port is occupied,we will retry for maxretry times. |
| rss.rpc.io.numConnectionsPerPeer | 1 | Connections between hosts are reused in order to reduce connection. |
| rss.ha.enabled | true | When true, RSS will activate raft implementation and sync shared data on master clusters. |
| rss.ha.enabled | false | When true, RSS will activate raft implementation and sync shared data on master clusters. |
| rss.ha.master.hosts | | Master hosts address list. |
| rss.ha.service.id | | When this config is empty, RSS master will refuse to startup. |
| rss.ha.nodes.{serviceId} | | Nodes list that deploy RSS master. ServiceId is `rss.ha.service.id` |
Expand All @@ -66,7 +67,7 @@ memory. In conclusion, RSS worker off-heap memory should be set to `(numDirs * q

| Item | Default | Description |
| :---: | :---: | :--: |
| rss.worker.base.dirs | | Directory list to store shuffle data. For the sake of performance, there should be no more than 2 directories on the same disk partition. |
| rss.worker.base.dirs | | Directory list to store shuffle data. For the sake of performance, there should be one directory per HDD and eight per SDD. |
| rss.worker.flush.buffer.size | 256K | |
| rss.worker.flush.queue.capacity | 512 | Size of buffer queue attached to each storage directory. Each flush buffer queue consumes `rss.worker.flush.buffer.size` * `rss.worker.flush.queue.capacity`(256K * 512 = 128M) off-heap memory. This config can be used to estimate RSS worker's off-heap memory demands. |
| rss.worker.fetch.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128 M and the data will need 16 fetch chunk requests to fetch. |
Expand Down Expand Up @@ -96,7 +97,7 @@ memory. In conclusion, RSS worker off-heap memory should be set to `(numDirs * q

Assume we have a cluster described as below:
5 RSS Workers with 20 GB off-heap memory and 10 disks.
As we need to reserver 20% off-heap memory for netty, so we could assume 16 GB off-heap memory can be used for flush buffers.
As we need to reserve 20% off-heap memory for netty, so we could assume 16 GB off-heap memory can be used for flush buffers.

If `spark.rss.push.data.buffer.size` is 64 KB, we can have in-flight requests up to 1310720.
If you have 8192 mapper tasks , you could set `spark.rss.push.data.maxReqsInFlight=160` to gain performance improvements.
Expand Down Expand Up @@ -173,7 +174,7 @@ So we should set `rss.worker.flush.queue.capacity=6553` and each RSS worker has
| `rss.worker.prometheus.metric.port` | 9096 | int | |
| `rss.merge.push.data.threshold` | 1 MiB | String | |
| `rss.driver.metaService.port` | 0 | int | |
| `rss.worker.closeIdleConnections` | true | bool | |
| `rss.worker.closeIdleConnections` | false | bool | |
| `rss.ha.enabled` | false | bool | |
| `rss.ha.master.hosts` | `rss.master.host` 的值 | String | |
| `rss.ha.service.id` | | String | |
Expand Down
4 changes: 2 additions & 2 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ scrape_configs:
## Implementation
RSS master metric : `com/aliyun/emr/rss/service/deploy/master/MasterSource.scala`
RSS worker metric : `com/aliyun/emr/rss/service/deploy/master/MasterSource.scala`
RSS worker metric : `com/aliyun/emr/rss/service/deploy/worker/WorkerSource.scala`
and `com.aliyun.emr.rss.common.metrics.source.NetWorkSource`

## Grafana Dashboard

We provide a grafana dashboard for RSS [Grafana-Dashboard](assets/grafana/rss-dashboard.json). The dashboard was generated by grafana which version is 8.5.0.
We provide a grafana dashboard for RSS [Grafana-Dashboard](assets/grafana/rss-dashboard.json). The dashboard was generated by grafana of version 8.5.0.
Here are some snapshots:
![d1](assets/img/dashboard1.png)
![d2](assets/img/dashboard2.png)
Expand Down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ EXAMPLE: single master cluster
rss.master.address master-host:port
rss.metrics.system.enabled true
rss.worker.flush.buffer.size 256k
rss.worker.flush.queue.capacity 512
rss.worker.flush.queue.capacity 4096
rss.worker.base.dirs /mnt/disk1/,/mnt/disk2
# If your hosts have disk raid or use lvm, set rss.device.monitor.enabled to false
rss.device.monitor.enabled false
Expand Down Expand Up @@ -217,3 +217,13 @@ RSS have various metrics. [METRICS](METRICS.md)
## Contribution
This is an active open-source project. We are always open to developers who want to use the system or contribute to it.
See more detail in [Contributing](CONTRIBUTING.md).

## NOTICE
If you need to fully restart an RSS cluster in HA mode, you must clean ratis meta storage first because ratis meta will store expired states of the last running cluster.

Here are some instructions:
1. Stop all workers.
2. Stop all masters.
3. Clean all master`s ratis meta storage directory(rss.ha.storage.dir).
4. Start all masters.
5. Start all workers.
2 changes: 1 addition & 1 deletion client-spark/shuffle-manager-2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion client-spark/shuffle-manager-3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion client-spark/shuffle-manager-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
private var getBlacklist: ScheduledFuture[_] = _

// Use independent app heartbeat threads to avoid being blocked by other operations.
private val heartbeatIntervalMs = RssConf.applicationHeatbeatIntervalMs(conf)
private val heartbeatThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("app-heartbeat")
private var appHeartbeat: ScheduledFuture[_] = _
private val responseCheckerThread = ThreadUtils.
Expand Down Expand Up @@ -123,7 +124,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
logError("Error while send heartbeat", t)
}
}
}, 0, 30, TimeUnit.SECONDS)
}, 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS)
}

override def onStart(): Unit = {
Expand Down Expand Up @@ -173,10 +174,10 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
case msg: GetBlacklist =>
handleGetBlacklist(msg)
case StageEnd(applicationId, shuffleId) =>
logInfo(s"Received StageEnd request, ${Utils.makeShuffleKey(applicationId, shuffleId)}.")
logDebug(s"Received StageEnd request, ${Utils.makeShuffleKey(applicationId, shuffleId)}.")
handleStageEnd(null, applicationId, shuffleId)
case UnregisterShuffle(applicationId, shuffleId, _) =>
logInfo(s"Received UnregisterShuffle request," +
logDebug(s"Received UnregisterShuffle request," +
s"${Utils.makeShuffleKey(applicationId, shuffleId)}.")
handleUnregisterShuffle(null, applicationId, shuffleId)
}
Expand All @@ -189,19 +190,19 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
numPartitions)

case Revive(applicationId, shuffleId, mapId, attemptId, reduceId, epoch, oldPartition, cause) =>
logDebug(s"Received Revive request, " +
logTrace(s"Received Revive request, " +
s"$applicationId, $shuffleId, $mapId, $attemptId, ,$reduceId," +
s" $epoch, $oldPartition, $cause.")
handleRevive(context, applicationId, shuffleId, mapId, attemptId,
reduceId, epoch, oldPartition, cause)

case PartitionSplit(applicationId, shuffleId, reduceId, epoch, oldPartition) =>
logDebug(s"Received split request, " +
logTrace(s"Received split request, " +
s"$applicationId, $shuffleId, $reduceId, $epoch, $oldPartition")
handlePartitionSplitRequest(context, applicationId, shuffleId, reduceId, epoch, oldPartition)

case MapperEnd(applicationId, shuffleId, mapId, attemptId, numMappers) =>
logDebug(s"Received MapperEnd request, " +
logTrace(s"Received MapperEnd request, " +
s"${Utils.makeMapKey(applicationId, shuffleId, mapId, attemptId)}.")
handleMapperEnd(context, applicationId, shuffleId, mapId, attemptId, numMappers)

Expand All @@ -211,7 +212,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
handleGetReducerFileGroup(context, shuffleId)

case StageEnd(applicationId, shuffleId) =>
logInfo(s"Received StageEnd request, ${Utils.makeShuffleKey(applicationId, shuffleId)}.")
logDebug(s"Received StageEnd request, ${Utils.makeShuffleKey(applicationId, shuffleId)}.")
handleStageEnd(context, applicationId, shuffleId)
}

Expand All @@ -229,7 +230,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
// If do, just register and return
registerShuffleRequest.synchronized {
if (registerShuffleRequest.containsKey(shuffleId)) {
logInfo("[handleRegisterShuffle] request for same shuffleKey exists, just register")
logDebug("[handleRegisterShuffle] request for same shuffleKey exists, just register")
registerShuffleRequest.get(shuffleId).add(context)
return
} else {
Expand All @@ -242,7 +243,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
.filter(_.getEpoch == 0)
.toList
.asJava
logDebug(s"Shuffle $shuffleId already registered, just return.")
if (initialLocs.size != numPartitions) {
logWarning(s"Shuffle $shuffleId location size ${initialLocs.size} not equal to " +
s"numPartitions: $numPartitions!")
Expand Down Expand Up @@ -400,15 +400,15 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
shuffleReviving.synchronized {
if (shuffleReviving.containsKey(reduceId)) {
shuffleReviving.get(reduceId).add(context)
logInfo(s"For $shuffleId, same partition $reduceId-$oldEpoch is reviving," +
logTrace(s"For $shuffleId, same partition $reduceId-$oldEpoch is reviving," +
s"register context.")
return
} else {
// check if new slot for the partition has allocated
val latestLoc = getLatestPartition(shuffleId, reduceId, oldEpoch)
if (latestLoc != null) {
context.reply(ChangeLocationResponse(StatusCode.Success, latestLoc))
logInfo(s"New partition found, old partition $reduceId-$oldEpoch return it." +
logDebug(s"New partition found, old partition $reduceId-$oldEpoch return it." +
s" shuffleId: $shuffleId $latestLoc")
return
}
Expand Down Expand Up @@ -467,7 +467,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
slaves.get(0).getPeer
}

logDebug(s"[Update partition] success for $shuffleId $location.")
contexts.synchronized {
contexts.remove(reduceId)
}.asScala.foreach(_.reply(ChangeLocationResponse(StatusCode.Success, location)))
Expand Down Expand Up @@ -498,14 +497,11 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
shuffleSplitting.synchronized {
if (shuffleSplitting.containsKey(reduceId)) {
shuffleSplitting.get(reduceId).add(context)
logDebug(s"For $shuffleId, same $reduceId-$oldEpoch is splitting, register context")
return
} else {
val latestLoc = getLatestPartition(shuffleId, reduceId, oldEpoch)
if (latestLoc != null) {
context.reply(ChangeLocationResponse(StatusCode.Success, latestLoc))
logDebug(s"Split request found new partition, old partition $reduceId-$oldEpoch" +
s" return it. shuffleId: $shuffleId $latestLoc")
return
}
val set = new util.HashSet[RpcCallContext]()
Expand All @@ -514,7 +510,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
}
}

logDebug(s"Relocate partition for shuffle split ${Utils.makeShuffleKey(applicationId,
logDebug(s"Relocate partition for shuffle split ${Utils.makeShuffleKey(applicationId,
shuffleId)}, oldPartition: $oldPartition")

handleChangePartitionLocation(shuffleSplitting, applicationId, shuffleId, reduceId,
Expand All @@ -534,7 +530,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
var attempts = shuffleMapperAttempts.get(shuffleId)
// it would happen when task with no shuffle data called MapperEnd first
if (attempts == null) {
logInfo(s"[handleMapperEnd] $shuffleId not registered, create one.")
logDebug(s"[handleMapperEnd] $shuffleId not registered, create one.")
attempts = new Array[Int](numMappers)
0 until numMappers foreach (ind => attempts(ind) = -1)
shuffleMapperAttempts.put(shuffleId, attempts)
Expand Down Expand Up @@ -579,7 +575,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
}
timeout = timeout - delta
}
logDebug(s"Start getting reduce file group, $shuffleId.")

if (dataLostShuffleSet.contains(shuffleId)) {
context.reply(GetReducerFileGroupResponse(StatusCode.Failed, null, null))
Expand Down Expand Up @@ -950,7 +945,7 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
// destroy success buffers
val destroyAfterRetry = retrySlots.asScala.filterKeys(!failedAfterRetry.contains(_)).toMap
destroyBuffersWithRetry(applicationId, shuffleId,
destroyAfterRetry.asInstanceOf[WorkerResource])
new WorkerResource(destroyAfterRetry.asJava))
}
}

Expand Down Expand Up @@ -1041,7 +1036,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
}

private def removeExpiredShuffle(): Unit = {
logInfo("Check for expired shuffle.")
val currentTime = System.currentTimeMillis()
val keys = unregisterShuffleTime.keys().asScala.toList
keys.foreach { key =>
Expand Down Expand Up @@ -1184,7 +1178,6 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
}

def isClusterOverload(numPartitions: Int = 0): Boolean = {
logInfo(s"Ask Sync Cluster Load Status")
try {
rssHARetryClient.askSync[GetClusterLoadStatusResponse](GetClusterLoadStatus(numPartitions),
classOf[GetClusterLoadStatusResponse]).isOverload
Expand Down
2 changes: 1 addition & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.0</version>
<version>0.1.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit f00fae8

Please sign in to comment.