Skip to content

Commit

Permalink
Export / Retrieve Monitoring : Enable marking bulk tasks for export /…
Browse files Browse the repository at this point in the history
… retrieve #3201
  • Loading branch information
vrindanayak committed May 17, 2021
1 parent d02a748 commit 54d7426
Show file tree
Hide file tree
Showing 10 changed files with 454 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
public enum QueueMessageOperation {
CancelTasks,
RescheduleTasks,
DeleteTasks
DeleteTasks,
MarkTasksForScheduling
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ void rescheduleExportTask(Long pk, ExporterDescriptor exporter, HttpServletReque
void rescheduleExportTask(Long pk, ExporterDescriptor exporter, HttpServletRequestInfo httpServletRequestInfo,
QueueMessageEvent queueEvent, Date scheduledTime);

void markForExportTask(Long pk, String deviceName, ExporterDescriptor exporter,
HttpServletRequestInfo httpServletRequestInfo, Date scheduledTime);

int deleteTasks(TaskQueryParam queueTaskQueryParam, TaskQueryParam exportTaskQueryParam, int deleteTasksFetchSize);

List<String> listDistinctDeviceNames(TaskQueryParam exportTaskQueryParam);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,25 @@ private void rescheduleImmediately(ExportTask task, ExporterDescriptor exporter,
}
}

@Override
public void markForExportTask(
Long pk, String deviceName, ExporterDescriptor exporter, HttpServletRequestInfo httpServletRequestInfo,
Date scheduledTime) {
ExportTask task = em.find(ExportTask.class, pk);
if (task == null)
return;

LOG.info("Mark {} for export", task);
task.setExporterID(exporter.getExporterID());
task.setDeviceName(deviceName);
task.setScheduledTime(scheduledTime != null ? scheduledTime : new Date());
if (task.getQueueMessage() == null)
return;

queueManager.deleteTask(task.getQueueMessage().getMessageID(), null, false);
task.setQueueMessage(null);
}

@Override
public List<String> listDistinctDeviceNames(TaskQueryParam exportTaskQueryParam) {
return em.createQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ private Response rescheduleTasks(String newExporterID) {
return count(devName == null
? rescheduleOnDistinctDevices(newExporter, status)
: rescheduleTasks(newExporter,
newDeviceName != null
newDeviceName != null
? deviceName == null
? null : deviceName
: devName,
Expand Down Expand Up @@ -385,6 +385,101 @@ private int rescheduleTasks(ExporterDescriptor newExporter, String devName, Queu
}
}

@POST
@Path("/mark4export")
@Produces("application/json")
public Response mark4Export() {
return mark4ExportTasks(null);
}

@POST
@Path("/mark4export/{ExporterID}")
@Produces("application/json")
public Response mark4Export(@PathParam("ExporterID") String newExporterID) {
return mark4ExportTasks(newExporterID);
}

private Response mark4ExportTasks(String newExporterID) {
logRequest();
QueueMessage.Status status = status();
if (status == null)
return errResponse("Missing query parameter: status", Response.Status.BAD_REQUEST);

if (status == QueueMessage.Status.TO_SCHEDULE)
return errResponse("Cannot mark tasks for export with status: " + status, Response.Status.FORBIDDEN);

try {
ExporterDescriptor newExporter = null;
if (newExporterID != null)
newExporter = exporter(newExporterID);

String devName = newDeviceName != null ? newDeviceName : deviceName;
return count(devName == null
? mark4ExportOnDistinctDevices(newExporter, status)
: mark4ExportTasks(newExporter,
exportTaskQueryParam(newDeviceName != null
? deviceName == null
? null : deviceName
: devName,
updatedTime),
devName,
status));
} catch (IllegalStateException e) {
return errResponse(e.getMessage(), Response.Status.NOT_FOUND);
} catch (Exception e) {
return errResponseAsTextPlain(exceptionAsString(e), Response.Status.INTERNAL_SERVER_ERROR);
}
}

private int mark4ExportOnDistinctDevices(ExporterDescriptor newExporter, QueueMessage.Status status) {
List<String> distinctDeviceNames = mgr.listDistinctDeviceNames(exportTaskQueryParam(null, updatedTime));
int count = 0;
for (String devName : distinctDeviceNames)
count += mark4ExportTasks(newExporter,
exportTaskQueryParam(devName, updatedTime),
devName,
status);

return count;
}

private int mark4ExportTasks(ExporterDescriptor newExporter, TaskQueryParam exportTaskQueryParam, String devName,
QueueMessage.Status status) {
BulkQueueMessageEvent queueEvent = new BulkQueueMessageEvent(request, QueueMessageOperation.MarkTasksForScheduling);
try {
int markedForExport = 0;
int count;
int markForExportTasksFetchSize = queueTasksFetchSize();
HttpServletRequestInfo httpServletRequestInfo = HttpServletRequestInfo.valueOf(request);
Date scheduledTime = scheduledTime();
do {
List<Tuple> exportTasks = mgr.exportTaskPksAndExporterIDs(
queueTaskQueryParam(status), exportTaskQueryParam, markForExportTasksFetchSize);
exportTasks.forEach(exportTask -> {
long pk = (long) exportTask.get(0);
try {
mgr.markForExportTask(pk, devName,
newExporter != null ? newExporter : exporter((String) exportTask.get(1)),
httpServletRequestInfo,
scheduledTime);
} catch (Exception e) {
LOG.warn("Failed to mark task [pk={}] for export \n", pk, e);
}
});
count = exportTasks.size();
markedForExport += count;
} while (count >= markForExportTasksFetchSize);
queueEvent.setCount(markedForExport);
LOG.info("Marked {} tasks on device {} for export", markedForExport, devName);
return markedForExport;
} catch (Exception e) {
queueEvent.setException(e);
throw e;
} finally {
bulkQueueMsgEvent.fire(queueEvent);
}
}

@DELETE
@Path("/{taskPK}")
public Response deleteTask(@PathParam("taskPK") long pk) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ int scheduleRetrieveTask(int priority, ExternalRetrieveContext ctx, Date notRetr

void rescheduleRetrieveTask(Long pk, String newQueueName, QueueMessageEvent queueEvent, Date scheduledTime);

void markTaskForRetrieve(Long pk, String devName, String newQueueName, Date scheduledTime);

int deleteTasks(TaskQueryParam queueTaskQueryParam, TaskQueryParam retrieveTaskQueryParam, int deleteTasksFetchSize);

List<RetrieveBatch> listRetrieveBatches(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,23 @@ private void rescheduleImmediately(RetrieveTask task, QueueMessageEvent queueEve
}
}

public void markTaskForRetrieve(Long pk, String devName, String newQueueName, Date scheduledTime) {
RetrieveTask task = em.find(RetrieveTask.class, pk);
if (task == null)
return;

LOG.info("Mark {} for retrieve", task);
task.setScheduledTime(scheduledTime != null ? scheduledTime : new Date());
task.setDeviceName(devName);
if (newQueueName != null)
task.setQueueName(newQueueName);
if (task.getQueueMessage() == null)
return;

queueManager.deleteTask(task.getQueueMessage().getMessageID(), null, false);
task.setQueueMessage(null);
}

private Attributes toKeys(RetrieveTask task) {
int n = task.getSOPInstanceUID() != null ? 3 : task.getSeriesInstanceUID() != null ? 2 : 1;
Attributes keys = new Attributes(n + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public void rescheduleRetrieveTask(Long pk, String newQueueName, QueueMessageEve
ejb.rescheduleRetrieveTask(pk, newQueueName, queueEvent, scheduledTime);
}

@Override
public void markTaskForRetrieve(Long pk, String devName, String newQueueName, Date scheduledTime) {
ejb.markTaskForRetrieve(pk, devName, newQueueName, scheduledTime);
}

@Override
public int deleteTasks(TaskQueryParam queueTaskQueryParam, TaskQueryParam retrieveTaskQueryParam, int deleteTasksFetchSize) {
return ejb.deleteTasks(queueTaskQueryParam, retrieveTaskQueryParam, deleteTasksFetchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,124 @@ private Response rescheduleValidTasks(TaskQueryParam queueTaskQueryParam, TaskQu
: conflict(failed);
}

@POST
@Path("/mark4retrieve")
@Produces("application/json")
public Response markForRetrieveTasks() {
logRequest();
QueueMessage.Status status = status();
if (status == null)
return errResponse("Missing query parameter: status", Response.Status.BAD_REQUEST);

if (status == QueueMessage.Status.TO_SCHEDULE)
return errResponse("Cannot mark tasks for retrieve with status: " + status, Response.Status.FORBIDDEN);

try {
if (newQueueName != null && arcDev().getQueueDescriptor(newQueueName) == null)
return errResponse("No such Queue : " + newQueueName, Response.Status.NOT_FOUND);

String devName = newDeviceName != null ? newDeviceName : deviceName;
if (devName != null && !devName.equals(device.getDeviceName()))
return rsClient.forward(request, devName, "");

return newDeviceName != null
? markValidTasksForRetrieve(
queueTaskQueryParam(status), retrieveTaskQueryParam(null, updatedTime), newDeviceName)
: count(devName == null
? markForRetrieveOnDistinctDevices(status)
: markTasksForRetrieve(
queueTaskQueryParam(status), retrieveTaskQueryParam(devName, updatedTime), devName));
} catch (IllegalStateException e) {
return errResponse(e.getMessage(), Response.Status.NOT_FOUND);
} catch (Exception e) {
return errResponseAsTextPlain(exceptionAsString(e), Response.Status.INTERNAL_SERVER_ERROR);
}
}

private Response markValidTasksForRetrieve(TaskQueryParam queueTaskQueryParam, TaskQueryParam retrieveTaskQueryParam,
String devName) {
BulkQueueMessageEvent queueEvent = new BulkQueueMessageEvent(request, QueueMessageOperation.MarkTasksForScheduling);
int markedForRetrieve = 0;
int failed = 0;
try {
int count = 0;
int markForRetrieveTaskFetchSize = queueTasksFetchSize();
Date scheduledTime = scheduledTime();
do {
List<Tuple> retrieveTaskTuples = mgr.listRetrieveTaskPkAndLocalAETs(
queueTaskQueryParam, retrieveTaskQueryParam, markForRetrieveTaskFetchSize);
for (Tuple tuple : retrieveTaskTuples) {
Long retrieveTaskPk = (Long) tuple.get(0);
String localAET = (String) tuple.get(1);
try {
if (validateTaskAssociationInitiator(localAET, deviceCache.findDevice(devName))) {
mgr.markTaskForRetrieve(retrieveTaskPk, devName, newQueueName, scheduledTime);
count++;
}
} catch (ConfigurationException e) {
LOG.info("Validation of association initiator failed for Retrieve Task [pk={}, localAET={}] on device {} : {}",
retrieveTaskPk, localAET, devName, e.getMessage());
failed++;
}
}
markedForRetrieve += count;
} while (count >= markForRetrieveTaskFetchSize);
queueEvent.setCount(markedForRetrieve);
LOG.info("Marked {} tasks for retrieve on device {}", markedForRetrieve, devName);
} catch (Exception e) {
queueEvent.setException(e);
throw e;
} finally {
queueEvent.setFailed(failed);
bulkQueueMsgEvent.fire(queueEvent);
}

if (failed == 0)
return count(markedForRetrieve);

LOG.info("Failed to mark {} tasks for retrieve on device {}", failed, device.getDeviceName());
return markedForRetrieve > 0
? accepted(markedForRetrieve, failed)
: conflict(failed);
}

private int markForRetrieveOnDistinctDevices(QueueMessage.Status status) {
List<String> distinctDeviceNames = mgr.listDistinctDeviceNames(retrieveTaskQueryParam(null, updatedTime));
int count = 0;
for (String devName : distinctDeviceNames)
count += markTasksForRetrieve(
queueTaskQueryParam(status),
retrieveTaskQueryParam(devName, updatedTime),
devName);

return count;
}

private int markTasksForRetrieve(
TaskQueryParam queueTaskQueryParam, TaskQueryParam retrieveTaskQueryParam, String devName) {
BulkQueueMessageEvent bulkMsgQueueEvent = new BulkQueueMessageEvent(request, QueueMessageOperation.MarkTasksForScheduling);
try {
int markedForRetrieve = 0;
int markTasksForRetrieveFetchSize = queueTasksFetchSize();
Date scheduledTime = scheduledTime();
do {
List<Long> retrieveTaskPks = mgr.listRetrieveTaskPks(
queueTaskQueryParam, retrieveTaskQueryParam, markTasksForRetrieveFetchSize);
retrieveTaskPks.forEach(pk -> mgr.markTaskForRetrieve(pk, devName, newQueueName, scheduledTime));

markedForRetrieve += retrieveTaskPks.size();
} while (markedForRetrieve >= markTasksForRetrieveFetchSize);
bulkMsgQueueEvent.setCount(markedForRetrieve);
LOG.info("Marked {} tasks for retrieve on device {}", markedForRetrieve, devName);
return markedForRetrieve;
} catch (Exception e) {
bulkMsgQueueEvent.setException(e);
throw e;
} finally {
bulkQueueMsgEvent.fire(bulkMsgQueueEvent);
}
}

@DELETE
@Path("/{taskPK}")
public Response deleteTask(@PathParam("taskPK") long pk) {
Expand Down
9 changes: 9 additions & 0 deletions dcm4chee-arc-ui2/src/swagger/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,12 @@
"/monitor/export/reschedule/{newExporterID}": {
"$ref": "paths/monitor.json#/exportRescheduleExporter"
},
"/monitor/export/mark4export": {
"$ref": "paths/monitor.json#/mark4Export"
},
"/monitor/export/mark4export/{newExporterID}": {
"$ref": "paths/monitor.json#/mark4ExportOnExporter"
},
"/monitor/export/batch": {
"$ref": "paths/monitor.json#/exportBatch"
},
Expand All @@ -681,6 +687,9 @@
"/monitor/retrieve/reschedule": {
"$ref": "paths/monitor.json#/retrieveReschedule"
},
"/monitor/retrieve/mark4retrieve": {
"$ref": "paths/monitor.json#/mark4retrieve"
},
"/monitor/retrieve/batch": {
"$ref": "paths/monitor.json#/retrieveBatch"
},
Expand Down
Loading

0 comments on commit 54d7426

Please sign in to comment.