From f3ed3c3f3935af932d2a5e62abbad6eec154877e Mon Sep 17 00:00:00 2001 From: cmoussa1 Date: Tue, 12 Nov 2024 08:52:16 -0800 Subject: [PATCH] db_info_subcommands: handle more tables Problem: The export-db and pop-db commands only handle a couple of columns from just the association_table and bank_table. These commands would be more robust if they included export and import support from all of the tables. Restructure the export_db_info() function to return data from *all* of the tables in the flux-accounting DB into separate .csv files, labeled with their table names in the database. Restructure the populate_db() function to handle populating any of the tables in the flux-accounting DB with a corresponding .csv file. Add an optional argument to the function to allow the user to specify which columns to include from the file when populating the table. --- .../accounting/db_info_subcommands.py | 171 +++++++++--------- src/cmd/flux-account-service.py | 20 +- src/cmd/flux-account.py | 53 +----- 3 files changed, 107 insertions(+), 137 deletions(-) diff --git a/src/bindings/python/fluxacct/accounting/db_info_subcommands.py b/src/bindings/python/fluxacct/accounting/db_info_subcommands.py index 1ebe7329..258fd519 100644 --- a/src/bindings/python/fluxacct/accounting/db_info_subcommands.py +++ b/src/bindings/python/fluxacct/accounting/db_info_subcommands.py @@ -10,90 +10,99 @@ # SPDX-License-Identifier: LGPL-3.0 ############################################################### import csv +import sqlite3 -from fluxacct.accounting import bank_subcommands as b -from fluxacct.accounting import user_subcommands as u +def export_db_info(conn): + """ + Export all of the information from the tables in the flux-accounting DB into + separate .csv files. + """ + try: + cur = conn.cursor() + # get all tables from DB + cur.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = cur.fetchall() + + # loop through each table and export it to a separate .csv file + for table_name in tables: + output_csv = f"{table_name[0]}.csv" + cur.execute(f"SELECT * FROM {table_name[0]}") + rows = cur.fetchall() + column_names = [description[0] for description in cur.description] + + # write data to .csv file + with open(output_csv, "w", newline="") as csv_file: + writer = csv.writer(csv_file) + writer.writerow(column_names) + writer.writerows(rows) + except sqlite3.OperationalError as exc: + raise sqlite3.OperationalError(f"export-db: {exc}") + except IOError as exc: + raise IOError(f"export-db: {exc}") + + +def populate_db(conn, csv_file, columns_included=None): + """ + Populate an existing table from a single .csv file with an option + to specify columns to include. The .csv file must have the column names in + the first line to indicate which columns to insert into the table. + + Args: + csv_file: Path to the .csv file. The name of the .csv file must match the + name of the table in the flux-accounting DB. + + columns_included (list, optional): List of columns to include from the .csv + file. If None, it will include all columns listed in the .csv file. -def export_db_info(conn, users=None, banks=None): + Raises: + ValueError: If the table derived from the .csv file name does not match + any of the tables in the flux-accounting DB. + """ try: cur = conn.cursor() - select_users_stmt = """ - SELECT username, userid, bank, shares, max_running_jobs, max_active_jobs, - max_nodes, queues FROM association_table - """ - cur.execute(select_users_stmt) - table = cur.fetchall() - - # open a .csv file for writing - users_filepath = users if users else "users.csv" - users_file = open(users_filepath, "w") - with users_file: - writer = csv.writer(users_file) - - for row in table: - writer.writerow(row) - - select_banks_stmt = """ - SELECT bank, parent_bank, shares FROM bank_table - """ - cur.execute(select_banks_stmt) - table = cur.fetchall() - - banks_filepath = banks if banks else "banks.csv" - banks_file = open(banks_filepath, "w") - with banks_file: - writer = csv.writer(banks_file) - - for row in table: - writer.writerow(row) - except IOError as err: - print(err) - - -def populate_db(conn, users=None, banks=None): - if banks is not None: - try: - with open(banks) as csv_file: - csv_reader = csv.reader(csv_file, delimiter=",") - - for row in csv_reader: - b.add_bank( - conn, - bank=row[0], - parent_bank=row[1], - shares=row[2], - ) - except IOError as err: - print(err) - - if users is not None: - try: - with open(users) as csv_file: - csv_reader = csv.reader(csv_file, delimiter=",") - - # assign default values to fields if - # their slot is empty in the csv file - for row in csv_reader: - username = row[0] - uid = row[1] - bank = row[2] - shares = row[3] if row[3] != "" else 1 - max_running_jobs = row[4] if row[4] != "" else 5 - max_active_jobs = row[5] if row[5] != "" else 7 - max_nodes = row[6] if row[6] != "" else 2147483647 - queues = row[7] - - u.add_user( - conn, - username, - bank, - uid, - shares, - max_running_jobs, - max_active_jobs, - max_nodes, - queues, + + # extract table name from .csv filename; check if it exists in DB + table_name = csv_file.split("/")[-1].replace(".csv", "") + cur.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = [table[0] for table in cur.fetchall()] + + if table_name not in tables: + raise ValueError( + f'pop-db: table "{table_name}" does not exist in the database' + ) + + with open(csv_file, "r", newline="") as file: + reader = csv.reader(file) + all_columns = next(reader) # column names + + if columns_included: + # filter only the columns specified + columns = [col for col in all_columns if col in columns_included] + else: + columns = all_columns + + for row in reader: + # build a list of (column, value) pairs for columns with non-empty values + column_value_pairs = [ + (columns[i], row[i]) for i in range(len(columns)) if row[i] != "" + ] + if column_value_pairs: + # separate columns and values for the SQL statement + cols_to_insert = [pair[0] for pair in column_value_pairs] + vals_to_insert = [pair[1] for pair in column_value_pairs] + insert_sql = ( + f"INSERT INTO {table_name} " + f"({', '.join(cols_to_insert)}) " + f"VALUES ({', '.join(['?' for _ in vals_to_insert])})" ) - except IOError as err: - print(err) + + # execute the insertion with only the non-empty values + cur.execute(insert_sql, vals_to_insert) + + conn.commit() + except sqlite3.OperationalError as exc: + conn.rollback() + raise sqlite3.OperationalError(f"pop-db: {exc}") + except IOError as exc: + raise IOError(f"pop-db: {exc}") diff --git a/src/cmd/flux-account-service.py b/src/cmd/flux-account-service.py index 5c93dad8..e2c0c6e4 100755 --- a/src/cmd/flux-account-service.py +++ b/src/cmd/flux-account-service.py @@ -545,17 +545,17 @@ def scrub_old_jobs(self, handle, watcher, msg, arg): def export_db(self, handle, watcher, msg, arg): try: - val = d.export_db_info( - self.conn, - msg.payload["users"], - msg.payload["banks"], - ) + val = d.export_db_info(self.conn) payload = {"export_db": val} handle.respond(msg, payload) except KeyError as exc: handle.respond_error(msg, 0, f"missing key in payload: {exc}") + except sqlite3.OperationalError as exc: + handle.respond_error(msg, 0, f"a SQLite error occurred: {exc}") + except IOError as exc: + handle.respond_error(msg, 0, f"an IO error occurred: {exc}") except Exception as exc: handle.respond_error( msg, 0, f"a non-OSError exception was caught: {str(exc)}" @@ -565,8 +565,8 @@ def pop_db(self, handle, watcher, msg, arg): try: val = d.populate_db( self.conn, - msg.payload["users"], - msg.payload["banks"], + msg.payload["csv_file"], + msg.payload["fields"], ) payload = {"pop_db": val} @@ -574,6 +574,12 @@ def pop_db(self, handle, watcher, msg, arg): handle.respond(msg, payload) except KeyError as exc: handle.respond_error(msg, 0, f"missing key in payload: {exc}") + except sqlite3.OperationalError as exc: + handle.respond_error(msg, 0, f"a SQLite error occurred: {exc}") + except IOError as exc: + handle.respond_error(msg, 0, f"an IO error occurred: {exc}") + except ValueError as exc: + handle.respond_error(msg, 0, f"{exc}") except Exception as exc: handle.respond_error( msg, 0, f"a non-OSError exception was caught: {str(exc)}" diff --git a/src/cmd/flux-account.py b/src/cmd/flux-account.py index 9fd46e75..95ef14f9 100755 --- a/src/cmd/flux-account.py +++ b/src/cmd/flux-account.py @@ -555,69 +555,24 @@ def add_scrub_job_records_arg(subparsers): def add_export_db_arg(subparsers): subparser = subparsers.add_parser( "export-db", - help=""" - Extract flux-accounting database information into two .csv files. - - Order of columns extracted from association_table: - - Username,UserID,Bank,Shares,MaxRunningJobs,MaxActiveJobs,MaxNodes,Queues - - If no custom path is specified, this will create a file in the - current working directory called users.csv. - - ---------------- - - Order of columns extracted from bank_table: - - Bank,ParentBank,Shares - - If no custom path is specified, this will create a file in the - current working directory called banks.csv. - - Use these two files to populate a new flux-accounting DB with: - - flux account pop-db -b banks.csv -u users.csv - """, + help="extract flux-accounting DB information into separate .csv files", formatter_class=flux.util.help_formatter(), ) subparser.set_defaults(func="export_db") - subparser.add_argument( - "-u", "--users", help="path to a .csv file containing user information" - ) - subparser.add_argument( - "-b", "--banks", help="path to a .csv file containing bank information" - ) def add_pop_db_arg(subparsers): subparser = subparsers.add_parser( "pop-db", - help=""" - Description: Populate a flux-accounting database with a .csv file. - - Order of elements required for populating association_table: - - Username,UserID,Bank,Shares,MaxRunningJobs,MaxActiveJobs,MaxNodes,Queues - - [Shares], [MaxRunningJobs], [MaxActiveJobs], and [MaxNodes] can be left - blank ('') in the .csv file for a given row. - - ---------------- - - Order of elements required for populating bank_table: - - Bank,ParentBank,Shares - - [ParentBank] can be left blank ('') in .csv file for a given row. - """, + help="populate a table in the flux-accounting DB with a .csv file", formatter_class=flux.util.help_formatter(), ) subparser.set_defaults(func="pop_db") subparser.add_argument( - "-u", "--users", help="path to a .csv file containing user information" + "-c", "--csv-file", help="path to a .csv file containing table information" ) subparser.add_argument( - "-b", "--banks", help="path to a .csv file containing bank information" + "-f", "--fields", help="which fields to insert into the table" )