-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create initial library version #1
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from .data_types import Arguments, LibraryComponents, OutputData, FilePath, Path | ||
from .argument_parser import ArgumentParserInterface | ||
from .input_reader import InputReaderInterface | ||
from .output_formatter import OutputFormatterInterface | ||
from .task_executor import BasecallerInterface, TaskExecutorInterface |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
""" | ||
This module defines public interface of ArgumentParser class. | ||
See test/sample_app.py for example usage. | ||
""" | ||
|
||
from .data_types import Arguments, LibraryComponents | ||
|
||
|
||
class ArgumentParserInterface: | ||
|
||
def parse_arguments(self) -> Arguments: | ||
""" | ||
Method parses user arguments and returns filled Arguments container | ||
necessary for proper library integration. | ||
""" | ||
raise NotImplementedError | ||
|
||
def get_library_components(self) -> LibraryComponents: | ||
""" | ||
Method returns configured and initialized library components | ||
ready for use by integrator. | ||
""" | ||
raise NotImplementedError |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
""" | ||
This module defines public library data containers and type aliases. | ||
See test/sample_app.py for example usage. | ||
""" | ||
|
||
from __future__ import annotations | ||
from typing import NamedTuple | ||
|
||
|
||
class OutputData(NamedTuple): | ||
""" | ||
Class is a container for basecaller output. | ||
All fields are mandatory and filled by this library, | ||
so MinKnow-like output can be provided. | ||
""" | ||
|
||
read_id: str | ||
run_id: str | ||
read_number: int | ||
channel_number: str | ||
start_time: str | ||
basecalled_sequence: str | ||
quality_scores: str | ||
|
||
|
||
class LibraryComponents(NamedTuple): | ||
""" | ||
Class is a container for class objects executing | ||
library functionality. | ||
Library components can be obtained from ArgumentParser object. | ||
Components are always returned initialized and ready for use | ||
(i.e. their initialize() method has been called). | ||
""" | ||
|
||
input_reader: InputReaderInterface | ||
task_executor: TaskExecutorInterface | ||
output_formatter: OutputFormatterInterface | ||
|
||
|
||
class Arguments(NamedTuple): | ||
""" | ||
Class is a container for parsed user arguments unrelated | ||
to configuration of library compnents. | ||
Those arguments are rather relevant for configuration | ||
on integrator side (i.e. basecaller object configuration). | ||
Arguments can be obtained from ArgumentParser object. | ||
""" | ||
|
||
watch_directory: bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. confusing name. From the name itself I would expect this to contain string/path |
||
network_type: str | ||
weights_path: FilePath | ||
beam_size: int | ||
beam_cut_threshold: float | ||
|
||
|
||
FilePath = str | ||
Path = str |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
""" | ||
This module defines public interface of InputReader class family. | ||
See test/sample_app.py for example usage. | ||
""" | ||
|
||
from typing import List | ||
|
||
from .data_types import FilePath, Path | ||
|
||
|
||
class InputReaderInterface: | ||
|
||
def __init__(self, input_directories: List[Path], input_files: List[FilePath]) -> None: | ||
|
||
self.input_directories = input_directories | ||
self.input_files = input_files | ||
self.task_batch = None | ||
|
||
def initialize(self) -> None: | ||
""" | ||
Method initializes class object. | ||
This method is always called by ArgumentParser and should NOT be | ||
called by integrator. | ||
""" | ||
raise NotImplementedError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not @AbstractMethod ? |
||
|
||
def get_next_batch(self) -> List[FilePath]: | ||
""" | ||
Method returns next batch of files ready to be processed | ||
by task executor. | ||
This method can be called as many times as necessary on | ||
DirectoryWatcher family, but should be called only ONCE | ||
on basic DirectoryReaders. | ||
""" | ||
raise NotImplementedError |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
""" | ||
This module defines public interface for OutputFormatter class family. | ||
See test/sample_app.py for example usage. | ||
""" | ||
|
||
import gzip | ||
from typing import List | ||
|
||
from .data_types import OutputData | ||
|
||
|
||
class OutputFormatterInterface: | ||
|
||
def __init__(self, output_name: str, compressed_output: bool) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is exact OOP antipattern. |
||
|
||
self.compressed_output = compressed_output | ||
|
||
if self.compressed_output: | ||
self.output_stream = gzip.open(output_name, "wt") | ||
else: | ||
self.output_stream = open(output_name, "w") | ||
|
||
def __del__(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. del does not work correctly, if you want RAII style, you have to use "with" statement and context managers |
||
self.output_stream.close() | ||
|
||
def write_output(self, output_data: List[OutputData]) -> None: | ||
""" | ||
Method implements printing of output batch in various formats. | ||
Output is flushed immediately and fsync is called after | ||
every batch is printed. | ||
""" | ||
raise NotImplementedError |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
""" | ||
This module defines public interface from TaskExecutor class family. | ||
Also defines interface that basecaller dependency injection MUST implement. | ||
See test/sample_app.py for example usage. | ||
""" | ||
|
||
from dataclasses import dataclass | ||
from typing import List, Tuple | ||
|
||
import numpy as np | ||
|
||
from .data_types import FilePath, OutputData | ||
|
||
@dataclass | ||
class BasecallerInterface: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How can dataclass be an interface? That make absolutely zero sense |
||
""" | ||
Task executors in this library can use any injected | ||
basecaller object that implements following interface. | ||
No default basecaller is available. | ||
""" | ||
|
||
def call_raw_signal(self, signal: np.ndarray) -> Tuple[str, str]: | ||
""" | ||
Method implements signal basecalling. Takes one argument | ||
of numpy.ndarray of numpy.float32 values. | ||
Please note that input signal is already normalized by library. | ||
This can be discussed in the future (see source/task_executor.py). | ||
|
||
Method returns a tuple of strings. Those are basecalled sequence | ||
and quality scores (as defined in fastq file format) respectively. | ||
""" | ||
raise NotImplementedError | ||
|
||
|
||
class TaskExecutorInterface: | ||
|
||
def __init__(self) -> None: | ||
self.caller = None | ||
|
||
def set_caller(self, caller: BasecallerInterface) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't it rather take caller as constructor param? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I'm not sure this is the best design, but I couldn't do better. caller is an external dependency. This library should be able to utilize any basecaller as long as it implements the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, that tells you that your whole design is off and has cross-cutting concerns. For example, you can have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is true. But once I have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, ok, now I see this. But then I don't understand how this works if you set caller after constructing the pool -- once pool is created, I would assume workers are already spawned and it would be too late to assign caller in the parent process There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. THAT is an excellent question. Haven't thought about it that way. |
||
""" | ||
Method lets integrator to inject a custom basecaller object and | ||
verifies at least partially whether it implements required interface. | ||
""" | ||
|
||
obj_type = type(caller) | ||
|
||
if hasattr(obj_type, 'call_raw_signal') and callable(obj_type.call_raw_signal): | ||
self.caller = caller | ||
else: | ||
raise ValueError('Caller object does NOT implement BasecallerInterface!') | ||
|
||
def execute_task_batch(self, tasks: List[FilePath]) -> List[OutputData]: | ||
""" | ||
Method performs basecalling using custom basecaller object on all files in task batch. | ||
Returns list of filled OutputData objects ready for output formatting. | ||
""" | ||
raise NotImplementedError |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from sys import platform | ||
|
||
from .input_reader import DirectoryReader | ||
|
||
if platform == 'linux': | ||
from .input_reader import DirectoryWatcherLinux as DirectoryWatcher | ||
elif platform == 'win32': | ||
from .input_reader import DirectoryWatcherWindows as DirectoryWatcher | ||
elif platform == 'darwin': | ||
from .input_reader import DirectoryWatcherDarwin as DirectoryWatcher | ||
|
||
from .output_formatter import OutputFormatterFasta, OutputFormatterFastq | ||
from .task_executor import SequentialTaskExecutor, ParallelTaskExecutor | ||
from .argument_parser import ArgumentParser |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
import argparse | ||
|
||
from interface import (ArgumentParserInterface, InputReaderInterface, OutputFormatterInterface, | ||
TaskExecutorInterface, Arguments, LibraryComponents) | ||
|
||
from source import (DirectoryReader, DirectoryWatcher, OutputFormatterFasta, | ||
OutputFormatterFastq, SequentialTaskExecutor, ParallelTaskExecutor) | ||
|
||
|
||
class ArgumentParser(ArgumentParserInterface): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see no reason for this to be class if it's only interaction is create it & then call parse_arguments once. I would make it just a function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, it also initializes LibraryComponents class. Instead of returning it all I figured, it would be more understandable, if I designed it as a class. Also, if someone else will want to use that library in a future and he will need to implement his own ArgumentParser, he can simply create a subclass of ArgumentParserInterface and generally just use the existing code infrastructure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest using against patterns like subclass. OOP, as done by many, is done wrong. In most cases, functional design fares way better in terms of maintainability, clarity and testability. Perhaps checou out some Sandi Metz presentations, they are usually good inspiration There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that this design decision is just silly. But I have a question. I don't like about python libraries that internal code is just mixed with API methods and the only distinction between them is just an underscore at the beginning of a function name. I really liked here that all of the implementation is separated from the library interface and it's clear for a new-comer what the interface looks like and what is the intended usage without spending too much time on trying to understand what the f*** it does. Just look at that: If I use a functional design for this component, is there any preferred pythonic way to declare but not implement functions a thus create pretty interface? Or should I just create a commented document describing the functionality in interface folder? Maybe it's not that necessary in this small component, but I would like to know for a future reference. Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we also discuss this one @ppershing ? |
||
|
||
def __init__(self) -> None: | ||
|
||
self.parser = argparse.ArgumentParser(description='Fast caller for ONT reads') | ||
self.library_components = None | ||
|
||
def parse_arguments(self) -> Arguments: | ||
|
||
self.parser.add_argument('--directory', type=str, nargs='*', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is weird that you add arguments to parser only in parse_args There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand what you mean. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right now, parse_arguments is not idempotent despite it's name suggesting that it does not mutate anything. Try to run it twice and see the fireworks happen ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Functional design should fix this one. |
||
help='One or more directories with reads') | ||
self.parser.add_argument('--watch-directory', action='store_true', default=False, | ||
help='Watch directories for new reads') | ||
self.parser.add_argument('--reads', type=str, nargs='*', | ||
help='One or more read files') | ||
|
||
self.parser.add_argument("--cores", type=int, default=1, | ||
help="Number of cores available for basecalling, defaults to 1") | ||
|
||
self.parser.add_argument("--output", type=str, required=True, | ||
help="Output FASTA/FASTQ file name") | ||
self.parser.add_argument("--output-format", choices=["fasta", "fastq"], default="fasta") | ||
self.parser.add_argument("--gzip-output", action="store_true", | ||
help="Compress output with gzip") | ||
|
||
self.parser.add_argument("--weights", type=str, default=None, | ||
help="Path to network weights; only used for custom weights") | ||
self.parser.add_argument("--network-type", choices=["48", "56", "64", "80", "96", "256"], default="48", | ||
help="Size of network. Default 48") | ||
self.parser.add_argument("--beam-size", type=int, default=None, | ||
help="Beam size (defaults 5 for 48,56,64,80,96 and 20 for 256). Use 1 for greedy decoding.") | ||
self.parser.add_argument("--beam-cut-threshold", type=float, default=None, | ||
help="Threshold for creating beams (higher means faster beam search, but smaller accuracy). \ | ||
Values higher than 0.2 might lead to weird errors. Default 0.1 for 48,...,96 and 0.0001 for 256") | ||
|
||
arguments = self.parser.parse_args() | ||
|
||
input_reader = _initialize_input_reader(arguments) | ||
task_executor = _initialize_task_executor(arguments) | ||
output_formatter = _initialize_output_formatter(arguments) | ||
|
||
self.library_components = LibraryComponents(input_reader, task_executor, output_formatter) | ||
|
||
return Arguments(arguments.watch_directory, arguments.network_type, arguments.weights, | ||
arguments.beam_size, arguments.beam_cut_threshold) | ||
|
||
def get_library_components(self) -> LibraryComponents: | ||
return self.library_components | ||
|
||
|
||
def _initialize_input_reader(arguments: argparse.Namespace) -> InputReaderInterface: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's unclear why this needs to get whole argspace namespace when it can take bool. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But on the other look, at least the interface is consistent with the rest of the helper functions so maybe it is ok as it is |
||
|
||
if arguments.watch_directory: | ||
watcher = DirectoryWatcher(arguments.directory, arguments.reads) | ||
else: | ||
watcher = DirectoryReader(arguments.directory, arguments.reads) | ||
|
||
watcher.initialize() | ||
return watcher | ||
|
||
|
||
def _initialize_task_executor(arguments: argparse.Namespace) -> TaskExecutorInterface: | ||
|
||
if arguments.cores <= 1: | ||
return SequentialTaskExecutor() | ||
|
||
return ParallelTaskExecutor(arguments.cores) | ||
|
||
|
||
def _initialize_output_formatter(arguments: argparse.Namespace) -> OutputFormatterInterface: | ||
|
||
if arguments.output_format == 'fasta': | ||
return OutputFormatterFasta(arguments.output, arguments.gzip_output) | ||
if arguments.output_format == 'fastq': | ||
return OutputFormatterFastq(arguments.output, arguments.gzip_output) | ||
|
||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given that this has very nasty confusable arguments (string, string, int, string, ....)
I strongly vote against named tuple. It should be data class instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've read a few articles and can't figure out what would be the difference. Can you elaborate, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Namedtuple can be destructured using tuple destructuring. Dataclass cannot. This prevents you from accidentally shooting yourself to foot as you always must access through named parameters