Skip to content

Commit

Permalink
Adjust params for tdsql datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
jefftlin committed May 29, 2024
1 parent 77be042 commit 77b6297
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 2 deletions.
5 changes: 5 additions & 0 deletions db/exchangis_dml.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ INSERT INTO `exchangis_job_param_config` (config_key,config_name,config_directio
,('writeMode','写入方式','SQOOP-SINK','MYSQL','OPTION','writeMode','写入方式','',1,'OPTION','["INSERT","UPDATE"]','INSERT','','','写入方式输入错误',0,0,'',1,'',1,'',1,NULL)
,('writeMode','写入方式','DATAX-SINK','MYSQL','OPTION','writeMode','写入方式','',1,'OPTION','["INSERT","UPDATE"]','INSERT','','','写入方式输入错误',0,0,'',1,'',1,'',1,NULL);

INSERT INTO `exchangis_job_param_config` (config_key,config_name,config_direction,`type`,ui_type,ui_field,ui_label,unit,required,value_type,value_range,default_value,validate_type,validate_range,validate_msg,is_hidden,is_advanced,source,`level`,treename,sort,description,status,ref_id) VALUES
('where','WHERE条件','SOURCE','TDSQL','INPUT','where','WHERE条件','',0,'VARCHAR','','','REGEX','^[\\s\\S]{0,500}$','WHERE条件输入过长',0,0,'',1,'',2,'',1,NULL);
,('writeMode','写入方式','SQOOP-SINK','TDSQL','OPTION','writeMode','写入方式','',1,'OPTION','["INSERT","UPDATE"]','INSERT','','','写入方式输入错误',0,0,'',1,'',1,'',1,NULL)
,('writeMode','写入方式','DATAX-SINK','TDSQL','OPTION','writeMode','写入方式','',1,'OPTION','["INSERT","UPDATE"]','INSERT','','','写入方式输入错误',0,0,'',1,'',1,'',1,NULL);

INSERT INTO `exchangis_job_param_config` (config_key,config_name,config_direction,`type`,ui_type,ui_field,ui_label,unit,required,value_type,value_range,default_value,validate_type,validate_range,validate_msg,is_hidden,is_advanced,source,`level`,treename,sort,description,status,ref_id) VALUES
('writeMode','写入方式','SQOOP-SINK','HIVE','OPTION','writeMode','写入方式(OVERWRITE只对TEXT类型表生效)','',1,'OPTION','["OVERWRITE","APPEND"]','OVERWRITE','','','写入方式输入错误',0,0,'',1,'',1,'',1,NULL)
,('partition','分区信息','SINK','HIVE','MAP','partition','分区信息(文本)','',0,'VARCHAR','','','REGEX','^[\\s\\S]{0,50}$','分区信息过长',0,0,'/api/rest_j/v1/dss/exchangis/main/datasources/render/partition/element/map',1,'',2,'',1,NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ public class JobParamConstraints {

public static final String PASSWORD = "password";

public static final String APP_ID = "appid";

public static final String OBJECT_ID = "objectid";

public static final String DK = "dk";

public static final String DATABASE = "database";

public static final String CONNECT_PARAMS = "params";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class DataxExchangisEngineJobBuilder extends AbstractResourceEngineJobBui
static{
//hive use hdfs plugin resource
PLUGIN_NAME_MAPPER.put("hive", "hdfs");
PLUGIN_NAME_MAPPER.put("tdsql", "mysql");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ protected void handleSinkColumns(SubExchangisJob subExchangisJob, ExchangisJobBu
*/
public abstract void handleJobSink(SubExchangisJob subExchangisJob, ExchangisJobBuilderContext ctx) throws ErrorException;

public void preHandleJobParamSet(JobParamSet paramSet) {
// Empty
}

/**
* Warn message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class MySQLDataxSubExchangisJobHandler extends AuthEnabledSubExchangisJob
@Override
public void handleJobSource(SubExchangisJob subExchangisJob, ExchangisJobBuilderContext ctx) throws ErrorException {
JobParamSet paramSet = subExchangisJob.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SOURCE);
preHandleJobParamSet(paramSet);
if (Objects.nonNull(paramSet)){
Arrays.asList(sourceMappings()).forEach(define -> paramSet.addNonNull(define.get(paramSet)));
paramSet.add(QUERY_SQL.newParam(subExchangisJob));
Expand All @@ -89,6 +90,7 @@ public void handleJobSource(SubExchangisJob subExchangisJob, ExchangisJobBuilder
@Override
public void handleJobSink(SubExchangisJob subExchangisJob, ExchangisJobBuilderContext ctx) throws ErrorException {
JobParamSet paramSet = subExchangisJob.getRealmParams(SubExchangisJob.REALM_JOB_CONTENT_SINK);
preHandleJobParamSet(paramSet);
if (Objects.nonNull(paramSet)){
Arrays.asList(sinkMappings()).forEach(define -> paramSet.addNonNull(define.get(paramSet)));
paramSet.add(SQL_COLUMN.newParam(subExchangisJob));
Expand All @@ -105,12 +107,12 @@ public boolean acceptEngine(String engineType) {
return "datax".equalsIgnoreCase(engineType);
}

private JobParamDefine<?>[] sourceMappings(){
protected JobParamDefine<?>[] sourceMappings(){
return new JobParamDefine[]{USERNAME, PASSWORD, SOURCE_DATABASE,
SOURCE_HOST, SOURCE_PORT, SOURCE_PARAMS_MAP};
}

public JobParamDefine<?>[] sinkMappings(){
protected JobParamDefine<?>[] sinkMappings(){
return new JobParamDefine[]{SINK_HOST, SINK_PORT, USERNAME, PASSWORD,
SINK_DATABASE, SINK_TABLE, SINK_PARAMS_MAP};
}
Expand Down

0 comments on commit 77b6297

Please sign in to comment.