nextEntry =
+ new AbstractMap.SimpleEntry<>(it.key(), it.value());
+ it.next();
+ return nextEntry;
+ }
+ return null;
+ }
+}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
index 9563fa9ec354f..2a4afa736224d 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java
@@ -22,9 +22,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.network.shuffledb.DB;
import org.apache.spark.network.shuffledb.DBBackend;
import org.apache.spark.network.shuffledb.LevelDB;
-import org.apache.spark.network.shuffledb.DB;
+import org.apache.spark.network.shuffledb.RocksDB;
import org.apache.spark.network.shuffledb.StoreVersion;
public class DBProvider {
@@ -34,11 +35,13 @@ public static DB initDB(
StoreVersion version,
ObjectMapper mapper) throws IOException {
if (dbFile != null) {
- // TODO: SPARK-38888, add rocksdb implementation.
switch (dbBackend) {
case LEVELDB:
org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version, mapper);
return levelDB != null ? new LevelDB(levelDB) : null;
+ case ROCKSDB:
+ org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile, version, mapper);
+ return rocksDB != null ? new RocksDB(rocksDB) : null;
default:
throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
}
@@ -49,11 +52,11 @@ public static DB initDB(
@VisibleForTesting
public static DB initDB(DBBackend dbBackend, File file) throws IOException {
if (file != null) {
- // TODO: SPARK-38888, add rocksdb implementation.
switch (dbBackend) {
case LEVELDB: return new LevelDB(LevelDBProvider.initLevelDB(file));
- default:
- throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
+ case ROCKSDB: return new RocksDB(RocksDBProvider.initRocksDB(file));
+ default:
+ throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
}
}
return null;
diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java
new file mode 100644
index 0000000000000..f1f702c44245a
--- /dev/null
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Objects;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import org.rocksdb.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.shuffledb.StoreVersion;
+
+/**
+ * RocksDB utility class available in the network package.
+ */
+public class RocksDBProvider {
+
+ static {
+ org.rocksdb.RocksDB.loadLibrary();
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(RocksDBProvider.class);
+
+ public static RocksDB initRockDB(File dbFile, StoreVersion version, ObjectMapper mapper) throws
+ IOException {
+ RocksDB tmpDb = null;
+ if (dbFile != null) {
+ BloomFilter fullFilter =
+ new BloomFilter(10.0D /* BloomFilter.DEFAULT_BITS_PER_KEY */, false);
+ BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()
+ .setFilterPolicy(fullFilter)
+ .setEnableIndexCompression(false)
+ .setIndexBlockRestartInterval(8)
+ .setFormatVersion(5);
+
+ Options dbOptions = new Options();
+ RocksDBLogger rocksDBLogger = new RocksDBLogger(dbOptions);
+
+ dbOptions.setCreateIfMissing(false);
+ dbOptions.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
+ dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION);
+ dbOptions.setTableFormatConfig(tableFormatConfig);
+ dbOptions.setLogger(rocksDBLogger);
+
+ try {
+ tmpDb = RocksDB.open(dbOptions, dbFile.toString());
+ } catch (RocksDBException e) {
+ if (e.getStatus().getCode() == Status.Code.NotFound) {
+ logger.info("Creating state database at " + dbFile);
+ dbOptions.setCreateIfMissing(true);
+ try {
+ tmpDb = RocksDB.open(dbOptions, dbFile.toString());
+ } catch (RocksDBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ } else {
+ // the RocksDB file seems to be corrupt somehow. Let's just blow it away and create
+ // a new one, so we can keep processing new apps
+ logger.error("error opening rocksdb file {}. Creating new file, will not be able to " +
+ "recover state for existing applications", dbFile, e);
+ if (dbFile.isDirectory()) {
+ for (File f : Objects.requireNonNull(dbFile.listFiles())) {
+ if (!f.delete()) {
+ logger.warn("error deleting {}", f.getPath());
+ }
+ }
+ }
+ if (!dbFile.delete()) {
+ logger.warn("error deleting {}", dbFile.getPath());
+ }
+ dbOptions.setCreateIfMissing(true);
+ try {
+ tmpDb = RocksDB.open(dbOptions, dbFile.toString());
+ } catch (RocksDBException dbExc) {
+ throw new IOException("Unable to create state store", dbExc);
+ }
+ }
+ }
+ try {
+ // if there is a version mismatch, we throw an exception, which means the service
+ // is unusable
+ checkVersion(tmpDb, version, mapper);
+ } catch (RocksDBException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ }
+ return tmpDb;
+ }
+
+ @VisibleForTesting
+ static RocksDB initRocksDB(File file) throws IOException {
+ BloomFilter fullFilter =
+ new BloomFilter(10.0D /* BloomFilter.DEFAULT_BITS_PER_KEY */, false);
+ BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()
+ .setFilterPolicy(fullFilter)
+ .setEnableIndexCompression(false)
+ .setIndexBlockRestartInterval(8)
+ .setFormatVersion(5);
+
+ Options dbOptions = new Options();
+ dbOptions.setCreateIfMissing(true);
+ dbOptions.setBottommostCompressionType(CompressionType.ZSTD_COMPRESSION);
+ dbOptions.setCompressionType(CompressionType.LZ4_COMPRESSION);
+ dbOptions.setTableFormatConfig(tableFormatConfig);
+ try {
+ return RocksDB.open(dbOptions, file.toString());
+ } catch (RocksDBException e) {
+ throw new IOException("Unable to open state store", e);
+ }
+ }
+
+ private static class RocksDBLogger extends org.rocksdb.Logger {
+ private static final Logger LOG = LoggerFactory.getLogger(RocksDBLogger.class);
+
+ RocksDBLogger(Options options) {
+ super(options);
+ }
+
+ @Override
+ protected void log(InfoLogLevel infoLogLevel, String message) {
+ if (infoLogLevel == InfoLogLevel.INFO_LEVEL) {
+ LOG.info(message);
+ }
+ }
+ }
+
+ /**
+ * Simple major.minor versioning scheme. Any incompatible changes should be across major
+ * versions. Minor version differences are allowed -- meaning we should be able to read
+ * dbs that are either earlier *or* later on the minor version.
+ */
+ public static void checkVersion(RocksDB db, StoreVersion newversion, ObjectMapper mapper) throws
+ IOException, RocksDBException {
+ byte[] bytes = db.get(StoreVersion.KEY);
+ if (bytes == null) {
+ storeVersion(db, newversion, mapper);
+ } else {
+ StoreVersion version = mapper.readValue(bytes, StoreVersion.class);
+ if (version.major != newversion.major) {
+ throw new IOException("cannot read state DB with version " + version + ", incompatible " +
+ "with current version " + newversion);
+ }
+ storeVersion(db, newversion, mapper);
+ }
+ }
+
+ public static void storeVersion(RocksDB db, StoreVersion version, ObjectMapper mapper)
+ throws IOException, RocksDBException {
+ db.put(StoreVersion.KEY, mapper.writeValueAsBytes(version));
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 07d3d3e077805..2d2f3c9428ac0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -718,7 +718,7 @@ package object config {
private[spark] val SHUFFLE_SERVICE_DB_BACKEND =
ConfigBuilder(Constants.SHUFFLE_SERVICE_DB_BACKEND)
.doc("Specifies a disk-based store used in shuffle service local db. " +
- "Only LEVELDB is supported now.")
+ "LEVELDB or ROCKSDB.")
.version("3.4.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
index bc1d43d67330c..921175bd41038 100644
--- a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.{ExternalBlockHandler, ExternalShuffleBlockResolver}
import org.apache.spark.network.shuffle.TestShuffleDataContext
+import org.apache.spark.network.shuffledb.DBBackend
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.util.Utils
@@ -34,8 +35,7 @@ import org.apache.spark.util.Utils
* with #spark.shuffle.service.db.enabled = true or false
* Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false
*/
-@ExtendedLevelDBTest
-class ExternalShuffleServiceDbSuite extends SparkFunSuite {
+abstract class ExternalShuffleServiceDbSuite extends SparkFunSuite {
val sortBlock0 = "Hello!"
val sortBlock1 = "World!"
val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
@@ -48,6 +48,8 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
var blockHandler: ExternalBlockHandler = _
var blockResolver: ExternalShuffleBlockResolver = _
+ protected def shuffleDBBackend(): DBBackend
+
override def beforeAll(): Unit = {
super.beforeAll()
sparkConf = new SparkConf()
@@ -78,6 +80,7 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
def registerExecutor(): Unit = {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
+ sparkConf.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name())
externalShuffleService = new ExternalShuffleService(shuffleServiceConf, securityManager)
// external Shuffle Service start
@@ -99,6 +102,7 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
"shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
+ sparkConf.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name())
externalShuffleService = new ExternalShuffleService(shuffleServiceConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
@@ -143,3 +147,12 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
}
}
}
+
+@ExtendedLevelDBTest
+class ExternalShuffleServiceLevelDBSuite extends ExternalShuffleServiceDbSuite {
+ override protected def shuffleDBBackend(): DBBackend = DBBackend.LEVELDB
+}
+
+class ExternalShuffleServiceRocksDBSuite extends ExternalShuffleServiceDbSuite {
+ override protected def shuffleDBBackend(): DBBackend = DBBackend.ROCKSDB
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 632abd9f566fb..a07d4f76905a7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -40,7 +40,9 @@ import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config
+import org.apache.spark.internal.config.SHUFFLE_SERVICE_DB_BACKEND
import org.apache.spark.internal.config.Worker._
+import org.apache.spark.network.shuffledb.DBBackend
import org.apache.spark.resource.{ResourceAllocation, ResourceInformation}
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, WORKER_GPU_ID}
@@ -339,8 +341,14 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
}
test("WorkDirCleanup cleans app dirs and shuffle metadata when " +
- "spark.shuffle.service.db.enabled=true") {
- testWorkDirCleanupAndRemoveMetadataWithConfig(true)
+ "spark.shuffle.service.db.enabled=true, spark.shuffle.service.db.backend=RocksDB") {
+ testWorkDirCleanupAndRemoveMetadataWithConfig(true, DBBackend.ROCKSDB)
+ }
+
+ test("WorkDirCleanup cleans app dirs and shuffle metadata when " +
+ "spark.shuffle.service.db.enabled=true, spark.shuffle.service.db.backend=LevelDB") {
+ assume(!Utils.isMacOnAppleSilicon)
+ testWorkDirCleanupAndRemoveMetadataWithConfig(true, DBBackend.LEVELDB)
}
test("WorkDirCleanup cleans only app dirs when" +
@@ -348,8 +356,13 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
testWorkDirCleanupAndRemoveMetadataWithConfig(false)
}
- private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean): Unit = {
+ private def testWorkDirCleanupAndRemoveMetadataWithConfig(
+ dbCleanupEnabled: Boolean, shuffleDBBackend: DBBackend = null): Unit = {
val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString)
+ if (dbCleanupEnabled) {
+ assert(shuffleDBBackend != null)
+ conf.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend.name())
+ }
conf.set("spark.worker.cleanup.appDataTtl", "60")
conf.set("spark.shuffle.service.enabled", "true")
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4c85bc3ceebbc..1f173a7c64a56 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -848,6 +848,16 @@ The following extra configuration options are available when the shuffle service
would be a valid Java package or class name and not include spaces.
+
+ spark.shuffle.service.db.backend |
+ LEVELDB |
+
+ To specify the kind of disk-base store used in shuffle service state store, supports `LEVELDB` and `ROCKSDB` now
+ and `LEVELDB` as default value.
+ The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now.
+ |
+ 3.4.0 |
+
Please note that the instructions above assume that the default shuffle service name,
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 8d1d05fbbbe8e..559e3bca6c934 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -328,6 +328,16 @@ SPARK_WORKER_OPTS supports the following system properties:
3.0.0 |
+
+ spark.shuffle.service.db.backend |
+ LEVELDB |
+
+ When spark.shuffle.service.db.enabled is true, user can use this to specify the kind of disk-based
+ store used in shuffle service state store. This supports `LEVELDB` and `ROCKSDB` now and `LEVELDB` as default value.
+ The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now.
+ |
+ 3.4.0 |
+
spark.storage.cleanupFilesAfterExecutorExit |
true |
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala
index f0cb008321ae4..692980b96f208 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleAlternateNameConfigSuite.scala
@@ -82,3 +82,9 @@ class YarnShuffleAlternateNameConfigWithLevelDBBackendSuite
extends YarnShuffleAlternateNameConfigSuite {
override protected def dbBackend: DBBackend = DBBackend.LEVELDB
}
+
+@ExtendedYarnTest
+class YarnShuffleAlternateNameConfigWithRocksDBBackendSuite
+ extends YarnShuffleAlternateNameConfigSuite {
+ override protected def dbBackend: DBBackend = DBBackend.ROCKSDB
+}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 70c77a55c8698..80e014fd062ca 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -97,6 +97,12 @@ class YarnShuffleIntegrationWithLevelDBBackendSuite
override protected def dbBackend: DBBackend = DBBackend.LEVELDB
}
+@ExtendedYarnTest
+class YarnShuffleIntegrationWithRocksDBBackendSuite
+ extends YarnShuffleIntegrationSuite {
+ override protected def dbBackend: DBBackend = DBBackend.ROCKSDB
+}
+
/**
* Integration test for the external shuffle service with auth on.
*/
@@ -121,6 +127,11 @@ class YarnShuffleAuthWithLevelDBBackendSuite extends YarnShuffleAuthSuite {
override protected def dbBackend: DBBackend = DBBackend.LEVELDB
}
+@ExtendedYarnTest
+class YarnShuffleAuthWithRocksDBBackendSuite extends YarnShuffleAuthSuite {
+ override protected def dbBackend: DBBackend = DBBackend.ROCKSDB
+}
+
private object YarnExternalShuffleDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 9c0e1ec1ff126..16fa42056921e 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -88,6 +88,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
yarnConfig.set("spark.shuffle.push.server.mergedShuffleFileManagerImpl",
"org.apache.spark.network.shuffle.RemoteBlockPushResolver")
+ yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name())
recoveryLocalDir = Utils.createTempDir()
tempDir = Utils.createTempDir()
@@ -162,7 +163,6 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
createMergeManager: (TransportConf, File) => MergedShuffleFileManager): YarnShuffleService = {
val shuffleService = createYarnShuffleService(false)
val dBBackend = shuffleDBBackend()
- yarnConfig.set(SHUFFLE_SERVICE_DB_BACKEND.key, shuffleDBBackend().name())
val transportConf = new TransportConf("shuffle", new HadoopConfigProvider(yarnConfig))
val dbName = dBBackend.fileName(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME)
val testShuffleMergeManager = createMergeManager(
@@ -1115,3 +1115,7 @@ abstract class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
class YarnShuffleServiceWithLevelDBBackendSuite extends YarnShuffleServiceSuite {
override protected def shuffleDBBackend(): DBBackend = DBBackend.LEVELDB
}
+
+class YarnShuffleServiceWithRocksDBBackendSuite extends YarnShuffleServiceSuite {
+ override protected def shuffleDBBackend(): DBBackend = DBBackend.ROCKSDB
+}