Skip to content

Commit

Permalink
fix modify pir_task_submit param (#233)
Browse files Browse the repository at this point in the history
Co-authored-by: terrence <terrence>
  • Loading branch information
TerrenceGee authored Dec 25, 2023
1 parent 9d4a9bf commit b65be13
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,18 @@ public class PirController {
private PirService pirService;

@ApiOperation(value = "提交匿踪查询任务",httpMethod = "POST",consumes = MediaType.APPLICATION_JSON_VALUE)
@PostMapping(value = "pirSubmitTask",consumes = MediaType.APPLICATION_JSON_VALUE)
public BaseResultEntity pirSubmitTask(@RequestBody BaseJsonParam<DataPirReq> req){
DataPirReq param = req.getParam();
if (StringUtils.isBlank(param.getResourceId())){
@RequestMapping("pirSubmitTask")
public BaseResultEntity pirSubmitTask(String resourceId,String pirParam,String taskName){
if (StringUtils.isBlank(resourceId)){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"resourceId");
}
if (StringUtils.isBlank(param.getTaskName())){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"taskName");
}
if (param.getKeyQuerys() == null || param.getKeyQuerys().size()==0){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"keyQuerys");
if (StringUtils.isBlank(pirParam)){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"pirParam");
}
for (DataPirKeyQuery keyQuery : param.getKeyQuerys()) {
if (keyQuery.getKey()==null){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"keyQuerys.key");
}
if (keyQuery.getQuery()==null){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"keyQuerys.query");
}
// if (keyQuery.getKey().length!=keyQuery.getQuery().size()){
// return BaseResultEntity.failure(BaseResultEnum.PARAM_INVALIDATION,"The number of key queries is not equal");
// }
if (StringUtils.isBlank(taskName)){
return BaseResultEntity.failure(BaseResultEnum.LACK_OF_PARAM,"taskName");
}
return pirService.pirSubmitTask(param);
return pirService.pirSubmitTask(resourceId,pirParam,taskName);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
server-addr: 192.168.99.10:31048
namespace: dev1
nacos:
config:
server-addr: localhost:8848
server-addr: 192.168.99.10:31048
namespace: dev1
max-retry: 3
config-long-poll-timeout: 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
server-addr: 192.168.99.10:31048
namespace: dev2
nacos:
config:
server-addr: localhost:8848
server-addr: 192.168.99.10:31048
namespace: dev2
max-retry: 3
config-long-poll-timeout: 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,54 +356,31 @@ public TaskParam<TaskPIRParam> call() throws Exception {
}

@Async
public void pirGrpcTask(DataTask dataTask, DataPirTask dataPirTask,String resourceColumnNames) {
public void pirGrpcTask(DataTask dataTask, String resourceId, String param) {
Date date = new Date();
try {
dataTask.setTaskState(TaskStateEnum.IN_OPERATION.getStateType());
updateTaskState(dataTask);
String formatDate = DateUtil.formatDate(date, DateUtil.DateStyle.HOUR_FORMAT_SHORT.getFormat());
StringBuilder sb = new StringBuilder().append(baseConfiguration.getResultUrlDirPrefix()).append(formatDate).append("/").append(dataTask.getTaskIdName()).append(".csv");
dataTask.setTaskResultPath(sb.toString());
List<DataPirKeyQuery> dataPirKeyQueries = JSONArray.parseArray(dataPirTask.getRetrievalId(), DataPirKeyQuery.class);
Map<String,String> jobMap = new HashMap<>();
List<FutureTask<TaskParam<TaskPIRParam>>> futureTasks = new ArrayList<>();
for (int i = 0; i < dataPirKeyQueries.size(); i++) {
FutureTask<TaskParam<TaskPIRParam>> pirTaskFutureTask = getPirTaskFutureTask(dataPirKeyQueries.get(i), dataTask, dataPirTask, resourceColumnNames, formatDate, i);
primaryThreadPool.submit(pirTaskFutureTask);
futureTasks.add(pirTaskFutureTask);
jobMap.put(i+"",String.join("+",dataPirKeyQueries.get(i).getKey()));
}
List<TaskParam<TaskPIRParam>> listTaskParams = new ArrayList<>();
for (FutureTask<TaskParam<TaskPIRParam>> futureTask : futureTasks) {
listTaskParams.add(futureTask.get());
}
for (TaskParam<TaskPIRParam> listTaskParam : listTaskParams) {
if (dataTask.getTaskState().equals(TaskStateEnum.FAIL.getStateType())){
if (!listTaskParam.getSuccess()){
dataTask.setTaskErrorMsg("\n【"+jobMap.get(listTaskParam.getJobId())+"】匹配规则出错:"+listTaskParam.getError());
}
}
if (!listTaskParam.getSuccess()){
dataTask.setTaskState(TaskStateEnum.FAIL.getStateType());
dataTask.setTaskErrorMsg("\n【"+jobMap.get(listTaskParam.getJobId())+"】匹配规则出错:"+listTaskParam.getError());
}else {
dataTask.setTaskState(TaskStateEnum.SUCCESS.getStateType());
}
}
if (dataTask.getTaskState().equals(TaskStateEnum.SUCCESS.getStateType())){
List<String> pirTaskResultData = dataRedisRepository.getPirTaskResultData(dataTask.getTaskIdName());
// log.info("数据写入文件sb:{} - pirTaskResultDataSize:{}",sb.toString(),pirTaskResultData.size());
boolean b = CsvUtil.csvWrite(sb.toString(), pirTaskResultData);
// log.info("数据写入文件结果:{}",b);

TaskPIRParam pirParam = new TaskPIRParam();
pirParam.setQueryParam(param.split(","));
pirParam.setServerData(resourceId);
pirParam.setOutputFullFilename(dataTask.getTaskResultPath());
TaskParam taskParam = new TaskParam();
taskParam.setTaskContentParam(pirParam);
taskParam.setTaskId(dataTask.getTaskIdName());
taskHelper.submit(taskParam);
if (taskParam.getSuccess()){
dataTask.setTaskState(TaskStateEnum.SUCCESS.getStateType());
}else {
dataTask.setTaskState(TaskStateEnum.FAIL.getStateType());
dataTask.setTaskErrorMsg("运行失败:"+taskParam.getError());
}
} catch (Exception e) {
dataTask.setTaskState(TaskStateEnum.FAIL.getStateType());
dataTask.setTaskErrorMsg(e.getMessage());
log.info("grpc pirSubmitTask Exception:{}", e.getMessage());
e.printStackTrace();
}finally {
dataRedisRepository.deletePirTaskResultKey(dataTask.getTaskIdName());
}
dataTask.setTaskEndTime(System.currentTimeMillis());
updateTaskState(dataTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public class PirService {
public String getResultFilePath(String taskId,String taskDate){
return new StringBuilder().append(baseConfiguration.getResultUrlDirPrefix()).append(taskDate).append("/").append(taskId).append(".csv").toString();
}
public BaseResultEntity pirSubmitTask(DataPirReq req) {
BaseResultEntity dataResource = otherBusinessesService.getDataResource(req.getResourceId());
public BaseResultEntity pirSubmitTask(String resourceId, String pirParam,String taskName) {
BaseResultEntity dataResource = otherBusinessesService.getDataResource(resourceId);
if (dataResource.getCode()!=0) {
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"资源查询失败");
}
Expand All @@ -55,26 +55,22 @@ public BaseResultEntity pirSubmitTask(DataPirReq req) {
if (available == 1) {
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"资源不可用");
}
String resourceColumnNames = pirDataResource.getOrDefault("resourceColumnNameList", "").toString();
if (StringUtils.isBlank(resourceColumnNames)){
return BaseResultEntity.failure(BaseResultEnum.DATA_RUN_TASK_FAIL,"获取资源字段列表失败");
}
DataTask dataTask = new DataTask();
// dataTask.setTaskIdName(UUID.randomUUID().toString());
dataTask.setTaskIdName(Long.toString(SnowflakeId.getInstance().nextId()));
dataTask.setTaskName(req.getTaskName());
dataTask.setTaskName(taskName);
dataTask.setTaskState(TaskStateEnum.IN_OPERATION.getStateType());
dataTask.setTaskType(TaskTypeEnum.PIR.getTaskType());
dataTask.setTaskStartTime(System.currentTimeMillis());
dataTaskPrRepository.saveDataTask(dataTask);
DataPirTask dataPirTask = new DataPirTask();
dataPirTask.setTaskId(dataTask.getTaskId());
dataPirTask.setRetrievalId(JSONObject.toJSONString(req.getKeyQuerys()));
dataPirTask.setRetrievalId(pirParam);
dataPirTask.setProviderOrganName(pirDataResource.get("organName").toString());
dataPirTask.setResourceName(pirDataResource.get("resourceName").toString());
dataPirTask.setResourceId(req.getResourceId());
dataPirTask.setResourceId(resourceId);
dataTaskPrRepository.saveDataPirTask(dataPirTask);
dataAsyncService.pirGrpcTask(dataTask,dataPirTask,resourceColumnNames);
dataAsyncService.pirGrpcTask(dataTask,resourceId,pirParam);
Map<String, Object> map = new HashMap<>();
map.put("taskId",dataTask.getTaskId());
return BaseResultEntity.success(map);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ server:
port: 8081
nacos:
config:
server-addr: localhost:8848
server-addr: 192.168.99.10:31048
namespace: dev1
max-retry: 3
config-long-poll-timeout: 1000
Expand All @@ -17,7 +17,7 @@ spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
server-addr: 192.168.99.10:31048
namespace: dev1
logging:
level:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ server:
port: 8082
nacos:
config:
server-addr: localhost:8848
server-addr: 192.168.99.10:31048
namespace: dev2
max-retry: 3
config-long-poll-timeout: 1000
Expand All @@ -17,7 +17,7 @@ spring:
cloud:
nacos:
discovery:
server-addr: localhost:8848
server-addr: 192.168.99.10:31048
namespace: dev2
logging:
level:
Expand Down

0 comments on commit b65be13

Please sign in to comment.