Skip to content

Commit e83aedd

Browse files
LuciferYangMridul Muralidharan
authored andcommitted
[SPARK-38888][BUILD][CORE][YARN][DOCS] Add RocksDB support for shuffle service state store
### What changes were proposed in this pull request? This is a extended work of SPARK-38909, in this pr, the `RocksDB` implementation is added for shuffle local state store. This PR adds the following code: - `shuffledb.RocksDB` and `shuffledb.RocksDBIterator`: implementation of RocksDB corresponding to `shuffledb.DB` and `shuffledb.DBIterator` - Add `ROCKSDB` to shuffle.DBBackend and the corresponding file suffix is `.rdb` and the description of `SHUFFLE_SERVICE_DB_BACKEND` in also changed - Add `RocksDBProvider` to build `RocksDB` instance and extend `DBProvider` to produce corresponding instances - Add dependency of `rocksdbjni` to `network-common` module ### Why are the changes needed? Support shuffle local state store to use RocksDB ### Does this PR introduce _any_ user-facing change? When user configures `spark.shuffle.service.db.enabled` as true, the user can use rocksdb as the shuffle lcoal state store by specifying `SHUFFLE_SERVICE_DB_BACKEND(spark.shuffle.service.db.backend)` as `RocksDB` in `spark-default.conf` or `spark-shuffle-site.xml(for yarn)`. The original data store in `LevelDB/RocksDB` will not be automatically convert to another kind of storage now. ### How was this patch tested? Add new test. Closes apache#37610 from LuciferYang/SPARK-38888. Authored-by: yangjie01 <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
1 parent 6577c43 commit e83aedd

File tree

14 files changed

+424
-14
lines changed

14 files changed

+424
-14
lines changed

common/network-common/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@
8282
<artifactId>leveldbjni-all</artifactId>
8383
<version>1.8</version>
8484
</dependency>
85+
<dependency>
86+
<groupId>org.rocksdb</groupId>
87+
<artifactId>rocksdbjni</artifactId>
88+
</dependency>
8589

8690
<dependency>
8791
<groupId>com.fasterxml.jackson.core</groupId>

common/network-common/src/main/java/org/apache/spark/network/shuffledb/DBBackend.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121

2222
/**
2323
* The enum `DBBackend` use to specify a disk-based store used in shuffle service local db.
24-
* Only LEVELDB is supported now.
24+
* Support the use of LevelDB and RocksDB.
2525
*/
2626
public enum DBBackend {
27-
LEVELDB(".ldb");
27+
LEVELDB(".ldb"), ROCKSDB(".rdb");
2828

2929
private final String fileSuffix;
3030

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffledb;
19+
20+
import java.io.IOException;
21+
22+
import com.google.common.base.Throwables;
23+
import org.rocksdb.RocksDBException;
24+
25+
/**
26+
* RocksDB implementation of the local KV storage used to persist the shuffle state.
27+
*/
28+
public class RocksDB implements DB {
29+
private final org.rocksdb.RocksDB db;
30+
31+
public RocksDB(org.rocksdb.RocksDB db) {
32+
this.db = db;
33+
}
34+
35+
@Override
36+
public void put(byte[] key, byte[] value) {
37+
try {
38+
db.put(key, value);
39+
} catch (RocksDBException e) {
40+
throw Throwables.propagate(e);
41+
}
42+
}
43+
44+
@Override
45+
public byte[] get(byte[] key) {
46+
try {
47+
return db.get(key);
48+
} catch (RocksDBException e) {
49+
throw Throwables.propagate(e);
50+
}
51+
}
52+
53+
@Override
54+
public void delete(byte[] key) {
55+
try {
56+
db.delete(key);
57+
} catch (RocksDBException e) {
58+
throw Throwables.propagate(e);
59+
}
60+
}
61+
62+
@Override
63+
public DBIterator iterator() {
64+
return new RocksDBIterator(db.newIterator());
65+
}
66+
67+
@Override
68+
public void close() throws IOException {
69+
db.close();
70+
}
71+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.network.shuffledb;
19+
20+
import java.io.IOException;
21+
import java.util.AbstractMap;
22+
import java.util.Map;
23+
import java.util.NoSuchElementException;
24+
25+
import com.google.common.base.Throwables;
26+
import org.rocksdb.RocksIterator;
27+
28+
/**
29+
* RocksDB implementation of `DBIterator`.
30+
*/
31+
public class RocksDBIterator implements DBIterator {
32+
33+
private final RocksIterator it;
34+
35+
private boolean checkedNext;
36+
37+
private boolean closed;
38+
39+
private Map.Entry<byte[], byte[]> next;
40+
41+
public RocksDBIterator(RocksIterator it) {
42+
this.it = it;
43+
}
44+
45+
@Override
46+
public boolean hasNext() {
47+
if (!checkedNext && !closed) {
48+
next = loadNext();
49+
checkedNext = true;
50+
}
51+
if (!closed && next == null) {
52+
try {
53+
close();
54+
} catch (IOException ioe) {
55+
throw Throwables.propagate(ioe);
56+
}
57+
}
58+
return next != null;
59+
}
60+
61+
@Override
62+
public Map.Entry<byte[], byte[]> next() {
63+
if (!hasNext()) {
64+
throw new NoSuchElementException();
65+
}
66+
checkedNext = false;
67+
Map.Entry<byte[], byte[]> ret = next;
68+
next = null;
69+
return ret;
70+
}
71+
72+
@Override
73+
public void close() throws IOException {
74+
if (!closed) {
75+
it.close();
76+
closed = true;
77+
next = null;
78+
}
79+
}
80+
81+
@Override
82+
public void seek(byte[] key) {
83+
it.seek(key);
84+
}
85+
86+
private Map.Entry<byte[], byte[]> loadNext() {
87+
if (it.isValid()) {
88+
Map.Entry<byte[], byte[]> nextEntry =
89+
new AbstractMap.SimpleEntry<>(it.key(), it.value());
90+
it.next();
91+
return nextEntry;
92+
}
93+
return null;
94+
}
95+
}

common/network-common/src/main/java/org/apache/spark/network/util/DBProvider.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
import com.fasterxml.jackson.databind.ObjectMapper;
2323
import com.google.common.annotations.VisibleForTesting;
2424

25+
import org.apache.spark.network.shuffledb.DB;
2526
import org.apache.spark.network.shuffledb.DBBackend;
2627
import org.apache.spark.network.shuffledb.LevelDB;
27-
import org.apache.spark.network.shuffledb.DB;
28+
import org.apache.spark.network.shuffledb.RocksDB;
2829
import org.apache.spark.network.shuffledb.StoreVersion;
2930

3031
public class DBProvider {
@@ -34,11 +35,13 @@ public static DB initDB(
3435
StoreVersion version,
3536
ObjectMapper mapper) throws IOException {
3637
if (dbFile != null) {
37-
// TODO: SPARK-38888, add rocksdb implementation.
3838
switch (dbBackend) {
3939
case LEVELDB:
4040
org.iq80.leveldb.DB levelDB = LevelDBProvider.initLevelDB(dbFile, version, mapper);
4141
return levelDB != null ? new LevelDB(levelDB) : null;
42+
case ROCKSDB:
43+
org.rocksdb.RocksDB rocksDB = RocksDBProvider.initRockDB(dbFile, version, mapper);
44+
return rocksDB != null ? new RocksDB(rocksDB) : null;
4245
default:
4346
throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
4447
}
@@ -49,11 +52,11 @@ public static DB initDB(
4952
@VisibleForTesting
5053
public static DB initDB(DBBackend dbBackend, File file) throws IOException {
5154
if (file != null) {
52-
// TODO: SPARK-38888, add rocksdb implementation.
5355
switch (dbBackend) {
5456
case LEVELDB: return new LevelDB(LevelDBProvider.initLevelDB(file));
55-
default:
56-
throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
57+
case ROCKSDB: return new RocksDB(RocksDBProvider.initRocksDB(file));
58+
default:
59+
throw new IllegalArgumentException("Unsupported DBBackend: " + dbBackend);
5760
}
5861
}
5962
return null;

0 commit comments

Comments
 (0)