Skip to content

Commit

Permalink
Stash commit
Browse files Browse the repository at this point in the history
  • Loading branch information
jefftlin committed Aug 29, 2024
1 parent 51e0af5 commit 17246ad
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ public Message getDataSourceInfoById(HttpServletRequest request,
}
}
return message;

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public RateLimit selectOne(RateLimit rateLimit) {
}

public boolean rateLimit(ExchangisJobInfo jobInfo) throws RateLimitNoLeftException {
//todo 修改入参为rateLimitProvider 需要多少
List<RateLimit> rateLimits = getJobRateLimits(jobInfo);
if (!rateLimits.isEmpty()) {
List<RateLimitUsed> applyUsed = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.webank.wedatasphere.exchangis.datasource.utils;

import java.util.List;
import java.util.Map;

public interface RateLimitsProvider {

List<String> getRealms();

Map<String, String> getRateLimits(String realmId);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class ExchangisLauncherConfiguration {

public static final String LAUNCHER_LINKIS_STARTUP_PARAM_NAME = "startUpParams";

public static final String LAUNCHER_LINKIS_RATE_LIMIT_PARAM_NAME = "rateLimitParams";

public static final String LAUNCHER_LINKIS_REQUEST_MEMORY = "wds.linkis.engineconn.java.driver.memory";

public static final String LAUNCHER_LINKIS_RESOURCES = "wds.linkis.engineconn.${engine}.bml.resources";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public LaunchableExchangisTask buildJob(ExchangisEngineJob inputJob, LaunchableE
launchableTask.setLinkisContentMap(inputJob.getJobContent());
Map<String, Object> linkisParams = new HashMap<>();
Map<String, Object> startUpParams = new HashMap<>();
Map<String, Object> rateLimitParams = new HashMap<>();
linkisParams.put(LAUNCHER_LINKIS_STARTUP_PARAM_NAME, startUpParams);
try {
String customParamPrefix = PatternInjectUtils.inject(LAUNCHER_LINKIS_CUSTOM_PARAM_PREFIX, new String[]{engine});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.webank.wedatasphere.exchangis.common.config.GlobalConfiguration;
import com.webank.wedatasphere.exchangis.common.pager.PageResult;
import com.webank.wedatasphere.exchangis.datasource.core.utils.Json;
import com.webank.wedatasphere.exchangis.datasource.service.RateLimitService;
import com.webank.wedatasphere.exchangis.job.domain.content.ExchangisJobInfoContent;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo;
import com.webank.wedatasphere.exchangis.job.launcher.AccessibleLauncherTask;
Expand Down Expand Up @@ -103,6 +104,9 @@ public class DefaultJobExecuteService implements JobExecuteService {
@Resource
private MetricConverterFactory<ExchangisMetricsVo> metricConverterFactory;

@Resource
private RateLimitService rateLimitService;

/**
* Validators
*/
Expand Down Expand Up @@ -364,6 +368,8 @@ public String executeJob(String requestUser, ExchangisJobInfo jobInfo, String ex
schedulerTask.setTenancy(execUser);
LOG.info("Submit the generation scheduler task: [{}] for job: [{}], tenancy: [{}] to TaskExecution", jobExecutionId, jobInfo.getId(), execUser);
try {
// rate limit todo
rateLimitService.rateLimit(jobInfo);
taskExecution.submit(schedulerTask);
} catch (ExchangisSchedulerException e) {
throw new ExchangisJobServerException(JOB_EXCEPTION_CODE.getCode(), "Exception in submitting to taskExecution", e);
Expand Down

0 comments on commit 17246ad

Please sign in to comment.