From 1e6414701f87623b4798e8bb61e8255a3b9810ff Mon Sep 17 00:00:00 2001 From: Eric Duong <86935179+eduongAZ@users.noreply.github.com> Date: Sun, 20 Aug 2023 11:32:59 -0700 Subject: [PATCH] Synchronize signal task (#531) * synchronize signals * affective task * affective task * rest state and finger tapping * ping pong competitive * ping pong cooperative * minecraft task, and pairing --- .../synchronize_signal_task/README.md | 3 + .../append_information/__init__.py | 0 .../common/__init__.py | 9 ++ .../common/get_station.py | 13 +++ .../common/remove_columns.py | 8 ++ .../common/remove_columns_all_exp.py | 10 ++ .../synchronize_signal_task/config.py | 21 +++++ .../process_eeg_500hz.py | 40 ++++++++ .../process_fnirs_10hz.py | 40 ++++++++ .../read_data/__init__.py | 9 ++ .../read_data/read_raw_csv.py | 47 ++++++++++ .../read_data/read_task_db.py | 88 +++++++++++++++++ .../read_data/tasks/__init__.py | 17 ++++ .../read_data/tasks/affective_individual.py | 53 +++++++++++ .../read_data/tasks/affective_team.py | 63 +++++++++++++ .../read_data/tasks/finger_tapping.py | 25 +++++ .../read_data/tasks/minecraft.py | 94 +++++++++++++++++++ .../read_data/tasks/ping_pong_competitive.py | 43 +++++++++ .../read_data/tasks/ping_pong_cooperative.py | 59 ++++++++++++ .../read_data/tasks/rest_state.py | 32 +++++++ .../synchronize_signal_task/requirements.txt | 8 ++ .../signal_synchronization/__init__.py | 8 ++ .../interpolation/__init__.py | 5 + .../interpolation/linear_interpolation.py | 36 +++++++ .../prepare_synchronization_data.py | 43 +++++++++ .../resample/__init__.py | 5 + .../resample/mne_resample.py | 23 +++++ .../synchronize_signals.py | 93 ++++++++++++++++++ .../signal_synchronization/utils/__init__.py | 8 ++ .../utils/generate_time_series.py | 28 ++++++ .../utils/get_shared_start_time.py | 16 ++++ .../utils/is_time_overlapping.py | 17 ++++ .../task_synchronization/__init__.py | 8 ++ .../prepare_task_synchronization_data.py | 31 ++++++ .../synchronize_task_signal.py | 89 ++++++++++++++++++ .../task_synchronization/utils/__init__.py | 13 +++ .../utils/group_signal_for_task_events.py | 78 +++++++++++++++ .../utils/group_signal_for_task_status.py | 34 +++++++ .../synchronize_affective_team_task_event.py | 66 +++++++++++++ .../utils/synchronize_task_event_signal.py | 49 ++++++++++ .../utils/synchronize_task_status_signal.py | 27 ++++++ .../write_data/__init__.py | 6 ++ .../write_data/write_csv.py | 29 ++++++ 43 files changed, 1394 insertions(+) create mode 100644 human_experiments/synchronize_signal_task/README.md create mode 100644 human_experiments/synchronize_signal_task/append_information/__init__.py create mode 100644 human_experiments/synchronize_signal_task/common/__init__.py create mode 100644 human_experiments/synchronize_signal_task/common/get_station.py create mode 100644 human_experiments/synchronize_signal_task/common/remove_columns.py create mode 100644 human_experiments/synchronize_signal_task/common/remove_columns_all_exp.py create mode 100644 human_experiments/synchronize_signal_task/config.py create mode 100644 human_experiments/synchronize_signal_task/process_eeg_500hz.py create mode 100644 human_experiments/synchronize_signal_task/process_fnirs_10hz.py create mode 100644 human_experiments/synchronize_signal_task/read_data/__init__.py create mode 100644 human_experiments/synchronize_signal_task/read_data/read_raw_csv.py create mode 100644 human_experiments/synchronize_signal_task/read_data/read_task_db.py create mode 100644 human_experiments/synchronize_signal_task/read_data/tasks/__init__.py create mode 100644 human_experiments/synchronize_signal_task/read_data/tasks/affective_individual.py create mode 100644 human_experiments/synchronize_signal_task/read_data/tasks/affective_team.py create mode 100644 human_experiments/synchronize_signal_task/read_data/tasks/finger_tapping.py create mode 100644 human_experiments/synchronize_signal_task/read_data/tasks/minecraft.py create mode 100644 human_experiments/synchronize_signal_task/read_data/tasks/ping_pong_competitive.py create mode 100644 human_experiments/synchronize_signal_task/read_data/tasks/ping_pong_cooperative.py create mode 100644 human_experiments/synchronize_signal_task/read_data/tasks/rest_state.py create mode 100644 human_experiments/synchronize_signal_task/requirements.txt create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/__init__.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/interpolation/__init__.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/interpolation/linear_interpolation.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/prepare_synchronization_data.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/resample/__init__.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/resample/mne_resample.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/synchronize_signals.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/utils/__init__.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/utils/generate_time_series.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/utils/get_shared_start_time.py create mode 100644 human_experiments/synchronize_signal_task/signal_synchronization/utils/is_time_overlapping.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/__init__.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/prepare_task_synchronization_data.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/synchronize_task_signal.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/utils/__init__.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/utils/group_signal_for_task_events.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/utils/group_signal_for_task_status.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_affective_team_task_event.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_task_event_signal.py create mode 100644 human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_task_status_signal.py create mode 100644 human_experiments/synchronize_signal_task/write_data/__init__.py create mode 100644 human_experiments/synchronize_signal_task/write_data/write_csv.py diff --git a/human_experiments/synchronize_signal_task/README.md b/human_experiments/synchronize_signal_task/README.md new file mode 100644 index 000000000..2918e3153 --- /dev/null +++ b/human_experiments/synchronize_signal_task/README.md @@ -0,0 +1,3 @@ +# Synchronize signal task + +This project is for synchronizing EEG and fNIRS signals with task data. diff --git a/human_experiments/synchronize_signal_task/append_information/__init__.py b/human_experiments/synchronize_signal_task/append_information/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/human_experiments/synchronize_signal_task/common/__init__.py b/human_experiments/synchronize_signal_task/common/__init__.py new file mode 100644 index 000000000..fbb067eb5 --- /dev/null +++ b/human_experiments/synchronize_signal_task/common/__init__.py @@ -0,0 +1,9 @@ +from .remove_columns import remove_columns +from .remove_columns_all_exp import remove_columns_all_exp +from .get_station import get_station + +__all__ = [ + "remove_columns", + "remove_columns_all_exp", + "get_station", +] diff --git a/human_experiments/synchronize_signal_task/common/get_station.py b/human_experiments/synchronize_signal_task/common/get_station.py new file mode 100644 index 000000000..4bb9621f1 --- /dev/null +++ b/human_experiments/synchronize_signal_task/common/get_station.py @@ -0,0 +1,13 @@ +import sqlite3 + + +def get_station(db_path: str, session: str, participant_id: int, task: str) -> str: + db = sqlite3.connect(db_path) + + station = db.execute(f""" + SELECT station + FROM data_validity + WHERE group_session = '{session}' AND participant = '{participant_id}' AND task = '{task}' + """).fetchall()[0][0] + + return station diff --git a/human_experiments/synchronize_signal_task/common/remove_columns.py b/human_experiments/synchronize_signal_task/common/remove_columns.py new file mode 100644 index 000000000..9e831475d --- /dev/null +++ b/human_experiments/synchronize_signal_task/common/remove_columns.py @@ -0,0 +1,8 @@ +import pandas as pd + + +def remove_columns(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame: + # get columns_to_drop in both dataframe and columns + columns_to_drop = [col for col in columns if col in df.columns] + + return df.drop(columns=columns_to_drop) diff --git a/human_experiments/synchronize_signal_task/common/remove_columns_all_exp.py b/human_experiments/synchronize_signal_task/common/remove_columns_all_exp.py new file mode 100644 index 000000000..6c88c200f --- /dev/null +++ b/human_experiments/synchronize_signal_task/common/remove_columns_all_exp.py @@ -0,0 +1,10 @@ +from .remove_columns import remove_columns + + +def remove_columns_all_exp(experiments: list[dict[str, any]], columns: list[str]): + for experiment in experiments: + for station in ["lion", "tiger", "leopard"]: + if station not in experiment: + continue + + experiment[station] = remove_columns(experiment[station], columns) diff --git a/human_experiments/synchronize_signal_task/config.py b/human_experiments/synchronize_signal_task/config.py new file mode 100644 index 000000000..9b7635712 --- /dev/null +++ b/human_experiments/synchronize_signal_task/config.py @@ -0,0 +1,21 @@ +import os + +USER = os.getenv("USER") +DB_PATH = f"/space/{USER}/tomcat/tomcat.db" +EEG_FILTERED_PATH = f"/space/{USER}/eeg_filtered" +FNIRS_FILTERED_PATH = f"/space/{USER}/fnirs_filtered" +NUM_PROCESSES = 40 +OUTPUT_DIR = "/tomcat/data/derived/drafts/release_2023_08_19_17" + +EXPERIMENT_SESSIONS = [ + "exp_2022_09_30_10", "exp_2022_11_01_10", "exp_2022_12_02_15", "exp_2023_02_21_14", + "exp_2022_10_04_09", "exp_2022_11_04_10", "exp_2022_12_05_12", "exp_2023_04_17_13", + "exp_2022_10_07_15", "exp_2022_11_07_10", "exp_2023_01_30_13", "exp_2023_04_18_14", + "exp_2022_10_14_10", "exp_2022_11_08_11", "exp_2023_01_31_14", "exp_2023_04_21_10", + "exp_2022_10_18_10", "exp_2022_11_10_10", "exp_2023_02_03_10", "exp_2023_04_24_13", + "exp_2022_10_21_15", "exp_2022_11_14_12", "exp_2023_02_06_13", "exp_2023_04_27_14", + "exp_2022_10_24_12", "exp_2022_11_15_13", "exp_2023_02_07_14", "exp_2023_04_28_10", + "exp_2022_10_27_10", "exp_2022_11_17_15", "exp_2023_02_10_10", "exp_2023_05_01_13", + "exp_2022_10_28_10", "exp_2022_11_18_10", "exp_2023_02_16_14", "exp_2023_05_02_14", + "exp_2022_10_31_10", "exp_2022_11_22_10", "exp_2023_02_20_01", "exp_2023_05_03_10", +] diff --git a/human_experiments/synchronize_signal_task/process_eeg_500hz.py b/human_experiments/synchronize_signal_task/process_eeg_500hz.py new file mode 100644 index 000000000..522a39b53 --- /dev/null +++ b/human_experiments/synchronize_signal_task/process_eeg_500hz.py @@ -0,0 +1,40 @@ +import os + +from common import remove_columns_all_exp +from config import DB_PATH, EEG_FILTERED_PATH, NUM_PROCESSES, EXPERIMENT_SESSIONS, OUTPUT_DIR +from read_data import read_raw_csv_all +from signal_synchronization import prepare_synchronization_data, synchronize_signals_all +from task_synchronization import synchronize_task_signal_all, prepare_task_synchronization_data +from write_data import write_csv_all + +if __name__ == "__main__": + desired_freq = 500 + + experiment_eeg_signal = read_raw_csv_all(EEG_FILTERED_PATH, NUM_PROCESSES, EXPERIMENT_SESSIONS) + + columns_to_remove = [ + "group_session", + "station", + "task", + "participant", + "timestamp_iso8601" + ] + + remove_columns_all_exp(experiment_eeg_signal, columns_to_remove) + + signal_type_info = [ + { + "signal_type": "eeg", + "experiment_signals": experiment_eeg_signal, + "recording_frequency": 500, + } + ] + + sync_experiments_info = prepare_synchronization_data(signal_type_info, desired_freq) + synchronized_signals = synchronize_signals_all(sync_experiments_info) + + task_synchronization_info = prepare_task_synchronization_data(synchronized_signals, DB_PATH, NUM_PROCESSES) + synchronized_task_signals = synchronize_task_signal_all(task_synchronization_info) + + output_dir = os.path.join(OUTPUT_DIR, f"eeg_{desired_freq}hz") + write_csv_all(synchronized_task_signals, OUTPUT_DIR, NUM_PROCESSES) diff --git a/human_experiments/synchronize_signal_task/process_fnirs_10hz.py b/human_experiments/synchronize_signal_task/process_fnirs_10hz.py new file mode 100644 index 000000000..507d4a7d3 --- /dev/null +++ b/human_experiments/synchronize_signal_task/process_fnirs_10hz.py @@ -0,0 +1,40 @@ +import os + +from common import remove_columns_all_exp +from config import DB_PATH, FNIRS_FILTERED_PATH, NUM_PROCESSES, EXPERIMENT_SESSIONS, OUTPUT_DIR +from read_data import read_raw_csv_all +from signal_synchronization import prepare_synchronization_data, synchronize_signals_all +from task_synchronization import synchronize_task_signal_all, prepare_task_synchronization_data +from write_data import write_csv_all + +if __name__ == "__main__": + desired_freq = 10 + + experiment_fnirs_signal = read_raw_csv_all(FNIRS_FILTERED_PATH, NUM_PROCESSES, EXPERIMENT_SESSIONS) + + columns_to_remove = [ + "group_session", + "station", + "task", + "participant", + "timestamp_iso8601" + ] + + remove_columns_all_exp(experiment_fnirs_signal, columns_to_remove) + + signal_type_info = [ + { + "signal_type": "fnirs", + "experiment_signals": experiment_fnirs_signal, + "recording_frequency": 10, + } + ] + + sync_experiments_info = prepare_synchronization_data(signal_type_info, desired_freq) + synchronized_signals = synchronize_signals_all(sync_experiments_info) + + task_synchronization_info = prepare_task_synchronization_data(synchronized_signals, DB_PATH, NUM_PROCESSES) + synchronized_task_signals = synchronize_task_signal_all(task_synchronization_info) + + output_dir = os.path.join(OUTPUT_DIR, f"fnirs_{desired_freq}hz") + write_csv_all(synchronized_task_signals, output_dir, NUM_PROCESSES) diff --git a/human_experiments/synchronize_signal_task/read_data/__init__.py b/human_experiments/synchronize_signal_task/read_data/__init__.py new file mode 100644 index 000000000..2f35df467 --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/__init__.py @@ -0,0 +1,9 @@ +from .read_raw_csv import read_raw_csv, read_raw_csv_all +from .read_task_db import read_task_db, read_task_db_all + +__all__ = [ + "read_raw_csv", + "read_raw_csv_all", + "read_task_db", + "read_task_db_all", +] diff --git a/human_experiments/synchronize_signal_task/read_data/read_raw_csv.py b/human_experiments/synchronize_signal_task/read_data/read_raw_csv.py new file mode 100644 index 000000000..07d19292c --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/read_raw_csv.py @@ -0,0 +1,47 @@ +import glob +import os +from multiprocessing import Pool + +import pandas as pd +from tqdm import tqdm + + +def read_raw_csv(csv_path: str) -> dict[str, any]: + dtypes = { + 'group_session': str, + 'task': str, + 'station': str, + 'timestamp_iso8601': str, + 'participant': int, + } + + exp_df = pd.read_csv(csv_path, index_col=0, dtype=dtypes) + lion_df = exp_df[exp_df['station'] == 'lion'] + tiger_df = exp_df[exp_df['station'] == 'tiger'] + leopard_df = exp_df[exp_df['station'] == 'leopard'] + + exp_name = os.path.splitext(os.path.basename(csv_path))[0] + + return { + 'experiment_name': exp_name, + 'lion': lion_df, + 'tiger': tiger_df, + 'leopard': leopard_df + } + + +def read_raw_csv_all(dir_path: str, + num_processes: int = 1, + whitelist_experiments: list[str] | None = None) -> list[dict[str, any]]: + csv_paths = sorted(glob.glob(f'{dir_path}/*.csv')) + + if whitelist_experiments is not None: + csv_paths = [path for path in csv_paths if os.path.splitext(os.path.basename(path))[0] in whitelist_experiments] + + if len(csv_paths) == 0: + raise ValueError(f'No CSV files found') + + with Pool(processes=num_processes) as pool: + raw_list = list(tqdm(pool.imap(read_raw_csv, csv_paths), total=len(csv_paths))) + + return raw_list diff --git a/human_experiments/synchronize_signal_task/read_data/read_task_db.py b/human_experiments/synchronize_signal_task/read_data/read_task_db.py new file mode 100644 index 000000000..b9b278d87 --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/read_task_db.py @@ -0,0 +1,88 @@ +from multiprocessing import Pool + +from tqdm import tqdm + +from .tasks import ( + affective_individual, + affective_team, + rest_state, + finger_tapping, + ping_pong_competitive, + ping_pong_cooperative, + minecraft +) + + +def read_task_db(db_path: str, experiment: str) -> dict[str, any]: + task_data = [] + + rest_state_data = rest_state(db_path, experiment) + if rest_state_data is not None: + task_data.append({ + "task_name": "rest_state", + "task_data": rest_state_data + }) + + finger_tapping_data = finger_tapping(db_path, experiment) + if finger_tapping_data is not None: + task_data.append({ + "task_name": "finger_tapping", + "task_data": finger_tapping_data + }) + + affective_individual_data = affective_individual(db_path, experiment) + if affective_individual_data is not None: + task_data.append({ + "task_name": "affective_individual", + "task_data": affective_individual_data + }) + + affective_team_data = affective_team(db_path, experiment) + if affective_team_data is not None: + task_data.append({ + "task_name": "affective_team", + "task_data": affective_team_data + }) + + ping_pong_competitive_data = ping_pong_competitive(db_path, experiment) + if ping_pong_competitive_data is not None: + task_data.append({ + "task_name": "ping_pong_competitive", + "task_data": ping_pong_competitive_data + }) + + ping_pong_cooperative_data = ping_pong_cooperative(db_path, experiment) + if ping_pong_cooperative_data is not None: + task_data.append({ + "task_name": "ping_pong_cooperative", + "task_data": ping_pong_cooperative_data + }) + + minecraft_data = minecraft(db_path, experiment) + if minecraft_data is not None: + task_data.append({ + "task_name": "minecraft", + "task_data": minecraft_data + }) + + task_data_dict = { + "experiment_name": experiment, + "task_data": task_data + } + + return task_data_dict + + +def _multiprocess_task_db(process_arg: tuple[str, str]) -> dict[str, any]: + return read_task_db(*process_arg) + + +def read_task_db_all(db_path: str, + experiments: list[str], + num_processes: int = 1) -> list[dict[str, any]]: + process_args = [(db_path, experiment) for experiment in experiments] + + with Pool(processes=num_processes) as pool: + results = list(tqdm(pool.imap(_multiprocess_task_db, process_args), total=len(process_args))) + + return results diff --git a/human_experiments/synchronize_signal_task/read_data/tasks/__init__.py b/human_experiments/synchronize_signal_task/read_data/tasks/__init__.py new file mode 100644 index 000000000..da077a16e --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/tasks/__init__.py @@ -0,0 +1,17 @@ +from .affective_individual import affective_individual +from .affective_team import affective_team +from .finger_tapping import finger_tapping +from .ping_pong_competitive import ping_pong_competitive +from .ping_pong_cooperative import ping_pong_cooperative +from .rest_state import rest_state +from .minecraft import minecraft + +__all__ = [ + 'affective_individual', + 'affective_team', + 'rest_state', + 'finger_tapping', + 'ping_pong_competitive', + 'ping_pong_cooperative', + 'minecraft' +] diff --git a/human_experiments/synchronize_signal_task/read_data/tasks/affective_individual.py b/human_experiments/synchronize_signal_task/read_data/tasks/affective_individual.py new file mode 100644 index 000000000..1dab6f21b --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/tasks/affective_individual.py @@ -0,0 +1,53 @@ +import sqlite3 + +import numpy as np +import pandas as pd + +from common import get_station + + +def affective_individual(db_path: str, experiment: str) -> dict[str, pd.DataFrame] | None: + db = sqlite3.connect(db_path) + + query = f""" + SELECT * + FROM affective_task_event + WHERE group_session = ? AND task_type = 'individual'; + """ + affective_individual_df = pd.read_sql_query(query, db, params=[experiment]) + + if affective_individual_df.empty: + return None + + # Map participant id to station + unique_participants = affective_individual_df['participant'].unique() + id_station_map = {} + for participant_id in unique_participants: + station = get_station(db_path, experiment, participant_id, "affective_individual") + id_station_map[participant_id] = station + + # Create a new column with the mapped stations + def map_station(participant): + return id_station_map.get(participant, np.nan) + + station_task_map = {} + for participant_id, station in id_station_map.items(): + station_affective_df = affective_individual_df[affective_individual_df['participant'] == participant_id] + station_affective_df = station_affective_df.copy() + station_affective_df['station'] = station_affective_df['participant'].apply(map_station) + station_affective_df = station_affective_df.drop(columns=['participant', + 'task_type', + 'group_session', + 'timestamp_iso8601']) + + # Reindex the DataFrame according to the new column order + cols = station_affective_df.columns.tolist() + cols = [cols[0]] + ['station'] + [col for col in cols[1:] if col != 'station'] + station_affective_df = station_affective_df.reindex(columns=cols) + + station_affective_df["timestamp_unix"] = station_affective_df["timestamp_unix"].astype(float) + station_affective_df = station_affective_df.reset_index(drop=True) + + station_task_map[station] = station_affective_df + + return station_task_map diff --git a/human_experiments/synchronize_signal_task/read_data/tasks/affective_team.py b/human_experiments/synchronize_signal_task/read_data/tasks/affective_team.py new file mode 100644 index 000000000..81b93e243 --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/tasks/affective_team.py @@ -0,0 +1,63 @@ +import sqlite3 + +import numpy as np +import pandas as pd + +from common import get_station + + +def affective_team(db_path: str, experiment: str) -> pd.DataFrame | None: + db = sqlite3.connect(db_path) + + query = f""" + SELECT * + FROM affective_task_event + WHERE group_session = ? AND task_type = 'team'; + """ + affective_team_df = pd.read_sql_query(query, db, params=[experiment]) + + if affective_team_df.empty: + return None + + # Map participant id to station + unique_participants = affective_team_df['participant'].unique() + id_station_map = {} + for participant_id in unique_participants: + if participant_id < 0: + continue + station = get_station(db_path, experiment, participant_id, "affective_team") + id_station_map[participant_id] = station + + # Create a new column with the mapped stations + def map_station(participant): + return id_station_map.get(participant, np.nan) + + affective_team_df = affective_team_df.copy() + affective_team_df['station'] = affective_team_df['participant'].apply(map_station) + + affective_team_df = affective_team_df.drop(columns=['participant', + 'task_type', + 'group_session', + 'timestamp_iso8601']) + + # Reindex the DataFrame according to the new column order + cols = affective_team_df.columns.tolist() + cols = [cols[0]] + ['station'] + [col for col in cols[1:] if col != 'station'] + affective_team_df = affective_team_df.reindex(columns=cols) + + affective_team_df["timestamp_unix"] = affective_team_df["timestamp_unix"].astype(float) + affective_team_df = affective_team_df.reset_index(drop=True) + + # Iterate through the unique non-NaN station values + for station in affective_team_df['station'].dropna().unique(): + # Create a new column named as _event_type + col_name = f'{station}_event_type' + # Fill the new column with the 'event_type' value where the 'station' column matches the current station + affective_team_df[col_name] = affective_team_df.apply( + lambda row: row['event_type'] if row['station'] == station else None, + axis=1 + ) + + affective_team_df = affective_team_df.drop(columns=["event_type"]) + + return affective_team_df diff --git a/human_experiments/synchronize_signal_task/read_data/tasks/finger_tapping.py b/human_experiments/synchronize_signal_task/read_data/tasks/finger_tapping.py new file mode 100644 index 000000000..ed7f919c5 --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/tasks/finger_tapping.py @@ -0,0 +1,25 @@ +import sqlite3 + +import pandas as pd + + +def finger_tapping(db_path: str, experiment: str) -> pd.DataFrame | None: + db = sqlite3.connect(db_path) + + query = f""" + SELECT * + FROM fingertapping_task_observation + WHERE group_session = ?; + """ + finger_tapping_df = pd.read_sql_query(query, db, params=[experiment]) + + if finger_tapping_df.empty: + return None + + finger_tapping_df = finger_tapping_df.drop(columns=['group_session', + 'timestamp_iso8601']) + + finger_tapping_df["timestamp_unix"] = finger_tapping_df["timestamp_unix"].astype(float) + finger_tapping_df = finger_tapping_df.reset_index(drop=True) + + return finger_tapping_df diff --git a/human_experiments/synchronize_signal_task/read_data/tasks/minecraft.py b/human_experiments/synchronize_signal_task/read_data/tasks/minecraft.py new file mode 100644 index 000000000..7414edcee --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/tasks/minecraft.py @@ -0,0 +1,94 @@ +import json +import sqlite3 + +import pandas as pd + + +def _get_testbed_messages(db_path: str, experiment: str, mission: str) -> pd.DataFrame | None: + db = sqlite3.connect(db_path) + + mission_id = db.execute(f""" + SELECT id + FROM mission + WHERE group_session = '{experiment}' AND name = '{mission}'; + """) + + if mission_id is None: + return None + + mission_id_result = mission_id.fetchone() + if mission_id_result is None or len(mission_id_result) == 0: + return None + + mission_id = mission_id_result[0] + + query = f""" + SELECT * + FROM testbed_message + WHERE mission = ? AND topic IN ('observations/events/mission', 'observations/events/scoreboard'); + """ + minecraft_df = pd.read_sql_query(query, db, params=[mission_id]) + + if minecraft_df.empty: + return None + + minecraft_df = minecraft_df.drop(columns=['mission', 'timestamp_iso8601']) + minecraft_df['timestamp_unix'] = minecraft_df['timestamp_unix'].astype(float) + minecraft_df = minecraft_df.sort_values(by='timestamp_unix').reset_index(drop=True) + + # Find the index where the mission starts + start_index = None + mission_indices = minecraft_df[minecraft_df['topic'] == 'observations/events/mission'].index + for idx in mission_indices: + message = json.loads(minecraft_df.loc[idx, 'message']) + if message["data"]["mission_state"] == "Start": + start_index = idx + break + + # Find the index where the mission ends + end_index = None + for idx in mission_indices: + message = json.loads(minecraft_df.loc[idx, 'message']) + if message["data"]["mission_state"] == "Stop": + end_index = idx + break + + # Keep only the rows between the start and end indices, excluding the start and end rows themselves + if start_index is not None and end_index is not None: + minecraft_df = minecraft_df.loc[start_index + 1: end_index - 1] + elif start_index is not None: # If there's no end_index, keep everything after the start + minecraft_df = minecraft_df.loc[start_index + 1:] + elif end_index is not None: # End found, no start + minecraft_df = minecraft_df.loc[:end_index - 1] + + if minecraft_df.empty: + return None + + minecraft_df = minecraft_df.sort_values(by='timestamp_unix').reset_index(drop=True) + + minecraft_df['points'] = minecraft_df.apply( + lambda row: json.loads(row['message'])["data"]["scoreboard"]["TeamScore"], + axis=1 + ) + + minecraft_df = minecraft_df.drop(columns=['message', 'topic']) + + return minecraft_df + + +def minecraft(db_path: str, experiment: str) -> dict[str, pd.DataFrame]: + training = _get_testbed_messages(db_path, experiment, 'Hands-on Training') + saturn_a = _get_testbed_messages(db_path, experiment, 'Saturn_A') + saturn_b = _get_testbed_messages(db_path, experiment, 'Saturn_B') + + minecraft_data = {} + if training is not None: + minecraft_data['hands_on_training'] = training + + if saturn_a is not None: + minecraft_data['saturn_a'] = saturn_a + + if saturn_b is not None: + minecraft_data['saturn_b'] = saturn_b + + return minecraft_data diff --git a/human_experiments/synchronize_signal_task/read_data/tasks/ping_pong_competitive.py b/human_experiments/synchronize_signal_task/read_data/tasks/ping_pong_competitive.py new file mode 100644 index 000000000..5a789de24 --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/tasks/ping_pong_competitive.py @@ -0,0 +1,43 @@ +import sqlite3 + +import pandas as pd + + +def ping_pong_competitive(db_path: str, experiment: str) -> dict[tuple[str, str], pd.DataFrame] | None: + db = sqlite3.connect(db_path) + + query = f""" + SELECT * + FROM ping_pong_competitive_task_observation + WHERE group_session = ?; + """ + ping_pong_competitive_df = pd.read_sql_query(query, db, params=[experiment]) + + if ping_pong_competitive_df.empty: + return None + + ping_pong_competitive_df = ping_pong_competitive_df.drop(columns=['group_session', + 'player_1_id', + 'player_2_id', + 'timestamp_iso8601']) + + # Rearrange the columns so that "timestamp_unix" is first + cols = ping_pong_competitive_df.columns.tolist() + cols = ['timestamp_unix'] + [col for col in cols if col != 'timestamp_unix'] + ping_pong_competitive_df = ping_pong_competitive_df[cols] + + # Separate the DataFrame into multiple DataFrames based on the unique values of "player_1_station" + unique_stations = ping_pong_competitive_df['player_1_station'].unique() + match_dfs = [ping_pong_competitive_df[ping_pong_competitive_df['player_1_station'] == station] + for station in unique_stations] + + ids_matches = {} + for match_df in match_dfs: + player_1_station = match_df['player_1_station'].unique()[0] + player_2_station = match_df['player_2_station'].unique()[0] + match_df = match_df.copy() + match_df["timestamp_unix"] = match_df["timestamp_unix"].astype(float) + match_df = match_df.reset_index(drop=True) + ids_matches[(player_1_station, player_2_station)] = match_df + + return ids_matches diff --git a/human_experiments/synchronize_signal_task/read_data/tasks/ping_pong_cooperative.py b/human_experiments/synchronize_signal_task/read_data/tasks/ping_pong_cooperative.py new file mode 100644 index 000000000..018811928 --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/tasks/ping_pong_cooperative.py @@ -0,0 +1,59 @@ +import sqlite3 + +import numpy as np +import pandas as pd + +from common import get_station + + +def ping_pong_cooperative(db_path: str, experiment: str) -> pd.DataFrame | None: + db = sqlite3.connect(db_path) + + query = f""" + SELECT * + FROM ping_pong_cooperative_task_observation + WHERE group_session = ?; + """ + ping_pong_competitive_df = pd.read_sql_query(query, db, params=[experiment]) + + if ping_pong_competitive_df.empty: + return None + + # Map participant id to station + player_1_id = ping_pong_competitive_df['player_1_id'].unique()[0] + player_2_id = ping_pong_competitive_df['player_2_id'].unique()[0] + player_3_id = ping_pong_competitive_df['player_3_id'].unique()[0] + + player_1_station = get_station(db_path, experiment, player_1_id, "ping_pong_cooperative") + player_2_station = get_station(db_path, experiment, player_2_id, "ping_pong_cooperative") + player_3_station = get_station(db_path, experiment, player_3_id, "ping_pong_cooperative") + + id_station_map = { + player_1_id: player_1_station, + player_2_id: player_2_station, + player_3_id: player_3_station + } + + # Create a new column with the mapped stations + def map_station(participant): + return id_station_map.get(participant, np.nan) + + ping_pong_competitive_df['player_1_station'] = ping_pong_competitive_df['player_1_id'].apply(map_station) + ping_pong_competitive_df['player_2_station'] = ping_pong_competitive_df['player_2_id'].apply(map_station) + ping_pong_competitive_df['player_3_station'] = ping_pong_competitive_df['player_3_id'].apply(map_station) + + ping_pong_competitive_df = ping_pong_competitive_df.drop(columns=['group_session', + 'player_1_id', + 'player_2_id', + 'player_3_id', + 'timestamp_iso8601']) + + # Move columns to front + cols_to_move = ['timestamp_unix', 'player_1_station', 'player_2_station', 'player_3_station'] + other_cols = [col for col in ping_pong_competitive_df.columns if col not in cols_to_move] + new_order = cols_to_move + other_cols + ping_pong_competitive_df = ping_pong_competitive_df[new_order] + + ping_pong_competitive_df["timestamp_unix"] = ping_pong_competitive_df["timestamp_unix"].astype(float) + + return ping_pong_competitive_df diff --git a/human_experiments/synchronize_signal_task/read_data/tasks/rest_state.py b/human_experiments/synchronize_signal_task/read_data/tasks/rest_state.py new file mode 100644 index 000000000..48e5d9930 --- /dev/null +++ b/human_experiments/synchronize_signal_task/read_data/tasks/rest_state.py @@ -0,0 +1,32 @@ +import sqlite3 + +import pandas as pd + + +def rest_state(db_path: str, experiment: str) -> pd.DataFrame | None: + db = sqlite3.connect(db_path) + + query = f""" + SELECT * + FROM rest_state_task + WHERE group_session = ?; + """ + rest_state_df = pd.read_sql_query(query, db, params=[experiment]) + + if rest_state_df.empty: + return None + + rest_state_df = rest_state_df.drop(columns=['group_session', + 'start_timestamp_iso8601', + 'stop_timestamp_iso8601']) + + rest_state_df = rest_state_df.reset_index(drop=True) + + # Creating a new DataFrame with two rows based on the existing DataFrame + new_rest_state_df = pd.DataFrame({ + 'timestamp_unix': [float(rest_state_df.loc[0, 'start_timestamp_unix']), + float(rest_state_df.loc[0, 'stop_timestamp_unix'])], + 'event_type': ['start_task', 'end_task'] + }) + + return new_rest_state_df diff --git a/human_experiments/synchronize_signal_task/requirements.txt b/human_experiments/synchronize_signal_task/requirements.txt new file mode 100644 index 000000000..ab0d53aca --- /dev/null +++ b/human_experiments/synchronize_signal_task/requirements.txt @@ -0,0 +1,8 @@ +numpy +scipy +pandas +scikit-learn +mne +tqdm +python-dotenv +python-dateutil diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/__init__.py b/human_experiments/synchronize_signal_task/signal_synchronization/__init__.py new file mode 100644 index 000000000..45542ac3a --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/__init__.py @@ -0,0 +1,8 @@ +from .prepare_synchronization_data import prepare_synchronization_data +from .synchronize_signals import synchronize_signals, synchronize_signals_all + +__all__ = [ + "prepare_synchronization_data", + "synchronize_signals", + "synchronize_signals_all" +] diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/interpolation/__init__.py b/human_experiments/synchronize_signal_task/signal_synchronization/interpolation/__init__.py new file mode 100644 index 000000000..ce19f5224 --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/interpolation/__init__.py @@ -0,0 +1,5 @@ +from .linear_interpolation import linear_interpolation + +__all__ = [ + 'linear_interpolation', +] diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/interpolation/linear_interpolation.py b/human_experiments/synchronize_signal_task/signal_synchronization/interpolation/linear_interpolation.py new file mode 100644 index 000000000..a1f68a445 --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/interpolation/linear_interpolation.py @@ -0,0 +1,36 @@ +import numpy as np +import pandas as pd + +from ..utils import generate_time_series_end_time + + +def linear_interpolation(df: pd.DataFrame, + frequency: float, + start_time: float | None = None) -> pd.DataFrame: + """ + Interpolate DataFrame to given frequency + :param df: signal DataFrame + :param frequency: sampling frequency in Hz + :param start_time: start time in seconds + :return: interpolated DataFrame + """ + # Check timestamp_unix column sorted + assert df['timestamp_unix'].is_monotonic_increasing, "timestamp_unix column must be sorted" + + # Generate new time series + if start_time is None: + start_time = df['timestamp_unix'].min() + else: + assert start_time >= df['timestamp_unix'].min(), "start_time must be >= df['timestamp_unix'].min()" + assert start_time <= df['timestamp_unix'].max(), "start_time must be <= df['timestamp_unix'].max()" + + end_time = df['timestamp_unix'].max() + interp_time_series = generate_time_series_end_time(start_time, end_time, frequency) + + # Interpolate signals + df_new = df.drop(columns=["timestamp_unix"]).apply( + lambda col: np.interp(interp_time_series, df['timestamp_unix'], col) + ) + df_new['timestamp_unix'] = interp_time_series + + return df_new diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/prepare_synchronization_data.py b/human_experiments/synchronize_signal_task/signal_synchronization/prepare_synchronization_data.py new file mode 100644 index 000000000..7392ffb56 --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/prepare_synchronization_data.py @@ -0,0 +1,43 @@ +def _common_experiment_names(signal_type_info: list[dict[str, any]]): + experiment_name_sets = [] + + for signal_info in signal_type_info: + names = {experiment["experiment_name"] for experiment in signal_info["experiment_signals"]} + experiment_name_sets.append(names) + + common_names = set.intersection(*experiment_name_sets) + + return sorted(list(common_names)) + + +def prepare_synchronization_data(signal_type_info: list[dict[str, any]], desired_freq: int) -> list[dict[str, any]]: + experiments_to_process = _common_experiment_names(signal_type_info) + + sync_experiments_info = [] + for experiment_to_process in experiments_to_process: + sync_experiment_info = { + "experiment_name": experiment_to_process, + "desired_freq": desired_freq, + } + + sync_signals_info = [] + for signal_info in signal_type_info: + sync_signal_info = { + "signal_type": signal_info["signal_type"], + "frequency": signal_info["recording_frequency"] + } + + for experiment_signal in signal_info["experiment_signals"]: + if experiment_signal["experiment_name"] == experiment_to_process: + for station in ["lion", "tiger", "leopard"]: + if station in experiment_signal: + if len(experiment_signal[station]) > 0: + sync_signal_info[station] = experiment_signal[station] + + sync_signals_info.append(sync_signal_info) + + sync_experiment_info["signals"] = sync_signals_info + + sync_experiments_info.append(sync_experiment_info) + + return sync_experiments_info diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/resample/__init__.py b/human_experiments/synchronize_signal_task/signal_synchronization/resample/__init__.py new file mode 100644 index 000000000..e6abd5b40 --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/resample/__init__.py @@ -0,0 +1,5 @@ +from .mne_resample import mne_resample + +__all__ = [ + "mne_resample", +] diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/resample/mne_resample.py b/human_experiments/synchronize_signal_task/signal_synchronization/resample/mne_resample.py new file mode 100644 index 000000000..06ed6663e --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/resample/mne_resample.py @@ -0,0 +1,23 @@ +import pandas as pd + +from mne.filter import resample + + +def mne_resample(signal_df: pd.DataFrame, + src_frequency: float, + new_frequency: float) -> pd.DataFrame: + """ + Resample DataFrame to given frequency using MNE + :param signal_df: Dataframe with signal + :param src_frequency: frequency of the signal Dataframe in Hz + :param new_frequency: desired new frequency in Hz for resampling + :return: resampled signal DataFrame + """ + sample_scale = new_frequency / src_frequency + + # Interpolate signals + signal_resampled_df = signal_df.apply( + lambda col: resample(col.to_numpy(), sample_scale, npad='auto') + ) + + return signal_resampled_df diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/synchronize_signals.py b/human_experiments/synchronize_signal_task/signal_synchronization/synchronize_signals.py new file mode 100644 index 000000000..a8df7567d --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/synchronize_signals.py @@ -0,0 +1,93 @@ +import math +from multiprocessing import Pool + +import numpy as np +import pandas as pd +from tqdm import tqdm + +from .interpolation import linear_interpolation +from .resample import mne_resample +from .utils import get_shared_start_time, generate_time_series_num_samples + + +def synchronize_signals(experiment: dict[str, any]) -> dict[str, any]: + desired_frequency = experiment["desired_freq"] + experiment_name = experiment["experiment_name"] + signals = experiment["signals"] + + signals_df = [] + for signal in signals: + for station in ["lion", "tiger", "leopard"]: + if station in signal: + signals_df.append(signal[station]) + start_time = float(math.ceil(get_shared_start_time(signals_df))) + + processed_signals = {} + for signal in signals: + signal_frequency = signal["frequency"] + for station in ["lion", "tiger", "leopard"]: + if station in signal: + # Resample signal + resampled_signal = mne_resample( + signal[station].drop(columns=["timestamp_unix"]), + signal_frequency, desired_frequency + ) + + # Assign timestamp for interpolation + resampled_signal["timestamp_unix"] = generate_time_series_num_samples( + start_time, len(resampled_signal), desired_frequency + ) + + # Interpolate signal to synchronize with other signals + interpolated_signal = linear_interpolation( + resampled_signal, desired_frequency, start_time + ) + + # Drop current timestamp to use the unified timestamps later + interpolated_signal = interpolated_signal.drop(columns=["timestamp_unix"]) + + signal_type = signal["signal_type"] + processed_signals[f"{station}_{signal_type}"] = interpolated_signal + + # Align signals + max_length = max(len(df) for df in processed_signals.values()) + + ready_for_synchronization_signals = [] + for name, processed_signal in processed_signals.items(): + # Reindex the dataframes to the maximum length to pad with NaNs at the end + processed_signal = processed_signal.reindex(np.arange(max_length)) + # Add prefix to each column name in the dataframes + processed_signal = processed_signal.add_prefix(f"{name}_") + + ready_for_synchronization_signals.append(processed_signal) + + # Concatenate the dataframes column-wise + synchronized_signal = pd.concat(ready_for_synchronization_signals, axis=1) + + time_series_frequency = desired_frequency + + # Assign timestamp + synchronized_signal["timestamp_unix"] = generate_time_series_num_samples( + start_time, + len(synchronized_signal), + time_series_frequency + ) + + # Move timestamp_unix column to the first column + columns = ['timestamp_unix'] + [col for col in synchronized_signal.columns if col != 'timestamp_unix'] + synchronized_signal = synchronized_signal[columns] + + synchronization_results = { + "experiment_name": experiment_name, + "signals": synchronized_signal + } + + return synchronization_results + + +def synchronize_signals_all(experiments: list[dict[str, any]], + num_processes: int = 1) -> list[dict[str, any]]: + with Pool(processes=num_processes) as pool: + synchronized_list = list(tqdm(pool.imap(synchronize_signals, experiments), total=len(experiments))) + + return synchronized_list diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/utils/__init__.py b/human_experiments/synchronize_signal_task/signal_synchronization/utils/__init__.py new file mode 100644 index 000000000..1203eb663 --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/utils/__init__.py @@ -0,0 +1,8 @@ +from .generate_time_series import generate_time_series_end_time, generate_time_series_num_samples +from .get_shared_start_time import get_shared_start_time + +__all__ = [ + "generate_time_series_end_time", + "generate_time_series_num_samples", + "get_shared_start_time", +] diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/utils/generate_time_series.py b/human_experiments/synchronize_signal_task/signal_synchronization/utils/generate_time_series.py new file mode 100644 index 000000000..4c8b60f2b --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/utils/generate_time_series.py @@ -0,0 +1,28 @@ +import numpy as np + + +def generate_time_series_end_time(start_time: float, + end_time: float, + frequency: float) -> np.ndarray: + """ + Generate time series with given start and end time + :param start_time: start time in seconds + :param end_time: end time in seconds + :param frequency: sampling frequency in Hz + :return: time series, excluding any time after the end_time + """ + num_samples = int((end_time - start_time) * frequency) + return generate_time_series_num_samples(start_time, num_samples, frequency) + + +def generate_time_series_num_samples(start_time: float, + num_samples: int, + frequency: float) -> np.ndarray: + """ + Generate time series with given start time and number of samples + :param start_time: start time in seconds + :param num_samples: number of samples + :param frequency: sampling frequency in Hz + :return: time series + """ + return start_time + np.arange(num_samples, dtype=np.int64) / frequency diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/utils/get_shared_start_time.py b/human_experiments/synchronize_signal_task/signal_synchronization/utils/get_shared_start_time.py new file mode 100644 index 000000000..3e710c364 --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/utils/get_shared_start_time.py @@ -0,0 +1,16 @@ +import pandas as pd + +from .is_time_overlapping import is_time_overlapping + + +def get_shared_start_time(dfs: list[pd.DataFrame]) -> float: + """ + Get shared start and end time from list of DataFrames + :param dfs: list of DataFrames + :return: shared start and end time + """ + assert len(dfs) > 0, "List of DataFrames is empty" + assert is_time_overlapping(dfs), "DataFrames are not overlapping" + + start_time = max(df['timestamp_unix'].min() for df in dfs) + return start_time diff --git a/human_experiments/synchronize_signal_task/signal_synchronization/utils/is_time_overlapping.py b/human_experiments/synchronize_signal_task/signal_synchronization/utils/is_time_overlapping.py new file mode 100644 index 000000000..7879f0fc1 --- /dev/null +++ b/human_experiments/synchronize_signal_task/signal_synchronization/utils/is_time_overlapping.py @@ -0,0 +1,17 @@ +import pandas as pd + + +def is_time_overlapping(dfs: list[pd.DataFrame]) -> bool: + """ + Check if all DataFrames have overlapping time ranges among each other + :param dfs: list of DataFrames + :return: True if all DataFrames have overlapping time ranges among each other, False otherwise + """ + # Ensure all DataFrames are not empty + assert all(len(df) > 0 for df in dfs), "All DataFrames must be non-empty" + + # Ensure all DataFrames have unix_time column + assert all('timestamp_unix' in df.columns for df in dfs), "All DataFrames must have unix_time column" + + return all(df['timestamp_unix'].min() <= dfs[0]['timestamp_unix'].max() for df in dfs) \ + and all(df['timestamp_unix'].max() >= dfs[0]['timestamp_unix'].min() for df in dfs) diff --git a/human_experiments/synchronize_signal_task/task_synchronization/__init__.py b/human_experiments/synchronize_signal_task/task_synchronization/__init__.py new file mode 100644 index 000000000..6ff19d927 --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/__init__.py @@ -0,0 +1,8 @@ +from .prepare_task_synchronization_data import prepare_task_synchronization_data +from .synchronize_task_signal import synchronize_task_signal, synchronize_task_signal_all + +__all__ = [ + "prepare_task_synchronization_data", + "synchronize_task_signal", + "synchronize_task_signal_all", +] diff --git a/human_experiments/synchronize_signal_task/task_synchronization/prepare_task_synchronization_data.py b/human_experiments/synchronize_signal_task/task_synchronization/prepare_task_synchronization_data.py new file mode 100644 index 000000000..21a760118 --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/prepare_task_synchronization_data.py @@ -0,0 +1,31 @@ +from read_data import read_task_db_all + + +def prepare_task_synchronization_data(synchronized_signals: list[dict[str, any]], + db_path: str, + num_processes: int = 1) -> list[dict[str, any]]: + # Get experiment sessions + experiments = [] + for experiment in synchronized_signals: + experiments.append(experiment['experiment_name']) + + # Get task data + task_data = read_task_db_all(db_path, experiments, num_processes) + + pairing = [] + for signal_dict in synchronized_signals: + experiment_name = signal_dict['experiment_name'] + + for task_dict in task_data: + if task_dict['experiment_name'] == experiment_name: + merged_dict = { + 'experiment_name': experiment_name, + 'signals': signal_dict["signals"], + 'tasks': task_dict["task_data"] + } + pairing.append(merged_dict) + break + + pairing.sort(key=lambda x: x['experiment_name']) + + return pairing diff --git a/human_experiments/synchronize_signal_task/task_synchronization/synchronize_task_signal.py b/human_experiments/synchronize_signal_task/task_synchronization/synchronize_task_signal.py new file mode 100644 index 000000000..ea9e97d63 --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/synchronize_task_signal.py @@ -0,0 +1,89 @@ +from multiprocessing import Pool + +import pandas as pd +from tqdm import tqdm + +from .utils import ( + group_signal_for_task_event, + group_signal_for_task_status, + synchronize_task_status_signal, + synchronize_affective_team_task_event, + synchronize_task_event_signal +) + + +def _filter_stations(df: pd.DataFrame, station_names: list[str]) -> pd.DataFrame: + # Always include the 'timestamp_unix' column + columns_to_include = ['timestamp_unix'] + + # Check for other columns that include one of the station names + for column_name in df.columns: + if any(station in column_name for station in station_names): + columns_to_include.append(column_name) + + # Return a DataFrame with only the selected columns + return df[columns_to_include] + + +def synchronize_task_signal(experiment: dict[str, any]) -> dict[str, any]: + signal_df = experiment["signals"] + tasks = experiment["tasks"] + + results = { + "experiment_name": experiment["experiment_name"] + } + + for task in tasks: + task_name = task["task_name"] + task_data = task["task_data"] + + match task_name: + case "rest_state": + grouped_signal_df = group_signal_for_task_event( + signal_df, task_data, check_event_assignments=False + ) + synchronized_df = synchronize_task_event_signal(grouped_signal_df, task_data) + results[task_name] = synchronized_df + case "affective_team": + grouped_signal_df = group_signal_for_task_event( + signal_df, task_data, check_event_assignments=False + ) + # synchronized_df = synchronize_affective_team_task_event(grouped_signal_df, task_data) + synchronized_df = synchronize_task_event_signal(grouped_signal_df, task_data) + results[task_name] = synchronized_df + case "finger_tapping" | "ping_pong_cooperative": + grouped_signal_df = group_signal_for_task_status(signal_df, task_data) + synchronized_df = synchronize_task_status_signal(grouped_signal_df, task_data) + results[task_name] = synchronized_df + case "affective_individual": + for station, task_df in task_data.items(): + station_signal_df = _filter_stations(signal_df, [station]) + grouped_station_signal_df = group_signal_for_task_event( + station_signal_df, task_df, check_event_assignments=False + ) + synchronized_df = synchronize_task_event_signal(grouped_station_signal_df, task_df) + results[f"{task_name}_{station}"] = synchronized_df + case "ping_pong_competitive": + for stations, task_df in task_data.items(): + player_1_station, player_2_station = stations + stations_signal_df = _filter_stations(signal_df, [player_1_station, player_2_station]) + grouped_stations_signal_df = group_signal_for_task_status(stations_signal_df, task_df) + synchronized_df = synchronize_task_status_signal(grouped_stations_signal_df, task_df) + results[f"{task_name}_{player_1_station}_{player_2_station}"] = synchronized_df + case "minecraft": + for mission_name, mission_df in task_data.items(): + grouped_signal_df = group_signal_for_task_status(signal_df, mission_df) + synchronized_df = synchronize_task_status_signal(grouped_signal_df, mission_df) + results[f"{task_name}_{mission_name}"] = synchronized_df + case _: + raise ValueError(f"Unknown task name: {task_name}") + + return results + + +def synchronize_task_signal_all(experiments: list[dict[str, any]], + num_processes: int = 1) -> list[dict[str, any]]: + with Pool(processes=num_processes) as pool: + synchronized_list = list(tqdm(pool.imap(synchronize_task_signal, experiments), total=len(experiments))) + + return synchronized_list diff --git a/human_experiments/synchronize_signal_task/task_synchronization/utils/__init__.py b/human_experiments/synchronize_signal_task/task_synchronization/utils/__init__.py new file mode 100644 index 000000000..54f57aefc --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/utils/__init__.py @@ -0,0 +1,13 @@ +from .group_signal_for_task_events import group_signal_for_task_event +from .group_signal_for_task_status import group_signal_for_task_status +from .synchronize_affective_team_task_event import synchronize_affective_team_task_event +from .synchronize_task_event_signal import synchronize_task_event_signal +from .synchronize_task_status_signal import synchronize_task_status_signal + +__all__ = [ + "group_signal_for_task_event", + "group_signal_for_task_status", + "synchronize_task_status_signal", + "synchronize_task_event_signal", + "synchronize_affective_team_task_event", +] diff --git a/human_experiments/synchronize_signal_task/task_synchronization/utils/group_signal_for_task_events.py b/human_experiments/synchronize_signal_task/task_synchronization/utils/group_signal_for_task_events.py new file mode 100644 index 000000000..64a50761e --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/utils/group_signal_for_task_events.py @@ -0,0 +1,78 @@ +import bisect + +import numpy as np +import pandas as pd + + +def _check_event_signal_assignments(signal_times, event_times): + # Create a list to track assignments + assignments = [False] * len(signal_times) + + for event_time in event_times: + # Find the position where this event time would be inserted in sorted_signals + position = bisect.bisect(signal_times, event_time) + + # Make sure the event time doesn't fall after all signal times + if position == len(signal_times): + position -= 1 + + # Make sure the event time doesn't fall before all signal times + if position > 0 and abs(signal_times[position - 1] - event_time) <= abs( + signal_times[position] - event_time): + position -= 1 + + # Assert that the signal at this position hasn't been assigned yet + assert not assignments[position], \ + f"Signal at position {position} and time {signal_times[position]} already assigned" + + # Assign this signal + assignments[position] = True + + +def group_signal_for_task_event(signal_df: pd.DataFrame, + task_df: pd.DataFrame, + check_event_assignments: bool = True) -> pd.DataFrame: + """ + Group signals relevant to a task + :param signal_df: signal dataframe + :param task_df: task dataframe + :param check_event_assignments: whether to check if all events have been assigned to a signal + :return: signal dataframe with signals relevant to the task + """ + # Check time column sorted + assert task_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + assert signal_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + + # Get task start and end time + start_time = task_df["timestamp_unix"].min() + end_time = task_df["timestamp_unix"].max() + + # Get the index of the element closest to start_time + start_index = bisect.bisect_left(signal_df["timestamp_unix"].values, start_time) + # Check if the index is not at the end of the array and if the next value is closer + if start_index != len(signal_df) and \ + np.abs(signal_df.iloc[start_index]["timestamp_unix"] - start_time) > \ + np.abs(signal_df.iloc[start_index + 1]["timestamp_unix"] - start_time): + start_index += 1 + + # Get the index of the element closest to end_time + end_index = bisect.bisect_right(signal_df["timestamp_unix"].values, end_time) + # Check if the index is not at the start of the array and if the previous value is closer + if end_index != 0 and end_index < len(signal_df) and \ + np.abs(signal_df.iloc[end_index - 1]["timestamp_unix"] - end_time) <= \ + np.abs(signal_df.iloc[end_index]["timestamp_unix"] - end_time): + end_index -= 1 + + # Filter signal_df + signal_df = signal_df.iloc[start_index:end_index + 1] + + # Check that all event times are assigned to a signal + if check_event_assignments: + signal_times = signal_df["timestamp_unix"].values + event_times = task_df["timestamp_unix"].values + _check_event_signal_assignments(signal_times, event_times) + + # Drop the columns with any NaN value + signal_df = signal_df.dropna(axis=1, how='any') + + return signal_df diff --git a/human_experiments/synchronize_signal_task/task_synchronization/utils/group_signal_for_task_status.py b/human_experiments/synchronize_signal_task/task_synchronization/utils/group_signal_for_task_status.py new file mode 100644 index 000000000..b25337c86 --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/utils/group_signal_for_task_status.py @@ -0,0 +1,34 @@ +import pandas as pd + + +def group_signal_for_task_status(signal_df: pd.DataFrame, task_df: pd.DataFrame) -> pd.DataFrame: + """ + Group signals relevant to a task + :param signal_df: signal dataframe + :param task_df: task dataframe + :return: signal dataframe with signals relevant to the task + """ + # Check time column sorted + assert task_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + assert signal_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + + start_time = task_df["timestamp_unix"].min() + end_time = task_df["timestamp_unix"].max() + + # Get the index of the element just before start_time + start_index = signal_df["timestamp_unix"].searchsorted(start_time, side='left') - 1 + + # Get the index of the element just after end_time + end_index = signal_df["timestamp_unix"].searchsorted(end_time, side='right') + + # Check and adjust if indices go out of bounds + start_index = max(start_index, 0) + end_index = min(end_index, len(signal_df) - 1) + + # Filter signal_df + signal_df = signal_df.iloc[start_index:end_index + 1] + + # Drop the columns with any NaN value + signal_df = signal_df.dropna(axis=1, how='any') + + return signal_df diff --git a/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_affective_team_task_event.py b/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_affective_team_task_event.py new file mode 100644 index 000000000..691f828e6 --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_affective_team_task_event.py @@ -0,0 +1,66 @@ +from bisect import bisect_right + +import numpy as np +import pandas as pd + + +def synchronize_affective_team_task_event(signal_df: pd.DataFrame, + task_df: pd.DataFrame) -> tuple[pd.DataFrame, str]: + """ + Synchronize signal data with task event data + :param signal_df: signal dataframe + :param task_df: task dataframe + :return: signal dataframe with task data synchronized + """ + # Check time column sorted + assert task_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + assert signal_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + + output_message = "" + + # Get the 'unix_time' values as a list for binary search + unix_times = signal_df['timestamp_unix'].tolist() + + # Initialize new columns in the signal data and set as NaN + signal_df[task_df.columns] = np.nan + + for _, row in task_df.iterrows(): + # Find the index of the closest signal data entry which is before the current task data + idx = bisect_right(unix_times, row['timestamp_unix']) - 1 + + # If there's a next timestamp, and it's closer to the current timestamp + if idx + 1 < len(unix_times) and \ + abs(unix_times[idx + 1] - row['timestamp_unix']) < abs(unix_times[idx] - row['timestamp_unix']): + idx += 1 # assign to the next timestamp + + # Assign the task data to this signal data entry + if idx >= 0: + for task_data_column in task_df.columns: + # if the _event_type of this signal data entry is not NaN + # and the _event_type of this task data entry is not NaN, + # but they are different, then throw error + if task_data_column.endswith('_event_type') and \ + not pd.isna(signal_df.loc[signal_df.index[idx], task_data_column]) and \ + not pd.isna(row[task_data_column]): + if not signal_df.loc[signal_df.index[idx], task_data_column] == \ + row[task_data_column]: + output_message += \ + f"[WARNING] affective team: Signal data entry {signal_df.index[idx]} " \ + f"has different {task_data_column} " \ + f"from task data entry {row['time']}, " \ + f"proceeding with the overwriting" + + # Assign _event_type to this signal data entry + # if the _event_type of this task data entry is not NaN + if task_data_column.endswith('_event_type') and not pd.isna(row[task_data_column]): + signal_df.loc[signal_df.index[idx], task_data_column] = row[task_data_column] + + # If the _event_type is intermediate_selection of final_submission, + # then assign valence and arousal score + if task_data_column.endswith('_event_type') and \ + (row[task_data_column] == 'intermediate_selection' or + row[task_data_column] == 'final_submission'): + signal_df.loc[signal_df.index[idx], 'valence_score'] = row['valence_score'] + signal_df.loc[signal_df.index[idx], 'arousal_score'] = row['arousal_score'] + + return signal_df, output_message diff --git a/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_task_event_signal.py b/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_task_event_signal.py new file mode 100644 index 000000000..7cdb353e1 --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_task_event_signal.py @@ -0,0 +1,49 @@ +from bisect import bisect_right + +import numpy as np +import pandas as pd + + +def synchronize_task_event_signal(signal_df: pd.DataFrame, + task_df: pd.DataFrame) -> pd.DataFrame: + """ + Synchronize signal data with task event data + :param signal_df: signal dataframe + :param task_df: task dataframe + :return: signal dataframe with task data synchronized + """ + # Check time column sorted + assert task_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + assert signal_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + + # Get the 'unix_time' values as a list for binary search + unix_times = signal_df['timestamp_unix'].tolist() + + task_df.rename(columns={'timestamp_unix': 'task_timestamp_unix'}, inplace=True) + + # Initialize new columns in the signal data and set as NaN + signal_df[task_df.columns] = np.nan + + for _, row in task_df.iterrows(): + # Find the index of the closest signal data entry which is before the current task data + idx = bisect_right(unix_times, row['task_timestamp_unix']) - 1 + + # If there's a next timestamp, and it's closer to the current timestamp + if idx + 1 < len(unix_times) and \ + abs(unix_times[idx + 1] - row['task_timestamp_unix']) < abs(unix_times[idx] - row['task_timestamp_unix']): + idx += 1 # assign to the next timestamp + + if idx >= 0: + # Filter the columns with the "_event_type" suffix + row_with_suffix = signal_df.loc[signal_df.index[idx]].filter(like='_event_type') + + # Check if any columns were found with the "_event_type" suffix + if not row_with_suffix.empty and (row_with_suffix == "final_submission").any(): + continue # Skip this iteration if "final_submission" is found in any of the "_event_type" columns + + # Assign the task data to this signal data entry + signal_df.loc[signal_df.index[idx], task_df.columns] = row + + signal_df = signal_df.drop(columns=['task_timestamp_unix']) + + return signal_df diff --git a/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_task_status_signal.py b/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_task_status_signal.py new file mode 100644 index 000000000..b624181c7 --- /dev/null +++ b/human_experiments/synchronize_signal_task/task_synchronization/utils/synchronize_task_status_signal.py @@ -0,0 +1,27 @@ +import pandas as pd + + +def synchronize_task_status_signal(signal_df: pd.DataFrame, + task_df: pd.DataFrame) -> pd.DataFrame: + """ + Synchronize task status with signals + :param signal_df: signal dataframe + :param task_df: task dataframe + :return: synchronized dataframe + """ + # Check time column sorted + assert task_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + assert signal_df["timestamp_unix"].is_monotonic_increasing, "timestamp_unix column must be sorted" + + task_df.rename(columns={'timestamp_unix': 'task_timestamp_unix'}, inplace=True) + + # Assigning the events to the closest signal entries + merged_df = pd.merge_asof(signal_df, + task_df, + left_on='timestamp_unix', + right_on='task_timestamp_unix', + direction='nearest') + + merged_df = merged_df.drop(columns=['task_timestamp_unix']) + + return merged_df diff --git a/human_experiments/synchronize_signal_task/write_data/__init__.py b/human_experiments/synchronize_signal_task/write_data/__init__.py new file mode 100644 index 000000000..a5b287136 --- /dev/null +++ b/human_experiments/synchronize_signal_task/write_data/__init__.py @@ -0,0 +1,6 @@ +from .write_csv import write_csv, write_csv_all + +__all__ = [ + "write_csv", + "write_csv_all", +] diff --git a/human_experiments/synchronize_signal_task/write_data/write_csv.py b/human_experiments/synchronize_signal_task/write_data/write_csv.py new file mode 100644 index 000000000..a145f8eb0 --- /dev/null +++ b/human_experiments/synchronize_signal_task/write_data/write_csv.py @@ -0,0 +1,29 @@ +import os +from multiprocessing import Pool + +from tqdm import tqdm + + +def write_csv(experiment: dict[str, any], dir_path: str): + experiment_name = experiment["experiment_name"] + output_dir_path = os.path.join(dir_path, experiment_name) + os.makedirs(output_dir_path, exist_ok=True) + + for task, df in experiment.items(): + if task == "experiment_name": + continue + df.to_csv(os.path.join(output_dir_path, f'{task}.csv'), index=False) + + +def _write_experiment_csv(process_args: tuple[dict[str, any], str]): + write_csv(*process_args) + + +def write_csv_all(experiments: list[dict[str, any]], dir_path: str, num_processes: int = 1): + os.makedirs(dir_path, exist_ok=True) + + process_args = [(experiment, dir_path) for experiment in experiments] + + with Pool(processes=num_processes) as pool: + for _ in tqdm(pool.imap(_write_experiment_csv, process_args), total=len(process_args)): + pass