-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver24.py
425 lines (348 loc) · 16.3 KB
/
server24.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
from jinja2 import StrictUndefined
from sqlalchemy import asc, update
from flask import Flask, render_template, redirect, request, flash, session, g, make_response, jsonify
from flask_debugtoolbar import DebugToolbarExtension
from model import connect_to_db, db, Meds, Users, User_meds
import db_query_functions as db_helper
import api
from reminders_twilio import *
from datetime import datetime, timedelta
import os
from sys import argv
from pprint import pprint
import json
import requests
import schedule
import time
if __name__ == "__main__":
connect_to_db(app) # This will connect the app to the database
app.run(debug=True)
app = Flask(__name__)
app.permanent_session_lifetime = timedelta(days=1)
# Required to use Flask sessions and the debug toolbar
app.secret_key = "QWEASDZXC"
# Normally, if you use an undefined variable in Jinja2, it fails
# silently. This is horrible. Fix this so that, instead, it raises an error.
app.jinja_env.undefined = StrictUndefined
@app.route("/")
def display_homepage():
"""Render homepage."""
return render_template("homepage.html")
@app.route("/find_meds")
def display_medication_search_bar():
"""Display the medication search form."""
return render_template("find_medications.html")
@app.route("/results")
def display_medication_search_results():
"""Display the options for medication search results."""
form_imprint = (request.args.get('pill_imprint') or '').upper() # Default empty string if None
score = request.args.get('pill_score') # Assuming score is optional, no need to do anything if None
shape = (request.args.get('pill_shape') or '').upper() # Default empty string if None
color = (request.args.get('pill_color') or '').upper() # Default empty string if None
name = (request.args.get('name_of_med') or '').capitalize() # Default empty string if None
# Assuming db_helper.query_with_find_meds_values can handle optional parameters like score being None
query_results = db_helper.query_with_find_meds_values(form_imprint, score, shape, color, name)
med_dictionary = db_helper.make_dictionary_from_query(query_results)
# Simplified check for empty dictionary
if not med_dictionary:
return render_template("no_search.html")
else:
return render_template("results.html", med_options
@app.route("/more_info/<value>")
def display_more_info(value):
"""Given selected value, query FDA API to display more information on med."""
api_results = api.query_fda_api(value)
return render_template("more_info.html", api_results=api_results)
@app.route('/register', methods=['GET'])
def register_new_user():
"""Display user registration form."""
return render_template("registration.html")
@app.route('/register', methods=['POST'])
def process_registration():
"""Register new user if email not already in db."""
f_name = request.form.get('first_name')
l_name = request.form.get('last_name')
email = request.form.get('email')
cell = request.form.get('cell')
password_hash = request.form.get('password')
cell_number = cell_verify(cell) #verify cell number using Twilio API.
#if cell verify returns error- then number is invalid.
if cell_number == False: #exception made for error to return false.
flash("That is not a valid phone number, please try again!")
return redirect('/register')
# if user cell already exists, ask user to login.
if Users.query.filter(Users.cell_number == cell_number).first():
flash("That cell number already exists, please login.")
return redirect('/login')
else: # if user cell does not exist, add to db
user = Users(f_name=f_name,
l_name=l_name,
email=email,
cell_number=cell_number)
user.set_password(password_hash)
db.session.add(user)
db.session.commit()
#place user in session.
session['user_name'] = user.f_name
session['user_id'] = user.user_id
# flash("Successfully logged in!"). #need to fix formating for flash messages
return redirect('/user-page')
@app.route('/login', methods=['GET'])
def display_login_page():
return render_template("login.html")
@app.route('/login', methods=['POST'])
def login_user():
"""Login user and add them to the session."""
#query DB using login information.
f_name = request.form.get('first_name') #either lower case or upcase for user input discrepancy.
cell= request.form.get('cell')
cell_number = cell_verify(cell) #get correctly formated cell number
password_hash = request.form.get('password')
user = Users.query.filter((Users.f_name == f_name),
(Users.cell_number == cell_number)).first()
if user and user.check_password(password_hash):
session['user_name'] = user.f_name
session['user_id'] = user.user_id
session.permanent = True # Ensure the session lasts for the configured lifetime
return redirect('/user-page') # Redirect to user's page
else:
flash("Invalid username or password.") # More generic flash message
return redirect('/login') # Redirect back to the login page
@app.route('/logout')
def logout_user():
"""Remove user from session."""
del session['user_name']
del session['user_id']
# flash("Successfully logged out!") #need to fix formating for flash messages
return redirect('/')
@app.route('/user-page', methods=['GET'])
def display_user_page():
"""Display specific information about user."""
if session.get('user_name',None):
user = Users.query.filter(Users.user_id == session['user_id']).first()
medications = user.u_meds #get medications for user in session.
med_dictionary = db_helper.make_dictionary_for_user_meds(medications)
return render_template('user_page.html',
user=user,
med_options=med_dictionary)
else:
return redirect('/login')
@app.route('/user_data', methods=['POST'])
def send_user_data():
"""Display specific information about user."""
req = request.get_json() #get JSON from front-end
med_id = req['med_id'] #pull med_id from JSON
user = Users.query.filter(Users.user_id == session['user_id']).first()
medications = user.u_meds #get medications for user in session.
med_info = User_meds.query.filter(User_meds.med_id == med_id).first()
med_dictionary = db_helper.make_object_dictionary(med_info)
#make response to pass back to front-end
res = make_response(jsonify(med_dictionary), 200)
return res
@app.route('/user-page', methods=['POST'])
def process_adding_user_medications():
"""Add user medications to DB from input on user profile page."""
for_med_name = (request.form.get('med_name')).capitalize() #changed seed.py to have medicine name as caplitalize for display
for_med_strength = (request.form.get('med_name')).upper() #strength in DB is upcase.
strength = (request.form.get('med_strength')).upper() #we want this in upcase if we store in db.
qty_per_dose = int(request.form.get('qty_per_dose'))
dosing_schedule = int(request.form.get('dosing'))
start_date = request.form.get('start_date')
rx_start_date = datetime.strptime(start_date,'%Y-%m-%d')
#place info in session
session['for_med_name'] = for_med_name
session['strength'] = strength
session['qty_per_dose'] = qty_per_dose
session['dosing_schedule'] = dosing_schedule
session['rx_start_date'] = rx_start_date
search_db = (Meds.query.filter((Meds.strength.like('%'+for_med_strength+'%'))|
(Meds.medicine_name.like('%'+for_med_name+'%')))
.order_by(Meds.has_image.desc())
.all())
database_med = db_helper.make_dictionary_from_query(search_db)
if len(database_med) == 0:
#then med is not in db and need to query the API.
search_api = api.query_fda_api(for_med_name)
return render_template('confirm_med_api.html', api_results=search_api)
else:
return render_template('confirm_med_db.html', database_med=database_med)
@app.route("/add_med", methods=['POST'])
def add_med_to_databse():
"""Add user medication to database and notify user."""
#grab user from session.
user = Users.query.filter(Users.user_id == session['user_id']).first()
session['user_name'] = user.f_name
user_id = session['user_id']
#pulling info from confirm_med_api.html
api_info = request.form.get('api_results') #api_info returns as a string.
#pulling info from confirm_med_db.html
db_med_image = request.form.get('med_image')
db_med_strength = request.form.get('med_strength')
#pull info needed from session to call db_helper function.
med_name = session['for_med_name']
session_strength = session['strength']
qty_per_dose = session['qty_per_dose']
times_per_day = session['dosing_schedule']
rx_start_date = session['rx_start_date']
api_results = api.query_fda_api(med_name)
brand_name = api_results["brand_name"]
if api_info == None: #if med existed in the DB.
#pull info needed to call db_helper function.
name_from_strength = db_med_strength[0] #ignore space and other chars.
med = Meds.query.filter((Meds.strength.like('%'+name_from_strength+'%')) &
(Meds.img_path == db_med_image)).first()
med_id = med.med_id
db_helper.add_user_med_to_database(api_results,
med_id,
user_id,
qty_per_dose,
times_per_day,
rx_start_date)
else:
#need to format strength to fit format in DB prior to instantiating.
strength = ((med_name.upper())+ " " + (session_strength.upper()))
#create new med instance prior to instantiating user med.
db_helper.instantiate_new_medication(strength, brand_name)
#query for newly instantiated medication to get med_id.
new = Meds.query.filter((Meds.strength == strength) &
(Meds.medicine_name == (brand_name.capitalize()))).first()
med_id = new.med_id
db_helper.add_user_med_to_database(api_results,
med_id,
user_id,
qty_per_dose,
times_per_day,
rx_start_date)
#delete items placed in session when user starts to add a new medication.
del session['for_med_name']
del session['strength']
del session['qty_per_dose']
del session['dosing_schedule']
del session['rx_start_date']
# flash("Medication Added!") #need to fix flash message format on page
return redirect('/user-page')
@app.route("/add_med_unverified")
def display_add_medication_form():
"""Add med to DB if not in DB and not in API call- rare case."""
user = Users.query.filter(Users.user_id == session['user_id']).first()
user_id = session['user_id']
med_name = session['for_med_name']
session_strength = session['strength']
#need to format strength to fit format in DB prior to instantiating.
strength = ((med_name.upper())+ " " + (session_strength.upper()))
qty_per_dose = session['qty_per_dose']
times_per_day = session['dosing_schedule']
rx_start_date = session['rx_start_date']
db_helper.instantiate_new_medication(strength, med_name)
new = Meds.query.filter((Meds.strength == strength) &
(Meds.medicine_name == (med_name.capitalize()))).first()
med_id = new.med_id
db_helper.add_unverified_med(user_id,
med_id,
qty_per_dose,
times_per_day,
rx_start_date)
del session['for_med_name']
del session['strength']
del session['qty_per_dose']
del session['dosing_schedule']
del session['rx_start_date']
flash("""We added your medication, however, there is no further information
regarding your medication at this time.""")
return redirect('/user-page')
@app.route("/show_schedule_form", methods=['POST'])
def display_schedule_medication_form():
"""Display form to fill in order to schedule patients medication."""
med_strength = request.form.get('med_strength')
med_id = request.form.get('med_id')
return render_template('schedule_meds_form.html',
med_strength=med_strength,
med_id=med_id)
@app.route("/schedule_med", methods=['POST'])
def schedule_medication():
"""Update u_med in DB and schedule text notifications for medication."""
from flask import request, session, flash, redirect
# Check if user_id exists in session and query the user
user_id = session.get('user_id')
if not user_id:
flash("No user is logged in.", "error")
return redirect('/login') # Redirect to login if no user_id in session
user = Users.query.filter_by(user_id=user_id).first()
# Ensure user exists before proceeding
if not user:
flash("User not found.", "error")
return redirect('/login') # Redirect to login if user not found
# Get medications for the user
user_medications_list = user.u_meds
med_dictionary = db_helper.make_dictionary_for_user_meds(medications)
# Retrieve and validate request data
req = request.get_json()
# Check if the necessary fields exist in the request
required_fields = ['am_time', 'mid_time', 'pm_time', 'duration', 'qty', 'refills', 'med_strength', 'med_id']
for field in required_fields:
if field not in req:
flash(f"Missing required field: {field}", "error")
return redirect('/some-error-page') # Or handle it gracefully with a custom message
# Pull info from request JSON
am = req['am_time']
mid = req['mid_time']
pm = req['pm_time']
rx_duration = req['duration']
qty = req['qty']
refills = req['refills']
med_strength = req['med_strength']
med_id = req['med_id']
# Further processing with the retrieved data (e.g., saving to the database, etc.)
if am != "":
am_time = datetime.strptime(am,'%H:%M')
else:
am_time = None
if mid != "":
mid_day_time = datetime.strptime(mid,'%H:%M')
else:
mid_day_time = None
if pm != "":
pm_time = datetime.strptime(pm, '%H:%M')
else:
pm_time = None
user_med = User_meds.query.filter((User_meds.med_id == med_id)).first()
user_med.am_time = am_time
user_med.mid_day_time = mid_day_time
user_med.pm_time = pm_time
user_med.rx_duration = rx_duration
user_med.qty = qty
user_med.current_qty = qty
user_med.refills = refills
user_med.text_remind = True
db.session.add(user_med) #update user medication in DB
db.session.commit()
res = make_response(jsonify(req), 200) #make response to pass to front-end
#get user info to send confirmation text
med_name = user_med.brand_name
cell = user_med.user.cell_number
name = user_med.user.f_name
message = (f"Hello {name}! Thank you for signing up and scheduling your meds! You will now recieve text reminders when it is time to take your {med_name}.")
send_text_reminders(message, cell)
return res
@app.route("/delete_med", methods=['POST'])
def delete_medication():
"""Delete user medications from their list."""
med_strength = request.form.get('med_strength')
med_id = request.form.get('med_id')
med = User_meds.query.filter((User_meds.user_id == session['user_id']) &
(User_meds.med_id == med_id)).delete()
db.session.commit()
flash("Medication deleted!")
return redirect('/user-page')
if __name__ == "__main__":
schedule.every().day.at("22:00").do(send_for_active_users)
print("I am checking for active users.")
app.debug = False
# make sure templates, etc. are not cached in debug mode
app.jinja_env.auto_reload = app.debug
connect_to_db(app)
# point that we invoke the DebugToolbarExtension
# Use the DebugToolbar
DebugToolbarExtension(app)
schedule.run_continuously(1)
app.run(port=5000, host='0.0.0.0')