diff --git a/CHANGELOG.md b/CHANGELOG.md index e7b7808..c185628 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,4 +12,4 @@ and this project adheres to [Semantic Versioning][]. ### Added -- Basic tool, preprocessing and plotting functions +- Basic tool, preprocessing and plotting functions diff --git a/docs/api.md b/docs/api.md index c2ab7a7..32f55e9 100644 --- a/docs/api.md +++ b/docs/api.md @@ -37,7 +37,19 @@ dt.mimic_iv_omop dt.gibleed_omop dt.synthea27nj_omop - dt.mimic_ii + dt.physionet2012 +``` + +## Tools + +```{eval-rst} +.. module:: ehrdata.tl +.. currentmodule:: ehrdata + +.. autosummary:: + :toctree: generated + + tl.omop.EHRDataset ``` ## Plotting diff --git a/docs/contributing.md b/docs/contributing.md index 8a9b28d..1f20499 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -155,11 +155,11 @@ This will automatically create a git tag and trigger a Github workflow that crea Please write documentation for new or changed features and use-cases. This project uses [sphinx][] with the following features: -- The [myst][] extension allows to write documentation in markdown/Markedly Structured Text -- [Numpy-style docstrings][numpydoc] (through the [napoloen][numpydoc-napoleon] extension). -- Jupyter notebooks as tutorials through [myst-nb][] (See [Tutorials with myst-nb](#tutorials-with-myst-nb-and-jupyter-notebooks)) -- [sphinx-autodoc-typehints][], to automatically reference annotated input and output types -- Citations (like {cite:p}`Virshup_2023`) can be included with [sphinxcontrib-bibtex](https://sphinxcontrib-bibtex.readthedocs.io/) +- The [myst][] extension allows to write documentation in markdown/Markedly Structured Text +- [Numpy-style docstrings][numpydoc] (through the [napoloen][numpydoc-napoleon] extension). +- Jupyter notebooks as tutorials through [myst-nb][] (See [Tutorials with myst-nb](#tutorials-with-myst-nb-and-jupyter-notebooks)) +- [sphinx-autodoc-typehints][], to automatically reference annotated input and output types +- Citations (like {cite:p}`Virshup_2023`) can be included with [sphinxcontrib-bibtex](https://sphinxcontrib-bibtex.readthedocs.io/) See scanpy’s {doc}`scanpy:dev/documentation` for more information on how to write your own. @@ -183,10 +183,10 @@ please check out [this feature request][issue-render-notebooks] in the `cookiecu #### Hints -- If you refer to objects from other packages, please add an entry to `intersphinx_mapping` in `docs/conf.py`. - Only if you do so can sphinx automatically create a link to the external documentation. -- If building the documentation fails because of a missing link that is outside your control, - you can add an entry to the `nitpick_ignore` list in `docs/conf.py` +- If you refer to objects from other packages, please add an entry to `intersphinx_mapping` in `docs/conf.py`. + Only if you do so can sphinx automatically create a link to the external documentation. +- If building the documentation fails because of a missing link that is outside your control, + you can add an entry to the `nitpick_ignore` list in `docs/conf.py` (docs-building)= diff --git a/docs/index.md b/docs/index.md index 7065d92..324eab5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -17,4 +17,5 @@ notebooks/cohort_definition notebooks/study_design_example_omop_cdm notebooks/indwelling_arterial_catheters notebooks/tutorial_time_series_with_pypots +notebooks/omop_ml ``` diff --git a/docs/notebooks/omop_ml.ipynb b/docs/notebooks/omop_ml.ipynb new file mode 100644 index 0000000..25c6b63 --- /dev/null +++ b/docs/notebooks/omop_ml.ipynb @@ -0,0 +1,765 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "The autoreload extension is already loaded. To reload it, use:\n", + " %reload_ext autoreload\n" + ] + } + ], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Deep Learning on Timeseries for the OMOP CDM with ehrdata\n", + "ehrdata offers a deep learning convenience map-style [pytorch dataset](https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset), EHRDataset.\n", + "This is the input for pytorch's Dataloader, the canonical data loading structure for deep learning models in pytorch.\n", + "\n", + "For more information on the OMOP Common Data Model (CDM), see the notebook on the [OMOP CDM](./omop_tables_tutorial.ipynb).\n", + "\n", + "For more information on advanced time series algorithms, see the notebook on [Time Series Analysis with ehrdata and PyPOTS](./tutorial_time_series_with_pypots.ipynb)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Introduction\n", + "Disclaimer: the example usecase is for demonstration purposes only. The data preprocessing, the task definition, and the model setup are meant to be introductory. And as such lack the complexity required for proper inference. But flexible enough to build exactly this on top of it.\n", + "\n", + "## Worked example: Predict in-hospital mortality of ICU patients\n", + "We consider the task of predicting the in-hospital mortality of ICU patients, using public [MIMIC-IV demo dataset in the OMOP Common Data Model](https://physionet.org/content/mimic-iv-demo-omop/0.9/).\n", + "\n", + "Dataset:
\n", + "Kallfelz, M., Tsvetkova, A., Pollard, T., Kwong, M., Lipori, G., Huser, V., Osborn, J., Hao, S., & Williams, A. (2021). MIMIC-IV demo data in the OMOP Common Data Model (version 0.9). PhysioNet. https://doi.org/10.13026/p1f5-7x35." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Imports\n", + "We start with the required imports" + ] + }, + { + "cell_type": "code", + "execution_count": 267, + "metadata": {}, + "outputs": [], + "source": [ + "import torch\n", + "import duckdb\n", + "import numpy as np\n", + "from torch.utils.data import DataLoader\n", + "\n", + "import ehrdata as ed\n", + "import ehrapy as ep" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Setup the database\n", + "We use the plug-and-play ehrdata dataset, and duckdb as our RDMS.\n", + "#### Setup a local database connection" + ] + }, + { + "cell_type": "code", + "execution_count": 268, + "metadata": {}, + "outputs": [], + "source": [ + "con = duckdb.connect(\":memory:\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Download the data, and load it into the database\n", + "Convenience dataset available from ehrdata." + ] + }, + { + "cell_type": "code", + "execution_count": 269, + "metadata": {}, + "outputs": [], + "source": [ + "ed.dt.mimic_iv_omop(backend_handle=con)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define the Cohort\n", + "We start off by considering only patients with a `visit_occurrence` in ICU (`visit_concept_id` in `visit_detail` for ICU: In this dataset, we choose the OMOP Concept IDs\n", + "- 4305366 for Surgical ICU\n", + "- 40481392 for Medical ICU\n", + "- 32037 for Intensive Care\n", + "- 763903 for Trauma ICU\n", + "- 4149943 for Cardiac ICU\n", + "\n", + "If a person had multiple such ICU stays, we select the first.\n", + "\n", + "There are better ways than to delete rows in `visit_occurrence` which do not satisfy our cohort definition from our database, for the toy example this is the fastest." + ] + }, + { + "cell_type": "code", + "execution_count": 270, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 270, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "con.execute(\"\"\"\n", + " WITH RankedVisits AS (\n", + " SELECT\n", + " v.*,\n", + " vd.*,\n", + " ROW_NUMBER() OVER (PARTITION BY v.person_id ORDER BY v.visit_start_date) AS rn\n", + " FROM visit_occurrence v\n", + " JOIN visit_detail vd USING (visit_occurrence_id)\n", + " WHERE vd.visit_detail_concept_id IN (4305366, 40481392, 32037, 763903, 4149943)\n", + " ),\n", + " first_icu_visit_occurrence_id AS (\n", + " SELECT visit_occurrence_id\n", + " FROM RankedVisits\n", + " WHERE rn = 1\n", + " )\n", + " DELETE FROM visit_occurrence\n", + " WHERE visit_occurrence_id NOT IN (SELECT visit_occurrence_id FROM first_icu_visit_occurrence_id)\n", + "\"\"\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define the variables of interest and the time windows.\n", + "For more information of how we can convert the irregularly-sampled time series into a missing data problem by discretizing the time axis into non-overlapping intervals, see the Notebook on [Extracting, Representing, Validating and Vizualizing Data from an OMOP CDM Database with ehrdata, lamin, and Vitessce](./tutorial_omop_visualization.ipynb).\n", + "\n", + "Here, we decide for the following:\n", + "- We have for each person (n=100) one in-hospital stay; take the start of this hospital stay as the starting point (t=0) for each patient.\n", + "- We consider time-intervals of 1h, for 24h; that is, the first day after ICU admission. If for a patient less recorded data is available, the missing data is padded.\n", + "- We consider the data from the `measurements` table; we consider the numeric `value_as_number` values.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We now set up the persons to be considered, together with their observation start and endtimes, retrieved from the `visit_occurrence` table." + ] + }, + { + "cell_type": "code", + "execution_count": 271, + "metadata": {}, + "outputs": [], + "source": [ + "edata = ed.io.omop.setup_obs(\n", + " backend_handle=con,\n", + " observation_table=\"person_visit_occurrence\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In the next step, we retrieve all `value_as_number` entries from the `measurements` table:" + ] + }, + { + "cell_type": "code", + "execution_count": 272, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING:root:multiple units for features: [[ 1]\n", + " [ 7]\n", + " [ 31]\n", + " [ 33]\n", + " [ 39]\n", + " [ 44]\n", + " [ 49]\n", + " [ 57]\n", + " [ 60]\n", + " [ 79]\n", + " [ 81]\n", + " [ 83]\n", + " [ 93]\n", + " [110]\n", + " [160]\n", + " [175]\n", + " [186]\n", + " [187]\n", + " [189]\n", + " [190]\n", + " [195]\n", + " [204]\n", + " [207]\n", + " [221]\n", + " [269]\n", + " [273]\n", + " [275]]\n" + ] + } + ], + "source": [ + "edata = ed.io.omop.setup_variables(\n", + " edata=edata,\n", + " backend_handle=con,\n", + " data_tables=[\"measurement\"],\n", + " data_field_to_keep=[\"value_as_number\"],\n", + " interval_length_number=1,\n", + " interval_length_unit=\"h\",\n", + " num_intervals=24,\n", + " concept_ids=\"all\",\n", + " aggregation_strategy=\"last\",\n", + " instantiate_tensor=False,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "NOTE: this could/should become an ehrdata API call.\n", + "\n", + "We drop features which are not measured in at least 10 patients 1x." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 273, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# delete all rows for which data_table_concept_id is NOT NULL in at least 10 different patient_id s\n", + "con.execute(\"\"\"\n", + " WITH concept_ids_to_delete AS (\n", + "\n", + " SELECT\n", + " data_table_concept_id\n", + " FROM long_person_timestamp_feature_value\n", + " WHERE value_as_number IS NOT NULL\n", + " GROUP BY data_table_concept_id\n", + " HAVING COUNT(DISTINCT person_id) <= 10\n", + "\n", + " UNION\n", + "\n", + " SELECT\n", + " data_table_concept_id\n", + " FROM long_person_timestamp_feature_value\n", + " GROUP BY data_table_concept_id\n", + " HAVING COUNT(value_as_number) = 0\n", + " )\n", + "\n", + " DELETE FROM long_person_timestamp_feature_value\n", + " WHERE data_table_concept_id IN (\n", + " SELECT data_table_concept_id FROM concept_ids_to_delete\n", + " );\n", + "\"\"\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "NOTE: this could/should become an ehrdata API call.\n", + "\n", + "For model simpliclity, we conduct forward filling of the variables:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 274, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "con.execute(\"\"\"\n", + "WITH filled_measurements AS (\n", + " SELECT\n", + " person_id,\n", + " interval_step,\n", + " data_table_concept_id,\n", + " COALESCE(value_as_number,\n", + " LAST_VALUE(value_as_number IGNORE NULLS)\n", + " OVER (PARTITION BY person_id, data_table_concept_id\n", + " ORDER BY interval_step\n", + " ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)\n", + " ) AS filled_value\n", + " FROM long_person_timestamp_feature_value\n", + ")\n", + "UPDATE long_person_timestamp_feature_value\n", + "SET value_as_number = fm.filled_value\n", + "FROM filled_measurements as fm\n", + "WHERE long_person_timestamp_feature_value.person_id = fm.person_id\n", + "AND long_person_timestamp_feature_value.interval_step = fm.interval_step\n", + "AND long_person_timestamp_feature_value.data_table_concept_id = fm.data_table_concept_id;\n", + "\"\"\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "And for all values not captured in forward fill, we impute the missing value for person x, feature f, time step t as the mean of all other persons feature f at timestep t." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 275, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "con.execute(\"\"\"\n", + "WITH feature_means AS (\n", + " SELECT\n", + " interval_step,\n", + " data_table_concept_id,\n", + " AVG(value_as_number) AS mean_value\n", + " FROM long_person_timestamp_feature_value\n", + " WHERE value_as_number IS NOT NULL\n", + " GROUP BY interval_step, data_table_concept_id\n", + "),\n", + "filled_values AS (\n", + " SELECT\n", + " lptfv.person_id,\n", + " lptfv.interval_step,\n", + " lptfv.data_table_concept_id,\n", + " COALESCE(lptfv.value_as_number, fm.mean_value) AS filled_value\n", + " FROM long_person_timestamp_feature_value lptfv\n", + " LEFT JOIN feature_means fm\n", + " ON lptfv.interval_step = fm.interval_step\n", + " AND lptfv.data_table_concept_id = fm.data_table_concept_id\n", + ")\n", + "UPDATE long_person_timestamp_feature_value\n", + "SET value_as_number = fm.filled_value\n", + "FROM filled_values as fm\n", + "WHERE long_person_timestamp_feature_value.person_id = fm.person_id\n", + "AND long_person_timestamp_feature_value.interval_step = fm.interval_step\n", + "AND long_person_timestamp_feature_value.data_table_concept_id = fm.data_table_concept_id;\n", + "\"\"\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Deep Learning Model\n", + "#### Data Loading" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The tensor of shape n_obs x n_vars x num_intervals has been prepared in the RDBMS.\n", + "We can now create an `EHRDataset`, which is a subclass of pytorch's Dataset, and will stream the data for a deep learning model from the database." + ] + }, + { + "cell_type": "code", + "execution_count": 276, + "metadata": {}, + "outputs": [], + "source": [ + "dataset = ed.tl.omop.EHRDataset(con, edata, batch_size=5, idxs=None)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `EHRDataset`, as subclass of pytorch's `Dataset`, can be used right away for creating a pytorch `Dataloader`." + ] + }, + { + "cell_type": "code", + "execution_count": 277, + "metadata": {}, + "outputs": [], + "source": [ + "loader = DataLoader(dataset, batch_size=4, shuffle=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Model definition\n", + "We create a simple model for time series based on a Recurrent Neural Network in pytorch.\n", + "More advanced models interoperable with ehrdata are showcased in [Time Series Analysis with ehrdata and PyPOTS](./tutorial_time_series_with_pypots.ipynb). However, PyPOTS does not support a pytorch Dataloader as input." + ] + }, + { + "cell_type": "code", + "execution_count": 278, + "metadata": {}, + "outputs": [], + "source": [ + "import torch.nn as nn\n", + "import torch.nn.functional as F\n", + "\n", + "\n", + "class RNN_Model(nn.Module):\n", + " \"\"\"RNN Model.\"\"\"\n", + "\n", + " def __init__(self, input_size, hidden_size, num_layers, num_classes):\n", + " super().__init__()\n", + " self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)\n", + " self.fc = nn.Linear(hidden_size, num_classes)\n", + "\n", + " def _prepare_batch(self, batch):\n", + " x, target = batch\n", + " return torch.transpose(x, 2, 1), target.flatten().to(torch.long)\n", + "\n", + " def forward(self, x):\n", + " \"\"\"Forward method.\"\"\"\n", + " # x: (batch_size, seq_len, input_size)\n", + " out, _ = self.rnn(x)\n", + " out = out[:, -1, :]\n", + "\n", + " # out: (batch_size, num_classes)\n", + " logits = self.fc(out)\n", + " return out, logits\n", + "\n", + " def training_step(self, batch):\n", + " \"\"\"Training step.\"\"\"\n", + " x, target = self._prepare_batch(batch)\n", + " out, logits = self(x)\n", + " loss = F.cross_entropy(logits, target)\n", + " return loss\n", + "\n", + " def fit(self, loader, epochs=10):\n", + " \"\"\"Fit method.\"\"\"\n", + " self.train_loss = []\n", + " optimizer = torch.optim.Adam(self.parameters(), lr=0.01)\n", + " for epoch in range(epochs):\n", + " batch_loss = []\n", + " for batch in loader:\n", + " optimizer.zero_grad()\n", + " loss = self.training_step(batch)\n", + " loss.backward()\n", + " optimizer.step()\n", + "\n", + " batch_loss.append(loss.item())\n", + "\n", + " self.train_loss.append(np.mean(np.array(batch_loss)))\n", + " print(f\"Epoch {epoch}, Loss: {np.mean(np.array(batch_loss))}\")\n", + "\n", + " def predict(self, loader, soft=True):\n", + " \"\"\"Predict method.\"\"\"\n", + " predictions = []\n", + " with torch.no_grad():\n", + " for batch in loader:\n", + " x, target = self._prepare_batch(batch)\n", + " _, classification_logits = self(x)\n", + " if soft:\n", + " predicted = torch.softmax(classification_logits, 1)\n", + " else:\n", + " predicted = torch.max(classification_logits, 1)\n", + " predictions.append(predicted)\n", + " return torch.cat(predictions)\n", + "\n", + " def represent(self, loader):\n", + " \"\"\"Represent method.\"\"\"\n", + " representations = []\n", + " with torch.no_grad():\n", + " for batch in loader:\n", + " x, target = self._prepare_batch(batch)\n", + " output, _ = self(x)\n", + " representations.append(output)\n", + " return torch.cat(representations)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Model training" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Epoch 0, Loss: 0.3569712014496326\n", + "Epoch 1, Loss: 0.3650100648403168\n", + "Epoch 2, Loss: 0.3504540580511093\n", + "Epoch 3, Loss: 0.3471323770284653\n", + "Epoch 4, Loss: 0.3453905090689659\n" + ] + } + ], + "source": [ + "model = RNN_Model(\n", + " input_size=129,\n", + " hidden_size=16,\n", + " num_layers=1,\n", + " num_classes=2,\n", + ")\n", + "model.fit(loader, epochs=5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Model prediction\n", + "Classification could look like this" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "tensor([[0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8645, 0.1355],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9056, 0.0944],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.8114, 0.1886],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9056, 0.0944],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8542, 0.1458],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.7546, 0.2454],\n", + " [0.9055, 0.0945],\n", + " [0.8678, 0.1322],\n", + " [0.9055, 0.0945],\n", + " [0.8138, 0.1862],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945],\n", + " [0.9055, 0.0945]])" + ] + }, + "execution_count": 280, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.predict(loader)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Model representation\n", + "Futher, could illustrate patient representation:" + ] + }, + { + "cell_type": "code", + "execution_count": 281, + "metadata": {}, + "outputs": [], + "source": [ + "edata.obsm[\"last_step_representation\"] = np.array(model.represent(loader))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "ep.pp.neighbors(edata, use_rep=\"last_step_representation\")\n", + "ep.tl.umap(edata)\n", + "ep.pl.umap(\n", + " edata, color=\"discharge_to_source_value\", title=\"UMAP of RNN representation after 24h colored by discharge note\"\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "ehrapy_venv_oct", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/docs/notebooks/tutorial_time_series_with_pypots.ipynb b/docs/notebooks/tutorial_time_series_with_pypots.ipynb index 3fbe3d2..4e4c9d6 100644 --- a/docs/notebooks/tutorial_time_series_with_pypots.ipynb +++ b/docs/notebooks/tutorial_time_series_with_pypots.ipynb @@ -32,7 +32,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, "outputs": [], "source": [ @@ -41,9 +41,35 @@ }, { "cell_type": "code", - "execution_count": 58, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/eljas.roellin/Documents/ehrapy_workspace/ehrapy_venv_oct/lib/python3.11/site-packages/pypots/nn/modules/reformer/local_attention.py:31: FutureWarning: `torch.cuda.amp.autocast(args...)` is deprecated. Please use `torch.amp.autocast('cuda', args...)` instead.\n", + " @autocast(enabled=False)\n", + "/Users/eljas.roellin/Documents/ehrapy_workspace/ehrapy_venv_oct/lib/python3.11/site-packages/pypots/nn/modules/reformer/local_attention.py:98: FutureWarning: `torch.cuda.amp.autocast(args...)` is deprecated. Please use `torch.amp.autocast('cuda', args...)` instead.\n", + " @autocast(enabled=False)\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\u001b[34m\n", + "████████╗██╗███╗ ███╗███████╗ ███████╗███████╗██████╗ ██╗███████╗███████╗ █████╗ ██╗\n", + "╚══██╔══╝██║████╗ ████║██╔════╝ ██╔════╝██╔════╝██╔══██╗██║██╔════╝██╔════╝ ██╔══██╗██║\n", + " ██║ ██║██╔████╔██║█████╗█████╗███████╗█████╗ ██████╔╝██║█████╗ ███████╗ ███████║██║\n", + " ██║ ██║██║╚██╔╝██║██╔══╝╚════╝╚════██║██╔══╝ ██╔══██╗██║██╔══╝ ╚════██║ ██╔══██║██║\n", + " ██║ ██║██║ ╚═╝ ██║███████╗ ███████║███████╗██║ ██║██║███████╗███████║██╗██║ ██║██║\n", + " ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚══════╝╚══════╝╚═╝ ╚═╝╚═╝╚══════╝╚══════╝╚═╝╚═╝ ╚═╝╚═╝\n", + "ai4ts v0.0.3 - building AI for unified time-series analysis, https://time-series.ai \u001b[0m\n", + "\n" + ] + } + ], "source": [ "import duckdb\n", "import ehrdata as ed\n", @@ -60,72 +86,24 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 3, "metadata": {}, "outputs": [ { - "name": "stderr", - "output_type": "stream", - "text": [ - "INFO - Downloading Synthea27Nj_5.4.zip from https://github.com/OHDSI/EunomiaDatasets/raw/main/datasets/Synthea27Nj/Synthea27Nj_5.4.zip to /var/folders/yy/60ln_681745_fjjwvgwm_nyc0000gn/T/tmpfndmdvwt/Synthea27Nj_5.4.zip\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "254776f379994eeab1835ffe42fe89a1", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Output()" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/html": [ - "
\n"
-      ],
-      "text/plain": []
-     },
-     "metadata": {},
-     "output_type": "display_data"
-    },
-    {
-     "name": "stderr",
-     "output_type": "stream",
-     "text": [
-      "INFO - Extracted archive Synthea27Nj_5.4.zip from /var/folders/yy/60ln_681745_fjjwvgwm_nyc0000gn/T/tmpfndmdvwt/Synthea27Nj_5.4.zip to ehrapy_data/Synthea27Nj_5.4/Synthea27Nj_5.4\n",
-      "INFO - missing tables: []\n",
-      "INFO - unused files: ['EPISODE.csv', '__MACOSX', 'EPISODE_EVENT.csv']\n"
-     ]
-    },
-    {
-     "name": "stdout",
-     "output_type": "stream",
-     "text": [
-      "multiple units for features: []\n"
+     "ename": "",
+     "evalue": "",
+     "output_type": "error",
+     "traceback": [
+      "\u001b[1;31mCannot execute code, session has been disposed. Please try restarting the Kernel."
      ]
     },
     {
-     "name": "stderr",
-     "output_type": "stream",
-     "text": [
-      "/Users/eljas.roellin/Documents/ehrapy_workspace/ehrapy_venv_oct/lib/python3.11/site-packages/anndata/_core/aligned_df.py:68: ImplicitModificationWarning: Transforming to str index.\n",
-      "  warnings.warn(\"Transforming to str index.\", ImplicitModificationWarning)\n",
-      "/Users/eljas.roellin/Documents/ehrapy_workspace/ehrapy_venv_oct/lib/python3.11/site-packages/pandas/core/generic.py:3331: UserWarning: Converting non-nanosecond precision datetime values to nanosecond precision. This behavior can eventually be relaxed in xarray, as it is an artifact from pandas which is now beginning to support non-nanosecond precision values. This warning is caused by passing non-nanosecond np.datetime64 or np.timedelta64 values to the DataArray or Variable constructor; it can be silenced by converting the values to nanosecond precision ahead of time.\n",
-      "  return xarray.Dataset.from_dataframe(self)\n",
-      "/Users/eljas.roellin/Documents/ehrapy_workspace/ehrapy_venv_oct/lib/python3.11/site-packages/pandas/core/generic.py:3331: UserWarning: Converting non-nanosecond precision datetime values to nanosecond precision. This behavior can eventually be relaxed in xarray, as it is an artifact from pandas which is now beginning to support non-nanosecond precision values. This warning is caused by passing non-nanosecond np.datetime64 or np.timedelta64 values to the DataArray or Variable constructor; it can be silenced by converting the values to nanosecond precision ahead of time.\n",
-      "  return xarray.Dataset.from_dataframe(self)\n",
-      "/Users/eljas.roellin/Documents/ehrapy_workspace/ehrapy_venv_oct/lib/python3.11/site-packages/pandas/core/generic.py:3331: UserWarning: Converting non-nanosecond precision datetime values to nanosecond precision. This behavior can eventually be relaxed in xarray, as it is an artifact from pandas which is now beginning to support non-nanosecond precision values. This warning is caused by passing non-nanosecond np.datetime64 or np.timedelta64 values to the DataArray or Variable constructor; it can be silenced by converting the values to nanosecond precision ahead of time.\n",
-      "  return xarray.Dataset.from_dataframe(self)\n",
-      "/Users/eljas.roellin/Documents/ehrapy_workspace/ehrapy_venv_oct/lib/python3.11/site-packages/pandas/core/generic.py:3331: UserWarning: Converting non-nanosecond precision datetime values to nanosecond precision. This behavior can eventually be relaxed in xarray, as it is an artifact from pandas which is now beginning to support non-nanosecond precision values. This warning is caused by passing non-nanosecond np.datetime64 or np.timedelta64 values to the DataArray or Variable constructor; it can be silenced by converting the values to nanosecond precision ahead of time.\n",
-      "  return xarray.Dataset.from_dataframe(self)\n",
-      "/Users/eljas.roellin/Documents/ehrapy_workspace/ehrapy_venv_oct/lib/python3.11/site-packages/anndata/_core/aligned_df.py:68: ImplicitModificationWarning: Transforming to str index.\n",
-      "  warnings.warn(\"Transforming to str index.\", ImplicitModificationWarning)\n"
+     "ename": "",
+     "evalue": "",
+     "output_type": "error",
+     "traceback": [
+      "\u001b[1;31mCannot execute code, session has been disposed. Please try restarting the Kernel. \n",
+      "\u001b[1;31mView Jupyter log for further details."
      ]
     }
    ],
diff --git a/pyproject.toml b/pyproject.toml
index b9f4a39..68fba82 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -39,7 +39,7 @@ optional-dependencies.dev = [
 ]
 optional-dependencies.doc = [
   "docutils>=0.8,!=0.18.*,!=0.19.*",
-  "ehrdata[lamin,vitessce]",
+  "ehrdata[torch,lamin,vitessce]",
   "ipykernel",
   "ipython",
   "myst-nb>=1.1",
@@ -66,9 +66,13 @@ optional-dependencies.lamin = [
 ]
 optional-dependencies.test = [
   "coverage",
-  "ehrdata[vitessce,lamin]",
+  "ehrdata[torch,vitessce,lamin]",
   "pytest",
 ]
+optional-dependencies.torch = [
+  "torch",
+]
+
 optional-dependencies.vitessce = [
   "vitessce[all]>=3.4", # the actual dependency
   "zarr<3",             # vitessce does not support zarr>=3
diff --git a/src/ehrdata/__init__.py b/src/ehrdata/__init__.py
index eb69f06..fb7769b 100644
--- a/src/ehrdata/__init__.py
+++ b/src/ehrdata/__init__.py
@@ -1,8 +1,8 @@
 from importlib.metadata import version
 
-from . import dt, io, pl
+from . import dt, io, pl, tl
 from .core import EHRData
 
-__all__ = ["EHRData", "dt", "io", "pl"]
+__all__ = ["EHRData", "dt", "io", "tl", "pl"]
 
 __version__ = version("ehrdata")
diff --git a/src/ehrdata/core/_optional_modules_import.py b/src/ehrdata/core/_optional_modules_import.py
new file mode 100644
index 0000000..aee28a4
--- /dev/null
+++ b/src/ehrdata/core/_optional_modules_import.py
@@ -0,0 +1,9 @@
+def lazy_import_torch():
+    try:
+        import torch
+
+        return torch
+    except ImportError:
+        raise ImportError(
+            "The optional module 'torch' is not installed. Please install it using 'pip install ehrdata[torch]'."
+        ) from None
diff --git a/src/ehrdata/dt/datasets.py b/src/ehrdata/dt/datasets.py
index 94a6e2e..2ddba6c 100644
--- a/src/ehrdata/dt/datasets.py
+++ b/src/ehrdata/dt/datasets.py
@@ -44,7 +44,7 @@ def _setup_eunomia_datasets(
 def mimic_iv_omop(backend_handle: DuckDBPyConnection, data_path: Path | None = None) -> None:
     """Loads the MIMIC-IV demo data in the OMOP Common Data model.
 
-    This function loads the MIMIC-IV demo dataset from its `physionet repository _` .
+    This function loads the MIMIC-IV demo dataset from its `physionet repository `_.
     See also this link for more details.
 
     DOI https://doi.org/10.13026/2d25-8g07.
@@ -85,7 +85,7 @@ def mimic_iv_omop(backend_handle: DuckDBPyConnection, data_path: Path | None = N
 def gibleed_omop(backend_handle: DuckDBPyConnection, data_path: Path | None = None) -> None:
     """Loads the GiBleed dataset in the OMOP Common Data model.
 
-    This function loads the GIBleed dataset from the `EunomiaDatasets repository _`.
+    This function loads the GIBleed dataset from the `EunomiaDatasets repository `_.
     More details: https://github.com/OHDSI/EunomiaDatasets/tree/main/datasets/GiBleed.
 
     Parameters
@@ -124,7 +124,7 @@ def gibleed_omop(backend_handle: DuckDBPyConnection, data_path: Path | None = No
 def synthea27nj_omop(backend_handle: DuckDBPyConnection, data_path: Path | None = None) -> None:
     """Loads the Synthea27Nj dataset in the OMOP Common Data model.
 
-    This function loads the Synthea27Nj dataset from the `EunomiaDatasets repository _`.
+    This function loads the Synthea27Nj dataset from the `EunomiaDatasets repository `_.
     More details: https://github.com/OHDSI/EunomiaDatasets/tree/main/datasets/Synthea27Nj.
 
     Parameters
@@ -186,13 +186,13 @@ def physionet2012(
         "142998",
     ],
 ) -> EHRData:
-    """Loads the dataset of the `PhysioNet challenge 2012 (v1.0.0) _`.
+    """Loads the dataset of the `PhysioNet challenge 2012 (v1.0.0) `_.
 
-    If interval_length_number is 1, interval_length_unit is "h" (hour), and num_intervals is 48, this is equivalent to the SAITS preprocessing (insert paper/link/citation).
+    If interval_length_number is 1, interval_length_unit is "h" (hour), and num_intervals is 48, this is equivalent to the `SAITS `_ preprocessing.
     Truncated if a sample has more num_intervals steps; Padded if a sample has less than num_intervals steps.
     Further, by default the following 12 samples are dropped since they have no time series information at all: 147514, 142731, 145611, 140501, 155655, 143656, 156254, 150309,
     140936, 141264, 150649, 142998.
-    Taken the defaults of interval_length_number, interval_length_unit, num_intervals, and drop_samples, the tensor stored in .r of edata is the same as when doing the PyPOTS  preprocessing.
+    Taken the defaults of interval_length_number, interval_length_unit, num_intervals, and drop_samples, the tensor stored in .r of edata is the same as when doing the `PyPOTS `_ preprocessing.
     A simple deviation is that the tensor in ehrdata is of shape n_obs x n_vars x n_intervals (with defaults, 3000x37x48) while the tensor in PyPOTS is of shape n_obs x n_intervals x n_vars (3000x48x37).
     The tensor stored in .r is hence also fully compatible with the PyPOTS package, as the .r tensor of EHRData objects generally is.
 
diff --git a/src/ehrdata/io/omop/_queries.py b/src/ehrdata/io/omop/_queries.py
index f1937c5..552bd3c 100644
--- a/src/ehrdata/io/omop/_queries.py
+++ b/src/ehrdata/io/omop/_queries.py
@@ -115,7 +115,7 @@ def _generate_value_query(data_table: str, data_field_to_keep: Sequence, aggrega
     return is_present_query + value_query
 
 
-def _time_interval_table(
+def _write_long_time_interval_table(
     backend_handle: duckdb.duckdb.DuckDBPyConnection,
     time_defining_table: str,
     data_table: str,
@@ -125,7 +125,7 @@ def _time_interval_table(
     aggregation_strategy: str,
     data_field_to_keep: Sequence[str] | str,
     keep_date: str = "",
-):
+) -> None:
     if isinstance(data_field_to_keep, str):
         data_field_to_keep = [data_field_to_keep]
 
@@ -139,6 +139,8 @@ def _time_interval_table(
         timedeltas_dataframe,
     )
 
+    create_long_table_query = "CREATE TABLE long_person_timestamp_feature_value AS\n"
+
     # multi-step query
     # 1. Create person_time_defining_table, which matches the one created for obs. Needs to contain the person_id, and the start date in particular.
     # 2. Create person_data_table (data_table is typically measurement), which contains the cross product of person_id and the distinct concept_id s.
@@ -196,10 +198,23 @@ def _time_interval_table(
         GROUP BY lfi.person_id, lfi.data_table_concept_id, interval_step, interval_start, interval_end
         """
 
-    query = prepare_alias_query + select_query
+    query = create_long_table_query + prepare_alias_query + select_query
 
-    df = backend_handle.execute(query).df()
+    backend_handle.execute("DROP TABLE IF EXISTS long_person_timestamp_feature_value")
+    backend_handle.execute(query)
 
-    _drop_timedeltas(backend_handle)
+    add_person_range_index_query = """
+        ALTER TABLE long_person_timestamp_feature_value
+        ADD COLUMN person_index INTEGER;
 
-    return df
+        WITH RankedPersons AS (
+            SELECT person_id,
+                ROW_NUMBER() OVER (ORDER BY person_id) - 1 AS idx
+            FROM (SELECT DISTINCT person_id FROM long_person_timestamp_feature_value) AS unique_persons
+        )
+        UPDATE long_person_timestamp_feature_value
+        SET person_index = RP.idx
+        FROM RankedPersons RP
+        WHERE long_person_timestamp_feature_value.person_id = RP.person_id;
+    """
+    backend_handle.execute(add_person_range_index_query)
diff --git a/src/ehrdata/io/omop/omop.py b/src/ehrdata/io/omop/omop.py
index 4561fca..a10f2ac 100644
--- a/src/ehrdata/io/omop/omop.py
+++ b/src/ehrdata/io/omop/omop.py
@@ -32,7 +32,7 @@
     _check_valid_observation_table,
     _check_valid_variable_data_tables,
 )
-from ehrdata.io.omop._queries import _time_interval_table
+from ehrdata.io.omop._queries import _write_long_time_interval_table
 
 DOWNLOAD_VERIFICATION_TAG = "download_verification_tag"
 
@@ -65,7 +65,7 @@ def _set_up_duckdb(path: Path, backend_handle: DuckDBPyConnection, prefix: str =
                 dtype = None
 
             # read raw csv as temporary table
-            temp_relation = backend_handle.read_csv(path / file_name, dtype=dtype, escapechar="%")  # noqa: F841
+            temp_relation = backend_handle.read_csv(path / file_name, dtype=dtype)  # noqa: F841
             backend_handle.execute("CREATE OR REPLACE TABLE temp_table AS SELECT * FROM temp_relation")
 
             # make query to create table with lowercase column names
@@ -96,26 +96,32 @@ def _set_up_duckdb(path: Path, backend_handle: DuckDBPyConnection, prefix: str =
     logging.info(f"unused files: {unused_files}")
 
 
-def _collect_units_per_feature(ds, unit_key="unit_concept_id") -> dict:
+def _collect_units_per_feature(backend_handle, unit_key="unit_concept_id") -> dict:
+    query = f"""
+    SELECT DISTINCT data_table_concept_id, {unit_key} FROM long_person_timestamp_feature_value
+    WHERE is_present = 1
+    """
+    result = backend_handle.execute(query).fetchall()
+
     feature_units = {}
-    for i in range(ds[unit_key].shape[1]):
-        single_feature_units = ds[unit_key].isel({ds[unit_key].dims[1]: i})
-        single_feature_units_flat = np.array(single_feature_units).flatten()
-        single_feature_units_unique = pd.unique(single_feature_units_flat[~pd.isna(single_feature_units_flat)])
-        feature_units[ds["data_table_concept_id"][i].item()] = single_feature_units_unique
+    for feature, unit in result:
+        if feature in feature_units:
+            feature_units[feature].append(unit)
+        else:
+            feature_units[feature] = [unit]
     return feature_units
 
 
-def _check_one_unit_per_feature(ds, unit_key="unit_concept_id") -> None:
-    feature_units = _collect_units_per_feature(ds, unit_key=unit_key)
+def _check_one_unit_per_feature(backend_handle, unit_key="unit_concept_id") -> None:
+    feature_units = _collect_units_per_feature(backend_handle, unit_key=unit_key)
     num_units = np.array([len(units) for _, units in feature_units.items()])
 
     # print(f"no units for features: {np.argwhere(num_units == 0)}")
-    print(f"multiple units for features: {np.argwhere(num_units > 1)}")
+    logging.warning(f"multiple units for features: {np.argwhere(num_units > 1)}")
 
 
-def _create_feature_unit_concept_id_report(backend_handle, ds) -> pd.DataFrame:
-    feature_units_concept = _collect_units_per_feature(ds, unit_key="unit_concept_id")
+def _create_feature_unit_concept_id_report(backend_handle) -> pd.DataFrame:
+    feature_units_concept = _collect_units_per_feature(backend_handle, unit_key="unit_concept_id")
 
     feature_units_long_format = []
     for feature, units in feature_units_concept.items():
@@ -257,12 +263,18 @@ def setup_variables(
     aggregation_strategy: str = "last",
     enrich_var_with_feature_info: bool = False,
     enrich_var_with_unit_info: bool = False,
+    instantiate_tensor: bool = True,
 ):
     """Setup the variables.
 
     This function sets up the variables for the EHRData object.
     It will fail if there is more than one unit_concept_id per feature.
     Writes a unit report of the features to edata.uns["unit_report_"].
+    Writes the setup arguments into edata.uns["omop_io_variable_setup"].
+
+    Stores a table named `long_person_timestamp_feature_value` in long format in the RDBMS.
+    This table is instantiated into edata.r if `instantiate_tensor` is set to True;
+    otherwise, the table is only stored in the RDBMS for later use.
 
     Parameters
     ----------
@@ -289,7 +301,9 @@ def setup_variables(
     enrich_var_with_feature_info
         Whether to enrich the var table with feature information. If a concept_id is not found in the concept table, the feature information will be NaN.
     enrich_var_with_unit_info
-        Whether to enrich the var table with unit information. Raises an Error if a) multiple units per feature are found for at least one feature. If a concept_id is not found in the concept table, the feature information will be NaN.
+        Whether to enrich the var table with unit information. Raises an Error if multiple units per feature are found for at least one feature. For entire missing data points, the units are ignored. For observed data points with missing unit information (NULL in either unit_concept_id or unit_source_value), the value NULL/NaN is considered a single unit.
+    instantiate_tensor
+        Whether to instantiate the tensor into the .r field of the EHRData object.
 
     Returns
     -------
@@ -331,27 +345,21 @@ def setup_variables(
         logging.warning(f"No data found in {data_tables[0]}. Returning edata without additional variables.")
         return edata
 
-    ds = (
-        _time_interval_table(
-            backend_handle=backend_handle,
-            time_defining_table=time_defining_table,
-            data_table=data_tables[0],
-            data_field_to_keep=data_field_to_keep,
-            interval_length_number=interval_length_number,
-            interval_length_unit=interval_length_unit,
-            num_intervals=num_intervals,
-            aggregation_strategy=aggregation_strategy,
-        )
-        .set_index(["person_id", "data_table_concept_id", "interval_step"])
-        .to_xarray()
+    _write_long_time_interval_table(
+        backend_handle=backend_handle,
+        time_defining_table=time_defining_table,
+        data_table=data_tables[0],
+        data_field_to_keep=data_field_to_keep,
+        interval_length_number=interval_length_number,
+        interval_length_unit=interval_length_unit,
+        num_intervals=num_intervals,
+        aggregation_strategy=aggregation_strategy,
     )
 
-    _check_one_unit_per_feature(ds)
-    # TODO ignore? go with more vanilla omop style. _check_one_unit_per_feature(ds, unit_key="unit_source_value")
-
-    unit_report = _create_feature_unit_concept_id_report(backend_handle, ds)
+    _check_one_unit_per_feature(backend_handle)
+    unit_report = _create_feature_unit_concept_id_report(backend_handle)
 
-    var = ds["data_table_concept_id"].to_dataframe()
+    var = backend_handle.execute("SELECT DISTINCT data_table_concept_id FROM long_person_timestamp_feature_value").df()
 
     if enrich_var_with_feature_info or enrich_var_with_unit_info:
         concepts = backend_handle.sql("SELECT * FROM concept").df()
@@ -381,9 +389,19 @@ def setup_variables(
                 suffixes=("", "_unit"),
             )
 
-    t = ds["interval_step"].to_dataframe()
+    t = pd.DataFrame({"interval_step": np.arange(num_intervals)})
 
-    edata = EHRData(r=ds[data_field_to_keep[0]].values, obs=edata.obs, var=var, uns=edata.uns, t=t)
+    if instantiate_tensor:
+        ds = (
+            (backend_handle.execute("SELECT * FROM long_person_timestamp_feature_value").df())
+            .set_index(["person_id", "data_table_concept_id", "interval_step"])
+            .to_xarray()
+        )
+
+    else:
+        ds = None
+
+    edata = EHRData(r=ds[data_field_to_keep[0]].values if ds else None, obs=edata.obs, var=var, uns=edata.uns, t=t)
     edata.uns[f"unit_report_{data_tables[0]}"] = unit_report
 
     return edata
@@ -403,6 +421,7 @@ def setup_interval_variables(
     enrich_var_with_feature_info: bool = False,
     enrich_var_with_unit_info: bool = False,
     keep_date: Literal["start", "end", "interval"] = "start",
+    instantiate_tensor: bool = True,
 ):
     """Setup the interval variables
 
@@ -436,6 +455,8 @@ def setup_interval_variables(
         Whether to enrich the var table with feature information. If a concept_id is not found in the concept table, the feature information will be NaN.
     date_type
         Whether to keep the start or end date, or the interval span.
+    instantiate_tensor
+        Whether to instantiate the tensor into the .r field of the EHRData object.
 
     Returns
     -------
@@ -466,23 +487,26 @@ def setup_interval_variables(
         logging.warning(f"No data in {data_tables}.")
         return edata
 
+    _write_long_time_interval_table(
+        backend_handle=backend_handle,
+        time_defining_table=time_defining_table,
+        data_table=data_tables[0],
+        data_field_to_keep=data_field_to_keep,
+        interval_length_number=interval_length_number,
+        interval_length_unit=interval_length_unit,
+        num_intervals=num_intervals,
+        aggregation_strategy=aggregation_strategy,
+        keep_date=keep_date,
+    )
+
     ds = (
-        _time_interval_table(
-            backend_handle=backend_handle,
-            time_defining_table=time_defining_table,
-            data_table=data_tables[0],
-            data_field_to_keep=data_field_to_keep,
-            interval_length_number=interval_length_number,
-            interval_length_unit=interval_length_unit,
-            num_intervals=num_intervals,
-            aggregation_strategy=aggregation_strategy,
-            keep_date=keep_date,
-        )
+        backend_handle.execute("SELECT * FROM long_person_timestamp_feature_value")
+        .df()
         .set_index(["person_id", "data_table_concept_id", "interval_step"])
         .to_xarray()
     )
 
-    var = ds["data_table_concept_id"].to_dataframe()
+    var = backend_handle.execute("SELECT DISTINCT data_table_concept_id FROM long_person_timestamp_feature_value").df()
 
     if enrich_var_with_feature_info or enrich_var_with_unit_info:
         concepts = backend_handle.sql("SELECT * FROM concept").df()
@@ -491,7 +515,7 @@ def setup_interval_variables(
     if enrich_var_with_feature_info:
         var = pd.merge(var, concepts, how="left", left_index=True, right_on="concept_id")
 
-    t = ds["interval_step"].to_dataframe()
+    t = pd.DataFrame({"interval_step": np.arange(num_intervals)})
 
     edata = EHRData(r=ds[data_field_to_keep[0]].values, obs=edata.obs, var=var, uns=edata.uns, t=t)
 
diff --git a/src/ehrdata/tl/__init__.py b/src/ehrdata/tl/__init__.py
new file mode 100644
index 0000000..1c16543
--- /dev/null
+++ b/src/ehrdata/tl/__init__.py
@@ -0,0 +1 @@
+from . import omop
diff --git a/src/ehrdata/tl/omop/__init__.py b/src/ehrdata/tl/omop/__init__.py
new file mode 100644
index 0000000..4ba6ce9
--- /dev/null
+++ b/src/ehrdata/tl/omop/__init__.py
@@ -0,0 +1 @@
+from ._dataset import EHRDataset
diff --git a/src/ehrdata/tl/omop/_dataset.py b/src/ehrdata/tl/omop/_dataset.py
new file mode 100644
index 0000000..723ab5b
--- /dev/null
+++ b/src/ehrdata/tl/omop/_dataset.py
@@ -0,0 +1,124 @@
+from collections.abc import Sequence
+from typing import TYPE_CHECKING, Literal
+
+from duckdb.duckdb import DuckDBPyConnection
+
+from ehrdata.core._optional_modules_import import lazy_import_torch
+from ehrdata.io.omop._queries import DATA_TABLE_DATE_KEYS
+
+torch = lazy_import_torch()
+
+if TYPE_CHECKING:
+    import torch
+
+
+class EHRDataset(torch.utils.data.Dataset):
+    def __init__(
+        self,
+        con: DuckDBPyConnection,
+        edata,
+        batch_size: int = 10,
+        target: Literal["mortality"] = "mortality",
+        datetime: bool = True,
+        idxs: Sequence[int] | None = None,
+    ) -> torch.utils.data.Dataset:
+        """Return a torch.utils.data.Dataset object for EHR data.
+
+        This function builds a torch.utils.data.Dataset object for EHR data. The EHR data is assumed to be in the OMOP CDM format.
+        It is a Dataset structure for the tensor in ehrdata.r, in a suitable format for pytorch.utils.data.DataLoader.
+        This allows to stream the data in batches from the RDBMS, not requiring to load the entire dataset in memory.
+
+        Parameters
+        ----------
+        con
+            The connection to the database.
+        edata
+            The EHRData object.
+        batch_size
+            The batch size.
+        target
+            The target variable to be used.
+        datetime
+            If True, use datetime, if False, use date.
+        idxs
+            The indices of the patients to be used, can be used to include only a subset of the data, for e.g. train-test splits.
+            The observation table to be used.
+
+        Returns
+        -------
+        EHRDataset
+            A torch.utils.data.Dataset object of the .r tensor in ehrdata.
+        """
+        super().__init__()
+        self.con = con
+        self.edata = edata
+        self.target = target
+        self.datetime = datetime
+        self.idxs = idxs
+
+        self.n_timesteps = con.execute(
+            "SELECT COUNT(DISTINCT interval_step) FROM long_person_timestamp_feature_value"
+        ).fetchone()[0]
+        self.n_variables = con.execute(
+            "SELECT COUNT(DISTINCT data_table_concept_id) FROM long_person_timestamp_feature_value"
+        ).fetchone()[0]
+
+    def __len__(self):
+        if self.idxs:
+            where_clause = f"WHERE person_id IN ({','.join(str(_) for _ in self.idxs)})"
+        else:
+            where_clause = ""
+        query = f"""
+            SELECT COUNT(DISTINCT person_id)
+            FROM long_person_timestamp_feature_value
+            {where_clause}
+        """
+        return self.con.execute(query).fetchone()[0]
+
+    def __getitem__(self, person_index):
+        person_id_query = (
+            f"SELECT DISTINCT person_id FROM long_person_timestamp_feature_value WHERE person_index = {person_index}"
+        )
+        person_id = self.con.execute(person_id_query).fetchone()[0]
+        where_clause = f"WHERE person_index = {person_index}"
+
+        if self.idxs:
+            where_clause += f" AND person_index IN ({','.join(str(_) for _ in self.idxs)})"
+
+        query = f"""
+            SELECT person_index, data_table_concept_id, interval_step, COALESCE(CAST(value_as_number AS DOUBLE), 'NaN') AS value_as_number
+            FROM long_person_timestamp_feature_value
+            {where_clause}
+        """
+
+        long_format_data = torch.tensor(self.con.execute(query).fetchall(), dtype=torch.float32)
+
+        # convert long format to 3D tensor
+        feature_ids, feature_idx = torch.unique(long_format_data[:, 1], return_inverse=True)
+        step_ids, step_idx = torch.unique(long_format_data[:, 2], return_inverse=True)
+
+        result = torch.zeros(len(feature_ids), len(step_ids))
+        values = long_format_data[:, 3]
+        result.index_put_((feature_idx, step_idx), values)
+
+        if self.target != "mortality":
+            raise NotImplementedError(f"Target {self.target} is not implemented")
+
+        # If person has an entry in the death table that is within visit_start_datetime and visit_end_datetime of the visit_occurrence table, report 1, else 0:
+        # Left join ensures that for every patient, 0 or 1 is obtained
+        omop_io_observation_table = self.edata.uns["omop_io_observation_table"]
+        time_postfix = "time" if self.datetime else ""
+        target_query = f"""
+        SELECT
+            CASE
+                WHEN death_datetime BETWEEN {DATA_TABLE_DATE_KEYS["start"][omop_io_observation_table]}{time_postfix} AND {DATA_TABLE_DATE_KEYS["end"][omop_io_observation_table]}{time_postfix} THEN 1
+                ELSE 0
+            END AS mortality
+        FROM {self.edata.uns["omop_io_observation_table"]}
+        LEFT JOIN death USING (person_id)
+        WHERE person_id = {person_id} AND {omop_io_observation_table}_id = {self.edata.obs[self.edata.obs["person_id"] == person_id][f"{omop_io_observation_table}_id"].item()}
+        """
+
+        targets = torch.tensor(self.con.execute(target_query).fetchall(), dtype=torch.float32)
+
+        return result, targets
diff --git a/tests/conftest.py b/tests/conftest.py
index a42fcb1..4930316 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -4,7 +4,7 @@
 from ehrdata.io.omop import setup_connection
 
 
-@pytest.fixture  # (scope="session")
+@pytest.fixture
 def omop_connection_vanilla():
     con = duckdb.connect()
     setup_connection(path="tests/data/toy_omop/vanilla", backend_handle=con)
@@ -12,7 +12,7 @@ def omop_connection_vanilla():
     con.close()
 
 
-@pytest.fixture  # (scope="session")
+@pytest.fixture
 def omop_connection_capital_letters():
     con = duckdb.connect()
     setup_connection(path="tests/data/toy_omop/capital_letters", backend_handle=con)
@@ -20,9 +20,17 @@ def omop_connection_capital_letters():
     con.close()
 
 
-@pytest.fixture  # (scope="session")
+@pytest.fixture
 def omop_connection_empty_observation():
     con = duckdb.connect()
     setup_connection(path="tests/data/toy_omop/empty_observation", backend_handle=con)
     yield con
     con.close()
+
+
+@pytest.fixture
+def omop_connection_multiple_units():
+    con = duckdb.connect()
+    setup_connection(path="tests/data/toy_omop/multiple_units", backend_handle=con)
+    yield con
+    con.close()
diff --git a/tests/data/toy_omop/multiple_units/observation.csv b/tests/data/toy_omop/multiple_units/observation.csv
new file mode 100644
index 0000000..82066fe
--- /dev/null
+++ b/tests/data/toy_omop/multiple_units/observation.csv
@@ -0,0 +1,5 @@
+observation_id,person_id,observation_concept_id,observation_date,observation_datetime,observation_type_concept_id,value_as_number,value_as_string,value_as_concept_id,qualifier_concept_id,unit_concept_id,provider_id,visit_occurrence_id,visit_detail_id,observation_source_value,observation_source_concept_id,unit_source_value,qualifier_source_value
+1,1,3001062,2100-01-01,2100-01-01 12:00:00,32817,,Anemia,0,,8587,,,,225059,2000030108,mL,
+2,1,3001062,2100-01-02,2100-01-02 12:00:00,32817,,Anemia,0,,9665,,,,225059,2000030108,uL,
+3,1,3034263,2100-01-01,2100-01-01 12:00:00,32817,5,,,,8587,,,,224409,2000030058,mL,
+4,1,3034263,2100-01-02,2100-01-02 12:00:00,32817,5,,,,9665,,,,224409,2000030058,uL,
diff --git a/tests/data/toy_omop/multiple_units/observation_period.csv b/tests/data/toy_omop/multiple_units/observation_period.csv
new file mode 100644
index 0000000..40b7351
--- /dev/null
+++ b/tests/data/toy_omop/multiple_units/observation_period.csv
@@ -0,0 +1,2 @@
+observation_period_id,person_id,observation_period_start_date,observation_period_end_date,period_type_concept_id
+1,1,2100-01-01,2100-01-31,32828
diff --git a/tests/data/toy_omop/multiple_units/person.csv b/tests/data/toy_omop/multiple_units/person.csv
new file mode 100644
index 0000000..0f13db9
--- /dev/null
+++ b/tests/data/toy_omop/multiple_units/person.csv
@@ -0,0 +1,2 @@
+person_id,gender_concept_id,year_of_birth,month_of_birth,day_of_birth,birth_datetime,race_concept_id,ethnicity_concept_id,location_id,provider_id,care_site_id,person_source_value,gender_source_value,gender_source_concept_id,race_source_value,race_source_concept_id,ethnicity_source_value,ethnicity_source_concept_id
+1,8507,2095,,,,0,38003563,,,,1234,M,0,,,,
diff --git a/tests/test_io/test_omop.py b/tests/test_io/test_omop.py
index ac426e7..0b42e51 100644
--- a/tests/test_io/test_omop.py
+++ b/tests/test_io/test_omop.py
@@ -821,3 +821,21 @@ def test_empty_observation(omop_connection_empty_observation, caplog):
     )
     assert edata.shape == (1, 0)
     assert "No data found in observation. Returning edata without additional variables." in caplog.text
+
+
+def test_multiple_units(omop_connection_multiple_units, caplog):
+    con = omop_connection_multiple_units
+    edata = ed.io.omop.setup_obs(backend_handle=con, observation_table="person_observation_period")
+    edata = ed.io.omop.setup_variables(
+        edata,
+        backend_handle=con,
+        data_tables=["observation"],
+        data_field_to_keep=["value_as_number"],
+        interval_length_number=1,
+        interval_length_unit="day",
+        num_intervals=2,
+        enrich_var_with_feature_info=False,
+        enrich_var_with_unit_info=False,
+    )
+    # assert edata.shape == (1, 0)
+    assert "multiple units for features: [[0]\n [1]]\n" in caplog.text
diff --git a/tests/test_tl/test_ehrdataset.py b/tests/test_tl/test_ehrdataset.py
new file mode 100644
index 0000000..3137b6c
--- /dev/null
+++ b/tests/test_tl/test_ehrdataset.py
@@ -0,0 +1,34 @@
+import torch
+
+import ehrdata as ed
+
+
+def test_ehrdataset_vanilla(omop_connection_vanilla):
+    num_intervals = 3
+    batch_size = 2
+    con = omop_connection_vanilla
+
+    edata = ed.io.omop.setup_obs(con, observation_table="person_observation_period", death_table=True)
+    edata = ed.io.omop.setup_variables(
+        edata,
+        backend_handle=con,
+        data_tables="measurement",
+        data_field_to_keep="value_as_number",
+        interval_length_number=1,
+        interval_length_unit="day",
+        num_intervals=num_intervals,
+        enrich_var_with_feature_info=False,
+        enrich_var_with_unit_info=False,
+        instantiate_tensor=False,
+    )
+
+    ehr_dataset = ed.tl.omop.EHRDataset(con, edata, batch_size=batch_size, datetime=False, idxs=None)
+    assert isinstance(ehr_dataset, torch.utils.data.Dataset)
+    single_item = next(iter(ehr_dataset))
+    assert single_item[0].shape == (2, num_intervals)
+    assert len(single_item[1]) == 1
+
+    loader = torch.utils.data.DataLoader(ehr_dataset, batch_size=batch_size)
+    batch = next(iter(loader))
+    assert batch[0].shape == (batch_size, 2, num_intervals)
+    assert len(batch[1]) == batch_size