Skip to content

Commit

Permalink
[issue_1157][taier-scheduler] fix DataX sqlText replace #1157 (#1158)
Browse files Browse the repository at this point in the history
  • Loading branch information
vainhope authored Mar 15, 2024
1 parent 36bb1f4 commit a6f905b
Showing 1 changed file with 1 addition and 30 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
package com.dtstack.taier.scheduler.service;

import com.alibaba.fastjson.JSONObject;
import com.dtstack.taier.common.enums.EScheduleJobType;
import com.dtstack.taier.common.env.EnvironmentContext;
import com.dtstack.taier.common.exception.TaierDefineException;
import com.dtstack.taier.common.util.TaskParamsUtils;
import com.dtstack.taier.dao.domain.ScheduleJob;
import com.dtstack.taier.dao.domain.ScheduleTaskShade;
import com.dtstack.taier.dao.dto.ScheduleTaskParamShade;
import com.dtstack.taier.pluginapi.constrant.ConfigConstant;
import com.dtstack.taier.pluginapi.enums.EDeployMode;
import com.dtstack.taier.scheduler.PluginWrapper;
import com.dtstack.taier.scheduler.executor.DatasourceOperator;
import com.dtstack.taier.scheduler.server.pipeline.JobParamReplace;
import com.dtstack.taier.scheduler.utils.CreateJsonFileUtil;
import com.dtstack.taier.scheduler.utils.FileUtil;
import com.dtstack.taier.scheduler.utils.ScriptUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -35,15 +28,6 @@
@Service
public class DataxService {

@Autowired
private ClusterService clusterService;

@Autowired
private EnvironmentContext environmentContext;

@Autowired
private DatasourceOperator datasourceOperator;

@Autowired
private PluginWrapper pluginWrapper;

Expand Down Expand Up @@ -74,19 +58,6 @@ public void handDataxParams(Map<String, Object> actionParam, ScheduleTaskShade t
}
dealDataxExeParams(actionParam, task, scheduleJob, sqlText);
}
/**
* 将脚本上传到 hdfs
*
* @param sqlText
* @param task
* @param scheduleJob
* @return
*/
private String uploadToHdfs(String sqlText, ScheduleTaskShade task, ScheduleJob scheduleJob) {
JSONObject pluginInfo = clusterService.pluginInfoJSON(task.getTenantId(), task.getTaskType(), null, null, null);
String hdfsPath = environmentContext.getHdfsTaskPath() + (FileUtil.getUploadFileName(task.getTaskType(), scheduleJob.getJobId()));
return datasourceOperator.uploadToHdfs(pluginInfo, task.getTenantId(), sqlText, hdfsPath);
}

private void dealDataxExeParams(Map<String, Object> actionParam, ScheduleTaskShade task, ScheduleJob scheduleJob,
String sqlText) throws IOException {
Expand All @@ -112,7 +83,7 @@ private void dealDataxExeParams(Map<String, Object> actionParam, ScheduleTaskSha
throw new TaierDefineException("datax.local.path is null");
}
//生成datax的json文件
String taskTempPath = CreateJsonFileUtil.createJsonFile(task.getSqlText(), tempPath, task.getName());
String taskTempPath = CreateJsonFileUtil.createJsonFile(sqlText, tempPath, task.getName());
if (StringUtils.isBlank(taskTempPath)) {
throw new TaierDefineException("创建datax.json文件失败");
}
Expand Down

0 comments on commit a6f905b

Please sign in to comment.