Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create initial library version #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions interface/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .data_types import Arguments, LibraryComponents, OutputData, FilePath, Path
from .argument_parser import ArgumentParserInterface
from .input_reader import InputReaderInterface
from .output_formatter import OutputFormatterInterface
from .task_executor import BasecallerInterface, TaskExecutorInterface
23 changes: 23 additions & 0 deletions interface/argument_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
This module defines public interface of ArgumentParser class.
See test/sample_app.py for example usage.
"""

from .data_types import Arguments, LibraryComponents


class ArgumentParserInterface:

def parse_arguments(self) -> Arguments:
"""
Method parses user arguments and returns filled Arguments container
necessary for proper library integration.
"""
raise NotImplementedError

def get_library_components(self) -> LibraryComponents:
"""
Method returns configured and initialized library components
ready for use by integrator.
"""
raise NotImplementedError
57 changes: 57 additions & 0 deletions interface/data_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
This module defines public library data containers and type aliases.
See test/sample_app.py for example usage.
"""

from __future__ import annotations
from typing import NamedTuple


class OutputData(NamedTuple):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that this has very nasty confusable arguments (string, string, int, string, ....)
I strongly vote against named tuple. It should be data class instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read a few articles and can't figure out what would be the difference. Can you elaborate, please?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Namedtuple can be destructured using tuple destructuring. Dataclass cannot. This prevents you from accidentally shooting yourself to foot as you always must access through named parameters

"""
Class is a container for basecaller output.
All fields are mandatory and filled by this library,
so MinKnow-like output can be provided.
"""

read_id: str
run_id: str
read_number: int
channel_number: str
start_time: str
basecalled_sequence: str
quality_scores: str


class LibraryComponents(NamedTuple):
"""
Class is a container for class objects executing
library functionality.
Library components can be obtained from ArgumentParser object.
Components are always returned initialized and ready for use
(i.e. their initialize() method has been called).
"""

input_reader: InputReaderInterface
task_executor: TaskExecutorInterface
output_formatter: OutputFormatterInterface


class Arguments(NamedTuple):
"""
Class is a container for parsed user arguments unrelated
to configuration of library compnents.
Those arguments are rather relevant for configuration
on integrator side (i.e. basecaller object configuration).
Arguments can be obtained from ArgumentParser object.
"""

watch_directory: bool

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confusing name. From the name itself I would expect this to contain string/path

network_type: str
weights_path: FilePath
beam_size: int
beam_cut_threshold: float


FilePath = str
Path = str
35 changes: 35 additions & 0 deletions interface/input_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
This module defines public interface of InputReader class family.
See test/sample_app.py for example usage.
"""

from typing import List

from .data_types import FilePath, Path


class InputReaderInterface:

def __init__(self, input_directories: List[Path], input_files: List[FilePath]) -> None:

self.input_directories = input_directories
self.input_files = input_files
self.task_batch = None

def initialize(self) -> None:
"""
Method initializes class object.
This method is always called by ArgumentParser and should NOT be
called by integrator.
"""
raise NotImplementedError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not @AbstractMethod ?


def get_next_batch(self) -> List[FilePath]:
"""
Method returns next batch of files ready to be processed
by task executor.
This method can be called as many times as necessary on
DirectoryWatcher family, but should be called only ONCE
on basic DirectoryReaders.
"""
raise NotImplementedError
32 changes: 32 additions & 0 deletions interface/output_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
This module defines public interface for OutputFormatter class family.
See test/sample_app.py for example usage.
"""

import gzip
from typing import List

from .data_types import OutputData


class OutputFormatterInterface:

def __init__(self, output_name: str, compressed_output: bool) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exact OOP antipattern.
You should have OutputFormatter instance take "FileLike" argument and let it just do the job of writing to it. Or alternatively, supply "write_output" function to this class. The way it is now, you are mixing two responsibilities in one class hierarchy


self.compressed_output = compressed_output

if self.compressed_output:
self.output_stream = gzip.open(output_name, "wt")
else:
self.output_stream = open(output_name, "w")

def __del__(self) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

del does not work correctly, if you want RAII style, you have to use "with" statement and context managers

self.output_stream.close()

def write_output(self, output_data: List[OutputData]) -> None:
"""
Method implements printing of output batch in various formats.
Output is flushed immediately and fsync is called after
every batch is printed.
"""
raise NotImplementedError
58 changes: 58 additions & 0 deletions interface/task_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
This module defines public interface from TaskExecutor class family.
Also defines interface that basecaller dependency injection MUST implement.
See test/sample_app.py for example usage.
"""

from dataclasses import dataclass
from typing import List, Tuple

import numpy as np

from .data_types import FilePath, OutputData

@dataclass
class BasecallerInterface:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can dataclass be an interface? That make absolutely zero sense

"""
Task executors in this library can use any injected
basecaller object that implements following interface.
No default basecaller is available.
"""

def call_raw_signal(self, signal: np.ndarray) -> Tuple[str, str]:
"""
Method implements signal basecalling. Takes one argument
of numpy.ndarray of numpy.float32 values.
Please note that input signal is already normalized by library.
This can be discussed in the future (see source/task_executor.py).

Method returns a tuple of strings. Those are basecalled sequence
and quality scores (as defined in fastq file format) respectively.
"""
raise NotImplementedError


class TaskExecutorInterface:

def __init__(self) -> None:
self.caller = None

def set_caller(self, caller: BasecallerInterface) -> None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it rather take caller as constructor param?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'm not sure this is the best design, but I couldn't do better. caller is an external dependency. This library should be able to utilize any basecaller as long as it implements the BasecallerInterface. While ArgumentParser provides integrator with created and initialized TaskExecutor object, it has no way to know what caller to supply. And I think it shouldn't have. It should be an integrator responsibilty in my opinion to supply a correct caller.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that tells you that your whole design is off and has cross-cutting concerns. For example, you can have execute_task_batch to also take caller as an argument. That way, TaskExecutor deals only with executing in some fashion and does not care what the caller is. You can then "bind" caller to task executor in another level of abstraction

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true. But once I have caller as parameter of execute_task_batch() I have to serialize the caller object in parallel execution to communicate the caller object to the workers. If it was a data member, function _call_read() could be made a class method in some TaskExecutorBase class other executors would inherit from and caller object could be accessed naturally. (My original plan with this). That way it would be beneficial for performance since serializing large caller objects can be costly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, ok, now I see this. But then I don't understand how this works if you set caller after constructing the pool -- once pool is created, I would assume workers are already spawned and it would be too late to assign caller in the parent process

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THAT is an excellent question. Haven't thought about it that way.

"""
Method lets integrator to inject a custom basecaller object and
verifies at least partially whether it implements required interface.
"""

obj_type = type(caller)

if hasattr(obj_type, 'call_raw_signal') and callable(obj_type.call_raw_signal):
self.caller = caller
else:
raise ValueError('Caller object does NOT implement BasecallerInterface!')

def execute_task_batch(self, tasks: List[FilePath]) -> List[OutputData]:
"""
Method performs basecalling using custom basecaller object on all files in task batch.
Returns list of filled OutputData objects ready for output formatting.
"""
raise NotImplementedError
14 changes: 14 additions & 0 deletions source/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from sys import platform

from .input_reader import DirectoryReader

if platform == 'linux':
from .input_reader import DirectoryWatcherLinux as DirectoryWatcher
elif platform == 'win32':
from .input_reader import DirectoryWatcherWindows as DirectoryWatcher
elif platform == 'darwin':
from .input_reader import DirectoryWatcherDarwin as DirectoryWatcher

from .output_formatter import OutputFormatterFasta, OutputFormatterFastq
from .task_executor import SequentialTaskExecutor, ParallelTaskExecutor
from .argument_parser import ArgumentParser
86 changes: 86 additions & 0 deletions source/argument_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import argparse

from interface import (ArgumentParserInterface, InputReaderInterface, OutputFormatterInterface,
TaskExecutorInterface, Arguments, LibraryComponents)

from source import (DirectoryReader, DirectoryWatcher, OutputFormatterFasta,
OutputFormatterFastq, SequentialTaskExecutor, ParallelTaskExecutor)


class ArgumentParser(ArgumentParserInterface):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no reason for this to be class if it's only interaction is create it & then call parse_arguments once. I would make it just a function parse_arguments

Copy link
Collaborator Author

@M-Fedor M-Fedor Apr 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it also initializes LibraryComponents class. Instead of returning it all I figured, it would be more understandable, if I designed it as a class. Also, if someone else will want to use that library in a future and he will need to implement his own ArgumentParser, he can simply create a subclass of ArgumentParserInterface and generally just use the existing code infrastructure.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using against patterns like subclass. OOP, as done by many, is done wrong. In most cases, functional design fares way better in terms of maintainability, clarity and testability. Perhaps checou out some Sandi Metz presentations, they are usually good inspiration

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this design decision is just silly. But I have a question. I don't like about python libraries that internal code is just mixed with API methods and the only distinction between them is just an underscore at the beginning of a function name. I really liked here that all of the implementation is separated from the library interface and it's clear for a new-comer what the interface looks like and what is the intended usage without spending too much time on trying to understand what the f*** it does. Just look at that:
https://github.com/nanoporetech/ont_fast5_api
It's just tedious piece of code for me to use.

If I use a functional design for this component, is there any preferred pythonic way to declare but not implement functions a thus create pretty interface? Or should I just create a commented document describing the functionality in interface folder? Maybe it's not that necessary in this small component, but I would like to know for a future reference. Thanks!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also discuss this one @ppershing ?


def __init__(self) -> None:

self.parser = argparse.ArgumentParser(description='Fast caller for ONT reads')
self.library_components = None

def parse_arguments(self) -> Arguments:

self.parser.add_argument('--directory', type=str, nargs='*',

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is weird that you add arguments to parser only in parse_args

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what you mean.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now, parse_arguments is not idempotent despite it's name suggesting that it does not mutate anything. Try to run it twice and see the fireworks happen ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Functional design should fix this one.

help='One or more directories with reads')
self.parser.add_argument('--watch-directory', action='store_true', default=False,
help='Watch directories for new reads')
self.parser.add_argument('--reads', type=str, nargs='*',
help='One or more read files')

self.parser.add_argument("--cores", type=int, default=1,
help="Number of cores available for basecalling, defaults to 1")

self.parser.add_argument("--output", type=str, required=True,
help="Output FASTA/FASTQ file name")
self.parser.add_argument("--output-format", choices=["fasta", "fastq"], default="fasta")
self.parser.add_argument("--gzip-output", action="store_true",
help="Compress output with gzip")

self.parser.add_argument("--weights", type=str, default=None,
help="Path to network weights; only used for custom weights")
self.parser.add_argument("--network-type", choices=["48", "56", "64", "80", "96", "256"], default="48",
help="Size of network. Default 48")
self.parser.add_argument("--beam-size", type=int, default=None,
help="Beam size (defaults 5 for 48,56,64,80,96 and 20 for 256). Use 1 for greedy decoding.")
self.parser.add_argument("--beam-cut-threshold", type=float, default=None,
help="Threshold for creating beams (higher means faster beam search, but smaller accuracy). \
Values higher than 0.2 might lead to weird errors. Default 0.1 for 48,...,96 and 0.0001 for 256")

arguments = self.parser.parse_args()

input_reader = _initialize_input_reader(arguments)
task_executor = _initialize_task_executor(arguments)
output_formatter = _initialize_output_formatter(arguments)

self.library_components = LibraryComponents(input_reader, task_executor, output_formatter)

return Arguments(arguments.watch_directory, arguments.network_type, arguments.weights,
arguments.beam_size, arguments.beam_cut_threshold)

def get_library_components(self) -> LibraryComponents:
return self.library_components


def _initialize_input_reader(arguments: argparse.Namespace) -> InputReaderInterface:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's unclear why this needs to get whole argspace namespace when it can take bool.
Perpahs more importantly, watch_directory should not be bool but use "store" feature of argparse to store some value to it

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But on the other look, at least the interface is consistent with the rest of the helper functions so maybe it is ok as it is


if arguments.watch_directory:
watcher = DirectoryWatcher(arguments.directory, arguments.reads)
else:
watcher = DirectoryReader(arguments.directory, arguments.reads)

watcher.initialize()
return watcher


def _initialize_task_executor(arguments: argparse.Namespace) -> TaskExecutorInterface:

if arguments.cores <= 1:
return SequentialTaskExecutor()

return ParallelTaskExecutor(arguments.cores)


def _initialize_output_formatter(arguments: argparse.Namespace) -> OutputFormatterInterface:

if arguments.output_format == 'fasta':
return OutputFormatterFasta(arguments.output, arguments.gzip_output)
if arguments.output_format == 'fastq':
return OutputFormatterFastq(arguments.output, arguments.gzip_output)

return None
Loading