Skip to content

Commit

Permalink
♻️ acq2sqlite.py: move queries into class to doc and test
Browse files Browse the repository at this point in the history
  • Loading branch information
WillForan committed Oct 25, 2024
1 parent 2b9871b commit 2f0ad71
Showing 1 changed file with 153 additions and 82 deletions.
235 changes: 153 additions & 82 deletions acq2sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,110 +3,181 @@
convert db.txt into a sqlite database
"""
import sqlite3
import logging
import re
logging.basicConfig(level=logging.INFO)


def column_names():
"""
These names match what's used by dcmmeta2tsv.py and 00_build_db.bash
CSA first, normal dicom headers, and then filename
CSA first, normal dicom headers, and then filename.
Defaults to reading from ``taglist.txt``
This provides a language agnostic lookup for columns in ``schema.sql``
AND
* prepends Phase and iPAT
* appends filename
These column names should match what is output by
``./dcmmeta2tsv.bash`` or ``./dcmmeta2tsv.py``
Also see :py:func:`dcmmeta2tsv.read_known_tags`
>>> cn = column_names() # reads taglist.txt
>>> cn[0] # hard coded here
'Phase'
>>> cn[3] # from taglist.xt
'AcqDate'
"""
# CSA col names from 00_build_db.bash not in taglist.txt
colnames = ["Phase", "iPAT"]
with open("taglist.txt", "r") as f:
tag_colnames = [
line.split("\t")[0]
for line in f.readlines()
if not re.search("^name|^#", line)
]

# CSA col names from 00_build_db.bash not in taglist.txt
colnames = ["Phase", "iPAT"]
colnames += tag_colnames
colnames += [
"filename"
] # final file name column also not in taglist.txt (not a tag)
# final file name column also not in taglist.txt (not a tag)
colnames += [ "filename" ]
return colnames


COLNAMES = column_names()

### SQL queries
# These are the header values (now sql columns) that should be consistant for an acquistion ('SequenceName') in a specific study ('Project')
CONSTS = [
"Project",
"SequenceName",
"iPAT",
"Comments",
"SequenceType",
"PED_major",
"Phase",
"TR",
"TE",
"Matrix",
"PixelResol",
"BWP",
"BWPPE",
"FA",
"TA",
"FoV",
]

# So hopefully, they already exist and we can select them
find_cmd = "select rowid from acq_param where " + " and ".join(
[f"{col} = ?" for col in CONSTS]
)

# otherwise we'll need to create a new row
consts_ins_string = ",".join(CONSTS)
val_quests = ",".join(["?" for _ in CONSTS])
sql_cmd = f"INSERT INTO acq_param({consts_ins_string}) VALUES({val_quests});"

## we'll do the same thing for the acquisition paramaters (e.g. time and series number) that change very time -- only add if not already in the DB


ACQUNIQ = set(COLNAMES) - set(CONSTS) - set(["filename"])
assert ACQUNIQ == set(["AcqTime", "AcqDate", "SeriesNumber", "SubID", "Operator"])
# TODO: include station?

find_acq = "select rowid from acq where AcqTime like ? and AcqDate like ? and SubID = ? and SeriesNumber = ?"
acq_insert_columns = ["param_id"] + list(ACQUNIQ)
acq_insert = f"INSERT INTO acq({','.join(acq_insert_columns)}) VALUES({','.join(['?' for _ in acq_insert_columns])});"


def dict_to_db_row(d, sql):
class DBQuery:
"""
insert a dicom header (representive of acquistion) into db
Convient SQL queries for tracking dicom headers/metadata
Poorly implemented, ad-hoc bespoke ORM for ``schema.sql``
"""
# order here needs to match find_acq.
acq_search_vals = (d["AcqTime"], d["AcqDate"], d["SubID"], d["SeriesNumber"])
cur = sql.execute(find_acq, acq_search_vals)
acq = cur.fetchone()
if acq:
print(f"have acq {acq[0]} {acq_search_vals}")
return

val_array = [d[k] for k in CONSTS]
print(f"searching: {val_array}")
cur = sql.execute(find_cmd, val_array)
res = cur.fetchone()
if res:
rowid = res[0]
print(f"seq repeated: found exiting {rowid}")
else:
cur = sql.execute(sql_cmd, val_array)
rowid = cur.lastrowid
print(f"new seq: created {rowid}")
###
d["param_id"] = rowid
acq_insert_vals = [d[k] for k in acq_insert_columns]
cur = sql.execute(acq_insert, acq_insert_vals)
print(f"new acq: created {cur.lastrowid}")
CONSTS = [
"Project",
"SequenceName",
"iPAT",
"Comments",
"SequenceType",
"PED_major",
"Phase",
"TR",
"TE",
"Matrix",
"PixelResol",
"BWP",
"BWPPE",
"FA",
"TA",
"FoV",
]

def __init__(self, sql=None):
"""
Do a bunch of the query building up front:
* find existing ``acq``
* find existing ``acq_param``
* insert new into ``acq``
* insert new into ``acq_param``
"""
self.all_columns = column_names()
if sql:
self.sql = sql
else:
self.sql = sqlite3.connect("db.sqlite") # see schema.sql

### SQL queries
# These are the header values (now sql columns) that should be consistant for an acquistion ('SequenceName') in a specific study ('Project')
# So hopefully, they already exist and we can select them
self.find_cmd = "select rowid from acq_param where " + " and ".join(
[f"{col} = ?" for col in self.CONSTS]
)

# otherwise we'll need to create a new row
consts_ins_string = ",".join(self.CONSTS)
val_quests = ",".join(["?" for _ in self.CONSTS])
self.sql_cmd = f"INSERT INTO acq_param({consts_ins_string}) VALUES({val_quests});"

## we'll do the same thing for the acquisition paramaters
# (e.g. time and series number)
# that change very time.
# only add if not already in the DB
acq_uniq_col = set(self.all_columns) - set(self.CONSTS) - set(["filename"])
assert acq_uniq_col == set(["AcqTime", "AcqDate", "SeriesNumber", "SubID", "Operator"])
# TODO: include station?

self.find_acq = "select rowid from acq where AcqTime like ? and AcqDate like ? and SubID = ? and SeriesNumber = ?"
self.acq_insert_columns = ["param_id"] + list(acq_uniq_col)
acq_col_csv = ','.join(self.acq_insert_columns)
acq_q = ','.join(['?' for _ in self.acq_insert_columns])
self.acq_insert = f"INSERT INTO acq({acq_col_csv}) VALUES({acq_q});"

def check_acq(self, d):
"""
Is this exact acquisition (time, id, series) already in the database?
"""
acq_search_vals = (d["AcqTime"], d["AcqDate"], d["SubID"], d["SeriesNumber"])
cur = self.sql.execute(self.find_acq, acq_search_vals)
acq = cur.fetchone()
if acq:
logging.info(f"have acq {acq[0]} {acq_search_vals}")
return True
return False

def param_rowid(self, d):
"""
Find or insert the combination of parameters for an aquisition.
Using ``CONSTS``, the header parameters that should be invarient
across acquistions of the same name within a study.
>>> db = DBQuery(sqlite3.connect(':memory:'))
>>> with open('schema.sql') as f: [db.sql.execute(c) for c in f.read().split(";")]
...
>>> # db.sql.execute(".read schema.sql")
>>> example = {"Project": 'x', "SequenceName": 'a', 'TR': 1500}
>>> db.param_rowid(example)
1
>>> db.param_rowid(example)
1
>>> db.param_rowid({**example, 'Project': 'b'})
2
"""
val_array = [d.get(k) for k in self.CONSTS]
logging.info("searching: %s", val_array)
cur = self.sql.execute(self.find_cmd, val_array)
res = cur.fetchone()
if res:
rowid = res[0]
logging.info("seq repeated: found exiting %d", rowid)
else:
cur = self.sql.execute(self.sql_cmd, val_array)
rowid = cur.lastrowid
logging.info("new seq: created %d", rowid)

return rowid


def dict_to_db_row(self, d):
"""
insert a dicom header (representive of acquistion) into db
"""
# order here needs to match find_acq.
if self.check_acq(d):
return

rowid = self.param_rowid(d)
###
d["param_id"] = rowid
acq_insert_vals = [d[k] for k in self.acq_insert_columns]
cur = self.sql.execute(self.acq_insert, acq_insert_vals)
logging.info("new acq: created %d", cur.lastrowid)


if __name__ == "__main__":
sql = sqlite3.connect("db.sqlite") # see schema.sql
db = DBQuery()
with open("db.txt", "r") as f:
while line := f.readline():
vals = line.split("\t")
d = dict(zip(COLNAMES, vals))
dict_to_db_row(d, sql)
d = dict(zip(db.all_columns, vals))
db.dict_to_db_row(d)

sql.commit()
db.sql.commit()

0 comments on commit 2f0ad71

Please sign in to comment.