Skip to content

Commit

Permalink
Now writing data to db with correct datatypes
Browse files Browse the repository at this point in the history
  • Loading branch information
nabelekt committed Jan 7, 2021
1 parent 7ac31f4 commit 92dcae4
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 13 deletions.
62 changes: 62 additions & 0 deletions column_casting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import sqlalchemy


# Default database datatype for all non-derived data is text
# This file specifies columns to cast and what the python and database datatypes should be is before importing to database

# Conversion intended to go:
# str -> py_datatype in correct_data_types()
# and then:
# py_datatype -> db_dataype in dfs_to_db()
# But that first step is currently disabled, thought to be unnecesary, so conversion goes:
# str -> db_dataype in dfs_to_db()
# DateTime, boolean, and string data values are handled correctly automatically
# Conversion to db_dataype disregards the sentence_type, so if column db_datatypes are different, column names must be uniquw

# Database datatypes reference: https://www.tutorialspoint.com/postgresql/postgresql_data_types.htm


datatype_dict = {} # Must contain (key, value) pair for all destination datatypes
datatype_dict['Int16'] = sqlalchemy.types.SmallInteger()
datatype_dict['Int32'] = sqlalchemy.types.Integer()
datatype_dict['float32'] = sqlalchemy.types.Float(precision=6)


# This db_datatypes dictionary is completed in dfs_to_db()
db_datatypes = {}
db_datatypes['cycle_id'] = sqlalchemy.types.Integer()


columns_to_cast = {}

columns_to_cast['GSV', 'Int16'] = ['num_messages', 'msg_num', 'num_sv_in_view',
'sv_prn_num_1', 'elevation_deg_1', 'azimuth_1', 'snr_1',
'sv_prn_num_2', 'elevation_deg_2', 'azimuth_2', 'snr_2',
'sv_prn_num_3', 'elevation_deg_3', 'azimuth_3', 'snr_3',
'sv_prn_num_4', 'elevation_deg_4', 'azimuth_4', 'snr_4',
'sv_prn_num_5', 'elevation_deg_5', 'azimuth_5', 'snr_5',
'sv_prn_num_6', 'elevation_deg_6', 'azimuth_6', 'snr_6',
'sv_prn_num_7', 'elevation_deg_7', 'azimuth_7', 'snr_7',
'sv_prn_num_8', 'elevation_deg_8', 'azimuth_8', 'snr_8',
'sv_prn_num_9', 'elevation_deg_9', 'azimuth_9', 'snr_9',
'sv_prn_num_10', 'elevation_deg_10', 'azimuth_10', 'snr_10',
'sv_prn_num_11', 'elevation_deg_11', 'azimuth_11', 'snr_11',
'sv_prn_num_12', 'elevation_deg_12', 'azimuth_12', 'snr_12',]

columns_to_cast['RMC', 'Int32'] = ['datestamp']
columns_to_cast['RMC', 'float32'] = ['timestamp', 'lat', 'lon', 'spd_over_grnd', 'true_course', 'mag_variation']

columns_to_cast['GGA', 'float32'] = ['timestamp', 'lat', 'lon', 'horizontal_dil', 'altitude', 'geo_sep']
columns_to_cast['GGA', 'Int16'] = ['gps_qial', 'num_sats']
# For GGA, unsure about 'age_gps_data' and 'ref_station_id'

columns_to_cast['GLL', 'float32'] = ['lat', 'lon']

columns_to_cast['VTG', 'float32'] = ['true_track', 'mag_track', 'spd_over_grnd_kts', 'spd_over_grnd_kmph']

columns_to_cast['GSA', 'Int16'] = ['mode_fix_type', 'sv_id01', 'sv_id02', 'sv_id03', 'sv_id04',
'sv_id05', 'sv_id06', 'sv_id07', 'sv_id08',
'sv_id09', 'sv_id10', 'sv_id11', 'sv_id12',]
columns_to_cast['GSA', 'float32'] = ['pdop', 'hdop', 'vdop']


4 changes: 2 additions & 2 deletions db_data_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import db_creds


def send_data_to_db(log_file_path, dfs, table_name_base, table_name_suffixes=None):
def send_data_to_db(log_file_path, dfs, table_name_base, table_name_suffixes=None, dtypes=None):

log_file_name = os.path.basename(log_file_path)

Expand All @@ -29,7 +29,7 @@ def send_data_to_db(log_file_path, dfs, table_name_base, table_name_suffixes=Non
table_name = table_name + '_' + table_name_suffixes[df_idx]

try:
df.to_sql(table_name, engine, method='multi', if_exists=if_exists_opt_loc, index=False)
df.to_sql(table_name, engine, method='multi', if_exists=if_exists_opt_loc, index=False, dtype=dtypes)
except (sqlalchemy.exc.OperationalError, psycopg2.OperationalError) as e:
sys.exit(f"\n\n\033[1m\033[91mERROR writing to database:\n {e}\033[0m\n\nExiting.\n\n") # Print error text bold and red

Expand Down
37 changes: 26 additions & 11 deletions nmea_data_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import db_creds
import db_utils
import db_table_lists
from column_casting import columns_to_cast, datatype_dict, db_datatypes


def parse_and_validate_args():
Expand All @@ -27,13 +28,14 @@ def parse_and_validate_args():
help="where to output data: CSV files, database, or both")
parser.add_argument("--drop_previous_db_tables", "--dropt",
action="store_true",
help="drop previous DB tables before importing new data; only applies when output_method is 'db' or 'both'")
help="drop all previous DB tables before importing new data; only applies when output_method is 'db' or 'both'")
parser.add_argument("--backfill_datetimes", "--bfdt",
action="store_true",
help="backfill datetimes where missing by extrapolating from messages with datetime information")

args = parser.parse_args()

# Check if input file exists
if os.path.isfile(args.filepath):
return args
else:
Expand All @@ -57,6 +59,9 @@ def read_file(file):
print(f'Parse error on line {line_idx+1}: {e}')
continue

if len(sentences) == 0:
sys.exit(f"\nNo data found in {file.name} input file.\nExiting.\n\n")

return sentences


Expand Down Expand Up @@ -330,14 +335,13 @@ def correct_data_types(df):
df.replace(to_replace=pd.NaT, value='', inplace=True) # Do this replace first because pd.Nat won't be replaced with np.NaN
df.replace(to_replace='', value=np.NaN, inplace=True)

columns_not_to_cast = ['cycle_id', 'sentence_type', 'talker', 'datetime']
columns_to_cast = [column_name for column_name in df.columns if column_name not in columns_not_to_cast]

if df['sentence_type'][0] == 'GSV':

for column in columns_to_cast:
df[column] = df[column].astype('float').astype('Int16') # Smallest Int type in Postgres is 2 bytes with a max value of +32,767
# Cast as float first to get around bug: https://stackoverflow.com/questions/60024262/error-converting-object-string-to-int32-typeerror-object-cannot-be-converted
# Cast dataframe data from strings to appropriate datatypes specified in columns_to_cast.py
# sentence_type = df['sentence_type'][0]
# for py_datatype in datatype_dict.keys():
# if (sentence_type, py_datatype) in columns_to_cast: # If key exists in dictionary
# for column in columns_to_cast[sentence_type, py_datatype]:
# df[column] = df[column].astype('float').astype(py_datatype)
# # Cast as float first to get around bug: https://stackoverflow.com/questions/60024262/error-converting-object-string-to-int32-typeerror-object-cannot-be-converted

df['datetime'].replace(to_replace=np.NaN, value=pd.NaT, inplace=True) # Needed for backfill_datetimes() to work properly

Expand Down Expand Up @@ -365,7 +369,16 @@ def dfs_to_db(sentence_dfs, input_file_path, verbose=False):
# Pass lowercase 'talker_sentencetype' as table name suffixes
table_name_suffixes = [f"{df['talker'][0]}_{df['sentence_type'][0]}".lower() for df in sentence_dfs]

table_names = db_data_import.send_data_to_db(input_file_path, sentence_dfs, table_name_base, table_name_suffixes)
# Determine database datatypes for columns in columns_to_cast
for df in sentence_dfs:
sentence_type = df['sentence_type'][0]
for py_datatype in datatype_dict.keys():
if (sentence_type, py_datatype) in columns_to_cast: # If key exists in dictionary
for column in columns_to_cast[sentence_type, py_datatype]:
db_datatypes[column] = datatype_dict[py_datatype] # Get database datatype for column


table_names = db_data_import.send_data_to_db(input_file_path, sentence_dfs, table_name_base, table_name_suffixes, dtypes=db_datatypes)

if verbose:
print(f"data from logfile '{input_file_path}' written to:")
Expand Down Expand Up @@ -464,7 +477,8 @@ def main():
print("\nProcessing data... ", end="")
sentence_dfs = process_data_common(sentences, cycle_start='GNRMC') # Cycle starts with 'RMC' sentence
if args.backfill_datetimes:
dts_sentences = backfill_datetimes(sentence_dfs, verbose=True)
backfill_datetimes(sentence_dfs, verbose=True)
# derive_data(sentence_dfs)
print("done.")

if (args.output_method == 'csv' or args.output_method == 'both'):
Expand All @@ -484,6 +498,7 @@ def main():

print("\nAll done. Exiting.\n\n")


if __name__ == '__main__':

main()

0 comments on commit 92dcae4

Please sign in to comment.