-
Notifications
You must be signed in to change notification settings - Fork 0
/
sdf_helpers.py
76 lines (59 loc) · 2.39 KB
/
sdf_helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import mmap
from collections import deque
from pathlib import Path
from typing import Deque, Dict, Iterable, List, Tuple
from rdkit.Chem import MolFromMolBlock, SDWriter
def mmap_file(path: Path) -> mmap.mmap:
with open(path, "r") as file:
mmapped_file = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)
return mmapped_file
def find_structures_bytes_ranges(mmapped_file: mmap.mmap) -> Dict[int, Tuple[int, int]]:
structures_ranges: Dict[int, Tuple[int, int]] = {}
start_offset: int = 0
prev_line: bytes = b""
prev_prev_line: bytes = b""
for line in iter(mmapped_file.readline, b""):
if line.startswith(b"$$$$"):
end_offset = mmapped_file.tell() - len(line)
identifier = int(prev_prev_line.strip().decode())
structures_ranges[identifier] = (start_offset, end_offset)
start_offset = mmapped_file.tell()
prev_prev_line, prev_line = prev_line, line
return structures_ranges
def read_selected_ranges(
mmapped_file: mmap.mmap, ranges_to_read: List[Tuple[int, int]]
) -> str:
selected_lines: Deque[str] = deque()
for start, end in ranges_to_read:
selected_lines.append(mmapped_file[start:end].decode())
return "".join(selected_lines)
def write_mols_to_sdf(path: Path, sdf_blocks: Iterable[Tuple[int, str]]) -> None:
with open(str(path), "a") as f:
sorted_sdf_blocks = sorted(sdf_blocks, key=lambda x: x[0])
with SDWriter(f) as w:
for wid, sdf_block in sorted_sdf_blocks:
mol = MolFromMolBlock(sdf_block)
if mol:
mol.SetProp("WID", str(wid))
w.write(mol)
def main():
file_path = "data/lotus.sdf"
mmapped_file = mmap_file(file_path)
structures_ranges = find_structures_bytes_ranges(mmapped_file)
# print(structures_ranges)
start_time = time.time()
ranges_to_read = [
structures_ranges[key] for key in list(structures_ranges.keys())[:100000]
]
if ranges_to_read:
selected_lines = read_selected_ranges(mmapped_file, ranges_to_read)
else:
logging.info("No '$$$$' occurrences found in the file.")
end_time = time.time()
logging.info(f"Time taken to get the blocks: {end_time - start_time} seconds")
# print(selected_lines)
if __name__ == "__main__":
import logging
import time
logging.basicConfig(level=logging.INFO)
main()