Skip to content

Commit

Permalink
1.update version to 0.1.4
Browse files Browse the repository at this point in the history
2.optimize rpc memory usage
3.keep one copy of roaring bitmap
4.shade commands-lang3 to avoid dependency conflicts
5.update RSS dashboard
  • Loading branch information
FMX committed Oct 25, 2022
1 parent 515b4b5 commit 5e8d3ef
Show file tree
Hide file tree
Showing 17 changed files with 293 additions and 466 deletions.
658 changes: 217 additions & 441 deletions assets/grafana/rss-dashboard.json

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion client-spark/shuffle-manager-2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.3</version>
<version>${project.version}</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -132,6 +132,10 @@
<pattern>io.netty</pattern>
<shadedPattern>com.aliyun.emr.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.celeborn.org.apache.commons</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<includes>
Expand All @@ -144,6 +148,7 @@
<include>com.google.protobuf:protobuf-java</include>
<include>com.google.guava:guava</include>
<include>io.netty:*</include>
<include>org.apache.commons:commons-lang3</include>
</includes>
</artifactSet>
<filters>
Expand Down
7 changes: 6 additions & 1 deletion client-spark/shuffle-manager-3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.3</version>
<version>${project.version}</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -120,6 +120,10 @@
<pattern>io.netty</pattern>
<shadedPattern>com.aliyun.emr.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.celeborn.org.apache.commons</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<includes>
Expand All @@ -132,6 +136,7 @@
<include>com.google.protobuf:protobuf-java</include>
<include>com.google.guava:guava</include>
<include>io.netty:*</include>
<include>org.apache.commons:commons-lang3</include>
</includes>
</artifactSet>
<filters>
Expand Down
2 changes: 1 addition & 1 deletion client-spark/shuffle-manager-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.3</version>
<version>${project.version}</version>
<relativePath>../../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.3</version>
<version>${project.version}</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.atomic.LongAdder;

import io.netty.buffer.ByteBuf;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -164,8 +165,12 @@ private boolean skipLocation(int startMapIndex, int endMapIndex, PartitionLocati
if (endMapIndex == Integer.MAX_VALUE) {
return false;
}
RoaringBitmap bitmap = location.getMapIdBitMap();
if (bitmap == null && location.getPeer() != null) {
bitmap = location.getPeer().getMapIdBitMap();
}
for (int i = startMapIndex; i < endMapIndex; i++) {
if (location.getMapIdBitMap().contains(i)) {
if (bitmap.contains(i)) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package com.aliyun.emr.rss.client.write

import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.LongAdder
import java.util.concurrent.{Callable, ConcurrentHashMap, ScheduledFuture, TimeUnit}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.Random

import com.google.common.cache.{Cache, CacheBuilder}
import io.netty.util.internal.ConcurrentSet
import org.roaringbitmap.RoaringBitmap

Expand All @@ -38,7 +39,7 @@ import com.aliyun.emr.rss.common.protocol.RpcNameConstants.WORKER_EP
import com.aliyun.emr.rss.common.protocol.message.ControlMessages._
import com.aliyun.emr.rss.common.protocol.message.StatusCode
import com.aliyun.emr.rss.common.rpc._
import com.aliyun.emr.rss.common.rpc.netty.{NettyRpcEndpointRef, NettyRpcEnv}
import com.aliyun.emr.rss.common.rpc.netty.{LocalNettyRpcCallContext, NettyRpcEndpointRef, NettyRpcEnv, RemoteNettyRpcCallContext}
import com.aliyun.emr.rss.common.util.{ThreadUtils, Utils}

class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint with Logging {
Expand All @@ -52,6 +53,9 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
private val splitMode = RssConf.partitionSplitMode(conf)
private val storageHint = RssConf.storageHint(conf)
private val rangeReadFilter = RssConf.rangeReadFilterEnabled(conf)
private val rpcCacheSize = RssConf.rpcCacheSize(conf)
private val rpcCacheConcurrentLevel = RssConf.rpcCacheConcurrentLevel(conf)
private val rpcCacheExpireMs = RssConf.rpcCacheExpireTimeMs(conf)

private val unregisterShuffleTime = new ConcurrentHashMap[Int, Long]()

Expand All @@ -67,6 +71,11 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
// shuffle id -> (partitionId -> newest PartitionLocation)
private val latestPartitionLocation =
new ConcurrentHashMap[Int, ConcurrentHashMap[Int, PartitionLocation]]()
private val getReducerFileGroupRpcCache: Cache[Int, ByteBuffer] = CacheBuilder.newBuilder()
.concurrencyLevel(rpcCacheConcurrentLevel)
.expireAfterWrite(rpcCacheExpireMs, TimeUnit.MILLISECONDS)
.maximumSize(rpcCacheSize)
.build().asInstanceOf[Cache[Int, ByteBuffer]]

val newMapFunc =
new util.function.Function[Int, ConcurrentHashMap[Int, PartitionLocation]]() {
Expand Down Expand Up @@ -582,12 +591,26 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
if (dataLostShuffleSet.contains(shuffleId)) {
context.reply(GetReducerFileGroupResponse(StatusCode.Failed, null, null))
} else {
val shuffleFileGroup = reducerFileGroupsMap.get(shuffleId)
context.reply(GetReducerFileGroupResponse(
StatusCode.Success,
shuffleFileGroup,
shuffleMapperAttempts.get(shuffleId)
))
if (context.isInstanceOf[LocalNettyRpcCallContext]) {
// This branch is for the UTs
context.reply(GetReducerFileGroupResponse(
StatusCode.Success,
reducerFileGroupsMap.getOrDefault(shuffleId, Array.empty),
shuffleMapperAttempts.getOrDefault(shuffleId, Array.empty)))
} else {
val cachedMsg = getReducerFileGroupRpcCache.get(
shuffleId,
new Callable[ByteBuffer]() {
override def call(): ByteBuffer = {
val returnedMsg = GetReducerFileGroupResponse(
StatusCode.Success,
reducerFileGroupsMap.getOrDefault(shuffleId, Array.empty),
shuffleMapperAttempts.getOrDefault(shuffleId, Array.empty))
context.asInstanceOf[RemoteNettyRpcCallContext].nettyEnv.serialize(returnedMsg)
}
})
context.asInstanceOf[RemoteNettyRpcCallContext].callback.onSuccess(cachedMsg)
}
}
}

Expand Down Expand Up @@ -708,14 +731,14 @@ class LifecycleManager(appId: String, val conf: RssConf) extends RpcEndpoint wit
}
committedSlaveIds.asScala.foreach { id =>
val slavePartition = slavePartMap.get(id)
slavePartition.setMapIdBitMap(committedMapIdBitmap.get(id))
val masterPartition = committedPartitions.get(id)
if (masterPartition ne null) {
masterPartition.setPeer(slavePartition)
slavePartition.setPeer(masterPartition)
} else {
logWarning(s"Shuffle $shuffleId partition $id: master lost, " +
s"use slave $slavePartition.")
slavePartition.setMapIdBitMap(committedMapIdBitmap.get(id))
committedPartitions.put(id, slavePartition)
}
}
Expand Down
2 changes: 1 addition & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.3</version>
<version>${project.version}</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
12 changes: 12 additions & 0 deletions common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,18 @@ object RssConf extends Logging {
conf.getBoolean("rss.range.read.filter.enabled", false)
}

def rpcCacheSize(conf: RssConf): Int = {
conf.getInt("rss.rpc.cache.size", 256)
}

def rpcCacheConcurrentLevel(conf: RssConf): Int = {
conf.getInt("rss.rpc.cache.concurrent.level", 32)
}

def rpcCacheExpireTimeMs(conf: RssConf): Long = {
conf.getTimeAsMs("rss.rpc.cache.expire", "15s")
}

val WorkingDirName = "hadoop/rss-worker/shuffle_data"

// If we want to use multi-raft group we can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ private[rss] class LocalNettyRpcCallContext(
* A [[RpcCallContext]] that will call [[RpcResponseCallback]] to send the reply back.
*/
private[rss] class RemoteNettyRpcCallContext(
nettyEnv: NettyRpcEnv,
callback: RpcResponseCallback,
val nettyEnv: NettyRpcEnv,
val callback: RpcResponseCallback,
senderAddress: RpcAddress)
extends NettyRpcCallContext(senderAddress) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.io.{File, FileInputStream, InputStreamReader, IOException}
import java.lang.management.ManagementFactory
import java.math.{MathContext, RoundingMode}
import java.net._
import java.nio.charset.StandardCharsets
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.text.SimpleDateFormat
import java.util.{Locale, Properties, UUID}
import java.util
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@

<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.3</version>
<version>${project.version}</version>
<packaging>pom</packaging>

<name>Aliyun E-MapReduce Shuffle Service Project Parent POM</name>

<properties>
<project.version>0.1.4</project.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
Expand Down
2 changes: 1 addition & 1 deletion server-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>com.aliyun.emr</groupId>
<artifactId>remote-shuffle-service</artifactId>
<version>0.1.3</version>
<version>${project.version}</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion server-master/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>remote-shuffle-service</artifactId>
<groupId>com.aliyun.emr</groupId>
<version>0.1.3</version>
<version>${project.version}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion server-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>remote-shuffle-service</artifactId>
<groupId>com.aliyun.emr</groupId>
<version>0.1.3</version>
<version>${project.version}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.aliyun.emr.rss.service.deploy.worker

import java.io.IOException
import java.util.{ArrayList => jArrayList, List => jList, HashMap => jHashMap}
import java.util.{ArrayList => jArrayList, HashMap => jHashMap, List => jList}
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.function.BiFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private[worker] final class LocalStorageManager(

@throws[IOException]
def createWriter(appId: String, shuffleId: Int, location: PartitionLocation,
splitThreshold: Long, splitMode: PartitionSplitMode,rangeReadFilter: Boolean): FileWriter = {
splitThreshold: Long, splitMode: PartitionSplitMode, rangeReadFilter: Boolean): FileWriter = {
if (!hasAvailableWorkingDirs()) {
throw new IOException("No available working dirs!")
}
Expand Down

0 comments on commit 5e8d3ef

Please sign in to comment.