Skip to content

Commit

Permalink
Merge branch 'dev-1.1.3-webank-job' into dev-1.1.3-webank
Browse files Browse the repository at this point in the history
  • Loading branch information
Davidhua1996 committed May 29, 2024
2 parents 77b6297 + f44823d commit 883341f
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class Json {
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
//Ignore unknown properties
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//Accept NaN
mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true);
//Cancel to scape no ascii
// mapper.configure(JsonWriteFeature.ESCAPE_NON_ASCII.mappedFeature(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class SubmitSchedulerTask extends AbstractExchangisSchedulerTask implemen

private static final Logger LOG = LoggerFactory.getLogger(SubmitSchedulerTask.class);

/**
* Submit parallel limit
*/
private static final AtomicInteger SUBMIT_PARALLEL = new AtomicInteger(0);

private LaunchableExchangisTask launchableExchangisTask;

private TaskManager<LaunchedExchangisTask> taskManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public LogResult logsFromPageAndPath(String logPath, LogQuery logQuery) {
LOG.info("have error information");
}
if (!line.contains("password")) {
logs.add(new String(line.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8));
logs.add(line);
}
readLine += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Message executeJob(@RequestBody(required = false) Map<String, Object> per
jobInfo.getExecuteUser() : loginUser);
result.data("jobExecutionId", jobExecutionId);
} catch (Exception e) {
String message;
String message;
if (Objects.nonNull(jobInfo)) {
message = "Error occur while executing job: [id: " + jobInfo.getId() + " name: " + jobInfo.getName() + "]";
result = Message.error(message + "(执行任务出错), reason: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import com.webank.wedatasphere.exchangis.job.server.service.JobExecuteService;
import com.webank.wedatasphere.exchangis.job.server.vo.*;
import org.apache.commons.lang.StringUtils;
import org.apache.linkis.common.conf.CommonVars;
import org.modelmapper.ModelMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.*;
Expand All @@ -49,6 +51,10 @@
public class DefaultJobExecuteService implements JobExecuteService {
private final static Logger LOG = LoggerFactory.getLogger(DefaultJobExecuteService.class);

private static final CommonVars<String> TASK_LOG_IGNORE_KEYS = CommonVars.apply(
"wds.exchangis.job.task.log.ignore-keys",
"service.DefaultManagerService,info.DefaultNodeHealthyInfoManager");

@Autowired
private LaunchedTaskDao launchedTaskDao;

Expand Down Expand Up @@ -89,6 +95,18 @@ public class DefaultJobExecuteService implements JobExecuteService {
@Resource
private MetricConverterFactory<ExchangisMetricsVo> metricConverterFactory;

/**
* Log ignore key set
*/
private final Set<String> logIgnoreKeySet = new HashSet<>();

@PostConstruct
public void init(){
String defaultIgnoreKeys = TASK_LOG_IGNORE_KEYS.getValue();
if (StringUtils.isNotBlank(defaultIgnoreKeys)){
logIgnoreKeySet.addAll(Arrays.asList(defaultIgnoreKeys.split(",")));
}
}
@Override
public List<ExchangisJobTaskVo> getExecutedJobTaskList(String jobExecutionId) throws ExchangisJobServerException{
List<LaunchedExchangisTaskEntity> launchedExchangisTaskEntities = launchedTaskDao.selectTaskListByJobExecutionId(jobExecutionId);
Expand Down Expand Up @@ -189,6 +207,16 @@ public ExchangisCategoryLogVo getJobLogInfo(String jobExecutionId, LogQuery logQ
public ExchangisCategoryLogVo getTaskLogInfo(String taskId, String jobExecutionId, LogQuery logQuery)
throws ExchangisJobServerException, ExchangisTaskLaunchException {
LaunchedExchangisTaskEntity launchedTaskEntity = this.launchedTaskDao.getLaunchedTaskEntity(taskId);
if (logIgnoreKeySet.size() > 0){
String ignoreKeys = logQuery.getIgnoreKeywords();
if (StringUtils.isNotBlank(ignoreKeys)){
Set<String> ignores = new HashSet<>(Arrays.asList(ignoreKeys.split(",")));
ignores.addAll(logIgnoreKeySet);
logQuery.setIgnoreKeywords(StringUtils.join(ignores, ","));
} else {
logQuery.setIgnoreKeywords(StringUtils.join(logIgnoreKeySet, ","));
}
}
if (Objects.isNull(launchedTaskEntity)){
return resultToCategoryLog(logQuery, new LogResult(0, false, new ArrayList<>()), TaskStatus.Inited);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public boolean subscribe(LaunchableExchangisTask task) {
if (Objects.isNull(jobEntity) || TaskStatus.isCompleted(jobEntity.getStatus())){
taskEntity.setStatus(jobEntity.getStatus());
this.launchedTaskDao.insertLaunchedTaskOrUpdate(taskEntity);
// TODO delete the launch able task

return false;
} else {
return this.launchedTaskDao.insertLaunchedTaskOrUpdate(taskEntity) == 1;
Expand Down

0 comments on commit 883341f

Please sign in to comment.