Skip to content

Commit

Permalink
db_info_subcommands: handle more tables
Browse files Browse the repository at this point in the history
Problem: The export-db and pop-db commands only handle a couple of
columns from just the association_table and bank_table. These commands
would be more robust if they included export and import support from all
of the tables.

Restructure the export_db_info() function to return data from *all* of
the tables in the flux-accounting DB into separate .csv files, labeled
with their table names in the database.

Restructure the populate_db() function to handle populating any of the
tables in the flux-accounting DB with a corresponding .csv file. Add an
optional argument to the function to allow the user to specify which
columns to include from the file when populating the table.
  • Loading branch information
cmoussa1 committed Nov 12, 2024
1 parent 62e4e11 commit f3ed3c3
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 137 deletions.
171 changes: 90 additions & 81 deletions src/bindings/python/fluxacct/accounting/db_info_subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,90 +10,99 @@
# SPDX-License-Identifier: LGPL-3.0
###############################################################
import csv
import sqlite3

from fluxacct.accounting import bank_subcommands as b
from fluxacct.accounting import user_subcommands as u

def export_db_info(conn):
"""
Export all of the information from the tables in the flux-accounting DB into
separate .csv files.
"""
try:
cur = conn.cursor()
# get all tables from DB
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cur.fetchall()

# loop through each table and export it to a separate .csv file
for table_name in tables:
output_csv = f"{table_name[0]}.csv"
cur.execute(f"SELECT * FROM {table_name[0]}")
rows = cur.fetchall()
column_names = [description[0] for description in cur.description]

# write data to .csv file
with open(output_csv, "w", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(column_names)
writer.writerows(rows)
except sqlite3.OperationalError as exc:
raise sqlite3.OperationalError(f"export-db: {exc}")
except IOError as exc:
raise IOError(f"export-db: {exc}")


def populate_db(conn, csv_file, columns_included=None):
"""
Populate an existing table from a single .csv file with an option
to specify columns to include. The .csv file must have the column names in
the first line to indicate which columns to insert into the table.
Args:
csv_file: Path to the .csv file. The name of the .csv file must match the
name of the table in the flux-accounting DB.
columns_included (list, optional): List of columns to include from the .csv
file. If None, it will include all columns listed in the .csv file.
def export_db_info(conn, users=None, banks=None):
Raises:
ValueError: If the table derived from the .csv file name does not match
any of the tables in the flux-accounting DB.
"""
try:
cur = conn.cursor()
select_users_stmt = """
SELECT username, userid, bank, shares, max_running_jobs, max_active_jobs,
max_nodes, queues FROM association_table
"""
cur.execute(select_users_stmt)
table = cur.fetchall()

# open a .csv file for writing
users_filepath = users if users else "users.csv"
users_file = open(users_filepath, "w")
with users_file:
writer = csv.writer(users_file)

for row in table:
writer.writerow(row)

select_banks_stmt = """
SELECT bank, parent_bank, shares FROM bank_table
"""
cur.execute(select_banks_stmt)
table = cur.fetchall()

banks_filepath = banks if banks else "banks.csv"
banks_file = open(banks_filepath, "w")
with banks_file:
writer = csv.writer(banks_file)

for row in table:
writer.writerow(row)
except IOError as err:
print(err)


def populate_db(conn, users=None, banks=None):
if banks is not None:
try:
with open(banks) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")

for row in csv_reader:
b.add_bank(
conn,
bank=row[0],
parent_bank=row[1],
shares=row[2],
)
except IOError as err:
print(err)

if users is not None:
try:
with open(users) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")

# assign default values to fields if
# their slot is empty in the csv file
for row in csv_reader:
username = row[0]
uid = row[1]
bank = row[2]
shares = row[3] if row[3] != "" else 1
max_running_jobs = row[4] if row[4] != "" else 5
max_active_jobs = row[5] if row[5] != "" else 7
max_nodes = row[6] if row[6] != "" else 2147483647
queues = row[7]

u.add_user(
conn,
username,
bank,
uid,
shares,
max_running_jobs,
max_active_jobs,
max_nodes,
queues,

# extract table name from .csv filename; check if it exists in DB
table_name = csv_file.split("/")[-1].replace(".csv", "")
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [table[0] for table in cur.fetchall()]

if table_name not in tables:
raise ValueError(
f'pop-db: table "{table_name}" does not exist in the database'
)

with open(csv_file, "r", newline="") as file:
reader = csv.reader(file)
all_columns = next(reader) # column names

if columns_included:
# filter only the columns specified
columns = [col for col in all_columns if col in columns_included]
else:
columns = all_columns

for row in reader:
# build a list of (column, value) pairs for columns with non-empty values
column_value_pairs = [
(columns[i], row[i]) for i in range(len(columns)) if row[i] != ""
]
if column_value_pairs:
# separate columns and values for the SQL statement
cols_to_insert = [pair[0] for pair in column_value_pairs]
vals_to_insert = [pair[1] for pair in column_value_pairs]
insert_sql = (
f"INSERT INTO {table_name} "
f"({', '.join(cols_to_insert)}) "
f"VALUES ({', '.join(['?' for _ in vals_to_insert])})"
)
except IOError as err:
print(err)

# execute the insertion with only the non-empty values
cur.execute(insert_sql, vals_to_insert)

conn.commit()
except sqlite3.OperationalError as exc:
conn.rollback()
raise sqlite3.OperationalError(f"pop-db: {exc}")
except IOError as exc:
raise IOError(f"pop-db: {exc}")
20 changes: 13 additions & 7 deletions src/cmd/flux-account-service.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,17 +545,17 @@ def scrub_old_jobs(self, handle, watcher, msg, arg):

def export_db(self, handle, watcher, msg, arg):
try:
val = d.export_db_info(
self.conn,
msg.payload["users"],
msg.payload["banks"],
)
val = d.export_db_info(self.conn)

payload = {"export_db": val}

handle.respond(msg, payload)
except KeyError as exc:
handle.respond_error(msg, 0, f"missing key in payload: {exc}")
except sqlite3.OperationalError as exc:
handle.respond_error(msg, 0, f"a SQLite error occurred: {exc}")
except IOError as exc:
handle.respond_error(msg, 0, f"an IO error occurred: {exc}")
except Exception as exc:
handle.respond_error(
msg, 0, f"a non-OSError exception was caught: {str(exc)}"
Expand All @@ -565,15 +565,21 @@ def pop_db(self, handle, watcher, msg, arg):
try:
val = d.populate_db(
self.conn,
msg.payload["users"],
msg.payload["banks"],
msg.payload["csv_file"],
msg.payload["fields"],
)

payload = {"pop_db": val}

handle.respond(msg, payload)
except KeyError as exc:
handle.respond_error(msg, 0, f"missing key in payload: {exc}")
except sqlite3.OperationalError as exc:
handle.respond_error(msg, 0, f"a SQLite error occurred: {exc}")
except IOError as exc:
handle.respond_error(msg, 0, f"an IO error occurred: {exc}")
except ValueError as exc:
handle.respond_error(msg, 0, f"{exc}")
except Exception as exc:
handle.respond_error(
msg, 0, f"a non-OSError exception was caught: {str(exc)}"
Expand Down
53 changes: 4 additions & 49 deletions src/cmd/flux-account.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,69 +555,24 @@ def add_scrub_job_records_arg(subparsers):
def add_export_db_arg(subparsers):
subparser = subparsers.add_parser(
"export-db",
help="""
Extract flux-accounting database information into two .csv files.
Order of columns extracted from association_table:
Username,UserID,Bank,Shares,MaxRunningJobs,MaxActiveJobs,MaxNodes,Queues
If no custom path is specified, this will create a file in the
current working directory called users.csv.
----------------
Order of columns extracted from bank_table:
Bank,ParentBank,Shares
If no custom path is specified, this will create a file in the
current working directory called banks.csv.
Use these two files to populate a new flux-accounting DB with:
flux account pop-db -b banks.csv -u users.csv
""",
help="extract flux-accounting DB information into separate .csv files",
formatter_class=flux.util.help_formatter(),
)
subparser.set_defaults(func="export_db")
subparser.add_argument(
"-u", "--users", help="path to a .csv file containing user information"
)
subparser.add_argument(
"-b", "--banks", help="path to a .csv file containing bank information"
)


def add_pop_db_arg(subparsers):
subparser = subparsers.add_parser(
"pop-db",
help="""
Description: Populate a flux-accounting database with a .csv file.
Order of elements required for populating association_table:
Username,UserID,Bank,Shares,MaxRunningJobs,MaxActiveJobs,MaxNodes,Queues
[Shares], [MaxRunningJobs], [MaxActiveJobs], and [MaxNodes] can be left
blank ('') in the .csv file for a given row.
----------------
Order of elements required for populating bank_table:
Bank,ParentBank,Shares
[ParentBank] can be left blank ('') in .csv file for a given row.
""",
help="populate a table in the flux-accounting DB with a .csv file",
formatter_class=flux.util.help_formatter(),
)
subparser.set_defaults(func="pop_db")
subparser.add_argument(
"-u", "--users", help="path to a .csv file containing user information"
"-c", "--csv-file", help="path to a .csv file containing table information"
)
subparser.add_argument(
"-b", "--banks", help="path to a .csv file containing bank information"
"-f", "--fields", help="which fields to insert into the table"
)


Expand Down

0 comments on commit f3ed3c3

Please sign in to comment.