From 9324a3509ff6d8c022b7c49bdc4c4d25b9984dc5 Mon Sep 17 00:00:00 2001 From: Josiah Seaman Date: Tue, 23 Jul 2019 21:38:22 +0100 Subject: [PATCH] #4 WIP: Changing over to database style access data models while maintaining features except for SlicedGraph --- Graph/gfa.py | 6 +- Graph/models.py | 257 ++++++++++++++++++++++-------------------------- Graph/sort.py | 2 +- Graph/test.py | 182 +++++++++++++++++----------------- 4 files changed, 215 insertions(+), 232 deletions(-) diff --git a/Graph/gfa.py b/Graph/gfa.py index 462ffe7..c8250c8 100644 --- a/Graph/gfa.py +++ b/Graph/gfa.py @@ -118,18 +118,18 @@ def to_paths(self) -> List[Path]: node_hash = {} for segment in self.gfa.segments: node_id = segment.name + "+" - node = Node(segment.sequence, []) + node = Node(segment.sequence) node_hash[node_id] = node node_id = segment.name + "-" - node = Node(segment.sequence, []) + node = Node(segment.sequence) node_hash[node_id] = node paths = [] for path in self.gfa.paths: nodes = [] for node in path.segment_names: - node_index = NodeTraversal(Node(node_hash[node.name + node.orient].seq, [], node.name), node.orient) + node_index = NodeTraversal(Node(node_hash[node.name + node.orient].seq, node.name), node.orient) nodes.append(node_index) paths.append(Path(path.name, nodes)) diff --git a/Graph/models.py b/Graph/models.py index 42312bc..02f3508 100644 --- a/Graph/models.py +++ b/Graph/models.py @@ -19,18 +19,16 @@ class NodeMissingError(ValueError): pass -class Node:#(models.Model): - # seq = models.CharField(max_length=255, blank=True) - # paths = models.ManyToOneRel(Path) +class Node(models.Model): + seq = models.CharField(max_length=255, blank=True) + id = models.IntegerField(primary_key=True) # display_name = models.CharField(max_length=255, blank=True) - def __init__(self, seq: str, paths: 'Iterable[Path]', id: str = None): - assert isinstance(seq, str), seq - self.id = id if id else str(uuid1()) - self.seq = seq - self.paths = set() # Set[PathIndex] + @classmethod + def build(cls, seq: str, paths: 'Iterable[Path]', id: str = None): + node = Node.objects.create(seq) for p in paths: - self.append_path(p) + NodeTraversal.objects.create(node, path) def __len__(self): return len(self.paths) @@ -49,11 +47,10 @@ def __eq__(self, other): def __hash__(self): return hash(self.seq) - def append_path(self, path): - """Instead: Use Path.append_node if possible""" - assert isinstance(path, Path), path - self.paths.add(PathIndex(path, len(path.nodes))) # not parallelizable - path.nodes.append(NodeTraversal(self)) + # def append_path(self, path): + # """Instead: Use Path.append_node if possible""" + # assert isinstance(path, Path), path + # NodeTraversal.objects.create(self, path) def to_gfa(self, segment_id: int): return '\t'.join(['S', str(segment_id), self.seq]) @@ -126,15 +123,16 @@ def smallest(self): version = 1.0 -class Path: +class Path(models.Model): """Paths represent the linear order of on particular individual (accession) as its genome was sequenced. A path visits a series of nodes and the ordered concatenation of the node sequences is the accession's genome. Create Paths first from accession names, then append them to Nodes to link together.""" - def __init__(self, accession: str, nodes = []): - self.accession = accession # one path per accessions - self.nodes = nodes # List[NodeTraversal] - self.position_checkpoints = {} # TODO: currently not used + accession = models.CharField(unique=True, max_length=1000) # one path per accession + + # def __init__(self, accession: str, nodes = []): + # # self.nodes = nodes # List[NodeTraversal] + # self.position_checkpoints = {} # TODO: currently not used def __getitem__(self, path_index): return self.nodes[path_index] @@ -149,11 +147,14 @@ def __eq__(self, other): def __hash__(self): return hash(self.accession) + @property + def nodes(self): + return NodeTraversal.objects.get(path=self)#.order_by('order') + def append_node(self, node: Node, strand: str): """This is the preferred way to build a graph in a truly non-linear way. NodeTraversal is appended to Path (order dependent) and PathIndex is added to Node (order independent).""" - self.nodes.append(NodeTraversal(node, strand)) - node.paths.add(PathIndex(self, len(self.nodes)-1)) # already appended node + NodeTraversal.objects.create(node, self, strand) return node def name(self): @@ -163,34 +164,14 @@ def to_gfa(self): return '\t'.join(['P', self.accession, "+,".join([x.node.name + x.strand for x in self.nodes]) + "+", ",".join(['*' for x in self.nodes])]) -class PathIndex: - """Link from a Node to the place in the path where the Node is referenced. A Node can appear - in a Path multiple times. Index indicates which instance it is.""" - def __init__(self, path: Path, index: int): - self.path = path - self.index = index - - def __repr__(self): - return repr(self.path.accession) - - def __eq__(self, other): - if self.path.accession == other.path.accession: # and self.index == other.index: - return True - else: - return False - - def __lt__(self, other): - return self.path.accession < other.path.accession - - def __hash__(self): - return hash(self.path.accession) # * (self.index if self.index else 1) -class NodeTraversal: +class NodeTraversal(models.Model): """Link from a Path to a Node it is currently traversing. Includes strand""" - def __init__(self, node: Node, strand: str = '+'): - self.node = node - self.strand = strand # TODO: make this required + node = models.ForeignKey(Node, on_delete=models.CASCADE) + path = models.ForeignKey(Path, on_delete=models.CASCADE, help_text='') + order = models.AutoField(primary_key=True, help_text='Defines the order a path lists traversals') + strand = models.CharField(choices=[('+', '+'),('-', '-')], default='+', max_length=1) def __repr__(self): if self.strand == '+': @@ -259,94 +240,96 @@ def append_node_to_path(self, node_id, strand, path_name): else: raise ValueError("Provide the id of the node, not", node_id) self.paths[path_name].append_node(self.nodes[node_id], strand) - - def compute_slices(self): - """Alias: Upgrades a Graph to a SlicedGraph""" - return SlicedGraph.from_graph(self) - - -class SlicedGraph(Graph): - def __init__(self, paths): - super(SlicedGraph, self).__init__(paths) - """Factory for generating graphs from a representation""" - self.slices = [] # only get populated by compute_slices() - - if not self.slices: - self.compute_slices() - - def __eq__(self, representation): - if isinstance(representation, SlicedGraph): - return all(slice_a == slice_b for slice_a, slice_b in zip_longest(self.slices, representation.slices)) - return self == SlicedGraph.build(representation) # build a graph then compare it - - def __repr__(self): - """Warning: the representation strings are very sensitive to whitespace""" - return self.slices.__repr__() - - def __getitem__(self, i): - return self.slices[i] - - @staticmethod - def from_graph(graph): - g = SlicedGraph([]) - g.paths = graph.paths # shallow copy all relevant fields - g.nodes = graph.nodes - g.compute_slices_by_dagify() - return g - - def compute_slices(self): - """TODO: This is a mockup stand in for the real method.""" - if not self.paths: # nothing to do - return self - first_path = next(iter(self.paths.values())) - for node_traversal in first_path: - node = node_traversal.node - self.slices.append(Slice([node])) - return self - - def compute_slices_by_dagify(self): - """This method uses DAGify algorithm to compute slices.""" - from Graph.sort import DAGify # help avoid circular import - - if not self.paths: - return self - dagify = DAGify(self.paths) - profile = dagify.generate_profiles(0) - slices = dagify.to_slices(profile) - self.slices = slices - return self - - @staticmethod - def build(cmd): - """This factory uses existing slice declarations to build a graph with Paths populated in the order - that they are mentioned in the slices. Currently, this is + only and does not support non-linear - orderings. Use Path.append_node() to build non-linear graphs.""" - if isinstance(cmd, str): - cmd = eval(cmd) - # preemptively grab all the path names from every odd list entry - paths = {key for sl in cmd for i in range(0, len(sl), 2) for key in sl[i + 1]} - graph = SlicedGraph(paths) - graph.slices = [] - for sl in cmd: - current_slice = [] - if isinstance(sl, Slice): - graph.slices.append(sl) - else: - if isinstance(sl[0], Node): # already Nodes, don't need to build - current_slice = sl - else: - try: - for i in range(0, len(sl), 2): - paths = [graph.paths[key] for key in sl[i + 1]] - current_slice.append(Node(sl[i], paths)) - except IndexError: - raise IndexError("Expecting two terms: ", sl[0]) # sl[i:i+2]) - - graph.slices.append(Slice(current_slice)) - return graph - - @classmethod - def load_from_slices(cls, slices, paths): - graph = cls(paths) - graph.slices = slices - return graph +# +# def compute_slices(self): +# """Alias: Upgrades a Graph to a SlicedGraph""" +# return SlicedGraph.from_graph(self) +# # +# +# class SlicedGraph(Graph): +# def __init__(self, paths): +# super(SlicedGraph, self).__init__(paths) +# """Factory for generating graphs from a representation""" +# self.slices = [] # only get populated by compute_slices() +# +# if not self.slices: +# self.compute_slices() +# +# def __eq__(self, representation): +# if isinstance(representation, SlicedGraph): +# return all(slice_a == slice_b for slice_a, slice_b in zip_longest(self.slices, representation.slices)) +# return self == SlicedGraph.build(representation) # build a graph then compare it +# +# def __repr__(self): +# """Warning: the representation strings are very sensitive to whitespace""" +# return self.slices.__repr__() +# +# def __getitem__(self, i): +# return self.slices[i] +# +# @staticmethod +# def from_graph(graph): +# g = SlicedGraph([]) +# g.paths = graph.paths # shallow copy all relevant fields +# g.nodes = graph.nodes +# g.compute_slices_by_dagify() +# return g +# +# def compute_slices(self): +# """TODO: This is a mockup stand in for the real method.""" +# if not self.paths: # nothing to do +# return self +# first_path = next(iter(self.paths.values())) +# for node_traversal in first_path: +# node = node_traversal.node +# self.slices.append(Slice([node])) +# return self +# +# def compute_slices_by_dagify(self): +# """This method uses DAGify algorithm to compute slices.""" +# from Graph.sort import DAGify # help avoid circular import +# +# if not self.paths: +# return self +# dagify = DAGify(self.paths) +# profile = dagify.generate_profiles(0) +# slices = dagify.to_slices(profile) +# self.slices = slices +# return self +# +# @staticmethod +# def build(cmd): +# """This factory uses existing slice declarations to build a graph with Paths populated in the order +# that they are mentioned in the slices. Currently, this is + only and does not support non-linear +# orderings. Use Path.append_node() to build non-linear graphs.""" +# if isinstance(cmd, str): +# cmd = eval(cmd) +# # preemptively grab all the path names from every odd list entry +# paths = {key for sl in cmd for i in range(0, len(sl), 2) for key in sl[i + 1]} +# graph = SlicedGraph(paths) +# graph.slices = [] +# for sl in cmd: +# current_slice = [] +# if isinstance(sl, Slice): +# graph.slices.append(sl) +# else: +# if isinstance(sl[0], Node): # already Nodes, don't need to build +# current_slice = sl +# else: +# try: +# for i in range(0, len(sl), 2): +# paths = [graph.paths[key] for key in sl[i + 1]] +# node = Node(sl[i], paths) +# node.add_paths(paths) +# current_slice.append(node) +# except IndexError: +# raise IndexError("Expecting two terms: ", sl[0]) # sl[i:i+2]) +# +# graph.slices.append(Slice(current_slice)) +# return graph +# +# @classmethod +# def load_from_slices(cls, slices, paths): +# graph = cls(paths) +# graph.slices = slices +# return graph diff --git a/Graph/sort.py b/Graph/sort.py index 2074a3b..e9837ae 100644 --- a/Graph/sort.py +++ b/Graph/sort.py @@ -2,7 +2,7 @@ import dataclasses from typing import List -from Graph.models import NodeTraversal, Path, Slice, Node, SlicedGraph +from Graph.models import NodeTraversal, Path, Slice, Node @dataclasses.dataclass diff --git a/Graph/test.py b/Graph/test.py index 3a6df76..f39adb2 100644 --- a/Graph/test.py +++ b/Graph/test.py @@ -2,7 +2,7 @@ import os from os.path import join from Graph.gfa import GFA -from Graph.models import Graph, Slice, Node, Path, SlicedGraph +from Graph.models import Graph, Slice, Node, Path#, SlicedGraph from Graph.sort import DAGify # Define the working directory @@ -10,12 +10,12 @@ PATH_TO_TEST_DATA = join(BASE_DIR, "test_data") location_of_xg = join(BASE_DIR, "test_data","xg") - -def G(rep): - """Short hand for Graph construction that returns a slice""" - if len(rep) > 1: - raise ValueError("Warning: only the first slice will be returned.", rep) - return SlicedGraph.build(rep)[0] +# +# def G(rep): +# """Short hand for Graph construction that returns a slice""" +# if len(rep) > 1: +# raise ValueError("Warning: only the first slice will be returned.", rep) +# return SlicedGraph.build(rep)[0] a, b, c, d, e = 'a', 'b', 'c', 'd', 'e' # Paths must be created first @@ -38,26 +38,26 @@ class GraphTest(unittest.TestCase): ['C', {a, b, c}, 'T', {d}], # [10]path slip ['TATA', {a, b, c, d}]] # [11] anchor - def example_graph(self): - # IMPORTANT: Never reuse Paths: Paths must be created fresh for each graph - a, b, c, d, e = Path('a'), Path('b'), Path('c'), Path('d'), Path('e') - paths = [a, b, c, d, e] - factory_input = [Slice([Node('ACGT', {a,b,c,d})]), - Slice([Node('C',{a,b,d}),Node('T', {c})]), - Slice([Node('GGA',{a,b,c,d})]), - Slice([Node('C',{a,b,d}),Node('', {c})]), - Slice([Node('AGTACG',{a,b,c}), Node('CGTACT',{d})]), - Slice([Node('TTG',{a,b,c,d})]), - Slice([Node('A', {a, b}), Node('C', {d, e}), Node('T', {c})]), # third allele - Slice([Node('GG', {a, b}), Node('TT', {c, d})]), # equal size nodes - Slice([Node('C', {a, b, c, e}), Node('T', {d})]), - Slice([Node('C', {a, b, e}), Node('T', {c, d})]), - Slice([Node('C', {a, b, c}), Node('T', {d})]), - Slice([Node('TATA', {a, b, c, d})]) # anchor - ] - - base_graph = SlicedGraph.load_from_slices(factory_input, paths) - return base_graph + # def example_graph(self): + # # IMPORTANT: Never reuse Paths: Paths must be created fresh for each graph + # a, b, c, d, e = Path('a'), Path('b'), Path('c'), Path('d'), Path('e') + # paths = [a, b, c, d, e] + # factory_input = [Slice([Node('ACGT', {a,b,c,d})]), + # Slice([Node('C',{a,b,d}),Node('T', {c})]), + # Slice([Node('GGA',{a,b,c,d})]), + # Slice([Node('C',{a,b,d}),Node('', {c})]), + # Slice([Node('AGTACG',{a,b,c}), Node('CGTACT',{d})]), + # Slice([Node('TTG',{a,b,c,d})]), + # Slice([Node('A', {a, b}), Node('C', {d, e}), Node('T', {c})]), # third allele + # Slice([Node('GG', {a, b}), Node('TT', {c, d})]), # equal size nodes + # Slice([Node('C', {a, b, c, e}), Node('T', {d})]), + # Slice([Node('C', {a, b, e}), Node('T', {c, d})]), + # Slice([Node('C', {a, b, c}), Node('T', {d})]), + # Slice([Node('TATA', {a, b, c, d})]) # anchor + # ] + # + # base_graph = SlicedGraph.load_from_slices(factory_input, paths) + # return base_graph def test_equalities(self): self.assertEqual(Node('A', {}),Node('A', {})) @@ -65,24 +65,24 @@ def test_equalities(self): self.assertEqual(Node('A', {Path('x'),Path('y')}),Node('A', {Path('x'),Path('y')})) self.assertEqual(Slice([Node('ACGT', {Path('a'), Path('b'), Path('c'), Path('d')})]), Slice([Node('ACGT', {Path('a'), Path('b'), Path('c'), Path('d')})])) - self.assertEqual(SlicedGraph.build([['ACGT', {a, b, c, d}]]), SlicedGraph.build([['ACGT', {a, b, c, d}]])) - - def test_graph_factory(self): - base_graph = self.example_graph() - g1, g2 = SlicedGraph.build(self.factory_input), SlicedGraph.build(self.factory_input) - assert g1 == g2, \ - ('\n' + repr(g1) + '\n' + repr(g2)) - g_double = SlicedGraph.build(eval(str(base_graph))) - # WARN: Never compare two string literals: could be order sensitive, one object must be Graph - #str(g_double) == str(base_graph) - assert g_double == base_graph, repr(g_double) + '\n' + repr(base_graph) - assert g1 == base_graph, repr(g1) + '\n' + repr(base_graph) - assert g_double == self.factory_input - assert g_double == str(self.factory_input) - - def test_G(self): - with self.assertRaises(ValueError): - G([['C', {Path('a'), Path('b')}], ['T', {Path('12'), Path('16')}]]) + # self.assertEqual(SlicedGraph.build([['ACGT', {a, b, c, d}]]), SlicedGraph.build([['ACGT', {a, b, c, d}]])) + + # def test_graph_factory(self): + # base_graph = self.example_graph() + # g1, g2 = SlicedGraph.build(self.factory_input), SlicedGraph.build(self.factory_input) + # assert g1 == g2, \ + # ('\n' + repr(g1) + '\n' + repr(g2)) + # g_double = SlicedGraph.build(eval(str(base_graph))) + # # WARN: Never compare two string literals: could be order sensitive, one object must be Graph + # #str(g_double) == str(base_graph) + # assert g_double == base_graph, repr(g_double) + '\n' + repr(base_graph) + # assert g1 == base_graph, repr(g1) + '\n' + repr(base_graph) + # assert g_double == self.factory_input + # assert g_double == str(self.factory_input) + # + # def test_G(self): + # with self.assertRaises(ValueError): + # G([['C', {Path('a'), Path('b')}], ['T', {Path('12'), Path('16')}]]) x,y,z,a = 'x', 'y', 'z', 'a' @@ -90,52 +90,52 @@ class DAGifyTest(unittest.TestCase): """ test class of sort.py """ - def test_dagify(self): gfa = GFA.load_from_gfa(join(PATH_TO_TEST_DATA, "test.gfa")) paths = gfa.to_paths dagify = DAGify(paths) profile = dagify.generate_profiles(0) - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + slices = dagify.to_slices(profile) + # graph = SlicedGraph.load_from_slices(slices, paths) # x, y, z = graph.paths['x'], graph.paths['y'], graph.paths['z'] - self.assertEqual([['CAAATAAG', {x,y,z}], ['A', {y,z}, 'G', {x}], ['C', {x,y,z}], ['TTG', {x,y,z}], ['A', {z}, 'G', {x,y}], ['AAATTTTCTGGAGTTCTAT', {x,y,z}], ['T', {x,y,z}], ['ATAT', {x,y,z}], ['T', {x,y,z}], ['CCAACTCTCTG', {x,y,z}]], graph) + # self.assertEqual([['CAAATAAG', {x,y,z}], ['A', {y,z}, 'G', {x}], ['C', {x,y,z}], ['TTG', {x,y,z}], ['A', {z}, 'G', {x,y}], ['AAATTTTCTGGAGTTCTAT', {x,y,z}], ['T', {x,y,z}], ['ATAT', {x,y,z}], ['T', {x,y,z}], ['CCAACTCTCTG', {x,y,z}]], graph) def test_dagify2(self): gfa = GFA.load_from_gfa(join(PATH_TO_TEST_DATA, "test2.gfa")) paths = gfa.to_paths dagify = DAGify(paths) profile = dagify.generate_profiles(0) - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) - x,y,z,a = 'x', 'y', 'z', 'a' - self.assertEqual([['CAAATAAG', {x, y, z}], ['G', {x}, 'A', {y, z}], ['C', {x, y}, 'T', {z}], ['TTG', {x, y, z}], ['G', {x, y}, 'A', {a, z}], ['AAATTTTCTGGAGTTCTAT', {a, x, y, z}], ['A', {a, z}, 'T', {x, y}], ['ATAT', {x, y, z}], ['T', {x, y, z}], ['CCAACTCTCTG', {x, y, z}]], graph) + # graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + # x,y,z,a = 'x', 'y', 'z', 'a' + # self.assertEqual([['CAAATAAG', {x, y, z}], ['G', {x}, 'A', {y, z}], ['C', {x, y}, 'T', {z}], ['TTG', {x, y, z}], ['G', {x, y}, 'A', {a, z}], ['AAATTTTCTGGAGTTCTAT', {a, x, y, z}], ['A', {a, z}, 'T', {x, y}], ['ATAT', {x, y, z}], ['T', {x, y, z}], ['CCAACTCTCTG', {x, y, z}]], graph) def test_dagify3(self): gfa = GFA.load_from_gfa(join(PATH_TO_TEST_DATA, "test3.gfa")) paths = gfa.to_paths dagify = DAGify(paths) profile, rep_count = dagify.generate_profiles_with_minimizing_replications() - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) self.assertEqual(rep_count, 1) - self.assertEqual(graph, [['CAAATAAG', {x, y}], ['CCAACTCTCTG', {y}, 'G', {x}], ['C', {x, y}], ['TTG', {x, y}], ['G', {x, y}], ['AAATTTTCTGGAGTTCTAT', {x, y}], ['T', {x, y}], ['ATAT', {x, y}], ['T', {x, y}], ['CCAACTCTCTG', {x, y}]]) + # graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + # self.assertEqual(graph, [['CAAATAAG', {x, y}], ['CCAACTCTCTG', {y}, 'G', {x}], ['C', {x, y}], ['TTG', {x, y}], ['G', {x, y}], ['AAATTTTCTGGAGTTCTAT', {x, y}], ['T', {x, y}], ['ATAT', {x, y}], ['T', {x, y}], ['CCAACTCTCTG', {x, y}]]) def test_dagify_altpath(self): gfa = GFA.load_from_gfa(join(PATH_TO_TEST_DATA, "alternate_paths.gfa")) paths = gfa.to_paths dagify = DAGify(paths) profile, rep_count = dagify.generate_profiles_with_minimizing_replications() - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) self.assertEqual(rep_count, 1) - self.assertEqual(graph, [['CAAATAAG', {x, y}], ['A', {x}, '', {y}], ['G', {x, y}], ['A', {y}, '', {x}], ['T', {x, y}]]) + # graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + # self.assertEqual(graph, [['CAAATAAG', {x, y}], ['A', {x}, '', {y}], ['G', {x, y}], ['A', {y}, '', {x}], ['T', {x, y}]]) def test_dagify_dup(self): gfa = GFA.load_from_gfa(join(PATH_TO_TEST_DATA, "duplicate.gfa")) paths = gfa.to_paths dagify = DAGify(paths) profile, rep_count = dagify.generate_profiles_with_minimizing_replications() - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) self.assertEqual(rep_count, 2) - self.assertEqual(graph, [['CAAATAAG', {x, y}], ['', {x}, 'A', {y}], ['', {x}, 'G', {y}], ['A', {x, y}], ['G', {x, y}], ['T', {x, y}]]) + # graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + # self.assertEqual(graph, [['CAAATAAG', {x, y}], ['', {x}, 'A', {y}], ['', {x}, 'G', {y}], ['A', {x, y}], ['G', {x, y}], ['T', {x, y}]]) def test_unresolved_repreat(self): @@ -143,8 +143,8 @@ def test_unresolved_repreat(self): paths = gfa.to_paths dagify = DAGify(paths) profile, rep_count = dagify.generate_profiles_with_minimizing_replications() - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) - self.assertEqual([['CAAATAAG', {'x'}, 'T', {'y'}], ['A', {'y', 'x'}], ['G', {'x'}, 'C', {'y'}]], graph) + # graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + # self.assertEqual([['CAAATAAG', {'x'}, 'T', {'y'}], ['A', {'y', 'x'}], ['G', {'x'}, 'C', {'y'}]], graph) @unittest.skip("Inversion is unsupported") def test_inversion(self): @@ -152,8 +152,8 @@ def test_inversion(self): paths = gfa.to_paths dagify = DAGify(paths) profile, rep_count = dagify.generate_profiles_with_minimizing_replications() - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) - self.assertEqual(graph, []) + # graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + # self.assertEqual(graph, []) @unittest.skip("Inversion is unsupported") def test_nested_inversion(self): @@ -161,8 +161,8 @@ def test_nested_inversion(self): paths = gfa.to_paths dagify = DAGify(paths) profile, rep_count = dagify.generate_profiles_with_minimizing_replications() - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) - self.assertEqual(graph, []) + # graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + # self.assertEqual(graph, []) @unittest.skip("Inversion is unsupported") def test_simple_inversion(self): @@ -170,8 +170,8 @@ def test_simple_inversion(self): paths = gfa.to_paths dagify = DAGify(paths) profile, rep_count = dagify.generate_profiles_with_minimizing_replications() - graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) - self.assertEqual(graph, [['CAAATAAG', {x,y}], ['AC', {x}, 'AC', {y}], ['G', {x, y}]]) + # graph = SlicedGraph.load_from_slices(dagify.to_slices(profile), paths) + # self.assertEqual(graph, [['CAAATAAG', {x,y}], ['AC', {x}, 'AC', {y}], ['G', {x, y}]]) @@ -194,25 +194,25 @@ def test_load_gfa_to_graph(self): self.assertEqual(len(graph.paths), 3) self.assertEqual(len(graph.nodes), 15) - def test_gfa_to_sliced_graph(self): - graph, gfa = self.make_graph_from_gfa() - slices = SlicedGraph.from_graph(graph) - x = 'x' - y = 'y' - z = 'z' - print(slices) - self.assertEqual(slices, [['CAAATAAG', {x, y, z}], ['A', {y, z}, 'G', {x}], ['C', {x, y, z}], ['TTG', {x, y, z}], ['A', {z}, 'G', {x, y}], ['AAATTTTCTGGAGTTCTAT', {x, y, z}], ['T', {x, y, z}], ['ATAT', {x, y, z}], ['T', {x, y, z}], ['CCAACTCTCTG', {x, y, z}]]) - - def test_gfa_to_sliced_graph_via_dagify(self): - #TODO: this is currently close but not quite there. - # Slices must be fully defined in SlicedGraph.compute_slices() - graph, gfa = self.make_graph_from_gfa() - slices = SlicedGraph.from_graph(graph) - x = 'x' - y = 'y' - z = 'z' - print(slices) - self.assertEqual(slices, [['CAAATAAG', {x, y, z}], ['A', {y, z}, 'G', {x}], ['C', {x, y, z}], ['TTG', {x, y, z}], ['A', {z}, 'G', {x, y}], ['AAATTTTCTGGAGTTCTAT', {x, y, z}], ['T', {x, y, z}], ['ATAT', {x, y, z}], ['T', {x, y, z}], ['CCAACTCTCTG', {x, y, z}]]) + # def test_gfa_to_sliced_graph(self): + # graph, gfa = self.make_graph_from_gfa() + # slices = SlicedGraph.from_graph(graph) + # x = 'x' + # y = 'y' + # z = 'z' + # print(slices) + # self.assertEqual(slices, [['CAAATAAG', {x, y, z}], ['A', {y, z}, 'G', {x}], ['C', {x, y, z}], ['TTG', {x, y, z}], ['A', {z}, 'G', {x, y}], ['AAATTTTCTGGAGTTCTAT', {x, y, z}], ['T', {x, y, z}], ['ATAT', {x, y, z}], ['T', {x, y, z}], ['CCAACTCTCTG', {x, y, z}]]) + # + # def test_gfa_to_sliced_graph_via_dagify(self): + # #TODO: this is currently close but not quite there. + # # Slices must be fully defined in SlicedGraph.compute_slices() + # graph, gfa = self.make_graph_from_gfa() + # slices = SlicedGraph.from_graph(graph) + # x = 'x' + # y = 'y' + # z = 'z' + # print(slices) + # self.assertEqual(slices, [['CAAATAAG', {x, y, z}], ['A', {y, z}, 'G', {x}], ['C', {x, y, z}], ['TTG', {x, y, z}], ['A', {z}, 'G', {x, y}], ['AAATTTTCTGGAGTTCTAT', {x, y, z}], ['T', {x, y, z}], ['ATAT', {x, y, z}], ['T', {x, y, z}], ['CCAACTCTCTG', {x, y, z}]]) def make_graph_from_gfa(self): gfa = GFA.load_from_gfa(join(PATH_TO_TEST_DATA, "test.gfa")) @@ -235,13 +235,13 @@ def test_load_gfa_via_xg(self): graph.save_as_xg(join(PATH_TO_TEST_DATA, "test.xg"), location_of_xg) graph2 = GFA.load_from_xg(join(PATH_TO_TEST_DATA, "test.xg"), location_of_xg) graph = graph2.to_graph - graph = SlicedGraph.from_graph(graph) - x = 'x' - y = 'y' - z = 'z' - self.assertEqual(graph, [['CAAATAAG', {x, y, z}], ['A', {y, z}, 'G', {x}], ['C', {x, y, z}], ['TTG', {x, y, z}], - ['A', {z}, 'G', {x, y}], ['AAATTTTCTGGAGTTCTAT', {x, y, z}], ['T', {x, y, z}], - ['ATAT', {x, y, z}], ['T', {x, y, z}], ['CCAACTCTCTG', {x, y, z}]]) + # graph = SlicedGraph.from_graph(graph) + # x = 'x' + # y = 'y' + # z = 'z' + # self.assertEqual(graph, [['CAAATAAG', {x, y, z}], ['A', {y, z}, 'G', {x}], ['C', {x, y, z}], ['TTG', {x, y, z}], + # ['A', {z}, 'G', {x, y}], ['AAATTTTCTGGAGTTCTAT', {x, y, z}], ['T', {x, y, z}], + # ['ATAT', {x, y, z}], ['T', {x, y, z}], ['CCAACTCTCTG', {x, y, z}]]) @staticmethod def is_different(gfa1, gfa2):