Skip to content

Commit

Permalink
Init traceability feature
Browse files Browse the repository at this point in the history
  • Loading branch information
omar-rifai committed Sep 23, 2021
1 parent d187bbe commit c34018d
Show file tree
Hide file tree
Showing 8 changed files with 393 additions and 57 deletions.
2 changes: 1 addition & 1 deletion clinica/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.1
0.5.0
215 changes: 215 additions & 0 deletions clinica/engine/provenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import json
import functools
from os import read

from pathlib import Path
from typing import Optional


def provenance(func):
from .provenance_utils import get_files_list

@functools.wraps(func)
def run_wrapper(self, **kwargs):
ret = []
pipeline_fullname = self.fullname
in_files_paths = get_files_list(self, pipeline_fullname, dict_field="input_to")

prov_context = get_context(files_paths=in_files_paths)
prov_command = get_command(self, in_files_paths)

if validate_command(prov_context, prov_command):
ret = func(self)
else:
raise Exception(
"The pipeline selected is incompatible with the input files provenance"
)
out_files_paths = get_files_list(
self, pipeline_fullname, dict_field="output_from"
)
register_prov(prov_command, out_files_paths)

return ret

return run_wrapper


def register_prov(prov_command: dict, out_files: list) -> bool:

# TODO: iterate over out_files and create a provenance file for each
for file in out_files:
write_prov_file(prov_command, file)
print("Provenance registered succesfully")
return True


def get_context(files_paths: str) -> dict:
"""
Return a dictionary with the provenance info related to the files in the files_paths
"""
from clinica.engine.provenance_utils import read_prov, get_associated_prov

prov_data = {"Entity": [], "Agent": [], "Activity": []}
for path in files_paths:
prov_record = read_prov(get_associated_prov(path))
if prov_record:
prov_data = append_prov_dict(prov_data, prov_record)

return prov_data


def get_command(self, input_files_paths: list) -> dict:
"""
Read the user command and save information in a dict
"""
import sys

new_entities = []
new_agent = get_agent()
for path in input_files_paths:
new_entities.append(get_entity(path))
new_activity = get_activity(self, new_agent["@id"], new_entities)

return {
"Agent": [new_agent],
"Activity": [new_activity],
"Entity": new_entities,
}


def write_prov_file(prov_command, files_paths):
"""
Write the dictionary data to the file_path
"""
from clinica.engine.provenance_utils import read_prov, get_associated_prov

for file_path in files_paths:
prov_path = get_associated_prov(file_path)

if prov_path.exists():
# append the pipeline provenance information to the old provenance file
prov_main = read_prov(prov_path)
prov_main = append_prov_dict(prov_main, prov_command)
else:
print("help")
# create new provenance file with pipeline information
return ""


def append_prov_dict(prov_main: dict, prov_new: dict) -> dict:
"""
Append a specific prov data to the global prov dict
"""

for k in prov_new.keys():
for el in prov_new[k]:
if prov_main[k] and el not in prov_main[k]:
prov_main[k].append(el)
return prov_main


def get_agent() -> dict:
import clinica
from .provenance_utils import get_agent_id

agent_version = clinica.__version__
agent_label = clinica.__name__
agent_id = get_agent_id(agent_label + agent_version)

new_agent = {"@id": agent_id, "label": agent_label, "version": agent_version}

return new_agent


def get_activity(self, agent_id: str, entities: list) -> dict:
"""
Add the current command to the list of activities
"""
import sys
from .provenance_utils import get_activity_id

activity_parameters = self.parameters
activity_label = self.fullname
activity_id = get_activity_id(self.fullname)
activity_command = (sys.argv[1:],)
activity_agent = agent_id
activity_used_files = [e["@id"] for e in entities]

new_activity = {
"@id": activity_id,
"label": activity_label,
"command": activity_command,
"parameters": activity_parameters,
"wasAssociatedWith": activity_agent,
"used": activity_used_files,
}

return new_activity


def get_entity(img_path: str) -> dict:
"""
Add the current file to the list of entities
"""
from clinica.engine.provenance_utils import get_entity_id
from clinica.engine.provenance_utils import get_last_activity
from pathlib import Path

entity_id = get_entity_id(img_path)
entity_label = Path(img_path).name
entity_path = img_path
entity_source = get_last_activity(img_path)

new_entity = {
"@id": entity_id,
"label": entity_label,
"atLocation": entity_path,
"wasGeneratedBy": entity_source,
}

return new_entity


def create_prov_file(command, path):
"""
Create new provenance file based on command
"""
# TODO: create a json-ld object next to the file and add it to the active prov object
return


def validate_command(prov_context: dict, prov_command: dict) -> bool:
"""
Check the command is valid on the data being run
"""
flag = True
new_activity_id = prov_command["Activity"][0]["@id"]
new_agent_id = prov_command["Agent"][0]["@id"]

for entity in prov_context["Entity"]:
old_activity_id = entity["wasGeneratedBy"]
if old_activity_id:
ptr_activity = next(
item
for item in prov_context["Activity"]
if item["@id"] == old_activity_id
)
old_agent_id = ptr_activity["wasAssociatedWith"]
flag and is_valid(
{(old_agent_id, old_activity_id): (new_agent_id, new_activity_id)}
)
return flag


def is_valid(command: dict) -> bool:
valid_list = [
{
("clin:clinica0.5.0", "clin:adni2Bids"): (
"clin:clinica0.5.0",
"clin:t1-linear",
)
}
]
if command in valid_list:
return True
return False
116 changes: 116 additions & 0 deletions clinica/engine/provenance_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from typing import Union, Optional
from pathlib import Path


def get_files_list(self, pipeline_fullname: str, dict_field="input_to") -> list:
"""
Calls clinica_file_reader with the appropriate extentions
"""
from clinica.utils.inputs import clinica_file_reader
import clinica.utils.input_files as cif

dict_field_options = ["input_to", "output_from"]
if dict_field not in dict_field_options:
raise (f"dict_field must be one of {dict_field_options}")

# retrieve all the data dictionaries from the input_files module
files_dicts = {
k: v
for k, v in vars(cif).items()
if isinstance(v, dict)
and dict_field in v.keys()
and pipeline_fullname in v[dict_field]
}
# TODO: check if bids or caps as output
ret_files = []
for elem in files_dicts:
ref_dir = (
self.bids_directory if dict_field == "input_to" else self.caps_directory
)
current_file = clinica_file_reader(
self.subjects,
self.sessions,
ref_dir,
files_dicts[elem],
raise_exception=False,
)
if current_file:
ret_files.extend(current_file)

return ret_files


def is_entity_tracked(prov_context: dict, entity_id: str) -> bool:
flag_exists = next(
(True for item in prov_context["Entity"] if item["@id"] == entity_id),
False,
)
return flag_exists


def is_agent_tracked(prov_context: dict, agent_id: str) -> bool:
flag_exists = next(
(True for item in prov_context["Agent"] if item["@id"] == agent_id),
False,
)
return flag_exists


def is_activity_tracked(prov_context: dict, activity_id: str) -> bool:
flag_exists = next(
(True for item in prov_context["Activity"] if item["@id"] == activity_id),
False,
)
return flag_exists


def get_entity_id(file_path: str) -> str:
from pathlib import Path

entity_id = Path(file_path).with_suffix("").name
return entity_id


def get_activity_id(pipeline_name: str) -> str:
return "clin:" + pipeline_name


def get_agent_id(agent_name: str) -> str:
return "clin:" + agent_name


def get_last_activity(file_path: str) -> Optional[list]:

"""
Return the last activity executed on the file
"""

prov_record = read_prov(get_associated_prov(file_path))
if prov_record and prov_record["Activity"]:
last_activity = prov_record["Activity"][-1]["@id"]
return last_activity
return None


def get_associated_prov(file_path: str) -> Path:

file_path = Path(file_path)
while file_path.suffix != "":
file_path = file_path.with_suffix("")

associated_jsonld = file_path.with_suffix(".jsonld")
return associated_jsonld


def read_prov(prov_path: Path) -> Optional[dict]:
"""
Check if the given file is a valid provenance json-ld
"""
import json

# TODO: check that the provenance file associations and uses exists
if prov_path.exists():
with open(prov_path, "r") as fp:
json_ld_data = json.load(fp)
return json_ld_data
return None
2 changes: 2 additions & 0 deletions clinica/pipelines/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import click
from nipype.pipeline.engine import Workflow
import clinica.engine.provenance as prov


def postset(attribute, value):
Expand Down Expand Up @@ -236,6 +237,7 @@ def build(self):
self.build_output_node()
return self

@prov.provenance
def run(self, plugin=None, plugin_args=None, update_hash=False, bypass_check=False):
"""Executes the Pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def build_input_node(self):
"*_T1w_segm-graymatter_space-Ixi549Space_modulated-on_probability.nii.gz",
),
"description": "graymatter tissue segmented in T1w MRI in Ixi549 space",
"needed_pipeline": "t1-volume-tissue-segmentation",
"output_from": "t1-volume-tissue-segmentation",
}
elif self.parameters["orig_input_data"] == "pet-volume":
if not (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def build_input_node(self):
{
"pattern": self.parameters["t_map"] + "*",
"description": "statistics t map",
"needed_pipeline": "statistics-volume",
"output_from": "statistics-volume",
},
)

Expand Down
Loading

0 comments on commit c34018d

Please sign in to comment.