Skip to content

Commit

Permalink
Advance the strategy to render the sub exchangis job.
Browse files Browse the repository at this point in the history
  • Loading branch information
Davidhua1996 committed Nov 12, 2024
1 parent b077372 commit 8ffa8af
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.webank.wedatasphere.exchangis.job.domain;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.webank.wedatasphere.exchangis.common.util.json.Json;
import com.webank.wedatasphere.exchangis.job.vo.ExchangisJobVo;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -27,7 +30,8 @@ public class ExchangisJobInfo extends GenericExchangisJob {
/**
* Job parameter map
*/
protected Map<String, String> jobParamsMap = new HashMap<>();
@JsonIgnore
protected Map<String, String> jobParamsMap;
/**
* Job description
*/
Expand Down Expand Up @@ -79,6 +83,26 @@ public String getJobParams() {

public void setJobParams(String jobParams) {
this.jobParams = jobParams;
this.jobParamsMap = null;
}

public Map<String, String> getJobParamsMap() {
if (null == this.jobParamsMap){
if (StringUtils.isNotBlank(this.jobParams)){
try {
this.jobParamsMap = Json.fromJson(this.jobParams, null);
return this.jobParamsMap;
} catch (Exception e){
// Ignore the exception
}
}
this.jobParamsMap = new HashMap<>();
}
return jobParamsMap;
}

public void setJobParamsMap(Map<String, String> jobParamsMap) {
this.jobParamsMap = jobParamsMap;
}

public String getJobDesc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public class SubExchangisJob extends GenericExchangisJob {
*/
protected List<Map<String, Object>> sinkSplits = new ArrayList<>();

/**
* JobParams
*/
protected final Map<String, Object> jobParams = new HashMap<>();

protected String sourceType;

protected String sinkType;
Expand Down Expand Up @@ -154,6 +159,11 @@ public void setSinkSplits(List<Map<String, Object>> sinkSplits) {
this.sinkSplits = sinkSplits;
}

public Map<String, Object> getJobParams() {
return jobParams;
}


/**
* Copy sub exchangis job
* @return job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -33,6 +34,11 @@ public JobParamSet(List<JobParam<?>> params){
this.jobParamStore.put(param.getStrKey(), newParam);
});
}

public void forEach(BiConsumer<String, JobParam<?>> action){
jobParamStore.forEach(action);
}

public JobParamSet add(JobParamDefine<?> jobParamDefine){
return add(prepare(jobParamDefine, this));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class DateTool {
static final String TIME_PLACEHOLDER_TIMESTAMP = "timestamp";
static final String MONTH_BEGIN_SYMBOL = "run_month_begin";
static final String MONTH_END_SYMBOL = "run_month_end";
static final String[] HOUR_SPEC_SYMBOLS = new String[]{"yyyyMMdd", "yyyy-MM-dd", "HH"};
static final String[] HOUR_SPEC_SYMBOLS = new String[]{"yyyyMMddHH", "yyyy-MM-dd-HH", "HH"};
static final String FORMAT_STD_SYMBOL = "_std";
static final String FORMAT_UTC_SYMBOL = "_utc";
public static final String[] TIME_PLACEHOLDER = new String[]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,10 @@ private static void replaceParameters(Map<String, String> map, String key,
}
}

public static String renderDt(String template, Calendar calendar){
long time = calendar.getTimeInMillis();
public static String renderDt(String template, long time){
if(template==null){
return null;
}
Date date =new Date();

Matcher matcher= DateTool.TIME_REGULAR_PATTERN.matcher(template);
while(matcher.find()){
try {
Expand All @@ -109,28 +106,18 @@ public static String renderDt(String template, Calendar calendar){
String symbol = matcher.group(1);
boolean spec = false;
if (null != symbol) {
String startTime = null;
String tempTime = null;
for(String specSymbol : DateTool.HOUR_SPEC_SYMBOLS){
if(specSymbol.equals(symbol)){
tempTime = dataTool.format(specSymbol);
startTime = template.replace(m, tempTime);
return startTime;
spec = true;
break;
}
}
if(!spec) {
if (DateTool.MONTH_BEGIN_SYMBOL.equals(symbol)) {
dataTool.getMonthBegin(0);
} else if (DateTool.MONTH_END_SYMBOL.equals(symbol)) {
dataTool.getMonthEnd(0);
} else if (DateTool.TIME_PLACEHOLDER_TIMESTAMP.equals(symbol)){
calendar.setTime(date);
calendar.add(Calendar.DAY_OF_MONTH, 0);
tempTime = String.valueOf(calendar.getTimeInMillis());
startTime = template.replace(m, tempTime);
return startTime;
}
else {
} else {
dataTool.addDay(-1);
}
}
Expand Down Expand Up @@ -185,12 +172,16 @@ public static String renderDt(String template, Calendar calendar){
* @return string
*/
public static String replaceVariable(String source, Map<String, Object> variables){
return replaceVariable(source, variables, Calendar.getInstance().getTimeInMillis());
}

public static String replaceVariable(String source, Map<String, Object> variables, long time){
String result = source;
if (StringUtils.isNotBlank(result)){
result = VariableUtils.replace(MARKER_HEAD + source, variables).substring(MARKER_HEAD.length());
if (StringUtils.isNotBlank(result)){
// Render again
result = renderDt(result, Calendar.getInstance());
result = renderDt(result, time);
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.webank.wedatasphere.exchangis.job.launcher.domain;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.webank.wedatasphere.exchangis.common.util.json.Json;
import com.webank.wedatasphere.exchangis.job.domain.ExchangisJobInfo;
import com.webank.wedatasphere.exchangis.job.domain.GenericExchangisJob;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Job could be executed
Expand All @@ -26,6 +31,16 @@ public class LaunchableExchangisJob extends GenericExchangisJob {
*/
private String execUser;

/**
* Job params (JSON)
*/
protected String jobParams;

/**
* Job parameter map
*/
@JsonIgnore
private Map<String, String> jobParamsMap;

private List<LaunchableExchangisTask> launchableExchangisTasks = new ArrayList<>();

Expand Down Expand Up @@ -60,4 +75,32 @@ public String getExecUser() {
public void setExecUser(String execUser) {
this.execUser = execUser;
}

public String getJobParams() {
return jobParams;
}

public void setJobParams(String jobParams) {
this.jobParams = jobParams;
this.jobParamsMap = null;
}

public Map<String, String> getJobParamsMap() {
if (null == this.jobParamsMap){
if (StringUtils.isNotBlank(this.jobParams)){
try {
this.jobParamsMap = Json.fromJson(this.jobParams, null);
return this.jobParamsMap;
} catch (Exception e){
// Ignore the exception
}
}
this.jobParamsMap = new HashMap<>();
}
return jobParamsMap;
}

public void setJobParamsMap(Map<String, String> jobParamsMap) {
this.jobParamsMap = jobParamsMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.webank.wedatasphere.exchangis.job.server.mapper.JobTransformProcessorDao;
import com.webank.wedatasphere.exchangis.job.server.render.transform.TransformTypes;
import com.webank.wedatasphere.exchangis.job.server.render.transform.processor.TransformProcessor;
import com.webank.wedatasphere.exchangis.job.utils.JobUtils;
import com.webank.wedatasphere.exchangis.utils.SpringContextHolder;
import org.apache.commons.lang.StringUtils;
import org.apache.linkis.common.exception.ErrorException;
Expand Down Expand Up @@ -92,7 +93,9 @@ public TransformExchangisJob buildJob(ExchangisJobInfo inputJob, TransformExchan
inputJob.getId(), inputJob.getName(), contents.size());
//Second to new SubExchangisJob instances
List<SubExchangisJob> subExchangisJobs = contents.stream().map(job -> {
TransformExchangisJob.TransformSubExchangisJob transformSubJob = new TransformExchangisJob.TransformSubExchangisJob(job);
// Put the params of job info
TransformExchangisJob.TransformSubExchangisJob transformSubJob =
new TransformExchangisJob.TransformSubExchangisJob(job, inputJob.getJobParamsMap());
transformSubJob.setId(inputJob.getId());
transformSubJob.setCreateUser(outputJob.getCreateUser());
setTransformCodeResource(transformSubJob);
Expand Down Expand Up @@ -127,6 +130,8 @@ public TransformExchangisJob buildJob(ExchangisJobInfo inputJob, TransformExchan
//TODO Handle the subExchangisJob parallel
handledJobs.addAll(doHandle(sourceHandler, sinkHandler, subExchangisJob, ctx));
}
// Render the sub job
renderJobs(handledJobs);
// Final reset the sub exchangis jobs
outputJob.setSubJobSet(handledJobs);
}else{
Expand Down Expand Up @@ -198,18 +203,50 @@ private void doSplitHandle(SubExchangisJob subExchangisJob,
for (Map<String, Object> splitPart : splits){
SubExchangisJob copy = subExchangisJob.copy();
JobParamSet copyParamSet = copy.getRealmParams(splitRealm);
Map<String, Object> jobParams = copy.getJobParams();
for (Map.Entry<String, Object> entry : splitPart.entrySet()){
// If it is mapping key, overwrite the param value
if (mappingParams.containsKey(entry.getKey())){
Optional.ofNullable(copyParamSet.get(entry.getKey()))
.ifPresent(param -> param.setValue(entry.getValue()));
}
jobParams.put(entry.getKey(), entry.getKey());
}
// Add to the handled job list
handledJobs.add(copy);
}
}
}

/**
* Render ${xxx} in jobs (include time placeholder)
* @param jobs jobs
*/
@SuppressWarnings("unchecked")
private void renderJobs(List<SubExchangisJob> jobs){
Calendar calendar = Calendar.getInstance();
for (SubExchangisJob subJob : jobs){
Map<String, Object> jobParams = subJob.getJobParams();
Object dateTime = jobParams.remove(JobParamConstraints.EXTRA_SUBMIT_DATE);
long time = Objects.nonNull(dateTime)? Long.parseLong(String.valueOf(dateTime)) : calendar.getTimeInMillis();
subJob.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SOURCE)
.forEach((key, param) -> {
Object value = param.getValue();
if (value instanceof String){
((JobParam<String>)param)
.setValue(JobUtils.replaceVariable((String)value, jobParams, time));
}
});
subJob.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SINK)
.forEach((key, param) -> {
Object value = param.getValue();
if (value instanceof String){
((JobParam<String>)param)
.setValue(JobUtils.replaceVariable((String)value, jobParams, time));
}
});
}
}
/**
* Set the code resource to transform job
* @param subExchangisJob sub transform job
Expand Down Expand Up @@ -282,4 +319,5 @@ public void handleSink(SubExchangisJob subExchangisJob, ExchangisJobBuilderConte
}
}
}

}
Loading

0 comments on commit 8ffa8af

Please sign in to comment.