Skip to content

Commit

Permalink
Preserves all data in the raw tables. (#545)
Browse files Browse the repository at this point in the history
* Do not remove invalid signals

* Printing connection string

* Removing tmp from table name

* Save MC trial start and end messages to the database as well.

* Save MC trial start and end messages to the database as well.

* Save MC trial start and end messages to the database as well.

* Save MC trial start and end messages to the database as well.

* Debug

* Debug

* Fixing MC in v2

* Removing tmp from MC tables

---------

Co-authored-by: Paulo Soares <[email protected]>
Co-authored-by: Paulo Soares <[email protected]>
  • Loading branch information
3 people authored Sep 27, 2023
1 parent a4ea871 commit 4afab55
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 55 deletions.
41 changes: 30 additions & 11 deletions human_experiments/datasette_interface/entity/task/minecraft_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,44 @@ class MinecraftMission(Base):
id: Mapped[str] = mapped_column(Text, primary_key=True)
group_session_id: Mapped[str] = mapped_column("group_session", Text, ForeignKey("group_session.id"))
name: Mapped[str] = mapped_column(Text)
start_timestamp_unix: Mapped[str] = mapped_column(Text)
start_timestamp_iso8601: Mapped[str] = mapped_column(Text)
stop_timestamp_unix: Mapped[str] = mapped_column(Text)
stop_timestamp_iso8601: Mapped[str] = mapped_column(Text)
mission_start_timestamp_unix: Mapped[str] = mapped_column(Text)
mission_start_timestamp_iso8601: Mapped[str] = mapped_column(Text)
mission_stop_timestamp_unix: Mapped[str] = mapped_column(Text)
mission_stop_timestamp_iso8601: Mapped[str] = mapped_column(Text)
trial_start_timestamp_unix: Mapped[str] = mapped_column(Text)
trial_start_timestamp_iso8601: Mapped[str] = mapped_column(Text)
trial_stop_timestamp_unix: Mapped[str] = mapped_column(Text)
trial_stop_timestamp_iso8601: Mapped[str] = mapped_column(Text)
final_team_score: Mapped[Optional[int]] = mapped_column(Integer)
testbed_version: Mapped[str] = mapped_column(Text)

def __init__(self, id: str, group_session_id: str, name: str, start_timestamp_unix: str,
start_timestamp_iso8601: str, stop_timestamp_unix: str, stop_timestamp_iso8601: str,
final_team_score: int, testbed_version: str):
def __init__(self,
id: str,
group_session_id: str,
name: str,
mission_start_timestamp_unix: str,
mission_start_timestamp_iso8601: str,
mission_stop_timestamp_unix: str,
mission_stop_timestamp_iso8601: str,
trial_start_timestamp_unix: str,
trial_start_timestamp_iso8601: str,
trial_stop_timestamp_unix: str,
trial_stop_timestamp_iso8601: str,
final_team_score: int,
testbed_version: str):
super().__init__()

self.id = id
self.group_session_id = group_session_id
self.name = name
self.start_timestamp_unix = start_timestamp_unix
self.start_timestamp_iso8601 = start_timestamp_iso8601
self.stop_timestamp_unix = stop_timestamp_unix
self.stop_timestamp_iso8601 = stop_timestamp_iso8601
self.mission_start_timestamp_unix = mission_start_timestamp_unix
self.mission_start_timestamp_iso8601 = mission_start_timestamp_iso8601
self.mission_stop_timestamp_unix = mission_stop_timestamp_unix
self.mission_stop_timestamp_iso8601 = mission_stop_timestamp_iso8601
self.trial_start_timestamp_unix = trial_start_timestamp_unix
self.trial_start_timestamp_iso8601 = trial_start_timestamp_iso8601
self.trial_stop_timestamp_unix = trial_stop_timestamp_unix
self.trial_stop_timestamp_iso8601 = trial_stop_timestamp_iso8601
self.final_team_score = final_team_score
self.testbed_version = testbed_version

Expand Down
12 changes: 8 additions & 4 deletions human_experiments/datasette_interface/metadata.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,14 @@ databases:
id: The unique ID of the mission.
group_session_id: The group session that the mission was a part of.
name: The name of the mission (Hands-on Training, Saturn A, or Saturn B)
start_timestamp_iso8601: The starting timestamp of the mission in ISO-8601 format.
start_timestamp_unix: The starting Unix timestamp of the mission.
stop_timestamp_iso8601: The stop timestamp of the mission in ISO-8601 format.
stop_timestamp_unix: The stopping Unix timestamp of the mission.
mission_start_timestamp_iso8601: The starting timestamp of the mission in ISO-8601 format.
mission_start_timestamp_unix: The starting Unix timestamp of the mission.
mission_stop_timestamp_iso8601: The stop timestamp of the mission in ISO-8601 format.
mission_stop_timestamp_unix: The stopping Unix timestamp of the mission.
trial_start_timestamp_iso8601: The starting timestamp of the trial in ISO-8601 format.
trial_start_timestamp_unix: The starting Unix timestamp of the trial.
trial_stop_timestamp_iso8601: The stop timestamp of the trial in ISO-8601 format.
trial_stop_timestamp_unix: The stopping Unix timestamp of the trial.
final_team_score: The final mission score achieved by the team.
testbed_version: The version of the testbed used for the mission.

Expand Down
2 changes: 0 additions & 2 deletions human_experiments/datasette_interface/process_eeg_raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from process_raw_signals import create_indices
from process_raw_signals import insert_raw_unlabeled_data
from process_raw_signals import label_data
from process_raw_signals import remove_invalid_data
from sqlalchemy.orm import Session
from functools import partial

Expand Down Expand Up @@ -51,7 +50,6 @@ def process_eeg_raw_data(database_engine, override):
partial(get_station_from_xdf_stream, device_id_to_station_map=device_id_to_station_map))
create_indices(database_engine, not override, EEGRaw, "eeg")
label_data(database_engine, override, EEGRaw, "eeg")
remove_invalid_data(database_engine, EEGRaw, "eeg")


def recreate_eeg_raw_tables(database_engine):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from process_raw_signals import create_indices
from process_raw_signals import insert_raw_unlabeled_data
from process_raw_signals import label_data
from process_raw_signals import remove_invalid_data

logging.basicConfig(
level=logging.INFO,
Expand Down Expand Up @@ -37,7 +36,6 @@ def process_fnirs_raw_data(database_engine, override):
get_station_from_xdf_stream)
create_indices(database_engine, not override, FNIRSRaw, "fnirs")
label_data(database_engine, override, FNIRSRaw, "fnirs")
remove_invalid_data(database_engine, FNIRSRaw, "fnirs")


def recreate_fnirs_raw_tables(database_engine):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from process_raw_signals import create_indices
from process_raw_signals import insert_raw_unlabeled_data
from process_raw_signals import label_data
from process_raw_signals import remove_invalid_data

logging.basicConfig(
level=logging.INFO,
Expand All @@ -36,7 +35,6 @@ def process_gaze_raw_data(database_engine, override):
get_station_from_xdf_stream)
create_indices(database_engine, not override, GAZERaw, "Gaze")
label_data(database_engine, override, GAZERaw, "Gaze")
remove_invalid_data(database_engine, GAZERaw, "Gaze")


def recreate_gaze_raw_tables(database_engine):
Expand Down
86 changes: 54 additions & 32 deletions human_experiments/datasette_interface/process_minecraft_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,18 @@ def process_metadata_file(filepath, group_session, file_to_key_messages_mapping)
if mission_in_progress:
messages_to_insert_into_db.append(message)

start_timestamp = key_messages["mission_start"][0]["header"]["timestamp"]
mission_start_timestamp = key_messages["mission_start"][0]["header"]["timestamp"]

stop_timestamp = (
mission_stop_timestamp = (
key_messages["mission_stop"][0]["header"]["timestamp"]
if len(key_messages["mission_stop"]) == 1
else key_messages["trial_stop"][0]["header"]["timestamp"]
)

# Important to align audio with data
trial_start_timestamp = key_messages["trial_start"][0]["header"]["timestamp"]
trial_stop_timestamp = key_messages["trial_stop"][0]["header"]["timestamp"]

mission_name = key_messages["mission_start"][0]["data"]["mission"]
testbed_version = key_messages["trial_start"][-1]["data"][
"testbed_version"
Expand All @@ -322,10 +326,14 @@ def process_metadata_file(filepath, group_session, file_to_key_messages_mapping)
group_session_id=group_session,
id=trial_id,
name=mission_name,
start_timestamp_unix=start_timestamp,
start_timestamp_iso8601=convert_iso8601_timestamp_to_unix(start_timestamp),
stop_timestamp_unix=stop_timestamp,
stop_timestamp_iso8601=convert_iso8601_timestamp_to_unix(stop_timestamp),
mission_start_timestamp_unix=convert_iso8601_timestamp_to_unix(mission_start_timestamp),
mission_start_timestamp_iso8601=mission_start_timestamp,
mission_stop_timestamp_unix=convert_iso8601_timestamp_to_unix(mission_stop_timestamp),
mission_stop_timestamp_iso8601=mission_stop_timestamp,
trial_start_timestamp_unix=convert_iso8601_timestamp_to_unix(trial_start_timestamp),
trial_start_timestamp_iso8601=trial_start_timestamp,
trial_stop_timestamp_unix=convert_iso8601_timestamp_to_unix(trial_stop_timestamp),
trial_stop_timestamp_iso8601=trial_stop_timestamp,
final_team_score=final_team_score,
testbed_version=testbed_version,
)
Expand All @@ -334,8 +342,8 @@ def process_metadata_file(filepath, group_session, file_to_key_messages_mapping)
MinecraftTestbedMessage(
mission_id=trial_id,
id=i,
timestamp_unix=message["header"]["timestamp"],
timestamp_iso8601=convert_iso8601_timestamp_to_unix(message["header"]["timestamp"]),
timestamp_unix=convert_iso8601_timestamp_to_unix(message["header"]["timestamp"]),
timestamp_iso8601=message["header"]["timestamp"],
topic=message.pop("topic"),
message=json.dumps(message),
)
Expand All @@ -358,8 +366,10 @@ def process_directory_v2(group_session):
info(f"Processing block_2.xdf for {group_session}.")

messages = {"Hands-on Training": [], "Saturn_A": [], "Saturn_B": []}
trial_timestamps = {"Hands-on Training": [], "Saturn_A": [], "Saturn_B": []}

current_mission = None
trial_mission = None # For trial labeling
testbed_version = None
for i, timestamp in enumerate(stream["time_stamps"]):
text = stream["time_series"][i][0]
Expand Down Expand Up @@ -390,6 +400,7 @@ def process_directory_v2(group_session):
mission_state = message["data"]["mission_state"]
if mission_state == "Start":
current_mission = message["data"]["mission"]
trial_mission = current_mission
if messages[current_mission]:
info(
f"There was already a mission of type {current_mission}"
Expand All @@ -402,14 +413,18 @@ def process_directory_v2(group_session):
current_mission = None
else:
pass
elif (
topic == "trial"
and message["msg"]["sub_type"] == "start"
and testbed_version is None
# The line above assumes that we do not update the testbed in
# the middle of a trial.
):
testbed_version = message["data"]["testbed_version"]
elif topic == "trial":
if message["msg"]["sub_type"] == "start":
trial_start_index = i

if testbed_version is None:
# The line above assumes that we do not update the testbed in
# the middle of a trial.
testbed_version = message["data"]["testbed_version"]
elif message["msg"]["sub_type"] == "stop":
trial_stop_index = i
trial_timestamps[trial_mission] = (trial_start_index, trial_stop_index)
trial_mission = None
else:
if current_mission is not None:
messages[current_mission].append((i, message))
Expand All @@ -436,8 +451,11 @@ def process_directory_v2(group_session):
else:
error("[MISSING DATA]: No scoreboard messages found!")

start_timestamp_lsl = stream["time_stamps"][messages[0][0]]
stop_timestamp_lsl = stream["time_stamps"][messages[-1][0]]
mission_start_timestamp_lsl = stream["time_stamps"][messages[0][0]]
mission_stop_timestamp_lsl = stream["time_stamps"][messages[-1][0]]

trial_start_timestamp_lsl = stream["time_stamps"][trial_timestamps[mission][0]]
trial_stop_timestamp_lsl = stream["time_stamps"][trial_timestamps[mission][1]]

if trial_id in INVALID_MISSIONS:
error(f"[ANOMALY] Skipping {trial_id} as it is a duplicate not removed by the deduplicate logic.")
Expand All @@ -446,26 +464,30 @@ def process_directory_v2(group_session):
group_session_id=group_session,
id=trial_id,
name=mission,
start_timestamp_unix=start_timestamp_lsl,
start_timestamp_iso8601=convert_unix_timestamp_to_iso8601(start_timestamp_lsl),
stop_timestamp_unix=stop_timestamp_lsl,
stop_timestamp_iso8601=convert_unix_timestamp_to_iso8601(stop_timestamp_lsl),
mission_start_timestamp_unix=mission_start_timestamp_lsl,
mission_start_timestamp_iso8601=convert_unix_timestamp_to_iso8601(mission_start_timestamp_lsl),
mission_stop_timestamp_unix=mission_stop_timestamp_lsl,
mission_stop_timestamp_iso8601=convert_unix_timestamp_to_iso8601(mission_stop_timestamp_lsl),
trial_start_timestamp_unix=trial_start_timestamp_lsl,
trial_start_timestamp_iso8601=convert_unix_timestamp_to_iso8601(trial_start_timestamp_lsl),
trial_stop_timestamp_unix=trial_stop_timestamp_lsl,
trial_stop_timestamp_iso8601=convert_unix_timestamp_to_iso8601(trial_stop_timestamp_lsl),
final_team_score=final_team_score,
testbed_version=testbed_version,
)
minecraft_missions.append(minecraft_mission)

minecraft_testbed_messages.extend([
MinecraftTestbedMessage(
mission_id=trial_id,
id=i,
timestamp_unix=stream["time_stamps"][i],
timestamp_iso8601=convert_unix_timestamp_to_iso8601(stream["time_stamps"][i]),
topic=message.pop("topic"),
message=json.dumps(message),
for i, message in messages:
minecraft_testbed_messages.append(
MinecraftTestbedMessage(
mission_id=trial_id,
id=i,
timestamp_unix=stream["time_stamps"][i],
timestamp_iso8601=convert_unix_timestamp_to_iso8601(stream["time_stamps"][i]),
topic=message.pop("topic"),
message=json.dumps(message),
)
)
for i, message in messages
])

return minecraft_missions, minecraft_testbed_messages

Expand Down
5 changes: 3 additions & 2 deletions human_experiments/datasette_interface/recreate_database
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ from process_ping_pong_cooperative_data import process_ping_pong_cooperative_tas
from process_minecraft_data import process_minecraft_data, recreate_minecraft_tables
from process_fnirs_raw_data import process_fnirs_raw_data, recreate_fnirs_raw_tables
from process_eeg_raw_data import process_eeg_raw_data, recreate_eeg_raw_tables
from process_gaze_data import process_gaze_raw_data, recreate_gaze_raw_tables
from process_gaze_raw_data import process_gaze_raw_data, recreate_gaze_raw_tables

logging.basicConfig(
level=logging.INFO,
Expand Down Expand Up @@ -125,7 +125,7 @@ if __name__ == "__main__":
help="User with granted writing permissions in the database.")
parser.add_argument("--db_port", type=int, required=True,
help="Port where the cluster is running. Make sure to set the port to your local cluster.")
parser.add_argument("--db_name", type=str, required=False, default=f"tomcat",
parser.add_argument("--db_name", type=str, required=False, default="tomcat",
help="Database name. Make sure the database was previously created before running this script.")
parser.add_argument("--db_passwd", type=str, required=False, default="", help="Password to connect to the database.")
parser.add_argument("--override", action='store_true',
Expand All @@ -150,6 +150,7 @@ if __name__ == "__main__":

database_info = f"{args.db_user}:{args.db_name}@localhost:{args.db_port}"
connection_string = f"postgresql+psycopg2://{database_info}/{args.db_passwd}"
info(f"Establishing connection to database {connection_string}")
engine = create_engine(connection_string)

tables = TABLES.copy()
Expand Down

0 comments on commit 4afab55

Please sign in to comment.