forked from frcteam195/scouting_python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
AnalysisIR.py
272 lines (226 loc) · 12.9 KB
/
AnalysisIR.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import mysql.connector as mariaDB
import numpy as np
import datetime
import time
# For each analysisType we create add a new import statement. We could import all analysisTypes
from analysisTypes.autonomous import autonomous # Works in Database
from analysisTypes.ballSummary import ballSummary
from analysisTypes.brokeDown import brokeDown # Works in Database
from analysisTypes.climb import climb # Works in Database
from analysisTypes.groundPickup import groundPickup # Works in Database
from analysisTypes.hopperLoad import hopperLoad # Works in Database
from analysisTypes.lostComm import lostComm # Works in Database
from analysisTypes.matchVideos import matchVideos # Works in Database
from analysisTypes.playedDefense import playedDefense # Works in Database
from analysisTypes.subSBroke import subSBroke # Works in Database
from analysisTypes.totalBalls import totalBalls # Works in Database
from analysisTypes.totalInnerBalls import totalInnerBalls # Works in Database
from analysisTypes.totalLowBalls import totalLowBalls # Works in Database
from analysisTypes.totalOuterBalls import totalOuterBalls # Works in Database
from analysisTypes.totalScore import totalScore # Works in Database
from analysisTypes.totalUpperBalls import totalUpperBalls # Works in Database
from analysisTypes.wheelStage2 import wheelStage2 # Works in Database
from analysisTypes.wheelStage3 import wheelStage3 # Works in Database
from analysisTypes.startingPosition import startingPosition # Works in Database
from analysisTypes.ranking import ranking
# Define a Class called analysis
class analysis():
# Inside the class there are several functions defined _run_query, _setColumns, _wipeCEA, _getTeams,
# _getTeamData, _analyzeTeams, and _insertAnalysis. Those functions will not get called automatically
# so in order to get them to run we create a __init__ function which is a special function in Python
# that gets run every time the Class is initialized. Here we build the DB connection cursor from within
# the __init__ function and then call the cursor, columns, wipeCEA, rsRobots, and analyzeTeams functions
# from within the __init__ function, which means they will be run automatically when the Class is initialized
def __init__(self):
now = datetime.datetime.now()
print(now.strftime("%Y-%m-%d %H:%M:%S"))
start_time = time.time()
# Connection to AWS Testing database - use when you would destroy tables with proper data
# self.conn = mariaDB.connect(user='admin',
# passwd='Einstein195',
# host='frcteam195testinstance.cmdlvflptajw.us-east-1.rds.amazonaws.com',
# database='team195_scouting')
# self.cursor = self.conn.cursor()
# Pi DB with remote access (e.g. from laptop)
self.conn = mariaDB.connect(user='admin',
passwd='team195',
host='10.0.0.195',
database='team195_scouting')
self.cursor = self.conn.cursor()
# Pi DB with local access (e.g. from the Pi itself)
# self.conn = mariaDB.connect(user='admin',
# passwd='team195',
# host='localhost',
# database='team195_scouting')
# self.cursor = self.conn.cursor()
# Connection to AWS database with proper data
# self.conn = mariaDB.connect(user='admin',
# passwd='Einstein195',
# host='frcteam195.cmdlvflptajw.us-east-1.rds.amazonaws.com',
# database='team195_scouting')
self.cursor = self.conn.cursor()
self.columns = []
self._wipeCEA()
self.rsRobots = self._getTeams()
self._analyzeTeams()
self._rankTeamsAll()
print("Time: %0.2f seconds" % (time.time() - start_time))
print()
# Function to run a query - the query string must be passed to the function
def _run_query(self, query):
self.cursor.execute(query)
# Function to determine the DB table column headers
def _setColumns(self, columns):
self.columns = columns
# Function to wipe the CEA table. We may want to make this only remove CurrentEvent records.
def _wipeCEA(self):
self._run_query("DELETE FROM CurrentEventAnalysis")
self.conn.commit()
# Function to get the team list and set it to rsRobots. Uses the _run_query function defined above.
# The assert statement will return rsRobots if the record length > 0 and will exit with the
# message "No robots founds" if the record length is 0.
def _getTeams(self):
self._run_query("SELECT MatchScouting.Team FROM (MatchScouting "
"INNER JOIN Matches ON MatchScouting.MatchID = Matches.MatchID) "
"INNER JOIN Events ON Matches.EventID = Events.EventID "
"WHERE (((Events.CurrentEvent) = 1)) "
"GROUP BY CAST(MatchScouting.Team AS INT), MatchScouting.Team "
"HAVING (((MatchScouting.Team) Is Not Null)); ")
rsRobots = self.cursor.fetchall()
assert len(rsRobots) > 0, "No robots found"
return rsRobots
# Function to retrieve data records for a given team for all their matches and set it to rsRobotMatches
def _getTeamData(self, team):
self._run_query("SELECT MatchScouting.*, Matches.MatchNo, Teams.RobotWeight "
"FROM (Events INNER JOIN Matches ON Events.EventID = Matches.EventID) "
"INNER JOIN MatchScouting ON (Matches.EventID = MatchScouting.EventID) "
"AND (Matches.MatchID = MatchScouting.MatchID) "
"INNER JOIN Teams ON (MatchScouting.Team = Teams.Team) "
"WHERE (((MatchScouting.Team) = " + team[0] + " "
"AND ((Events.CurrentEvent) = 1))"
"AND ((ScoutingStatus = 1) Or (ScoutingStatus = 2) Or (ScoutingStatus = 3)) "
"AND (MatchScouting.TeamMatchNo <= 12)) "
"ORDER BY MatchScouting.TeamMatchNo;")
# Set columns to be a list of column headings in the Query results
# Very cool - cursor.description is used to auto-determine the column headings in the MatchScouting table
# so these values do not need to be hard-coded
self._setColumns([column[0] for column in list(self.cursor.description)])
rsRobotMatches = self.cursor.fetchall()
# If rsRobotMatches is not zero length return rsRobotMatches otherwise return None. This allows the
# function to skip a robot analysis if that robot does not have any match records yet.
if rsRobotMatches:
return rsRobotMatches
else:
return None
#
def _analyzeTeams(self):
# Loop over the # of teams and run each of the analysis functions calling _insertAnalysis after each one is run
for team in self.rsRobots:
# print(team)
rsRobotMatches = self._getTeamData(team)
# print(rsRobotMatches)
if rsRobotMatches:
rsCEA = autonomous(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = ballSummary(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = brokeDown(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = climb(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = groundPickup(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = hopperLoad(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = lostComm(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = matchVideos(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = playedDefense(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
# rsCEA = ranking(analysis=self, rsRobotMatches=rsRobotMatches)
# self._insertAnalysis(rsCEA)
rsCEA = startingPosition(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = subSBroke(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = totalBalls(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = totalInnerBalls(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = totalLowBalls(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = totalOuterBalls(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = totalScore(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = totalUpperBalls(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = wheelStage2(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
rsCEA = wheelStage3(analysis=self, rsRobotMatches=rsRobotMatches)
self._insertAnalysis(rsCEA)
# Helper function to rank a single analysis type, called by _rankTeamsAll
def _rankTeamsSingle(self, analysis_type):
# Get Summary 1 value for each team from CEA with analysis_type
# Sort in descending order by sum 1 value
# Determine percentile of each team
# Optional: see if at percentile cutoffs there is any repeated values
# Update summary 3 value in CEA for each team (rank based on percentile)
self._run_query("SELECT Team, Summary1Value "
"FROM CurrentEventAnalysis "
"WHERE AnalysisTypeID = " + str(analysis_type) + ";")
team_sum1 = self.cursor.fetchall() # List of tuples (team, summary1value)
if len(team_sum1) > 0:
team_sum1 = [team_tup for team_tup in team_sum1 if team_tup[1] is not None]
# print(team_sum1)
sum1 = [item[1] for item in team_sum1]
percentiles = np.percentile(sum1, [25, 50, 75, 90])
team_coloring = {}
for team in team_sum1:
if team[1] <= percentiles[0]:
team_color = 1
team_display = 10
elif team[1] <= percentiles[1]:
team_color = 2
team_display = 25
elif team[1] <= percentiles[2]:
team_color = 3
team_display = 50
elif team[1] <= percentiles[3]:
team_color = 4
team_display = 75
else:
team_color = 5
team_display = 90
query = "UPDATE CurrentEventAnalysis SET CurrentEventAnalysis.Summary3Format = " \
+ str(team_color) + ", CurrentEventAnalysis.Summary3Display = "\
+ str(team_display) + ", CurrentEventAnalysis.Summary3Value = " + str(team_display) \
+ " WHERE CurrentEventAnalysis.Team = '" + str(team[0]) \
+ "' AND CurrentEventAnalysis.AnalysisTypeID = " + str(analysis_type) + " ;"
self._run_query(query)
self.conn.commit()
else:
print('Data was not found in the db')
# run the _rankTeamsSingle for all analysis types in the analysisTypeList defined in this function
def _rankTeamsAll(self):
analysisTypeList=[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
for analysisType in analysisTypeList:
# print(analysisType)
self._rankTeamsSingle(analysisType)
# Function to insert an rsCEA record into the DB.
def _insertAnalysis(self, rsCEA):
rsCEA_records = rsCEA.items()
# Get the columnHeadings and values, do some formatting, and then use the _run_query function to run the
# query and the conn.commit to insert into the DB.
columnHeadings = str(tuple([record[0] for record in rsCEA_records])).replace("'", "")
values = str(tuple([record[1] for record in rsCEA_records]))
# Insert the records into the DB
self._run_query("INSERT INTO CurrentEventAnalysis "
+ columnHeadings + " VALUES "
+ values + ";")
# print(columnHeadings + values)
self.conn.commit()
# This initizlzes the analysis Class and thus runs the program.
if __name__ == '__main__':
myAnalysis = analysis()