Skip to content

Commit

Permalink
Merge branch 'master' into requirements/rebuild_file
Browse files Browse the repository at this point in the history
  • Loading branch information
LoannPeurey authored Apr 9, 2024
2 parents e275d09 + 67525c1 commit dd924d6
Show file tree
Hide file tree
Showing 22 changed files with 1,072 additions and 213 deletions.
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ coverage.xml
#imported annotations for unit testing
examples/valid_raw_data/annotations/*/converted
examples/valid_raw_data/annotations/*/*/converted
#imported annotation index for unit testing
examples/valid_raw_data/metadata/annotations.csv
!examples/valid_raw_data/annotations/vtc_present/converted

# Translations
*.mo
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Fixed

- eaf_builder replicating the subtree structure of recordings is not happening anymore. And individual files are not placed in individual subfolder anymore

## [0.1.2] 2024-03-14

### Added

- add the derived annotation pipeline
- audio conversion has now a 'standard' conversion pipeline that will convert files to mono channel 16kHz pcm_s16le (no need to convert channels and then sampling rate and no need to know the options)
- add the simple_CTC metric to the list of available metrics

Expand Down
248 changes: 247 additions & 1 deletion ChildProject/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import logging

from . import __version__
from .pipelines.derivations import DERIVATIONS
from .projects import ChildProject
from .converters import *
from .tables import IndexTable, IndexColumn, assert_dataframe, assert_columns_presence
Expand Down Expand Up @@ -458,7 +459,6 @@ def _check_for_outdated_merged_sets(self, sets: set = None):
warnings.append("set {} is outdated because the {} set it is merged from was modified. Consider updating or rerunning the creation of the {} set.".format(i,j,i))

return warnings

def _import_annotation(
self, import_function: Callable[[str], pd.DataFrame],
params: dict,
Expand Down Expand Up @@ -712,6 +712,252 @@ def import_annotations(

return (imported, errors)

def _derive_annotation(
self,
annotation: dict,
import_function: Callable,
output_set: str,
overwrite_existing: bool = False,
):
"""import and convert ``annotation``. This function should not be called outside of this class.
:param annotation: input annotation dictionary (attributes defined according to :ref:`ChildProject.annotations.AnnotationManager.SEGMENTS_COLUMNS`)
:type annotation: dict
:param import_function: derivation function to apply to the original set. This can be a python function or a string name of a stored method in .pipelines.derivations.DERIVATIONS .
:type import_function: Callable[[str], pd.DataFrame]
:param output_set: name of the new set of derived annotations
:type output_set: str
:param overwrite_existing: use for lines with the same set and annotation_filename to be rederived and overwritten
:type overwrite_existing: bool
:return: output annotation dictionary (attributes defined according to :ref:`ChildProject.annotations.AnnotationManager.SEGMENTS_COLUMNS`)
:rtype: dict
"""
annotation_result = annotation.copy()
annotation_result['set'] = output_set
annotation_result['format'] = "NA"
annotation_result['merged_from'] = annotation['set']

source_recording = os.path.splitext(annotation["recording_filename"])[0]
annotation_filename = "{}_{}_{}.csv".format(
source_recording, annotation["range_onset"], annotation["range_offset"]
)
output_filename = os.path.join(
"annotations", output_set, "converted", annotation_filename
)

# check if the annotation file already exists in dataset (same filename and same set)
if self.annotations[(self.annotations['set'] == output_set) &
(self.annotations['annotation_filename'] == annotation_filename)].shape[0] > 0:
if overwrite_existing:
logger_annotations.warning("Derived file %s will be overwritten", output_filename)

else:
logger_annotations.warning("File %s already exists. To overwrite, specify parameter ''overwrite_existing'' to True", output_filename)
return annotation_result

# find if there are annotation indexes in the same set that overlap the new annotation
# as it is not possible to annotate multiple times the same audio stretch in the same set
ovl_annots = self.annotations[(self.annotations['set'] == output_set) &
(self.annotations[
'annotation_filename'] != annotation_filename) & # this condition avoid matching a line that should be overwritten (so has the same annotation_filename), it is dependent on the previous block!!!
(self.annotations['recording_filename'] == annotation['recording_filename']) &
(self.annotations['range_onset'] < annotation['range_offset']) &
(self.annotations['range_offset'] > annotation['range_onset'])
]
if ovl_annots.shape[0] > 0:
array_tup = list(
ovl_annots[['set', 'recording_filename', 'range_onset', 'range_offset']].itertuples(index=False,
name=None))
annotation_result[
"error"] = f"derivation for set <{output_set}> recording <{annotation['recording_filename']}> from {annotation['range_onset']} to {annotation['range_offset']} cannot continue because it overlaps with these existing annotation lines: {array_tup} . You could try removing entirely the set and trying again."
logger_annotations.error("Error: %s", annotation['error'])
# (f"Error: {annotation['error']}")
annotation_result["imported_at"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return annotation_result

path = os.path.join(
self.project.path,
"annotations",
annotation["set"],
"converted", #EXPAND
annotation["annotation_filename"],
)

#TODO CHECK FOR DTYPES, enforcing dtypes of converted annotation files will become standard in a future update
df_input = pd.read_csv(path)
df = None

def bad_derivation(annotation_dict, msg_err, error, path_file):
annotation_dict["error"] = traceback.format_exc()
logger_annotations.error("An error occurred while processing '%s': %s", path_file, msg_err, exc_info=True)
annotation_dict["imported_at"] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return annotation_dict

# get all the metadata into a single dictionary for the derivation function
rec_dict = self.project.recordings[
self.project.recordings['recording_filename'] == annotation['recording_filename']].iloc[0].to_dict()
chi_dict = self.project.children[
self.project.children['child_id'] == rec_dict['child_id']].iloc[0].to_dict()
metadata = {**chi_dict, **rec_dict, **annotation}

# apply the derivation to the annotation dataframe
# if the derivation raises an exception stop the processing there and return the line
try:
df = import_function(self.project, metadata, df_input)
except Exception as e:
return bad_derivation(annotation_result, e, traceback.format_exc(), path)

# if the derivation function did not return a dataframe, stop there and return the line
if df is None or not isinstance(df, pd.DataFrame):
msg = f"<{import_function}> did not return a pandas DataFrame"
return bad_derivation(annotation_result, msg, msg, path)

# if the derivation does not contain the required columns of annotations
if not {c.name for c in self.SEGMENTS_COLUMNS if c.required}.issubset(df.columns):
required = {c.name for c in self.SEGMENTS_COLUMNS if c.required}
msg = f"DataFrame result of <{import_function}> function does not contain the required {required}"
return bad_derivation(annotation_result, msg, msg, path)

if not df.shape[1]:
df = pd.DataFrame(columns=[c.name for c in self.SEGMENTS_COLUMNS])

df["raw_filename"] = annotation["raw_filename"]

df["segment_onset"] += np.int64(annotation["time_seek"])
df["segment_offset"] += np.int64(annotation["time_seek"])
df["segment_onset"] = df["segment_onset"].astype(np.int64)
df["segment_offset"] = df["segment_offset"].astype(np.int64)

annotation_result["time_seek"] = np.int64(annotation["time_seek"])
annotation_result["range_onset"] = np.int64(annotation["range_onset"])
annotation_result["range_offset"] = np.int64(annotation["range_offset"])

df = AnnotationManager.clip_segments(
df, annotation_result["range_onset"], annotation_result["range_offset"]
)

sort_columns = ["segment_onset", "segment_offset"]
if "speaker_type" in df.columns:
sort_columns.append("speaker_type")

df.sort_values(sort_columns, inplace=True)

os.makedirs(
os.path.dirname(os.path.join(self.project.path, output_filename)),
exist_ok=True,
)
df.to_csv(os.path.join(self.project.path, output_filename), index=False)

annotation_result["annotation_filename"] = annotation_filename
annotation_result["imported_at"] = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
)
annotation_result["package_version"] = __version__

return annotation_result

def derive_annotations(self,
input_set: str,
output_set: str,
derivation_function: Union[str, Callable],
threads: int = -1,
overwrite_existing: bool = False,
) -> (pd.DataFrame, pd.DataFrame):
"""Derive annotations. From an existing set of annotations, create a new set that derive its result from
the original set
:param input_set: name of the set of annotations to be derived
:rtype: str
:param output_set: name of the new set of derived annotations
:rtype: str
:param derivation_function: name of the derivation type to be performed
:rtype: Union[str, Callable]
:param threads: If > 1, conversions will be run on ``threads`` threads, defaults to -1
:type threads: int, optional
:param overwrite_existing: choice if lines with the same set and annotation_filename should be overwritten
:type overwrite_existing: bool, optional
:return: tuple of dataframe of derived annotations, as in :ref:`format-annotations` and dataframe of errors
:rtype: tuple (pd.DataFrame, pd.DataFrame)
"""
input_processed = self.annotations[self.annotations['set'] == input_set].copy()
assert not input_processed.empty, "Input set {0} does not exist,\
existing sets are in the 'set' column of annotations.csv".format(input_set)

assert input_set != output_set, "Input set {0} should be different than output\
set {1}".format(input_set, output_set)

# check the existence of the derivation function and that it is callable or predefined
if not callable(derivation_function):
if derivation_function in DERIVATIONS.keys():
derivation_function = DERIVATIONS[derivation_function]
else:
raise ValueError(
"derivation value '{}' unknown, use one of {}".format(derivation_function, DERIVATIONS.keys())
)

if threads == 1:
# apply the derivation function to each annotation file that needs to be derived (sequential)
imported = input_processed.apply(
partial(self._derive_annotation,
import_function=derivation_function,
output_set=output_set,
overwrite_existing=overwrite_existing
), axis=1
).to_dict(orient="records")
else:
# apply the derivation function to each annotation file that needs to be derived (threaded)
with mp.Pool(processes=threads if threads > 0 else mp.cpu_count()) as pool:
imported = pool.map(
partial(self._derive_annotation,
import_function=derivation_function,
output_set=output_set,
overwrite_existing=overwrite_existing
),
input_processed.to_dict(orient="records"),
)

imported = pd.DataFrame(imported)
# drop additional columns that are not supposed to be kept in annotations.csv
imported.drop(
list(set(imported.columns) - {c.name for c in self.INDEX_COLUMNS}),
axis=1,
inplace=True,
)

# if at least 1 error occurred
if 'error' in imported.columns:
# separate importations that resulted in error from successful ones
errors = imported[~imported["error"].isnull()]
imported = imported[imported["error"].isnull()]
# when errors occur, separate them in a different csv in extra
if errors.shape[0] > 0:
output = os.path.join(self.project.path, "extra",
"errors_derive_{}.csv".format(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")))
errors.to_csv(output, index=False)
logger_annotations.info("Errors summary exported to %s", output)
else:
errors = None

# here we add the new lines of imported annotations to the annotations.csv file
self.read()
self.annotations = pd.concat([self.annotations, imported], sort=False)
# at this point, 2 lines with same set and annotation_filename can happen if specified overwrite,
# dropping duplicates remove the first importation and keeps the more recent one
self.annotations = self.annotations.sort_values('imported_at').drop_duplicates(
subset=["set", "recording_filename", "range_onset", "range_offset"], keep='last')
self.write()

sets = set(input_processed['set'].unique())

# this block should be executed everytime we change annotations.csv
# it checks that sets that are derived or merged from others are not outdated
# (meaning their importation is more recent than the merge/derivation)
outdated_sets = self._check_for_outdated_merged_sets(sets=sets)
for warning in outdated_sets:
logger_annotations.warning("warning: %s", warning)

return imported, errors

def get_subsets(self, annotation_set: str, recursive: bool = False) -> List[str]:
"""Retrieve the list of subsets belonging to a given set of annotations.
Expand Down
57 changes: 47 additions & 10 deletions ChildProject/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from ChildProject import __name__

from .pipelines.derivations import DERIVATIONS

import argparse
import os
import pandas as pd
Expand Down Expand Up @@ -234,20 +236,55 @@ def import_annotations(args):

if errors_imp is not None and errors_imp.shape[0] > 0:
logger.error('The importation failed for %d entry/ies',errors_imp.shape[0])
logger.debug(errors_imp)
logger.debug(errors_imp)

if imported is not None and imported.shape[0] > 0:
errors, warnings = am.validate(imported, threads=args.threads)

if len(am.errors) > 0:
if len(errors) > 0:
logger.error(
"in the resulting annotations %s errors and %s warnings were found",
len(am.errors) + len(errors),
len(warnings),
) # Is it right ?
logger.error("\n".join(am.errors))
logger.error("\n".join(errors))
logger.error("\n".join(warnings))
"in the resulting annotations %s errors were found:\n%s",
len(errors),
"\n".join(errors),
)


@subcommand(
[
arg("source", help="project path"),
arg("derivation", help="Type of derivation", type=str, choices=DERIVATIONS.keys()),
arg("--input-set", "-i", help="input set", required=True, type=str),
arg("--output-set", "-o", help="output set", required=True, type=str),
arg("--threads", help="amount of threads to run on", type=int, default=0),
arg("--overwrite-existing", "--ow",
help="overwrites existing annotation file when deriving (useful when reimporting), False by default",
action='store_true'),
]
)
def derive_annotations(args):
"""derive a set of annotations"""

project = ChildProject(args.source)

perform_validation(project, require_success=True, ignore_recordings=True)

am = AnnotationManager(project)
imported, errors_der = am.derive_annotations(args.input_set, args.output_set, args.derivation, args.threads, overwrite_existing=args.overwrite_existing)

if errors_der is not None and errors_der.shape[0] > 0:
logger.error('The derivation failed for %d entry/ies', errors_der.shape[0])
logger.debug(errors_der)

if imported is not None and imported.shape[0] > 0:
errors, warnings = am.validate(imported, threads=args.threads)

if len(errors) > 0:
logger.error(
"in the resulting annotations %s errors were found:\n%s",
len(errors),
"\n".join(errors),
)


@subcommand(
[
Expand Down
1 change: 1 addition & 0 deletions ChildProject/pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Loading

0 comments on commit dd924d6

Please sign in to comment.