Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tests): Add automated test for recording functionality #803

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ src

dist/
build/
openadapt/error.log
138 changes: 138 additions & 0 deletions tests/openadapt/test_recording.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Module for testing the recording module."""
import multiprocessing
import time
import os
import pytest
from unittest.mock import patch
from openadapt import record, playback, db, config, utils
from openadapt.db import crud
from openadapt.models import Recording

@pytest.fixture
def setup_db():
# Setup the database connection and return the session
db_session = crud.get_new_session(read_and_write=True)
yield db_session
db_session.close()

def test_recording(setup_db):
terminate_processing = multiprocessing.Event()
terminate_recording = multiprocessing.Event()
status_pipe, child_conn = multiprocessing.Pipe()
# conn = multiprocessing.connection.Connection
# signal = conn.recv()
# print(f"Received signal: {signal}")
# signal_type = signal["type"]
# print(f"Signal type: {signal_type}")
onyedikachi-david marked this conversation as resolved.
Show resolved Hide resolved

# Start the recording process
record_proc = multiprocessing.Process(
target=record.record,
args=("test task", terminate_processing, terminate_recording, child_conn, False)
)
record_proc.start()

# Wait for the recording.started signal
start_time = time.time()
while time.time() - start_time < 30:
onyedikachi-david marked this conversation as resolved.
Show resolved Hide resolved
if status_pipe.poll():
message = status_pipe.recv()
if message.get("type") == "record.started":
print(f"Received signal: {message}")
onyedikachi-david marked this conversation as resolved.
Show resolved Hide resolved
# if signal_type == "record.started":
onyedikachi-david marked this conversation as resolved.
Show resolved Hide resolved
break
time.sleep(0.1)
else:
pytest.fail("recording.started signal not received within 30 seconds")
onyedikachi-david marked this conversation as resolved.
Show resolved Hide resolved

# Stop the recording by emitting the stop sequence
with patch("openadapt.playback.emit_stop_sequence") as mock_emit_stop_sequence:
mock_emit_stop_sequence()

# Wait for the recording to stop
terminate_processing.set()
record_proc.join()

# Assert the state of the database
with setup_db as session:
recording = session.query(Recording).filter_by(task_description="test task").first()
assert recording is not None, "Recording not found in the database"

# Assert the state of the filesystem
video_file_path = utils.get_video_file_path(recording.timestamp)
assert os.path.exists(video_file_path), "Video file not found"

performance_plot_path = utils.get_performance_plot_file_path(recording.timestamp)
assert os.path.exists(performance_plot_path), "Performance plot not found"

def test_record_functionality():
# Set up multiprocessing communication
parent_conn, child_conn = multiprocessing.Pipe()

# Set up termination events
terminate_processing = multiprocessing.Event()
terminate_recording = multiprocessing.Event()

# Start the recording process
record_process = multiprocessing.Process(
target=record.record,
args=("Test recording", terminate_processing, terminate_recording, child_conn, False)
)
record_process.start()

print("Waiting for the 'record.started' signal")
print(f"Parent conn: {parent_conn}")
print(f"Child conn: {child_conn}")
print(f"Parent conn poll: {parent_conn.poll()}")
print(f"Parent conn recv: {parent_conn.recv()}")
print(f"Message: {parent_conn.recv()['type']}")
# Wait for the 'record.started' signal
start_time = time.time()
while time.time() - start_time < 30: # 30 second timeout
onyedikachi-david marked this conversation as resolved.
Show resolved Hide resolved
if parent_conn.poll(1): # 1 second timeout for poll
print(f"Message: {parent_conn.recv()['type']}")
message = parent_conn.recv()
print(f"Message: {message}")
if message['type'] == 'record.started':
break
else:
pytest.fail("Timed out waiting for 'record.started' signal")

# Emit stop sequence using playback
keyboard_controller = playback.get_keyboard_controller()
for char in config.STOP_SEQUENCES[0]:
playback.play_key_event(
playback.create_key_event("press", char),
keyboard_controller
)
playback.play_key_event(
playback.create_key_event("release", char),
keyboard_controller
)

# Wait for the recording to stop
terminate_recording.wait(timeout=30)
onyedikachi-david marked this conversation as resolved.
Show resolved Hide resolved
if not terminate_recording.is_set():
pytest.fail("Recording did not stop within the expected time")

# Clean up the recording process
record_process.join(timeout=5)
abrichr marked this conversation as resolved.
Show resolved Hide resolved
if record_process.is_alive():
record_process.terminate()

# Assert database state
with crud.get_new_session() as session:
recording = session.query(Recording).order_by(Recording.id.desc()).first()
assert recording is not None, "No recording was created in the database"
assert recording.task_description == "Test recording"

# Assert filesystem state
video_path = utils.get_video_file_path(recording.timestamp)
assert os.path.exists(video_path), f"Video file not found at {video_path}"

performance_plot_path = utils.get_performance_plot_file_path(recording.timestamp)
assert os.path.exists(performance_plot_path), f"Performance plot not found at {performance_plot_path}"

if __name__ == "__main__":
pytest.main([__file__])