-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
492 lines (424 loc) · 19.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
import os
import cv2
import torch
import random
from facenet_pytorch import MTCNN
from imutils.video import WebcamVideoStream
import time
import datetime
import keyboard
import pygetwindow as gw
import threading
import pyperclip
import getpass
import sqlite3
from collections import deque
import torch.nn as nn
import torch.optim as optim
import numpy as np
import json
import requests
import pyautogui
FEEDBACK_FILE = 'feedback.json'
DETAILED_FEEDBACK_FILE = 'detailed_feedback.json'
def load_feedback(filename):
if os.path.isfile(filename):
with open(filename, 'r') as f:
return json.load(f)
return []
dqn_agent_analysis = {}
class DQN(nn.Module):
def __init__(self, state_size, action_size):
super(DQN, self).__init__()
self.fc1 = nn.Linear(state_size, 24)
self.fc2 = nn.Linear(24, 24)
self.fc3 = nn.Linear(24, action_size)
def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
return self.fc3(x)
class DQNAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = deque(maxlen=2000)
self.gamma = 0.90 # discount rate
self.epsilon = 1.0 # exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.learning_rate = 0.001
self.model = DQN(state_size, action_size)
self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
self.criterion = nn.MSELoss()
def memorize(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
if np.random.rand() <= self.epsilon:
return random.randrange(self.action_size)
act_values = self.model(torch.FloatTensor(state))
return np.argmax(act_values.detach().numpy())
def replay(self, batch_size):
minibatch = random.sample(self.memory, batch_size)
for state, action, reward, next_state, done in minibatch:
target = reward
if not done:
target = (reward + self.gamma *
np.amax(self.model(torch.FloatTensor(next_state)).detach().numpy()))
target_f = self.model(torch.FloatTensor(state))
target_f = target_f.detach().numpy()
if action < len(target_f):
target_f[action] = target
else:
# Resize target_f to accommodate the maximum action index
target_f = np.pad(target_f, (0, action - len(target_f) + 1), mode='constant', constant_values=0)
target_f[action] = target
#print(f"Action index {action} is out of bounds for target_f with size {len(target_f)}")
continue
self.model.zero_grad()
outputs = self.model(torch.FloatTensor(state))
loss = self.criterion(outputs, torch.FloatTensor(target_f))
loss.backward()
self.optimizer.step()
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def retrain_with_feedback(self, feedback_data):
for feedback in feedback_data['feedback']:
state = [int(feedback['frame_index'])]
action = 0 if feedback['feedback'] == 'yes' else 1
reward = 1 if feedback['feedback'] == 'yes' else -1
next_state = state
done = True
self.memorize(state, action, reward, next_state, done)
action_mapping = {
"looking_at_screen": 0,
"looking_at_right": 1,
"looking_at_left": 2,
"looking_down_mobile": 3,
"no_face_using_mobile": 4
}
for detailed_feedback in feedback_data['detailed_feedback']:
state = [int(detailed_feedback.get('frame_index', 0))] # Use default value if frame_index is missing
face_position = detailed_feedback.get('face_position')
if face_position not in action_mapping:
print(f"Invalid face position: {face_position}")
continue
action = action_mapping[face_position]
reward = 1 if face_position == "looking_at_screen" else -1
next_state = state
done = True
self.memorize(state, action, reward, next_state, done)
batch_size = min(len(self.memory), 32) # Ensure batch size doesn't exceed memory size
self.replay(batch_size)
def get_latest_dqn_analysis(self):
analysis = {}
# Get the predicted actions for different states
analysis['predicted_actions'] = {}
for state_index in range(self.state_size):
state = [state_index]
predicted_action = self.act(state)
analysis['predicted_actions'][state_index] = predicted_action
# Compute other analysis metrics if needed
return analysis
app_data_dir = os.getenv('APPDATA')
# Define the directory name to store log files
log_dir = os.path.join(app_data_dir, 'KeystrokeLogger')
# Create the log directory if it doesn't exist
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logged_in_user = getpass.getuser()
conn = sqlite3.connect('surveillance.db')
cursor = conn.cursor()
DATABASE = 'surveillance.db'
def read_text_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except (FileNotFoundError, UnicodeDecodeError) as e:
print(f"Error reading file {file_path}: {e}")
return None
def insert_data_into_db(user, keystroke_log_file, window_tab_log_file, cam_log_file):
keystroke_log = read_text_file(keystroke_log_file)
window_tab_log = read_text_file(window_tab_log_file)
cam_log = read_text_file(cam_log_file)
if keystroke_log is None or window_tab_log is None or cam_log is None:
print("One or more log files are missing or couldn't be read. Skipping database insertion.")
return
try:
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO user_logs (user, keystroke_log, window_tab_log, cam_log)
VALUES (?, ?, ?, ?)
''', (user, keystroke_log, window_tab_log, cam_log))
conn.commit()
print("Data inserted successfully")
except sqlite3.Error as e:
print(f"An error occurred: {e}")
# Define the log file paths within the log directory
KEYSTROKE_LOG_FILE = os.path.join(
log_dir, f"{logged_in_user}_keystroke_log.txt")
WINDOW_LOG_FILE = os.path.join(log_dir, f"{logged_in_user}_window_log.txt")
WEBCAM_LOG_FILE = os.path.join(
log_dir, f"{logged_in_user}_webcam_log.txt")
import shutil
TARGET_LOG_FILE = os.path.join("Eye-Tracker", "webcam_log.txt")
def copy_webcam_log():
try:
shutil.copyfile(WEBCAM_LOG_FILE, TARGET_LOG_FILE)
print("Webcam log updated successfully.")
except Exception as e:
print("Error:", e)
has_run = False
def run_once():
global has_run
if not has_run:
print("Running function for the first time.")
copy_webcam_log()
has_run = True
else:
print("Function has already run.")
def detect_faces(mtcnn, frame):
boxes, probs, landmarks = mtcnn.detect(frame, landmarks=True)
if boxes is not None and len(boxes) > 0:
left_eye_y = landmarks[0][0][1]
right_eye_y = landmarks[0][1][1]
nose_y = landmarks[0][2][1]
eye_to_nose_distance = nose_y - (left_eye_y + right_eye_y) / 2
if 5 <= eye_to_nose_distance <= 20:
print("Looking at Screen")
return "Looking at Screen"
elif eye_to_nose_distance > 40:
print("Looking down, Possible Mobile Usage")
return "Looking down/Mobile Usage"
else:
x, _, w, _ = map(int, boxes[0])
center = (x + w) / 2
width = frame.shape[1]
if center < width * 0.4:
print("Looking at Right")
return "Looking at Right"
elif center > width * 0.6:
print("Looking at Left")
return "Looking at Left"
else:
print("Looking at Screen")
return "Looking at Screen"
else:
print("No Face Detected or Possible Mobile Use")
return "No Face/Using Mobile"
def save_frame_as_proof(frame, filename_prefix):
grayscale_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
filename = f"{filename_prefix}_{timestamp}.jpg"
directory = "Eye-Tracker"
if not os.path.exists(directory):
os.makedirs(directory)
filepath = os.path.join(directory, filename).replace('\\', '/')
cv2.imwrite(filepath, grayscale_frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
return filepath
def main():
try:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
mtcnn = MTCNN(keep_all=True, device=device, post_process=False)
state_size = 1
action_size = 4
agent = DQNAgent(state_size, action_size)
feedback_data = load_feedback(FEEDBACK_FILE)
detailed_feedback_data = load_feedback(DETAILED_FEEDBACK_FILE)
for feedback in feedback_data:
frame_index = feedback.get('frame_index')
if frame_index is None:
print("Invalid feedback data: 'frame_index' is missing or None.")
continue
state = [int(frame_index)]
action = 0 if feedback['feedback'] == 'yes' else 1
reward = 1 if feedback['feedback'] == 'yes' else -1
next_state = state
done = True
agent.memorize(state, action, reward, next_state, done)
for detailed_feedback in detailed_feedback_data:
#frame_index = detailed_feedback.get('frame_index')
#if frame_index is None:
# print("Invalid detailed feedback data: 'frame_index' is missing or None.")
# continue
state = [int(frame_index)]
action_mapping = {
"looking_at_screen": 0,
"looking_at_right": 1,
"looking_at_left": 2,
"looking_down_mobile": 3,
"no_face_using_mobile": 4
}
face_position = detailed_feedback.get('face_position')
if face_position not in action_mapping:
print(f"Invalid face position: {face_position}")
continue
action = action_mapping[face_position]
reward = 1 if face_position == "looking_at_screen" else -1
next_state = state
done = True
agent.memorize(state, action, reward, next_state, done)
# Load feedback data from JSON files
feedback_data = {
'feedback': load_feedback(FEEDBACK_FILE),
'detailed_feedback': load_feedback(DETAILED_FEEDBACK_FILE)
}
# Retrain DQN agent using the feedback data
agent.retrain_with_feedback(feedback_data)
with open("webcam_log.txt", "a", encoding='utf-8') as log_file:
while True:
wait_time = random.randint(1, 3)
print(f"Waiting for {wait_time} seconds...")
time.sleep(wait_time)
video_capture = WebcamVideoStream(src=0).start()
time.sleep(2.0)
frame = video_capture.read()
position = detect_faces(mtcnn, frame)
if position == "Looking at Screen":
state = [0]
elif position == "Looking at Right":
state = [1]
elif position == "Looking at Left":
state = [2]
elif position == "Looking Down/Mobile Possible":
state = [3]
else:
state = [4]
action = agent.act(state)
reward = 1 if position == "Looking at Screen" else -1
next_state = state
agent.memorize(state, action, reward, next_state, False)
if len(agent.memory) > 32:
agent.replay(32)
dqn_agent_analysis['Looking at Screen'] = agent.model(torch.FloatTensor([0])).detach().numpy().tolist()
dqn_agent_analysis['Looking at Right'] = agent.model(torch.FloatTensor([1])).detach().numpy().tolist()
dqn_agent_analysis['Looking at Left'] = agent.model(torch.FloatTensor([2])).detach().numpy().tolist()
dqn_agent_analysis['Looking Down/Mobile Possible'] = agent.model(torch.FloatTensor([3])).detach().numpy().tolist()
dqn_agent_analysis['No Face/Using Mobile'] = agent.model(torch.FloatTensor([4])).detach().numpy().tolist()
with open("dqn_agent_analysis.json", "w") as dqn_analysis_file:
json.dump(dqn_agent_analysis, dqn_analysis_file)
cv2.putText(frame, f"Position: {position}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
proof_filename = save_frame_as_proof(frame, "proof")
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
with open(WEBCAM_LOG_FILE, "a", encoding='utf-8') as log_file:
log_file.write(f"Timestamp: {timestamp}, Position: {position}, Proof Frame: {proof_filename}\n")
time.sleep(1)
cv2.destroyAllWindows()
video_capture.stop()
except KeyboardInterrupt:
print("Exiting...")
finally:
insert_data_into_db(logged_in_user, KEYSTROKE_LOG_FILE, WINDOW_LOG_FILE, WEBCAM_LOG_FILE)
def ensure_log_directory_exists():
os.makedirs(os.path.dirname(KEYSTROKE_LOG_FILE), exist_ok=True)
def get_active_window_title():
active_window = gw.getActiveWindow()
if active_window is not None:
return active_window.title
else:
return "Unknown"
def log_keystroke(keys, window_title):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ensure_log_directory_exists()
with open(KEYSTROKE_LOG_FILE, 'a', encoding='utf-8') as f:
f.write(f"{timestamp}: {keys} - {window_title}\n")
def on_press(event):
window_title = get_active_window_title()
if keyboard.is_pressed('ctrl') and event.name == 'c':
log_keystroke('Ctrl+C', window_title)
elif keyboard.is_pressed('ctrl') and event.name == 'v':
log_keystroke('Ctrl+V', window_title)
elif keyboard.is_pressed('alt') and event.name == 'tab':
log_keystroke('Alt+Tab', window_title)
elif keyboard.is_pressed('ctrl') and event.name == 'a':
log_keystroke('Ctrl+A', window_title)
elif keyboard.is_pressed('ctrl') and keyboard.is_pressed('win') and event.name == 'left':
log_keystroke('Ctrl+Win+Left', window_title)
elif keyboard.is_pressed('ctrl') and keyboard.is_pressed('win') and event.name == 'right':
log_keystroke('Ctrl+Win+Right', window_title)
def monitor_clipboard():
previous_clipboard_content = None
while True:
clipboard_content = pyperclip.paste()
if clipboard_content != previous_clipboard_content:
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(KEYSTROKE_LOG_FILE, 'a', encoding='utf-8') as f:
f.write(f"{timestamp} - Clipboard: {clipboard_content}\n")
previous_clipboard_content = clipboard_content
time.sleep(1)
previous_clipboard_content = None
def check_clipboard():
global previous_clipboard_content
while True:
clipboard_content = pyperclip.paste()
if clipboard_content != previous_clipboard_content:
previous_clipboard_content = clipboard_content
with open(WINDOW_LOG_FILE, "a", encoding='utf-8') as log_file:
log_file.write(
f"{time.strftime('%Y-%m-%d %H:%M:%S')} - Clipboard: {clipboard_content}\n")
time.sleep(1)
def show_popup_with_logo(logo_url, popup_text):
# Download the logo image from the URL
response = requests.get(logo_url)
logo_data = response.content
# Convert the logo image data to a numpy array
nparr = np.frombuffer(logo_data, np.uint8)
logo_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Resize the logo image to a smaller size
logo_img_resized = cv2.resize(logo_img, (200, 100)) # Adjust dimensions as needed
# Get the screen width and height
screen_width, screen_height = pyautogui.size()
# Create a blank popup window
popup_width, popup_height = 400, 300 # Adjust dimensions as needed
popup = np.zeros((popup_height, popup_width, 3), dtype=np.uint8)
# Calculate position to center the logo image
x_offset = (popup_width - logo_img_resized.shape[1]) // 2
y_offset = (popup_height - logo_img_resized.shape[0]) // 2
# Calculate position to center the popup window
popup_x = (screen_width - popup_width) // 2
popup_y = (screen_height - popup_height) // 2
# Insert the resized logo image into the popup window
popup[y_offset:y_offset+logo_img_resized.shape[0], x_offset:x_offset+logo_img_resized.shape[1]] = logo_img_resized
# Display the popup window with the provided text
cv2.imshow("Ai Cheating Surveillance", popup)
cv2.moveWindow("Ai Cheating Surveillance", popup_x, popup_y) # Move the window to center of the screen
cv2.putText(popup, popup_text, (10, popup_height - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
cv2.waitKey(5000) # Display the popup for 5 seconds
cv2.destroyAllWindows()
def log_active_window_change():
previous_active_window = None
while True:
active_window = gw.getActiveWindow()
if active_window != previous_active_window:
if active_window is not None:
with open(WINDOW_LOG_FILE, "a", encoding='utf-8') as log_file:
log_file.write(
f"{time.strftime('%Y-%m-%d %H:%M:%S')} - Switched to: {active_window.title}\n")
previous_active_window = active_window
time.sleep(1)
if __name__ == "__main__":
logo_url = "https://lh3.googleusercontent.com/d/1zOUoYGbMePnFgqRF_rbkAlPWQEgzNkNu"
popup_text = "Cheating Surveillance Is Now Active"
show_popup_with_logo(logo_url, popup_text)
threads = []
threads.append(threading.Thread(target=main))
threads.append(threading.Thread(target=keyboard.on_press, args=(on_press,)))
threads.append(threading.Thread(target=log_active_window_change))
threads.append(threading.Thread(target=monitor_clipboard))
threads.append(threading.Thread(target=check_clipboard))
#print(f"Feedback file path: {FEEDBACK_FILE}")
# print(f"Detailed feedback file path: {DETAILED_FEEDBACK_FILE}")
#print(f"Database file path: {DATABASE}")
# print(f"Log directory path: {log_dir}")
#print(f"Keystroke log file path: {KEYSTROKE_LOG_FILE}")
# print(f"Window log file path: {WINDOW_LOG_FILE}")
# print(f"Webcam log file path: {WEBCAM_LOG_FILE}")
for thread in threads:
thread.daemon = True
thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
insert_data_into_db(logged_in_user, KEYSTROKE_LOG_FILE, WINDOW_LOG_FILE, WEBCAM_LOG_FILE)
print("Exiting...")