diff --git a/human_experiments/datasette_interface/entity/task/minecraft_task.py b/human_experiments/datasette_interface/entity/task/minecraft_task.py index f505a192b..2119a616f 100644 --- a/human_experiments/datasette_interface/entity/task/minecraft_task.py +++ b/human_experiments/datasette_interface/entity/task/minecraft_task.py @@ -20,25 +20,44 @@ class MinecraftMission(Base): id: Mapped[str] = mapped_column(Text, primary_key=True) group_session_id: Mapped[str] = mapped_column("group_session", Text, ForeignKey("group_session.id")) name: Mapped[str] = mapped_column(Text) - start_timestamp_unix: Mapped[str] = mapped_column(Text) - start_timestamp_iso8601: Mapped[str] = mapped_column(Text) - stop_timestamp_unix: Mapped[str] = mapped_column(Text) - stop_timestamp_iso8601: Mapped[str] = mapped_column(Text) + mission_start_timestamp_unix: Mapped[str] = mapped_column(Text) + mission_start_timestamp_iso8601: Mapped[str] = mapped_column(Text) + mission_stop_timestamp_unix: Mapped[str] = mapped_column(Text) + mission_stop_timestamp_iso8601: Mapped[str] = mapped_column(Text) + trial_start_timestamp_unix: Mapped[str] = mapped_column(Text) + trial_start_timestamp_iso8601: Mapped[str] = mapped_column(Text) + trial_stop_timestamp_unix: Mapped[str] = mapped_column(Text) + trial_stop_timestamp_iso8601: Mapped[str] = mapped_column(Text) final_team_score: Mapped[Optional[int]] = mapped_column(Integer) testbed_version: Mapped[str] = mapped_column(Text) - def __init__(self, id: str, group_session_id: str, name: str, start_timestamp_unix: str, - start_timestamp_iso8601: str, stop_timestamp_unix: str, stop_timestamp_iso8601: str, - final_team_score: int, testbed_version: str): + def __init__(self, + id: str, + group_session_id: str, + name: str, + mission_start_timestamp_unix: str, + mission_start_timestamp_iso8601: str, + mission_stop_timestamp_unix: str, + mission_stop_timestamp_iso8601: str, + trial_start_timestamp_unix: str, + trial_start_timestamp_iso8601: str, + trial_stop_timestamp_unix: str, + trial_stop_timestamp_iso8601: str, + final_team_score: int, + testbed_version: str): super().__init__() self.id = id self.group_session_id = group_session_id self.name = name - self.start_timestamp_unix = start_timestamp_unix - self.start_timestamp_iso8601 = start_timestamp_iso8601 - self.stop_timestamp_unix = stop_timestamp_unix - self.stop_timestamp_iso8601 = stop_timestamp_iso8601 + self.mission_start_timestamp_unix = mission_start_timestamp_unix + self.mission_start_timestamp_iso8601 = mission_start_timestamp_iso8601 + self.mission_stop_timestamp_unix = mission_stop_timestamp_unix + self.mission_stop_timestamp_iso8601 = mission_stop_timestamp_iso8601 + self.trial_start_timestamp_unix = trial_start_timestamp_unix + self.trial_start_timestamp_iso8601 = trial_start_timestamp_iso8601 + self.trial_stop_timestamp_unix = trial_stop_timestamp_unix + self.trial_stop_timestamp_iso8601 = trial_stop_timestamp_iso8601 self.final_team_score = final_team_score self.testbed_version = testbed_version diff --git a/human_experiments/datasette_interface/metadata.yml b/human_experiments/datasette_interface/metadata.yml index ae8861886..7c15f9813 100644 --- a/human_experiments/datasette_interface/metadata.yml +++ b/human_experiments/datasette_interface/metadata.yml @@ -134,10 +134,14 @@ databases: id: The unique ID of the mission. group_session_id: The group session that the mission was a part of. name: The name of the mission (Hands-on Training, Saturn A, or Saturn B) - start_timestamp_iso8601: The starting timestamp of the mission in ISO-8601 format. - start_timestamp_unix: The starting Unix timestamp of the mission. - stop_timestamp_iso8601: The stop timestamp of the mission in ISO-8601 format. - stop_timestamp_unix: The stopping Unix timestamp of the mission. + mission_start_timestamp_iso8601: The starting timestamp of the mission in ISO-8601 format. + mission_start_timestamp_unix: The starting Unix timestamp of the mission. + mission_stop_timestamp_iso8601: The stop timestamp of the mission in ISO-8601 format. + mission_stop_timestamp_unix: The stopping Unix timestamp of the mission. + trial_start_timestamp_iso8601: The starting timestamp of the trial in ISO-8601 format. + trial_start_timestamp_unix: The starting Unix timestamp of the trial. + trial_stop_timestamp_iso8601: The stop timestamp of the trial in ISO-8601 format. + trial_stop_timestamp_unix: The stopping Unix timestamp of the trial. final_team_score: The final mission score achieved by the team. testbed_version: The version of the testbed used for the mission. diff --git a/human_experiments/datasette_interface/process_eeg_raw_data.py b/human_experiments/datasette_interface/process_eeg_raw_data.py index adc224536..0e058b0a8 100755 --- a/human_experiments/datasette_interface/process_eeg_raw_data.py +++ b/human_experiments/datasette_interface/process_eeg_raw_data.py @@ -10,7 +10,6 @@ from process_raw_signals import create_indices from process_raw_signals import insert_raw_unlabeled_data from process_raw_signals import label_data -from process_raw_signals import remove_invalid_data from sqlalchemy.orm import Session from functools import partial @@ -51,7 +50,6 @@ def process_eeg_raw_data(database_engine, override): partial(get_station_from_xdf_stream, device_id_to_station_map=device_id_to_station_map)) create_indices(database_engine, not override, EEGRaw, "eeg") label_data(database_engine, override, EEGRaw, "eeg") - remove_invalid_data(database_engine, EEGRaw, "eeg") def recreate_eeg_raw_tables(database_engine): diff --git a/human_experiments/datasette_interface/process_fnirs_raw_data.py b/human_experiments/datasette_interface/process_fnirs_raw_data.py index ac7e2ed56..38e321a97 100755 --- a/human_experiments/datasette_interface/process_fnirs_raw_data.py +++ b/human_experiments/datasette_interface/process_fnirs_raw_data.py @@ -9,7 +9,6 @@ from process_raw_signals import create_indices from process_raw_signals import insert_raw_unlabeled_data from process_raw_signals import label_data -from process_raw_signals import remove_invalid_data logging.basicConfig( level=logging.INFO, @@ -37,7 +36,6 @@ def process_fnirs_raw_data(database_engine, override): get_station_from_xdf_stream) create_indices(database_engine, not override, FNIRSRaw, "fnirs") label_data(database_engine, override, FNIRSRaw, "fnirs") - remove_invalid_data(database_engine, FNIRSRaw, "fnirs") def recreate_fnirs_raw_tables(database_engine): diff --git a/human_experiments/datasette_interface/process_gaze_data.py b/human_experiments/datasette_interface/process_gaze_raw_data.py similarity index 92% rename from human_experiments/datasette_interface/process_gaze_data.py rename to human_experiments/datasette_interface/process_gaze_raw_data.py index 53e5a60f4..0cb7cea87 100755 --- a/human_experiments/datasette_interface/process_gaze_data.py +++ b/human_experiments/datasette_interface/process_gaze_raw_data.py @@ -9,7 +9,6 @@ from process_raw_signals import create_indices from process_raw_signals import insert_raw_unlabeled_data from process_raw_signals import label_data -from process_raw_signals import remove_invalid_data logging.basicConfig( level=logging.INFO, @@ -36,7 +35,6 @@ def process_gaze_raw_data(database_engine, override): get_station_from_xdf_stream) create_indices(database_engine, not override, GAZERaw, "Gaze") label_data(database_engine, override, GAZERaw, "Gaze") - remove_invalid_data(database_engine, GAZERaw, "Gaze") def recreate_gaze_raw_tables(database_engine): diff --git a/human_experiments/datasette_interface/process_minecraft_data.py b/human_experiments/datasette_interface/process_minecraft_data.py index 884d1224b..19cf771c1 100755 --- a/human_experiments/datasette_interface/process_minecraft_data.py +++ b/human_experiments/datasette_interface/process_minecraft_data.py @@ -290,14 +290,18 @@ def process_metadata_file(filepath, group_session, file_to_key_messages_mapping) if mission_in_progress: messages_to_insert_into_db.append(message) - start_timestamp = key_messages["mission_start"][0]["header"]["timestamp"] + mission_start_timestamp = key_messages["mission_start"][0]["header"]["timestamp"] - stop_timestamp = ( + mission_stop_timestamp = ( key_messages["mission_stop"][0]["header"]["timestamp"] if len(key_messages["mission_stop"]) == 1 else key_messages["trial_stop"][0]["header"]["timestamp"] ) + # Important to align audio with data + trial_start_timestamp = key_messages["trial_start"][0]["header"]["timestamp"] + trial_stop_timestamp = key_messages["trial_stop"][0]["header"]["timestamp"] + mission_name = key_messages["mission_start"][0]["data"]["mission"] testbed_version = key_messages["trial_start"][-1]["data"][ "testbed_version" @@ -322,10 +326,14 @@ def process_metadata_file(filepath, group_session, file_to_key_messages_mapping) group_session_id=group_session, id=trial_id, name=mission_name, - start_timestamp_unix=start_timestamp, - start_timestamp_iso8601=convert_iso8601_timestamp_to_unix(start_timestamp), - stop_timestamp_unix=stop_timestamp, - stop_timestamp_iso8601=convert_iso8601_timestamp_to_unix(stop_timestamp), + mission_start_timestamp_unix=convert_iso8601_timestamp_to_unix(mission_start_timestamp), + mission_start_timestamp_iso8601=mission_start_timestamp, + mission_stop_timestamp_unix=convert_iso8601_timestamp_to_unix(mission_stop_timestamp), + mission_stop_timestamp_iso8601=mission_stop_timestamp, + trial_start_timestamp_unix=convert_iso8601_timestamp_to_unix(trial_start_timestamp), + trial_start_timestamp_iso8601=trial_start_timestamp, + trial_stop_timestamp_unix=convert_iso8601_timestamp_to_unix(trial_stop_timestamp), + trial_stop_timestamp_iso8601=trial_stop_timestamp, final_team_score=final_team_score, testbed_version=testbed_version, ) @@ -334,8 +342,8 @@ def process_metadata_file(filepath, group_session, file_to_key_messages_mapping) MinecraftTestbedMessage( mission_id=trial_id, id=i, - timestamp_unix=message["header"]["timestamp"], - timestamp_iso8601=convert_iso8601_timestamp_to_unix(message["header"]["timestamp"]), + timestamp_unix=convert_iso8601_timestamp_to_unix(message["header"]["timestamp"]), + timestamp_iso8601=message["header"]["timestamp"], topic=message.pop("topic"), message=json.dumps(message), ) @@ -358,8 +366,10 @@ def process_directory_v2(group_session): info(f"Processing block_2.xdf for {group_session}.") messages = {"Hands-on Training": [], "Saturn_A": [], "Saturn_B": []} + trial_timestamps = {"Hands-on Training": [], "Saturn_A": [], "Saturn_B": []} current_mission = None + trial_mission = None # For trial labeling testbed_version = None for i, timestamp in enumerate(stream["time_stamps"]): text = stream["time_series"][i][0] @@ -390,6 +400,7 @@ def process_directory_v2(group_session): mission_state = message["data"]["mission_state"] if mission_state == "Start": current_mission = message["data"]["mission"] + trial_mission = current_mission if messages[current_mission]: info( f"There was already a mission of type {current_mission}" @@ -402,14 +413,18 @@ def process_directory_v2(group_session): current_mission = None else: pass - elif ( - topic == "trial" - and message["msg"]["sub_type"] == "start" - and testbed_version is None - # The line above assumes that we do not update the testbed in - # the middle of a trial. - ): - testbed_version = message["data"]["testbed_version"] + elif topic == "trial": + if message["msg"]["sub_type"] == "start": + trial_start_index = i + + if testbed_version is None: + # The line above assumes that we do not update the testbed in + # the middle of a trial. + testbed_version = message["data"]["testbed_version"] + elif message["msg"]["sub_type"] == "stop": + trial_stop_index = i + trial_timestamps[trial_mission] = (trial_start_index, trial_stop_index) + trial_mission = None else: if current_mission is not None: messages[current_mission].append((i, message)) @@ -436,8 +451,11 @@ def process_directory_v2(group_session): else: error("[MISSING DATA]: No scoreboard messages found!") - start_timestamp_lsl = stream["time_stamps"][messages[0][0]] - stop_timestamp_lsl = stream["time_stamps"][messages[-1][0]] + mission_start_timestamp_lsl = stream["time_stamps"][messages[0][0]] + mission_stop_timestamp_lsl = stream["time_stamps"][messages[-1][0]] + + trial_start_timestamp_lsl = stream["time_stamps"][trial_timestamps[mission][0]] + trial_stop_timestamp_lsl = stream["time_stamps"][trial_timestamps[mission][1]] if trial_id in INVALID_MISSIONS: error(f"[ANOMALY] Skipping {trial_id} as it is a duplicate not removed by the deduplicate logic.") @@ -446,26 +464,30 @@ def process_directory_v2(group_session): group_session_id=group_session, id=trial_id, name=mission, - start_timestamp_unix=start_timestamp_lsl, - start_timestamp_iso8601=convert_unix_timestamp_to_iso8601(start_timestamp_lsl), - stop_timestamp_unix=stop_timestamp_lsl, - stop_timestamp_iso8601=convert_unix_timestamp_to_iso8601(stop_timestamp_lsl), + mission_start_timestamp_unix=mission_start_timestamp_lsl, + mission_start_timestamp_iso8601=convert_unix_timestamp_to_iso8601(mission_start_timestamp_lsl), + mission_stop_timestamp_unix=mission_stop_timestamp_lsl, + mission_stop_timestamp_iso8601=convert_unix_timestamp_to_iso8601(mission_stop_timestamp_lsl), + trial_start_timestamp_unix=trial_start_timestamp_lsl, + trial_start_timestamp_iso8601=convert_unix_timestamp_to_iso8601(trial_start_timestamp_lsl), + trial_stop_timestamp_unix=trial_stop_timestamp_lsl, + trial_stop_timestamp_iso8601=convert_unix_timestamp_to_iso8601(trial_stop_timestamp_lsl), final_team_score=final_team_score, testbed_version=testbed_version, ) minecraft_missions.append(minecraft_mission) - minecraft_testbed_messages.extend([ - MinecraftTestbedMessage( - mission_id=trial_id, - id=i, - timestamp_unix=stream["time_stamps"][i], - timestamp_iso8601=convert_unix_timestamp_to_iso8601(stream["time_stamps"][i]), - topic=message.pop("topic"), - message=json.dumps(message), + for i, message in messages: + minecraft_testbed_messages.append( + MinecraftTestbedMessage( + mission_id=trial_id, + id=i, + timestamp_unix=stream["time_stamps"][i], + timestamp_iso8601=convert_unix_timestamp_to_iso8601(stream["time_stamps"][i]), + topic=message.pop("topic"), + message=json.dumps(message), + ) ) - for i, message in messages - ]) return minecraft_missions, minecraft_testbed_messages diff --git a/human_experiments/datasette_interface/recreate_database b/human_experiments/datasette_interface/recreate_database index 4b6a54ead..4af0a35b4 100755 --- a/human_experiments/datasette_interface/recreate_database +++ b/human_experiments/datasette_interface/recreate_database @@ -22,7 +22,7 @@ from process_ping_pong_cooperative_data import process_ping_pong_cooperative_tas from process_minecraft_data import process_minecraft_data, recreate_minecraft_tables from process_fnirs_raw_data import process_fnirs_raw_data, recreate_fnirs_raw_tables from process_eeg_raw_data import process_eeg_raw_data, recreate_eeg_raw_tables -from process_gaze_data import process_gaze_raw_data, recreate_gaze_raw_tables +from process_gaze_raw_data import process_gaze_raw_data, recreate_gaze_raw_tables logging.basicConfig( level=logging.INFO, @@ -125,7 +125,7 @@ if __name__ == "__main__": help="User with granted writing permissions in the database.") parser.add_argument("--db_port", type=int, required=True, help="Port where the cluster is running. Make sure to set the port to your local cluster.") - parser.add_argument("--db_name", type=str, required=False, default=f"tomcat", + parser.add_argument("--db_name", type=str, required=False, default="tomcat", help="Database name. Make sure the database was previously created before running this script.") parser.add_argument("--db_passwd", type=str, required=False, default="", help="Password to connect to the database.") parser.add_argument("--override", action='store_true', @@ -150,6 +150,7 @@ if __name__ == "__main__": database_info = f"{args.db_user}:{args.db_name}@localhost:{args.db_port}" connection_string = f"postgresql+psycopg2://{database_info}/{args.db_passwd}" + info(f"Establishing connection to database {connection_string}") engine = create_engine(connection_string) tables = TABLES.copy()