Skip to content

Commit

Permalink
Enable thread amount config and pattern validation (#8)
Browse files Browse the repository at this point in the history
The number of threads per process are now configurable via environment variables, and the QBiC measurement ID is now format validated in the registration request to fail early.
  • Loading branch information
sven1103 authored May 15, 2024
1 parent 7944a41 commit f29d2ef
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 51 deletions.
19 changes: 11 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,14 @@ perform checksum validation. Feel free to use it as template for subsequent proc

### Evaluation

Last but not least, this step looks for any present QBiC measurement ID in the dataset name. If none
is given, the registration cannot be executed.
Last but not least, this step validates the QBiC measurement ID via a [configurable](#evaluation-step-config) regex pattern.

In this case the process moves the task directory into the user's home error folder. After the user
has
provided a valid QBiC measurement id, they can move the dataset into registration again.
In case of invalid measurement ID formats, the process moves the task directory into the user's home error folder.
After the user has provided a valid QBiC measurement id, they can move the dataset into registration again.

In case of a successful ID validation, the dataset will be moved to the configured destination folder.
If multiple destination folders are provided in the [configuration](#evaluation-step-config), the assignment of the next target directory is based
on a round-robin approach, to balance any downstream task load (e.g. openBIS dropbox registration).

## Configuration

Expand Down Expand Up @@ -218,7 +220,8 @@ finished tasks are moved to after successful operation.
#----------------
# Settings for the registration worker threads
#----------------
registration.threads=2
registration.threads=${REGISTRATION_THREADS:2}
registration.metadata.filename=metadata.txt
registration.working.dir=${WORKING_DIR:}
registration.target.dir=${PROCESSING_DIR:}
```
Expand All @@ -233,7 +236,7 @@ finished tasks are moved to after successful operation.
# Settings for the 1. processing step
# Proper packaging and provenance data, some simple checks
#------------------------------------
processing.threads=2
processing.threads=${PROCESSING_THREADS:2}
processing.working.dir=${PROCESSING_DIR}
processing.target.dir=${EVALUATION_DIR}
```
Expand All @@ -248,7 +251,7 @@ finished tasks are moved to after successful operation.
# Setting for the 2. processing step:
# Measurement ID evaluation
# ---------------------------------
evaluations.threads=2
evaluations.threads=${EVALUATION_THREADS:2}
evaluation.working.dir=${EVALUATION_DIR}
# Define one or more target directories here
# Example single target dir:
Expand Down
16 changes: 7 additions & 9 deletions src/main/java/life/qbic/data/processing/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,17 @@ RegistrationConfiguration registrationConfiguration(

@Bean
EvaluationWorkersConfig evaluationWorkersConfig(
@Value("${evaluations.threads}") int amountOfWorkers,
@Value("${evaluation.threads}") int amountOfWorkers,
@Value("${evaluation.working.dir}") String workingDirectory,
@Value("${evaluation.target.dirs}") String[] targetDirectory,
@Value("${evaluation.measurement-id.pattern}") String measurementIdPattern) {
return new EvaluationWorkersConfig(amountOfWorkers, workingDirectory,
measurementIdPattern, Arrays.stream(targetDirectory).toList());
@Value("${evaluation.target.dirs}") String[] targetDirectory) {
return new EvaluationWorkersConfig(amountOfWorkers, workingDirectory, Arrays.stream(targetDirectory).toList());
}

@Bean
EvaluationConfiguration evaluationConfiguration(EvaluationWorkersConfig evaluationWorkersConfig,
GlobalConfig globalConfig) {
return new EvaluationConfiguration(evaluationWorkersConfig.workingDirectory().toString(),
evaluationWorkersConfig.targetDirectories(),
evaluationWorkersConfig.measurementIdPattern().toString(), globalConfig);
evaluationWorkersConfig.targetDirectories(), globalConfig);
}

@Bean
Expand All @@ -86,8 +83,9 @@ ProcessingConfiguration processingConfiguration(ProcessingWorkersConfig processi
@Bean
GlobalConfig globalConfig(
@Value("${users.error.directory.name}") String usersErrorDirectoryName,
@Value("${users.registration.directory.name}") String usersRegistrationDirectoryName) {
return new GlobalConfig(usersErrorDirectoryName, usersRegistrationDirectoryName);
@Value("${users.registration.directory.name}") String usersRegistrationDirectoryName,
@Value("${qbic.measurement-id.pattern}") String measurementIdPattern) {
return new GlobalConfig(usersErrorDirectoryName, usersRegistrationDirectoryName, measurementIdPattern);
}

}
13 changes: 12 additions & 1 deletion src/main/java/life/qbic/data/processing/GlobalConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.regex.Pattern;

public class GlobalConfig {

private final Path usersErrorDirectoryName;

private final Path usersDirectoryRegistrationName;

public GlobalConfig(String usersErrorDirectoryName, String usersRegistrationDirectoryName) {
private final Pattern qbicMeasurementIdPattern;

public GlobalConfig(String usersErrorDirectoryName, String usersRegistrationDirectoryName, String qbicMeasurementIdPattern) {
if (usersErrorDirectoryName == null || usersErrorDirectoryName.isBlank()) {
throw new IllegalArgumentException("usersErrorDirectoryName cannot be null or empty");
}
if (usersRegistrationDirectoryName == null || usersRegistrationDirectoryName.isBlank()) {
throw new IllegalArgumentException("usersRegistrationDirectoryName cannot be null or empty");
}
if (qbicMeasurementIdPattern == null || qbicMeasurementIdPattern.isBlank()) {
throw new IllegalArgumentException("qbicMeasurementIdPattern cannot be null or empty");
}
this.usersErrorDirectoryName = Paths.get(usersErrorDirectoryName);
this.usersDirectoryRegistrationName = Paths.get(usersRegistrationDirectoryName);
this.qbicMeasurementIdPattern = Pattern.compile(qbicMeasurementIdPattern);
}

public Path usersErrorDirectory() {
Expand All @@ -28,4 +35,8 @@ public Path usersDirectoryRegistration() {
return this.usersDirectoryRegistrationName;
}

public Pattern qbicMeasurementIdPattern() {
return this.qbicMeasurementIdPattern;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ public class EvaluationWorkersConfig {
private final int threads;
private final Path workingDirectory;
private final Collection<Path> targetDirectories;
private final Pattern measurementIdPattern;

public EvaluationWorkersConfig(int threads, String workingDirectory, String measurementIdPattern,
public EvaluationWorkersConfig(int threads, String workingDirectory,
Collection<String> targetDirectories) {
if (threads < 1) {
throw new IllegalArgumentException(
Expand All @@ -32,10 +31,6 @@ public EvaluationWorkersConfig(int threads, String workingDirectory, String meas
throw new IllegalArgumentException(
"Evaluation target directory '%s' does not exist".formatted(path));
});
if (measurementIdPattern.isBlank()) {
throw new IllegalArgumentException("Measurement id pattern cannot be blank");
}
this.measurementIdPattern = Pattern.compile(measurementIdPattern);
}

public int threads() {
Expand All @@ -49,8 +44,4 @@ public Path workingDirectory() {
public Collection<Path> targetDirectories() {
return targetDirectories;
}

public Pattern measurementIdPattern() {
return measurementIdPattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ public class EvaluationConfiguration {

private final Path workingDirectory;
private final Collection<Path> targetDirectories;
private final Pattern measurementIdPattern;
private final Path usersErrorDirectory;
private final RoundRobinDraw<Path> targetDirectoriesRoundRobinDraw;

public EvaluationConfiguration(String workingDirectory, Collection<Path> targetDirectories,
String measurementIdPattern,
GlobalConfig globalConfig) {
this.workingDirectory = Paths.get(workingDirectory);
if (!this.workingDirectory.toFile().exists()) {
Expand All @@ -35,11 +33,7 @@ public EvaluationConfiguration(String workingDirectory, Collection<Path> targetD
"Evaluation target directory '%s' does not exist".formatted(path));
});
this.targetDirectoriesRoundRobinDraw = RoundRobinDraw.create(targetDirectories);
if (measurementIdPattern.isBlank()) {
throw new IllegalArgumentException("Measurement id pattern cannot be blank");
}
this.usersErrorDirectory = globalConfig.usersErrorDirectory();
this.measurementIdPattern = Pattern.compile(measurementIdPattern);
}

public Path workingDirectory() {
Expand All @@ -50,10 +44,6 @@ public RoundRobinDraw<Path> targetDirectories() {
return targetDirectoriesRoundRobinDraw;
}

public Pattern measurementIdPattern() {
return measurementIdPattern;
}

public Path usersErrorDirectory() {
return usersErrorDirectory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ public class EvaluationRequest extends Thread {
private final AtomicBoolean active = new AtomicBoolean(false);
private final AtomicBoolean terminated = new AtomicBoolean(false);
private final Path workingDirectory;
private final Pattern measurementIdPattern;
private final Path usersErrorDirectory;
private final RoundRobinDraw<Path> targetDirectories;
private Path assignedTargetDirectory;

public EvaluationRequest(Path workingDirectory, RoundRobinDraw<Path> targetDirectories,
Pattern measurementIdPattern, Path usersErrorDirectory) {
Path usersErrorDirectory) {
this.setName(THREAD_NAME.formatted(nextThreadNumber()));
this.workingDirectory = workingDirectory;
this.targetDirectories = targetDirectories;
this.measurementIdPattern = measurementIdPattern;
if (!workingDirectory.resolve(INTERVENTION_DIRECTORY).toFile().mkdir()
&& !workingDirectory.resolve(
INTERVENTION_DIRECTORY).toFile().exists()) {
Expand All @@ -73,7 +71,6 @@ public EvaluationRequest(Path workingDirectory, RoundRobinDraw<Path> targetDirec

public EvaluationRequest(EvaluationConfiguration evaluationConfiguration) {
this(evaluationConfiguration.workingDirectory(), evaluationConfiguration.targetDirectories(),
evaluationConfiguration.measurementIdPattern(),
evaluationConfiguration.usersErrorDirectory());
}

Expand Down Expand Up @@ -185,10 +182,6 @@ private boolean createMarkerFile(Path targetDirectory, String name) throws IOExc
return targetDirectory.resolve(markerFileName).toFile().createNewFile();
}

private Optional<File> findDataset(File taskDir) {
return Arrays.stream(taskDir.listFiles()).filter(File::isDirectory).findFirst();
}

private void moveToSystemIntervention(File taskDir, String reason) {
try {
var errorFile = taskDir.toPath().resolve("error.txt").toFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@
*/
public enum ErrorCode {
METADATA_FILE_NOT_FOUND,
INCOMPLETE_METADATA, FILE_NOT_FOUND, MISSING_FILE_ENTRY, IO_EXCEPTION
INCOMPLETE_METADATA, FILE_NOT_FOUND, MISSING_FILE_ENTRY, INVALID_MEASUREMENT_ID_FORMAT, IO_EXCEPTION
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import life.qbic.data.processing.ConcurrentRegistrationQueue;
import life.qbic.data.processing.GlobalConfig;
Expand Down Expand Up @@ -53,6 +54,7 @@ public class ProcessRegistrationRequest extends Thread {
private final Path targetDirectory;
private final String metadataFileName;
private final Path userErrorDirectory;
private final Pattern measurementIdPattern;
private AtomicBoolean active = new AtomicBoolean(false);

public ProcessRegistrationRequest(@NonNull ConcurrentRegistrationQueue registrationQueue,
Expand All @@ -63,6 +65,7 @@ public ProcessRegistrationRequest(@NonNull ConcurrentRegistrationQueue registrat
this.targetDirectory = configuration.targetDirectory();
this.metadataFileName = configuration.metadataFileName();
this.userErrorDirectory = globalConfig.usersErrorDirectory();
this.measurementIdPattern = globalConfig.qbicMeasurementIdPattern();
}

private static int nextThreadNumber() {
Expand Down Expand Up @@ -221,6 +224,8 @@ public void run() {
var registrationMetadata = findAndParseMetadata(workingTargetDir);
validateFileEntries(registrationMetadata, workingTargetDir);

validateMeasurementIds(registrationMetadata);

var aggregatedFilesByMeasurementId = registrationMetadata.stream().collect(
Collectors.groupingBy(RegistrationMetadata::measurementId));

Expand All @@ -244,6 +249,24 @@ public void run() {
}
}

private void validateMeasurementIds(List<RegistrationMetadata> registrationMetadata)
throws ValidationException {
registrationMetadata.stream().map(RegistrationMetadata::measurementId)
.filter(this::isMeasurementIdInvalid).findAny().ifPresent(invalidEntry -> {
throw new ValidationException(
"Invalid measurement ID format found: %s".formatted(invalidEntry),
ErrorCode.INVALID_MEASUREMENT_ID_FORMAT);
});
}

private boolean isMeasurementIdInvalid(String measurementId) {
return !isMeasurementIdValid(measurementId);
}

private boolean isMeasurementIdValid(String measurementId) {
return measurementIdPattern.matcher(measurementId).matches();
}

private void processAll(Map<String, List<RegistrationMetadata>> aggregatedFilesByMeasurementId,
Path workingTargetDir, RegistrationRequest request) throws IOException {
for (String measurementId : aggregatedFilesByMeasurementId.keySet()) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ users.error.directory.name=error
# Needs to be present in the users' home folders
# e.g. /home/<user1>/registration
users.registration.directory.name=registration
qbic.measurement-id.pattern=^(MS|NGS)Q[A-Z0-9]{4}[0-9]{3}[A-Z0-9]{2}-[0-9]*

#--------------------------------------
# Settings for the data scanning thread
Expand All @@ -30,7 +31,7 @@ scanner.interval=1000
#----------------
# Settings for the registration worker threads
#----------------
registration.threads=2
registration.threads=${REGISTRATION_THREADS:2}
registration.metadata.filename=metadata.txt
registration.working.dir=${WORKING_DIR:}
registration.target.dir=${PROCESSING_DIR:}
Expand All @@ -39,23 +40,22 @@ registration.target.dir=${PROCESSING_DIR:}
# Settings for the 1. processing step
# Proper packaging and provenance data, some simple checks
#------------------------------------
processing.threads=2
processing.threads=${PROCESSING_THREADS:2}
processing.working.dir=${PROCESSING_DIR}
processing.target.dir=${EVALUATION_DIR}

#----------------------------------
# Setting for the 2. processing step:
# Measurement ID evaluation
# ---------------------------------
evaluations.threads=2
evaluation.threads=${EVALUATION_THREADS:2}
evaluation.working.dir=${EVALUATION_DIR}
# Define one or more target directories here
# Example single target dir:
# evaluation.target.dirs=/my/example/target/dir
# Example multiple target dir:
# evaluation.target.dirs=/my/example/target/dir1,/my/example/target/dir2,/my/example/target/dir3
evaluation.target.dirs=${OPENBIS_ETL_DIRS}
evaluation.measurement-id.pattern=^(MS|NGS)Q[A-Z0-9]{4}[0-9]{3}[A-Z0-9]{2}-[0-9]*

# ----------------
# Logging settings
Expand Down

0 comments on commit f29d2ef

Please sign in to comment.