From 6d7c61492ca4b50778b6549ce19c741d88b13aa5 Mon Sep 17 00:00:00 2001 From: Matej Fedor Date: Thu, 22 Apr 2021 10:23:02 +0200 Subject: [PATCH] Create initial library version --- interface/__init__.py | 5 + interface/argument_parser.py | 23 +++++ interface/data_types.py | 57 +++++++++++ interface/input_reader.py | 35 +++++++ interface/output_formatter.py | 32 ++++++ interface/task_executor.py | 58 +++++++++++ source/__init__.py | 14 +++ source/argument_parser.py | 86 ++++++++++++++++ source/input_reader.py | 181 ++++++++++++++++++++++++++++++++++ source/output_formatter.py | 39 ++++++++ source/task_executor.py | 171 ++++++++++++++++++++++++++++++++ test/sample_app.py | 49 +++++++++ 12 files changed, 750 insertions(+) create mode 100644 interface/__init__.py create mode 100644 interface/argument_parser.py create mode 100644 interface/data_types.py create mode 100644 interface/input_reader.py create mode 100644 interface/output_formatter.py create mode 100644 interface/task_executor.py create mode 100644 source/__init__.py create mode 100644 source/argument_parser.py create mode 100644 source/input_reader.py create mode 100644 source/output_formatter.py create mode 100644 source/task_executor.py create mode 100644 test/sample_app.py diff --git a/interface/__init__.py b/interface/__init__.py new file mode 100644 index 0000000..91d4433 --- /dev/null +++ b/interface/__init__.py @@ -0,0 +1,5 @@ +from .data_types import Arguments, LibraryComponents, OutputData, FilePath, Path +from .argument_parser import ArgumentParserInterface +from .input_reader import InputReaderInterface +from .output_formatter import OutputFormatterInterface +from .task_executor import BasecallerInterface, TaskExecutorInterface diff --git a/interface/argument_parser.py b/interface/argument_parser.py new file mode 100644 index 0000000..dd5ecbc --- /dev/null +++ b/interface/argument_parser.py @@ -0,0 +1,23 @@ +""" +This module defines public interface of ArgumentParser class. +See test/sample_app.py for example usage. +""" + +from .data_types import Arguments, LibraryComponents + + +class ArgumentParserInterface: + + def parse_arguments(self) -> Arguments: + """ + Method parses user arguments and returns filled Arguments container + necessary for proper library integration. + """ + raise NotImplementedError + + def get_library_components(self) -> LibraryComponents: + """ + Method returns configured and initialized library components + ready for use by integrator. + """ + raise NotImplementedError diff --git a/interface/data_types.py b/interface/data_types.py new file mode 100644 index 0000000..5e29650 --- /dev/null +++ b/interface/data_types.py @@ -0,0 +1,57 @@ +""" +This module defines public library data containers and type aliases. +See test/sample_app.py for example usage. +""" + +from __future__ import annotations +from typing import NamedTuple + + +class OutputData(NamedTuple): + """ + Class is a container for basecaller output. + All fields are mandatory and filled by this library, + so MinKnow-like output can be provided. + """ + + read_id: str + run_id: str + read_number: int + channel_number: str + start_time: str + basecalled_sequence: str + quality_scores: str + + +class LibraryComponents(NamedTuple): + """ + Class is a container for class objects executing + library functionality. + Library components can be obtained from ArgumentParser object. + Components are always returned initialized and ready for use + (i.e. their initialize() method has been called). + """ + + input_reader: InputReaderInterface + task_executor: TaskExecutorInterface + output_formatter: OutputFormatterInterface + + +class Arguments(NamedTuple): + """ + Class is a container for parsed user arguments unrelated + to configuration of library compnents. + Those arguments are rather relevant for configuration + on integrator side (i.e. basecaller object configuration). + Arguments can be obtained from ArgumentParser object. + """ + + watch_directory: bool + network_type: str + weights_path: FilePath + beam_size: int + beam_cut_threshold: float + + +FilePath = str +Path = str diff --git a/interface/input_reader.py b/interface/input_reader.py new file mode 100644 index 0000000..1f9ab3b --- /dev/null +++ b/interface/input_reader.py @@ -0,0 +1,35 @@ +""" +This module defines public interface of InputReader class family. +See test/sample_app.py for example usage. +""" + +from typing import List + +from .data_types import FilePath, Path + + +class InputReaderInterface: + + def __init__(self, input_directories: List[Path], input_files: List[FilePath]) -> None: + + self.input_directories = input_directories + self.input_files = input_files + self.task_batch = None + + def initialize(self) -> None: + """ + Method initializes class object. + This method is always called by ArgumentParser and should NOT be + called by integrator. + """ + raise NotImplementedError + + def get_next_batch(self) -> List[FilePath]: + """ + Method returns next batch of files ready to be processed + by task executor. + This method can be called as many times as necessary on + DirectoryWatcher family, but should be called only ONCE + on basic DirectoryReaders. + """ + raise NotImplementedError diff --git a/interface/output_formatter.py b/interface/output_formatter.py new file mode 100644 index 0000000..7c21dde --- /dev/null +++ b/interface/output_formatter.py @@ -0,0 +1,32 @@ +""" +This module defines public interface for OutputFormatter class family. +See test/sample_app.py for example usage. +""" + +import gzip +from typing import List + +from .data_types import OutputData + + +class OutputFormatterInterface: + + def __init__(self, output_name: str, compressed_output: bool) -> None: + + self.compressed_output = compressed_output + + if self.compressed_output: + self.output_stream = gzip.open(output_name, "wt") + else: + self.output_stream = open(output_name, "w") + + def __del__(self) -> None: + self.output_stream.close() + + def write_output(self, output_data: List[OutputData]) -> None: + """ + Method implements printing of output batch in various formats. + Output is flushed immediately and fsync is called after + every batch is printed. + """ + raise NotImplementedError diff --git a/interface/task_executor.py b/interface/task_executor.py new file mode 100644 index 0000000..a8a1914 --- /dev/null +++ b/interface/task_executor.py @@ -0,0 +1,58 @@ +""" +This module defines public interface from TaskExecutor class family. +Also defines interface that basecaller dependency injection MUST implement. +See test/sample_app.py for example usage. +""" + +from dataclasses import dataclass +from typing import List, Tuple + +import numpy as np + +from .data_types import FilePath, OutputData + +@dataclass +class BasecallerInterface: + """ + Task executors in this library can use any injected + basecaller object that implements following interface. + No default basecaller is available. + """ + + def call_raw_signal(self, signal: np.ndarray) -> Tuple[str, str]: + """ + Method implements signal basecalling. Takes one argument + of numpy.ndarray of numpy.float32 values. + Please note that input signal is already normalized by library. + This can be discussed in the future (see source/task_executor.py). + + Method returns a tuple of strings. Those are basecalled sequence + and quality scores (as defined in fastq file format) respectively. + """ + raise NotImplementedError + + +class TaskExecutorInterface: + + def __init__(self) -> None: + self.caller = None + + def set_caller(self, caller: BasecallerInterface) -> None: + """ + Method lets integrator to inject a custom basecaller object and + verifies at least partially whether it implements required interface. + """ + + obj_type = type(caller) + + if hasattr(obj_type, 'call_raw_signal') and callable(obj_type.call_raw_signal): + self.caller = caller + else: + raise ValueError('Caller object does NOT implement BasecallerInterface!') + + def execute_task_batch(self, tasks: List[FilePath]) -> List[OutputData]: + """ + Method performs basecalling using custom basecaller object on all files in task batch. + Returns list of filled OutputData objects ready for output formatting. + """ + raise NotImplementedError diff --git a/source/__init__.py b/source/__init__.py new file mode 100644 index 0000000..a07f6e9 --- /dev/null +++ b/source/__init__.py @@ -0,0 +1,14 @@ +from sys import platform + +from .input_reader import DirectoryReader + +if platform == 'linux': + from .input_reader import DirectoryWatcherLinux as DirectoryWatcher +elif platform == 'win32': + from .input_reader import DirectoryWatcherWindows as DirectoryWatcher +elif platform == 'darwin': + from .input_reader import DirectoryWatcherDarwin as DirectoryWatcher + +from .output_formatter import OutputFormatterFasta, OutputFormatterFastq +from .task_executor import SequentialTaskExecutor, ParallelTaskExecutor +from .argument_parser import ArgumentParser diff --git a/source/argument_parser.py b/source/argument_parser.py new file mode 100644 index 0000000..fef3e63 --- /dev/null +++ b/source/argument_parser.py @@ -0,0 +1,86 @@ +import argparse + +from interface import (ArgumentParserInterface, InputReaderInterface, OutputFormatterInterface, + TaskExecutorInterface, Arguments, LibraryComponents) + +from source import (DirectoryReader, DirectoryWatcher, OutputFormatterFasta, + OutputFormatterFastq, SequentialTaskExecutor, ParallelTaskExecutor) + + +class ArgumentParser(ArgumentParserInterface): + + def __init__(self) -> None: + + self.parser = argparse.ArgumentParser(description='Fast caller for ONT reads') + self.library_components = None + + def parse_arguments(self) -> Arguments: + + self.parser.add_argument('--directory', type=str, nargs='*', + help='One or more directories with reads') + self.parser.add_argument('--watch-directory', action='store_true', default=False, + help='Watch directories for new reads') + self.parser.add_argument('--reads', type=str, nargs='*', + help='One or more read files') + + self.parser.add_argument("--cores", type=int, default=1, + help="Number of cores available for basecalling, defaults to 1") + + self.parser.add_argument("--output", type=str, required=True, + help="Output FASTA/FASTQ file name") + self.parser.add_argument("--output-format", choices=["fasta", "fastq"], default="fasta") + self.parser.add_argument("--gzip-output", action="store_true", + help="Compress output with gzip") + + self.parser.add_argument("--weights", type=str, default=None, + help="Path to network weights; only used for custom weights") + self.parser.add_argument("--network-type", choices=["48", "56", "64", "80", "96", "256"], default="48", + help="Size of network. Default 48") + self.parser.add_argument("--beam-size", type=int, default=None, + help="Beam size (defaults 5 for 48,56,64,80,96 and 20 for 256). Use 1 for greedy decoding.") + self.parser.add_argument("--beam-cut-threshold", type=float, default=None, + help="Threshold for creating beams (higher means faster beam search, but smaller accuracy). \ + Values higher than 0.2 might lead to weird errors. Default 0.1 for 48,...,96 and 0.0001 for 256") + + arguments = self.parser.parse_args() + + input_reader = _initialize_input_reader(arguments) + task_executor = _initialize_task_executor(arguments) + output_formatter = _initialize_output_formatter(arguments) + + self.library_components = LibraryComponents(input_reader, task_executor, output_formatter) + + return Arguments(arguments.watch_directory, arguments.network_type, arguments.weights, + arguments.beam_size, arguments.beam_cut_threshold) + + def get_library_components(self) -> LibraryComponents: + return self.library_components + + +def _initialize_input_reader(arguments: argparse.Namespace) -> InputReaderInterface: + + if arguments.watch_directory: + watcher = DirectoryWatcher(arguments.directory, arguments.reads) + else: + watcher = DirectoryReader(arguments.directory, arguments.reads) + + watcher.initialize() + return watcher + + +def _initialize_task_executor(arguments: argparse.Namespace) -> TaskExecutorInterface: + + if arguments.cores <= 1: + return SequentialTaskExecutor() + + return ParallelTaskExecutor(arguments.cores) + + +def _initialize_output_formatter(arguments: argparse.Namespace) -> OutputFormatterInterface: + + if arguments.output_format == 'fasta': + return OutputFormatterFasta(arguments.output, arguments.gzip_output) + if arguments.output_format == 'fastq': + return OutputFormatterFastq(arguments.output, arguments.gzip_output) + + return None diff --git a/source/input_reader.py b/source/input_reader.py new file mode 100644 index 0000000..dbfb0c3 --- /dev/null +++ b/source/input_reader.py @@ -0,0 +1,181 @@ +from multiprocessing import Pool +import os +import re +from sys import platform +from typing import NamedTuple, List, Tuple + +if platform == 'linux': + import inotify.adapters + import inotify.constants +elif platform == 'win32': + from win32file import CreateFile, ReadDirectoryChangesW + from win32security import SECURITY_ATTRIBUTES + from win32con import (FILE_SHARE_DELETE, FILE_SHARE_READ, FILE_SHARE_WRITE, OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS, FILE_NOTIFY_CHANGE_FILE_NAME) + +from interface import InputReaderInterface, FilePath, Path + + +class DirectoryReader(InputReaderInterface): + + def initialize(self) -> None: + self.task_batch, _ = _process_inputs(self.input_directories, self.input_files) + + def get_next_batch(self) -> List[FilePath]: + return self.task_batch + + +if platform == 'linux': + + class DirectoryWatcherLinux(InputReaderInterface): + + def __init__(self, input_directories: List[Path], input_files: List[FilePath]) -> None: + + self.notifier = inotify.adapters.Inotify(block_duration_s=None) + super().__init__(input_directories, input_files) + + def _fast5_filter_predicate(self, _: str, event: Tuple[str]) -> bool: + + (_, _, _, filename) = event + return _is_fast5_file(filename) + + def initialize(self) -> None: + + self.task_batch, directories = _process_inputs(self.input_directories, self.input_files) + + for directory in directories: + self.notifier.add_watch(directory, inotify.constants.IN_CREATE | inotify.constants.IN_MOVED_TO) + + def get_next_batch(self) -> List[FilePath]: + + task_batch_local = [] + + if self.task_batch: + task_batch_local.extend(self.task_batch) + self.task_batch.clear() + + while True: + + for event in self.notifier.event_gen(filter_predicate=self._fast5_filter_predicate): + + if event is None: + break + + (_, _, path, filename) = event + task_batch_local.append(os.path.join(path, filename)) + + if task_batch_local: + return task_batch_local + + +elif platform == 'win32': + + class DirectoryWatcherWindows(InputReaderInterface): + + DirectoryEvents = Tuple[List[Tuple[str]], FilePath] + + class DirectoryHandle(NamedTuple): + + handle: PyHANDLE + directory_path: FilePath + + + def __init__(self, input_directories: List[Path], input_files: List[FilePath]) -> None: + + self.directory_handles = [] + self.pool = None + super().__init__(input_directories, input_files) + + def __del__(self) -> None: + + for directory_handle in self.directory_handles: + directory_handle.handle.Close() + + self.pool.terminate() + + def _get_directory_handle(self, directory_path: FilePath) -> self.DirectoryHandle: + + FILE_LIST_DIRECTORY = 0x0001 + + security_attributes = SECURITY_ATTRIBUTES() + security_attributes.bInheritHandle = True + + handle = CreateFile( + directory_path, + FILE_LIST_DIRECTORY, + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + security_attributes, + OPEN_EXISTING, + FILE_FLAG_BACKUP_SEMANTICS, + None) + + return self.DirectoryHandle(handle, directory_path) + + def _get_events(self, directory_handle: self.DirectoryHandle) -> self.DirectoryEvents: + + BUFFER_SIZE = 65536 + + events = ReadDirectoryChangesW( + directory_handle.handle, + BUFFER_SIZE, + False, + FILE_NOTIFY_CHANGE_FILE_NAME, + None, + None) + + return events, directory_handle.directory_path + + def initialize(self) -> None: + + self.task_batch, directories = _process_inputs(self.input_directories, self.input_files) + + for directory in directories: + self.directory_handles.append(self._get_directory_handle(directory)) + + self.pool = Pool(len(self.directory_handles)) + + def get_next_batch(self) -> List[FilePath]: + + task_batch_local = [] + + if self.task_batch: + task_batch_local.extend(self.task_batch) + self.task_batch.clear() + + while True: + + for events, directory_path in self.pool.imap_unordered( + self._get_events, + self.directory_handles): + + for _, filename in events: + if _is_fast5_file(filename): + task_batch_local.append(os.path.join(directory_path, filename)) + + return task_batch_local + + +elif platform == 'darwin': + + class DirectoryWatcherDarwin(InputReaderInterface): + + def initialize(self) -> None: + raise NotImplementedError + + def get_next_batch(self) -> List[FilePath]: + raise NotImplementedError + + +def _is_fast5_file(filename: str) -> bool: + return re.search('.fast5$', filename) is not None + + +def _process_inputs(input_directories: List[Path], input_files: List[FilePath]) -> Tuple[List[FilePath], List[Path]]: + + task_batch = [file for file in input_files or [] if os.path.isfile(file) and _is_fast5_file(file)] + directories = [dirc for dirc in input_directories or [] if os.path.isdir(dirc)] + + for dirc in directories: + task_batch += [os.path.join(dirc, file) for file in os.listdir(dirc) if _is_fast5_file(file)] + + return task_batch, directories diff --git a/source/output_formatter.py b/source/output_formatter.py new file mode 100644 index 0000000..60a1876 --- /dev/null +++ b/source/output_formatter.py @@ -0,0 +1,39 @@ +from os import fsync +from typing import List + +from interface import OutputFormatterInterface, OutputData + + +class OutputFormatterFasta(OutputFormatterInterface): + + def write_output(self, output_data: List[OutputData]) -> None: + + for output in output_data: + + (read_id, _, _, _, _, basecalled_seq, _) = output + + if len(basecalled_seq) == 0: + return + + print(">%s" % read_id, file=self.output_stream) + print(basecalled_seq, file=self.output_stream, flush=True) + + fsync(self.output_stream.fileno()) + + +class OutputFormatterFastq(OutputFormatterInterface): + + def write_output(self, output_data: List[OutputData]) -> None: + + for output in output_data: + + (read_id, run_id, read_num, channel_num, start_time, basecalled_seq, quality_scores) = output + + print("@%s runid=%s read=%d ch=%s start_time=%s" + % (read_id, run_id, read_num, channel_num, start_time), file=self.output_stream) + + print(basecalled_seq, file=self.output_stream) + print("+", file=self.output_stream) + print(quality_scores, file=self.output_stream, flush=True) + + fsync(self.output_stream.fileno()) diff --git a/source/task_executor.py b/source/task_executor.py new file mode 100644 index 0000000..eac9f44 --- /dev/null +++ b/source/task_executor.py @@ -0,0 +1,171 @@ +import datetime +from multiprocessing import Pool +from typing import Generator, List, NamedTuple, Tuple + +import numpy as np +from ont_fast5_api.fast5_interface import get_fast5_file, check_file_type +from ont_fast5_api.fast5_read import Fast5Read + +from interface import BasecallerInterface, TaskExecutorInterface, FilePath, OutputData + + +class CallInputData(NamedTuple): + """ + Class is a container of input data for _call_read() function. + Design is motivated by the fact that ONT Fast5 API's Fast5Read + objects cannot be serialized through communication pipes with CPU workers. + This data is enough for worker to identify and load its input + data from fast5 file while avoiding costly transmision of raw data itself. + """ + + read_id: str + filename: str + caller: BasecallerInterface + + +class SequentialTaskExecutor(TaskExecutorInterface): + + def execute_task_batch(self, tasks: List[FilePath]) -> List[OutputData]: + + if self.caller is None: + raise ValueError('Caller object is NOT set!') + + output_list = [] + + try: + for task in tasks: + with get_fast5_file(task, mode='r') as fast5_file: + + for read_id in fast5_file.get_read_ids(): + output_data = _call_read(CallInputData(read_id, task, self.caller)) + output_list.append(output_data) + + except OSError as excp: + _print_exception_warning(str(excp)) + return output_list + + return output_list + + +class ParallelTaskExecutor(TaskExecutorInterface): + + def __init__(self, cores: int) -> None: + + self.pool = Pool(cores) + self.chunk_size = 100 + super().__init__() + + def __del__(self) -> None: + self.pool.terminate() + + def _get_call_inputs_wrapper(self, fast5_list) -> Generator[CallInputData, None, None]: + + for fast5_file in fast5_list: + for read_id in fast5_file.get_read_ids(): + yield CallInputData(read_id, fast5_file.filename, self.caller) + + def execute_task_batch(self, tasks: List[FilePath]) -> List[OutputData]: + """ + Method performs parallel basecalling of files from task batch. + If multifast5 file is being processed, reads of that file are distributed + to CPU workers by chunks of size = chunk_size and are basecalled in parallel. + If singlefast5 file is processed, it is appended to list with other + singlefast5 files and then files themselves are distributed to CPU workers + by chunks of size = chunk_size and are basecalled in parallel. + """ + + if self.caller is None: + raise ValueError('Caller object is NOT set!') + + output_list = [] + single_fast5_list = [] + + for task in tasks: + try: + with get_fast5_file(task, mode='r') as fast5_file: + file_type = check_file_type(fast5_file) + + if file_type == 'single-read': + single_fast5_list.append(fast5_file) + if file_type == 'multi-read': + for output in self.pool.imap_unordered( + _call_read, + self._get_call_inputs_wrapper([fast5_file]), + self.chunk_size): + output_list.append(output) + + except OSError as excp: + _print_exception_warning(str(excp)) + return output_list + + try: + for output in self.pool.imap_unordered( + _call_read, + self._get_call_inputs_wrapper(single_fast5_list), + self.chunk_size): + output_list.append(output) + + except OSError as excp: + _print_exception_warning(str(excp)) + return output_list + + return output_list + + +def _print_exception_warning(exception_str: str) -> None: + + print(exception_str) + print('Error encountered! Task execution may be incomplete!') + + +def _med_mad(signal: np.ndarray, factor=1.4826) -> Tuple[np.ndarray, np.ndarray]: + """ + Calculate signal median and median absolute deviation. + """ + + med = np.median(signal) + mad = np.median(np.absolute(signal - med)) * factor + return med, mad + + +def _rescale_signal(signal: np.ndarray) -> np.ndarray: + + signal = signal.astype(np.float32) + med, mad = _med_mad(signal) + signal -= med + signal /= mad + return signal + + +def _add_time_seconds(base_time_str: str, delta_seconds: int) -> str: + + base_time = datetime.datetime.strptime(base_time_str, '%Y-%m-%dT%H:%M:%SZ') + base_time += datetime.timedelta(seconds=delta_seconds) + return base_time.strftime('%Y-%m-%dT%H:%M:%SZ') + + +def _call_read(input_data: CallInputData) -> OutputData: + + (read_id, filename, caller) = input_data + + with get_fast5_file(filename, mode='r') as fast5_file: + + file_type = check_file_type(fast5_file) + read = Fast5Read(fast5_file, read_id) if file_type == 'multi-read' else fast5_file + + run_id = read.run_id.decode('utf-8') + read_number = read.handle['Raw'].attrs['read_number'] if file_type == 'multi-read' else read.status.read_info[0].read_number + start_time = read.handle['Raw'].attrs['start_time'] if file_type == 'multi-read' else read.status.read_info[0].start_time + channel_number = read.handle[read.global_key + 'channel_id'].attrs['channel_number'].decode('utf-8') + + sampling_rate = read.handle[read.global_key + 'channel_id'].attrs['sampling_rate'] + exp_start_time = read.handle[read.global_key + 'tracking_id'].attrs['exp_start_time'].decode('utf-8') + + start_time = _add_time_seconds(exp_start_time, start_time / sampling_rate) + + signal = read.get_raw_data() + signal = _rescale_signal(signal) + + basecalled_seq, quality_scores = caller.call_raw_signal(signal) + + return OutputData(read_id, run_id, read_number, channel_number, start_time, basecalled_seq, quality_scores) diff --git a/test/sample_app.py b/test/sample_app.py new file mode 100644 index 0000000..fd995c7 --- /dev/null +++ b/test/sample_app.py @@ -0,0 +1,49 @@ +""" +This module provides example integration of nanobeak library. +Also provides an easy way of testing library functionality during its development. +""" + +from sys import path +path[0] += ('/../') + +from typing import Tuple + +from interface import BasecallerInterface +from source import ArgumentParser + + +class BasecallerMock(BasecallerInterface): + + def call_raw_signal(self, signal) -> Tuple[str, str]: + + basecalled_sequence = 'ACGT' + quality_scores = 'Quite a score!' + + return basecalled_sequence, quality_scores + + +def basecall_input() -> None: + + task_batch = components.input_reader.get_next_batch() + output_data = components.task_executor.execute_task_batch(task_batch) + components.output_formatter.write_output(output_data) + + +def basecall_watched_input() -> None: + while True: + basecall_input() + + +if __name__ == '__main__': + + arg_parser = ArgumentParser() + + arguments = arg_parser.parse_arguments() + components = arg_parser.get_library_components() + + components.task_executor.set_caller(BasecallerMock()) + + if arguments.watch_directory: + basecall_watched_input() + else: + basecall_input()