Skip to content

Commit

Permalink
Synchronize signal task (#531)
Browse files Browse the repository at this point in the history
* synchronize signals

* affective task

* affective task

* rest state and finger tapping

* ping pong competitive

* ping pong cooperative

* minecraft task, and pairing
  • Loading branch information
eduongAZ authored Aug 20, 2023
1 parent 90d97e0 commit 1e64147
Show file tree
Hide file tree
Showing 43 changed files with 1,394 additions and 0 deletions.
3 changes: 3 additions & 0 deletions human_experiments/synchronize_signal_task/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Synchronize signal task

This project is for synchronizing EEG and fNIRS signals with task data.
Empty file.
9 changes: 9 additions & 0 deletions human_experiments/synchronize_signal_task/common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .remove_columns import remove_columns
from .remove_columns_all_exp import remove_columns_all_exp
from .get_station import get_station

__all__ = [
"remove_columns",
"remove_columns_all_exp",
"get_station",
]
13 changes: 13 additions & 0 deletions human_experiments/synchronize_signal_task/common/get_station.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import sqlite3


def get_station(db_path: str, session: str, participant_id: int, task: str) -> str:
db = sqlite3.connect(db_path)

station = db.execute(f"""
SELECT station
FROM data_validity
WHERE group_session = '{session}' AND participant = '{participant_id}' AND task = '{task}'
""").fetchall()[0][0]

return station
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import pandas as pd


def remove_columns(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
# get columns_to_drop in both dataframe and columns
columns_to_drop = [col for col in columns if col in df.columns]

return df.drop(columns=columns_to_drop)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .remove_columns import remove_columns


def remove_columns_all_exp(experiments: list[dict[str, any]], columns: list[str]):
for experiment in experiments:
for station in ["lion", "tiger", "leopard"]:
if station not in experiment:
continue

experiment[station] = remove_columns(experiment[station], columns)
21 changes: 21 additions & 0 deletions human_experiments/synchronize_signal_task/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

USER = os.getenv("USER")
DB_PATH = f"/space/{USER}/tomcat/tomcat.db"
EEG_FILTERED_PATH = f"/space/{USER}/eeg_filtered"
FNIRS_FILTERED_PATH = f"/space/{USER}/fnirs_filtered"
NUM_PROCESSES = 40
OUTPUT_DIR = "/tomcat/data/derived/drafts/release_2023_08_19_17"

EXPERIMENT_SESSIONS = [
"exp_2022_09_30_10", "exp_2022_11_01_10", "exp_2022_12_02_15", "exp_2023_02_21_14",
"exp_2022_10_04_09", "exp_2022_11_04_10", "exp_2022_12_05_12", "exp_2023_04_17_13",
"exp_2022_10_07_15", "exp_2022_11_07_10", "exp_2023_01_30_13", "exp_2023_04_18_14",
"exp_2022_10_14_10", "exp_2022_11_08_11", "exp_2023_01_31_14", "exp_2023_04_21_10",
"exp_2022_10_18_10", "exp_2022_11_10_10", "exp_2023_02_03_10", "exp_2023_04_24_13",
"exp_2022_10_21_15", "exp_2022_11_14_12", "exp_2023_02_06_13", "exp_2023_04_27_14",
"exp_2022_10_24_12", "exp_2022_11_15_13", "exp_2023_02_07_14", "exp_2023_04_28_10",
"exp_2022_10_27_10", "exp_2022_11_17_15", "exp_2023_02_10_10", "exp_2023_05_01_13",
"exp_2022_10_28_10", "exp_2022_11_18_10", "exp_2023_02_16_14", "exp_2023_05_02_14",
"exp_2022_10_31_10", "exp_2022_11_22_10", "exp_2023_02_20_01", "exp_2023_05_03_10",
]
40 changes: 40 additions & 0 deletions human_experiments/synchronize_signal_task/process_eeg_500hz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os

from common import remove_columns_all_exp
from config import DB_PATH, EEG_FILTERED_PATH, NUM_PROCESSES, EXPERIMENT_SESSIONS, OUTPUT_DIR
from read_data import read_raw_csv_all
from signal_synchronization import prepare_synchronization_data, synchronize_signals_all
from task_synchronization import synchronize_task_signal_all, prepare_task_synchronization_data
from write_data import write_csv_all

if __name__ == "__main__":
desired_freq = 500

experiment_eeg_signal = read_raw_csv_all(EEG_FILTERED_PATH, NUM_PROCESSES, EXPERIMENT_SESSIONS)

columns_to_remove = [
"group_session",
"station",
"task",
"participant",
"timestamp_iso8601"
]

remove_columns_all_exp(experiment_eeg_signal, columns_to_remove)

signal_type_info = [
{
"signal_type": "eeg",
"experiment_signals": experiment_eeg_signal,
"recording_frequency": 500,
}
]

sync_experiments_info = prepare_synchronization_data(signal_type_info, desired_freq)
synchronized_signals = synchronize_signals_all(sync_experiments_info)

task_synchronization_info = prepare_task_synchronization_data(synchronized_signals, DB_PATH, NUM_PROCESSES)
synchronized_task_signals = synchronize_task_signal_all(task_synchronization_info)

output_dir = os.path.join(OUTPUT_DIR, f"eeg_{desired_freq}hz")
write_csv_all(synchronized_task_signals, OUTPUT_DIR, NUM_PROCESSES)
40 changes: 40 additions & 0 deletions human_experiments/synchronize_signal_task/process_fnirs_10hz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os

from common import remove_columns_all_exp
from config import DB_PATH, FNIRS_FILTERED_PATH, NUM_PROCESSES, EXPERIMENT_SESSIONS, OUTPUT_DIR
from read_data import read_raw_csv_all
from signal_synchronization import prepare_synchronization_data, synchronize_signals_all
from task_synchronization import synchronize_task_signal_all, prepare_task_synchronization_data
from write_data import write_csv_all

if __name__ == "__main__":
desired_freq = 10

experiment_fnirs_signal = read_raw_csv_all(FNIRS_FILTERED_PATH, NUM_PROCESSES, EXPERIMENT_SESSIONS)

columns_to_remove = [
"group_session",
"station",
"task",
"participant",
"timestamp_iso8601"
]

remove_columns_all_exp(experiment_fnirs_signal, columns_to_remove)

signal_type_info = [
{
"signal_type": "fnirs",
"experiment_signals": experiment_fnirs_signal,
"recording_frequency": 10,
}
]

sync_experiments_info = prepare_synchronization_data(signal_type_info, desired_freq)
synchronized_signals = synchronize_signals_all(sync_experiments_info)

task_synchronization_info = prepare_task_synchronization_data(synchronized_signals, DB_PATH, NUM_PROCESSES)
synchronized_task_signals = synchronize_task_signal_all(task_synchronization_info)

output_dir = os.path.join(OUTPUT_DIR, f"fnirs_{desired_freq}hz")
write_csv_all(synchronized_task_signals, output_dir, NUM_PROCESSES)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .read_raw_csv import read_raw_csv, read_raw_csv_all
from .read_task_db import read_task_db, read_task_db_all

__all__ = [
"read_raw_csv",
"read_raw_csv_all",
"read_task_db",
"read_task_db_all",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import glob
import os
from multiprocessing import Pool

import pandas as pd
from tqdm import tqdm


def read_raw_csv(csv_path: str) -> dict[str, any]:
dtypes = {
'group_session': str,
'task': str,
'station': str,
'timestamp_iso8601': str,
'participant': int,
}

exp_df = pd.read_csv(csv_path, index_col=0, dtype=dtypes)
lion_df = exp_df[exp_df['station'] == 'lion']
tiger_df = exp_df[exp_df['station'] == 'tiger']
leopard_df = exp_df[exp_df['station'] == 'leopard']

exp_name = os.path.splitext(os.path.basename(csv_path))[0]

return {
'experiment_name': exp_name,
'lion': lion_df,
'tiger': tiger_df,
'leopard': leopard_df
}


def read_raw_csv_all(dir_path: str,
num_processes: int = 1,
whitelist_experiments: list[str] | None = None) -> list[dict[str, any]]:
csv_paths = sorted(glob.glob(f'{dir_path}/*.csv'))

if whitelist_experiments is not None:
csv_paths = [path for path in csv_paths if os.path.splitext(os.path.basename(path))[0] in whitelist_experiments]

if len(csv_paths) == 0:
raise ValueError(f'No CSV files found')

with Pool(processes=num_processes) as pool:
raw_list = list(tqdm(pool.imap(read_raw_csv, csv_paths), total=len(csv_paths)))

return raw_list
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from multiprocessing import Pool

from tqdm import tqdm

from .tasks import (
affective_individual,
affective_team,
rest_state,
finger_tapping,
ping_pong_competitive,
ping_pong_cooperative,
minecraft
)


def read_task_db(db_path: str, experiment: str) -> dict[str, any]:
task_data = []

rest_state_data = rest_state(db_path, experiment)
if rest_state_data is not None:
task_data.append({
"task_name": "rest_state",
"task_data": rest_state_data
})

finger_tapping_data = finger_tapping(db_path, experiment)
if finger_tapping_data is not None:
task_data.append({
"task_name": "finger_tapping",
"task_data": finger_tapping_data
})

affective_individual_data = affective_individual(db_path, experiment)
if affective_individual_data is not None:
task_data.append({
"task_name": "affective_individual",
"task_data": affective_individual_data
})

affective_team_data = affective_team(db_path, experiment)
if affective_team_data is not None:
task_data.append({
"task_name": "affective_team",
"task_data": affective_team_data
})

ping_pong_competitive_data = ping_pong_competitive(db_path, experiment)
if ping_pong_competitive_data is not None:
task_data.append({
"task_name": "ping_pong_competitive",
"task_data": ping_pong_competitive_data
})

ping_pong_cooperative_data = ping_pong_cooperative(db_path, experiment)
if ping_pong_cooperative_data is not None:
task_data.append({
"task_name": "ping_pong_cooperative",
"task_data": ping_pong_cooperative_data
})

minecraft_data = minecraft(db_path, experiment)
if minecraft_data is not None:
task_data.append({
"task_name": "minecraft",
"task_data": minecraft_data
})

task_data_dict = {
"experiment_name": experiment,
"task_data": task_data
}

return task_data_dict


def _multiprocess_task_db(process_arg: tuple[str, str]) -> dict[str, any]:
return read_task_db(*process_arg)


def read_task_db_all(db_path: str,
experiments: list[str],
num_processes: int = 1) -> list[dict[str, any]]:
process_args = [(db_path, experiment) for experiment in experiments]

with Pool(processes=num_processes) as pool:
results = list(tqdm(pool.imap(_multiprocess_task_db, process_args), total=len(process_args)))

return results
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .affective_individual import affective_individual
from .affective_team import affective_team
from .finger_tapping import finger_tapping
from .ping_pong_competitive import ping_pong_competitive
from .ping_pong_cooperative import ping_pong_cooperative
from .rest_state import rest_state
from .minecraft import minecraft

__all__ = [
'affective_individual',
'affective_team',
'rest_state',
'finger_tapping',
'ping_pong_competitive',
'ping_pong_cooperative',
'minecraft'
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import sqlite3

import numpy as np
import pandas as pd

from common import get_station


def affective_individual(db_path: str, experiment: str) -> dict[str, pd.DataFrame] | None:
db = sqlite3.connect(db_path)

query = f"""
SELECT *
FROM affective_task_event
WHERE group_session = ? AND task_type = 'individual';
"""
affective_individual_df = pd.read_sql_query(query, db, params=[experiment])

if affective_individual_df.empty:
return None

# Map participant id to station
unique_participants = affective_individual_df['participant'].unique()
id_station_map = {}
for participant_id in unique_participants:
station = get_station(db_path, experiment, participant_id, "affective_individual")
id_station_map[participant_id] = station

# Create a new column with the mapped stations
def map_station(participant):
return id_station_map.get(participant, np.nan)

station_task_map = {}
for participant_id, station in id_station_map.items():
station_affective_df = affective_individual_df[affective_individual_df['participant'] == participant_id]
station_affective_df = station_affective_df.copy()
station_affective_df['station'] = station_affective_df['participant'].apply(map_station)
station_affective_df = station_affective_df.drop(columns=['participant',
'task_type',
'group_session',
'timestamp_iso8601'])

# Reindex the DataFrame according to the new column order
cols = station_affective_df.columns.tolist()
cols = [cols[0]] + ['station'] + [col for col in cols[1:] if col != 'station']
station_affective_df = station_affective_df.reindex(columns=cols)

station_affective_df["timestamp_unix"] = station_affective_df["timestamp_unix"].astype(float)
station_affective_df = station_affective_df.reset_index(drop=True)

station_task_map[station] = station_affective_df

return station_task_map
Loading

0 comments on commit 1e64147

Please sign in to comment.