Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Garmin class #5

Merged
merged 2 commits into from
Dec 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 156 additions & 149 deletions dbdpy/garmin.py
Original file line number Diff line number Diff line change
@@ -1,183 +1,190 @@
import pandas
from garmin_fit_sdk import Decoder, Stream
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from garmin_fit_sdk import Decoder, Stream, Profile
import datetime
# from device import CommercialDevice


class GarminDataProcessor:
FIT_EPOCH_S = 631065600

# class garmin(CommercialDevice):
class garmin():
def __init__(self):
self.data = None

def read_fit(self, file):
"""
Reads FIT file and stores the data in the class instance.

Args:
file (str): The path to the FIT file to be read.
"""
stream = Stream.from_file(file)
decoder = Decoder(stream)
messages, errors = decoder.read(
apply_scale_and_offset=True,
convert_datetimes_to_dates=False,
convert_types_to_strings=True,
enable_crc_check=True,
expand_sub_fields=True,
expand_components=True,
merge_heart_rates=False,
mesg_listener=None
)
print(errors)
self.data = messages

def convert_time(self):
"""
Converts timestamps in the stored data to a human-readable format.

Uses the FIT_EPOCH_S constant and adjusts for the time zone.

Note: Requires data to be loaded using `read_fit` method first.
"""
time_delta = datetime.timedelta(hours=4)
if self.data is None:
return None
self.data['timestamp'] = [
datetime.datetime.utcfromtimestamp(
self.FIT_EPOCH_S + self.data['timestamp'][i]) - time_delta
for i in range(len(self.data['timestamp']))
]

def get_sleep_time(self):
"""
Extracts sleep start and wake times from the stored data.

Note: Requires data to be loaded and timestamps to be converted using
`read_fit` and `convert_time` methods.
"""
if self.data is None:
return None
sleep = self.data # Replace with the actual sleep data frame
sleep_time, wake_time = None, None
for i in range(len(sleep['timestamp']) - 1):
time_difference = sleep['timestamp'][i + 1] - sleep['timestamp'][i]
if time_difference.seconds // 3600 > 1:
sleep_time = sleep['timestamp'][i]
wake_time = sleep['timestamp'][i + 1]
return sleep_time, wake_time

def get_timestamp(self):
"""
Adjusts timestamps in the stored data, fixing inconsistencies and
converting them to a continuous sequence.

Note: Requires data to be loaded using `read_fit` method first.
"""
if self.data is None:
return None
df = self.data # Replace with the actual data frame
for i in range(len(df) - 1):
if df.iloc[i, 0].astype(str) == 'nan':
# super().__init__('garmin')
self.messages = None

# helper functions
def convert_time(self, df):
df['timestamp'] = [datetime.datetime.utcfromtimestamp(631065600 + df['timestamp'][i]) - datetime.timedelta(hours=4)
for i in range(len(df['timestamp']))]
return df

def get_timestamp(self, df):
for i in range(len(df)-1):
if df.iloc[i,0].astype(str) == 'nan':
timestamp_16 = df['timestamp_16'][i]
df.iloc[i, 0] = df.iloc[(i - 1), 0]
df.iloc[i,0] = df.iloc[(i-1),0]

for i in range(len(df)):
mesgTimestamp = int(df['timestamp'][i])
try:
try:
timestamp_16 = int(df['timestamp_16'][i])
duration = (timestamp_16 - (mesgTimestamp & 0xFFFF)) & 0xFFFF
duration = ( timestamp_16 - ( mesgTimestamp & 0xFFFF ) ) & 0xFFFF
except ValueError:
continue
mesgTimestamp += duration
df.iloc[i, 0] = mesgTimestamp

def get_floor(self):
df.iloc[i,0] = mesgTimestamp
return df

@classmethod
def from_directory(cls, filepath):
newGarmin = cls()
file_type = filepath.split('.')[-1]
if file_type == 'fit':
data = newGarmin.read_fit(filepath)
else:
data = newGarmin.read_sleepData(filepath)
newobject = cls(data)
return newobject

# read wellness file
def read_fit(self, path):
"""
Calculates cumulative ascent and descent values over time,
returning a data frame.
Reads FIT file and stores the data in the class instance.

Note: Requires data to be loaded using `read_fit` method first.
Args:
path (str): The path to the FIT file to be read.
"""
if self.data is None:
return None
df = self.data # Replace with the actual data frame
floor = df[['timestamp', 'ascent', 'descent']]
floor.iloc[0, 1:] = 0
for i in range(len(floor) - 1):
if str(floor['ascent'][i + 1]) == 'nan':
floor.iloc[i + 1, 1] = floor.iloc[i, 1]
stream = Stream.from_file(path)
decoder = Decoder(stream)
messages, errors = decoder.read(apply_scale_and_offset=True,
convert_datetimes_to_dates=False,
convert_types_to_strings=True,
enable_crc_check=True,
expand_sub_fields=True,
expand_components=True,
merge_heart_rates=False,
mesg_listener=None)

self.messages = messages

# read sleep data json file
def read_sleepData(self, path):
"""
Reads sleep data in json file and stores the data in the class instance.

Args:
file (str): The path to the json file to be read.
"""
sleepData = pd.read_json(path)
sleepData['sleepStartTimestampGMT'] = pd.to_datetime(sleepData['sleepStartTimestampGMT'])
sleepData['sleepStartTimestampGMT'] = sleepData['sleepStartTimestampGMT'] - datetime.timedelta(hours=4)
sleepData['sleepEndTimestampGMT'] = pd.to_datetime(sleepData['sleepEndTimestampGMT'])
sleepData['sleepEndTimestampGMT'] = sleepData['sleepEndTimestampGMT'] - datetime.timedelta(hours=4)
sleepData['calendarDate'] = pd.to_datetime(sleepData['calendarDate'])
sleepData.drop(sleepData[sleepData['sleepWindowConfirmationType'] == 'UNCONFIRMED'].index, inplace = True)
self.sleepData = sleepData

# get sleep stage information
def get_sleepstage(self):
sleepStage = self.sleepData[['calendarDate', 'deepSleepSeconds', 'lightSleepSeconds', 'remSleepSeconds', 'awakeSleepSeconds']]
return sleepStage

# get raw wellness data
def get_raw_data(self):
raw_data = pd.DataFrame(self.messages['monitoring_mesgs'])
raw_data = self.get_timestamp(raw_data)
raw_data = self.convert_time(raw_data)
return raw_data

def get_stress_level(self):
stress = pd.DataFrame(self.messages['stress_level_mesgs'])
stress['stress_level_time'] = [datetime.datetime.utcfromtimestamp(631065600 + stress['stress_level_time'][i]) - datetime.timedelta(hours=4)
for i in range(len(stress['stress_level_time']))]
return stress

def get_SpO2(self):
SpO2 = pd.DataFrame(self.messages['spo2_data_mesgs'])
SpO2 = self.convert_time(SpO2)
return SpO2

def get_floor(self):
raw_data = self.get_raw_data()
floor = raw_data[['timestamp', 'ascent', 'descent']]
floor.iloc[0,1:] = 0
for i in range(len(floor)-1):
if str(floor['ascent'][i+1]) == 'nan':
floor.iloc[i+1,1] = floor.iloc[i,1]
else:
floor.iloc[i + 1, 1] += floor.iloc[i, 1]
floor.iloc[i+1,1] += floor.iloc[i,1]

if str(floor['descent'][i + 1]) == 'nan':
floor.iloc[i + 1, 2] = floor.iloc[i, 2]

if str(floor['descent'][i+1]) == 'nan':
floor.iloc[i+1,2] = floor.iloc[i,2]
else:
floor.iloc[i + 1, 2] += floor.iloc[i, 2]
return floor
floor.iloc[i+1,2] += floor.iloc[i,2]

duplicate_rows = floor[floor.duplicated()]
floor_no_duplicates = floor.drop_duplicates(keep='first')

return floor_no_duplicates

def get_distance(self):
"""
Calculates cumulative distance values over time,
returning a data frame.

Note: Requires data to be loaded using `read_fit` method first.
"""
if self.data is None:
return None
df = self.data # Replace with the actual data frame
distance = df.iloc[:-5, :]
raw_data = self.get_raw_data()
distance = raw_data.iloc[:-5,:]
distance = distance[['timestamp', 'distance']]
distance.iloc[0, 1] = 0
for i in range(len(distance) - 1):
distance.iloc[0,1] = 0
for i in range(len(distance)-1):
i = i + 1
if str(distance['distance'][i]) == 'nan':
if str(distance['distance'][i - 1]) != 'nan':
distance.iloc[i, 1] = distance['distance'][i - 1]
if str(distance['distance'][i-1]) != 'nan':
distance.iloc[i,1] = distance['distance'][i-1]

if distance.iloc[i,1] < distance['distance'][i-1]:
distance.iloc[i,1] = distance['distance'][i-1]

if distance.iloc[i, 1] < distance['distance'][i - 1]:
distance.iloc[i, 1] = distance['distance'][i - 1]
return distance
duplicate_rows = distance[distance.duplicated()]
distance_no_duplicates = distance.drop_duplicates(keep='first')

return distance_no_duplicates

def get_hr(self):
"""
Extracts heart rate data from the stored data.

Note: Requires data to be loaded using `read_fit` method first.
"""
if self.data is None:
return None
hr_df = self.data[~self.data['heart_rate'].isna()]
hr_df = hr_df.reset_index(drop=True)
raw_data = self.get_raw_data()
hr_df = raw_data[~raw_data['heart_rate'].isna()].reset_index(drop=True)
hr_df = hr_df[['timestamp', 'heart_rate']]
hr_df['heart_rate'] = hr_df['heart_rate'].astype(int)
return hr_df

def get_intensity(self):
"""
Extracts intensity data from the stored data.

Note: Requires data to be loaded using `read_fit` method first.
"""
if self.data is None:
return None
intensity = self.data[~self.data['intensity'].isna()]
intensity = intensity.reset_index(drop=True)
raw_data = self.get_raw_data()
intensity = raw_data[~raw_data['intensity'].isna()].reset_index(drop=True)
intensity = intensity[['timestamp', 'intensity']]
return intensity

def get_calories(self):
"""
Calculates cumulative active calorie values over time,
returning a data frame.

Note: Requires data to be loaded using `read_fit` method first.
"""
if self.data is None:
return None
calories = self.data.iloc[:-5, :]
raw_data = self.get_raw_data()
calories = raw_data.iloc[:-5,:]
calories = calories[['timestamp', 'active_calories']]
for i in range(len(calories) - 1):
if str((calories['active_calories'][i + 1]) == 'nan') or\
(calories['active_calories'][i + 1] <
calories['active_calories'][i]):
calories.iloc[i + 1, 1] = calories.iloc[i, 1]
return calories
for i in range(len(calories)-1):
if str(calories['active_calories'][i+1]) == 'nan' or calories['active_calories'][i+1] < calories['active_calories'][i]:
calories.iloc[i+1,1] = calories.iloc[i,1]

duplicate_rows = calories[calories.duplicated()]
calories_no_duplicates = calories.drop_duplicates(keep='first')

return calories_no_duplicates

def get_sleep_time(self):
sleep = pd.DataFrame(self.messages['sleep_level_mesgs'])
sleep = self.convert_time(sleep)
for i in range(len(sleep['timestamp']) - 1):
time_difference = sleep['timestamp'][i + 1] - sleep['timestamp'][i]
if time_difference.seconds // 3600 > 1:
sleep_time = sleep['timestamp'][i]
wake_time = sleep['timestamp'][i + 1]

return sleep_time, wake_time

def get_rhr(self):
rhr_df = pd.DataFrame(self.messages['monitoring_hr_data_mesgs'])
rhr = rhr_df['resting_heart_rate'][0] # current_day_resting_heart_rate

return rhr
Loading