Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: experiments results file ingestion #488

Merged
merged 29 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
66aea76
temporal changes
noctillion Feb 1, 2024
d07a6db
Merge branch 'feat/exp-res-index-url' into features/experiment_drs_ha…
noctillion Feb 1, 2024
fa4c728
functional
noctillion Feb 14, 2024
50cbcec
Merge branch 'develop' into features/experiment_drs_harmonize
noctillion Feb 14, 2024
215d44b
define workflow
noctillion Feb 26, 2024
3f679fb
rename and clean logs
noctillion Feb 26, 2024
05ea3c7
remove file
noctillion Feb 26, 2024
c718aa1
Merge branch 'develop' into features/experiments_results_ingestion
noctillion Apr 19, 2024
76ce4e7
Merge branch 'develop' into features/experiments_results_ingestion
noctillion Apr 26, 2024
fdb02c8
change workflow name
noctillion Apr 29, 2024
2b11d1e
add trailing line
noctillion Apr 29, 2024
28f469e
change Gi to MB
noctillion Apr 29, 2024
e29e525
rename funcion
noctillion Apr 29, 2024
ee80986
add experiments_json_with_files wdl
noctillion Apr 29, 2024
8afaebb
change field name for ui
noctillion Apr 29, 2024
3d32aae
fix output type
noctillion Apr 30, 2024
a7b1351
Merge branch 'develop' into features/experiments_results_ingestion
noctillion May 8, 2024
32f512f
remove download external files
noctillion May 10, 2024
bd023d2
refactor write drs responses to a file
noctillion May 15, 2024
36820fc
refactor json response parsing
noctillion May 15, 2024
693115a
refactor function calls
noctillion May 15, 2024
96df3c5
rename variable and add comments
noctillion Jun 5, 2024
529a55f
add filter out vcf
noctillion Jun 12, 2024
d02ca64
add input for filter out vcf files for ingestion
noctillion Jun 12, 2024
eaa0c7e
lint
noctillion Jun 12, 2024
bc9ce13
lint
noctillion Jun 12, 2024
42fd9e3
Merge branch 'develop' into features/experiments_results_ingestion
noctillion Jun 12, 2024
8184b15
remove unused lune
noctillion Jun 18, 2024
eb02bb5
change key name
noctillion Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions chord_metadata_service/chord/workflows/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
__all__ = [
"WORKFLOW_PHENOPACKETS_JSON",
"WORKFLOW_EXPERIMENTS_JSON",
"WORKFLOW_EXPERIMENTS_JSON_WITH_FILES",
"WORKFLOW_FHIR_JSON",
"WORKFLOW_READSET",
"WORKFLOW_DOCUMENT",
Expand All @@ -18,6 +19,8 @@

WORKFLOW_PHENOPACKETS_JSON = "phenopackets_json"
WORKFLOW_EXPERIMENTS_JSON = "experiments_json"
WORKFLOW_EXPERIMENTS_JSON_WITH_FILES = "experiments_json_with_files"

WORKFLOW_FHIR_JSON = "fhir_json"
WORKFLOW_READSET = "readset"
WORKFLOW_DOCUMENT = "document"
Expand All @@ -31,7 +34,12 @@ def json_file_input(id_: str, required: bool = True):
return wm.WorkflowFileInput(id=id_, required=required, pattern=r"^.*\.json$")


def boolean_input(id_: str, required: bool = True):
return wm.WorkflowBooleanInput(id=id_, required=required, default="false")


DRS_URL_INPUT = wm.WorkflowServiceUrlInput(id="drs_url", service_kind="drs")
DIRECTORY_PATH_INPUT = wm.WorkflowDirectoryInput(id="directory")
KATSU_URL_INPUT = wm.WorkflowServiceUrlInput(id="katsu_url", service_kind="metadata")
PROJECT_DATASET_INPUT = wm.WorkflowProjectDatasetInput(id="project_dataset")
ACCESS_TOKEN_INPUT = wm.WorkflowSecretInput(id="access_token", key="access_token")
Expand Down Expand Up @@ -78,6 +86,28 @@ def json_file_input(id_: str, required: bool = True):
],
))

workflow_set.add_workflow(WORKFLOW_EXPERIMENTS_JSON_WITH_FILES, wm.WorkflowDefinition(
type="ingestion",
name="Bento Experiments JSON With Files",
description="This workflow ingests experiments and related files into DRS.",
data_type=DATA_TYPE_EXPERIMENT,
tags=[DATA_TYPE_EXPERIMENT, "experiment_result"],
file="experiments_json_with_files.wdl",
inputs=[
# injected
ACCESS_TOKEN_INPUT,
DRS_URL_INPUT,
KATSU_URL_INPUT,
VALIDATE_SSL_INPUT,
# user
PROJECT_DATASET_INPUT,
DIRECTORY_PATH_INPUT,
noctillion marked this conversation as resolved.
Show resolved Hide resolved
json_file_input("json_document"),
boolean_input("filter_out_vcf_files"),

],
))

workflow_set.add_workflow(WORKFLOW_READSET, wm.WorkflowDefinition(
type="ingestion",
name="Readset",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
version 1.0

workflow experiments_json_with_files {
input {
String directory
File json_document
String drs_url
String katsu_url
String project_dataset
Boolean filter_out_vcf_files = false
String access_token
Boolean validate_ssl
}

call prepare_files_list {
input:
json_document = json_document,
directory = directory,
filter_out_vcf_files = filter_out_vcf_files
}

call prepare_for_drs {
input:
json_path_list = prepare_files_list.path_list,
}

scatter (path in prepare_for_drs.consolidated_paths_for_drs) {
call post_to_drs {
input:
file_path = path,
drs_url = drs_url,
project_dataset = project_dataset,
token = access_token,
validate_ssl = validate_ssl
}
}

call write_drs_responses_to_file {
input:
drs_responses = post_to_drs.response_message
}

call parse_json {
input:
json_responses = write_drs_responses_to_file.results_post_drs
}

call update_experiment_json {
input:
json_document = json_document,
processed_drs_responses = parse_json.processed_drs_responses
}

call ingest_task {
input:
json_document = update_experiment_json.final_updated_json,
project_dataset = project_dataset,
katsu_url = katsu_url,
token = access_token,
validate_ssl = validate_ssl
}

output {
File download_list = prepare_files_list.path_list
Array[String] consolidated_paths_for_drs = prepare_for_drs.consolidated_paths_for_drs
Array[String] drs_responses = post_to_drs.response_message
File results_post_drs = write_drs_responses_to_file.results_post_drs
File processed_drs_responses = parse_json.processed_drs_responses
File final_updated_json = update_experiment_json.final_updated_json
}
}

task prepare_files_list {
input {
File json_document
String directory
String filter_out_vcf_files
}
command <<<
python3 -c "
import json
import os

directory = '~{directory}'
filter_vcf = '~{filter_out_vcf_files}'

with open('~{json_document}', 'r') as file:
data = json.load(file)

path_list = []
for experiment in data.get('experiments', []):
for result in experiment.get('experiment_results', []):
filename = result.get('filename', '')
file_found = False
is_vcf = filename.endswith('.vcf') or filename.endswith('.vcf.gz')

if filter_vcf and is_vcf:
continue

for root, dirs, files in os.walk(directory):
if filename in files:
file_found = True
file_path = os.path.join(root, filename)
path_list.append({'filename': filename, 'path': file_path})
break
if not file_found:
print(f'File not found for {filename}')

with open('path_list.json', 'w') as file:
json.dump(path_list, file, indent=4)
"
>>>
output {
File path_list = "path_list.json"
}
}

task prepare_for_drs {
input {
File json_path_list
}

command <<<
python3 -c "
import json

with open('~{json_path_list}', 'r') as file:
path_list = json.load(file)

consolidated_paths = [str(path['path']).strip() for path in path_list if str(path['path']).strip()]

print(json.dumps(consolidated_paths))

"
>>>
output {
Array[String] consolidated_paths_for_drs = read_json(stdout())
}
}

task post_to_drs {
input {
File file_path
String drs_url
String project_dataset
String token
Boolean validate_ssl
}
command <<<
project_id=$(python3 -c 'print("~{project_dataset}".split(":")[0])')
dataset_id=$(python3 -c 'print("~{project_dataset}".split(":")[1])')
curl ~{true="" false="-k" validate_ssl} \
-X POST \
-F "file=@~{file_path}" \
-F "project_id=$project_id" \
-F "dataset_id=$dataset_id" \
-H "Authorization: Bearer ~{token}" \
--fail-with-body \
"~{drs_url}/ingest"
>>>
output {
String response_message = read_string(stdout())
}
}

task write_drs_responses_to_file {
input {
Array[String] drs_responses
}

command <<<
python3 -c "
import json
# this is a temporary file to store the responses (str) from DRS
temporary_file_drs_responses = '~{write_json(drs_responses)}'
with open(temporary_file_drs_responses, 'r') as file:
drs_responses = json.load(file)

responses = []
for response in drs_responses:
response_corrected = json.loads(response)
noctillion marked this conversation as resolved.
Show resolved Hide resolved
responses.append(response_corrected)

with open('results_post_drs.json', 'w') as output_file:
json.dump(responses, output_file, indent=2)
"
>>>

output {
File results_post_drs = "results_post_drs.json"
}
}

task parse_json {
input {
File json_responses
}

command <<<
python3 -c "
import json

def parse_drs_response(file_path):
with open(file_path, 'r') as file:
data = json.load(file)

new_array = []
for item in data:
information = {
'name': item.get('name', ''),
'self_uri': item.get('self_uri', '')
}
new_array.append(information)

with open('processed_drs_responses.json', 'w') as outfile:
json.dump(new_array, outfile, indent=4)

parse_drs_response('~{json_responses}')
"
>>>
output {
File processed_drs_responses = "processed_drs_responses.json"
}
}

task update_experiment_json {
input {
File json_document
File processed_drs_responses
}
command <<<
python3 -c "
import json

with open('~{json_document}', 'r') as file:
data = json.load(file)

with open('~{processed_drs_responses}', 'r') as file:
drs_data = json.load(file)

def construct_drs_name_for_index(filename, format):
return filename + '.' + format.lower()

# Update the original JSON document with DRS URIs for both files and their indices
for experiment in data.get('experiments', []):
for result in experiment.get('experiment_results', []):
# Update primary file URL
for drs_response in drs_data:
if result['filename'] == drs_response['name']:
result['url'] = drs_response['self_uri']
break
# Update indices URLs if present
for index in result.get('indices', []):
expected_drs_name = construct_drs_name_for_index(result['filename'], index['format'])
for drs_response in drs_data:
if expected_drs_name == drs_response['name']:
index['url'] = drs_response['self_uri']
break

with open('final_updated_json.json', 'w') as file:
json.dump(data, file, indent=4)
"
>>>
output {
File final_updated_json = "final_updated_json.json"
}
}

task ingest_task {
input {
File json_document
String project_dataset
String katsu_url
String token
Boolean validate_ssl
}
command <<<
dataset_id=$(python3 -c 'print("~{project_dataset}".split(":")[1])')
RESPONSE=$(curl -X POST ~{true="" false="-k" validate_ssl} -s -w "%{http_code}" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ~{token}" \
--data "@~{json_document}" \
"~{katsu_url}/ingest/${dataset_id}/experiments_json")
if [[ "${RESPONSE}" != "204" ]]
then
echo "Error: Metadata service replied with ${RESPONSE}" 1>&2 # to stderr
exit 1
fi
echo ${RESPONSE}
>>>

output {
File txt_output = stdout()
File err_output = stderr()
}
}