Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1715] RemoteShuffleMaster should check celeborn.client.push.replicate.enabled in constructor #2911

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescriptor> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteShuffleMaster.class);
private final CelebornConf conf;
private final ShuffleMasterContext shuffleMasterContext;
// Flink JobId -> Celeborn register shuffleIds
private final Map<JobID, Set<Integer>> jobShuffleIds = JavaUtils.newConcurrentHashMap();
Expand All @@ -64,7 +65,9 @@ public class RemoteShuffleMaster implements ShuffleMaster<RemoteShuffleDescripto

public RemoteShuffleMaster(
ShuffleMasterContext shuffleMasterContext, ResultPartitionAdapter resultPartitionDelegation) {
checkShuffleConfig(shuffleMasterContext.getConfiguration());
Configuration configuration = shuffleMasterContext.getConfiguration();
checkShuffleConfig(configuration);
this.conf = FlinkUtils.toCelebornConf(configuration);
this.shuffleMasterContext = shuffleMasterContext;
this.resultPartitionDelegation = resultPartitionDelegation;
this.lifecycleManagerTimestamp = System.currentTimeMillis();
Expand All @@ -78,27 +81,18 @@ public void registerJob(JobShuffleContext context) {
if (lifecycleManager == null) {
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
LOG.info("CelebornAppId: {}", celebornAppId);
CelebornConf celebornConf =
FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
// if not set, set to true as default for flink
celebornConf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true);
lifecycleManager = new LifecycleManager(celebornAppId, celebornConf);
if (celebornConf.clientPushReplicateEnabled()) {
shuffleMasterContext.onFatalError(
new RuntimeException("Currently replicate shuffle data is unsupported for flink."));
return;
}
lifecycleManager = new LifecycleManager(celebornAppId, conf);
this.shuffleResourceTracker = new ShuffleResourceTracker(executor, lifecycleManager);
}
}
}

Set<Integer> previousShuffleIds = jobShuffleIds.putIfAbsent(jobID, new HashSet<>());
LOG.info("Register job: {}.", jobID);
shuffleResourceTracker.registerJob(context);
if (previousShuffleIds != null) {
throw new RuntimeException("Duplicated registration job: " + jobID);
}
shuffleResourceTracker.registerJob(context);
}

@Override
Expand Down Expand Up @@ -212,7 +206,6 @@ public MemorySize computeShuffleMemorySizeForTask(
}

int numResultPartitions = taskInputsOutputsDescriptor.getSubpartitionNums().size();
CelebornConf conf = FlinkUtils.toCelebornConf(shuffleMasterContext.getConfiguration());
long numBytesPerPartition = conf.clientFlinkMemoryPerResultPartition();
long numBytesForOutput = numBytesPerPartition * numResultPartitions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ public class FlinkUtils {
"remote-shuffle.job.compression.codec");

public static CelebornConf toCelebornConf(Configuration configuration) {
if (Boolean.parseBoolean(
SteNicholas marked this conversation as resolved.
Show resolved Hide resolved
configuration.getString(
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(),
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().defaultValueString()))) {
throw new IllegalArgumentException(
String.format(
"Flink does not support replicate shuffle data. Please check the config %s.",
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()));
}
CelebornConf tmpCelebornConf = new CelebornConf();
Map<String, String> confMap = configuration.toMap();
for (Map.Entry<String, String> entry : confMap.entrySet()) {
Expand All @@ -52,7 +61,10 @@ public static CelebornConf toCelebornConf(Configuration configuration) {
}
}

return tmpCelebornConf;
// The default value of this config option is false. If set to true, Celeborn will use local
// allocated workers as candidate being checked workers, this is more useful for map partition
// to regenerate the lost data. So if not set, set to true as default for flink.
return tmpCelebornConf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true);
}

public static String toCelebornAppId(long lifecycleManagerTimestamp, JobID jobID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ public void testInvalidShuffleConfig() {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
Configuration configuration = new Configuration();
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), "true");
Assert.assertThrows(
String.format(
"Flink does not support replicate shuffle data. Please check the config %s.",
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
IllegalArgumentException.class,
() -> createShuffleMaster(configuration));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ public void testInvalidShuffleConfig() {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
Configuration configuration = new Configuration();
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), "true");
Assert.assertThrows(
String.format(
"Flink does not support replicate shuffle data. Please check the config %s.",
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
IllegalArgumentException.class,
() -> createShuffleMaster(configuration));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ public void testInvalidShuffleConfig() {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
Configuration configuration = new Configuration();
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), "true");
Assert.assertThrows(
String.format(
"Flink does not support replicate shuffle data. Please check the config %s.",
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
IllegalArgumentException.class,
() -> createShuffleMaster(configuration));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ public void testInvalidShuffleConfig() {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
Configuration configuration = new Configuration();
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), "true");
Assert.assertThrows(
String.format(
"Flink does not support replicate shuffle data. Please check the config %s.",
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
IllegalArgumentException.class,
() -> createShuffleMaster(configuration));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ public void testInvalidShuffleConfig() {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
Configuration configuration = new Configuration();
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), "true");
Assert.assertThrows(
String.format(
"Flink does not support replicate shuffle data. Please check the config %s.",
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
IllegalArgumentException.class,
() -> createShuffleMaster(configuration));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ public void testInvalidShuffleConfig() {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
Configuration configuration = new Configuration();
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), "true");
Assert.assertThrows(
String.format(
"Flink does not support replicate shuffle data. Please check the config %s.",
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
IllegalArgumentException.class,
() -> createShuffleMaster(configuration));
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ public void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler) {
if (lifecycleManager == null) {
celebornAppId = FlinkUtils.toCelebornAppId(lifecycleManagerTimestamp, jobID);
LOG.info("CelebornAppId: {}", celebornAppId);
// The default value of this config option is false. If set to true, Celeborn will use
// local allocated workers as candidate being checked workers, this is more useful for
// map partition to regenerate the lost data. So if not set, set to true as default for
// flink.
conf.setIfMissing(CelebornConf.CLIENT_CHECKED_USE_ALLOCATED_WORKERS(), true);
lifecycleManager = new LifecycleManager(celebornAppId, conf);
this.shuffleResourceTracker = new ShuffleResourceTracker(executor, lifecycleManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@ public void testInvalidShuffleConfig() {
.set(
ExecutionOptions.BATCH_SHUFFLE_MODE,
BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
Configuration configuration = new Configuration();
configuration.setString(CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key(), "true");
Assert.assertThrows(
String.format(
"Flink does not support replicate shuffle data. Please check the config %s.",
CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED().key()),
IllegalArgumentException.class,
() -> createShuffleMaster(configuration));
}

@After
Expand Down
Loading