Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
[refactor] Unify ds query (#39)
Browse files Browse the repository at this point in the history
* Update Query interface

* Update SDK version

* Fix lint issue

* WIP gosum

* Bump sdk-datasource

* Fix bug and test with unified query

* Fix e2e tests with new unified query output

* updated api description and other small adjustments

* updated sdk

---------

Co-authored-by: Francesco Marino <[email protected]>
  • Loading branch information
romainbou and f-marino authored Mar 19, 2023
1 parent 79a1d7e commit edf7890
Show file tree
Hide file tree
Showing 11 changed files with 167 additions and 48 deletions.
35 changes: 30 additions & 5 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,17 @@ Retrieve patient IDs from i2b2 based on explore query terms.
}]
}],
"timing": "any|samevisit|sameinstancenum"
}
},
"outputDataObjectsSharedIDs": [
{
"name": "count",
"sharedID": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
},
{
"name": "patientList",
"sharedID": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxy"
}
]
}
```

Expand Down Expand Up @@ -241,6 +251,7 @@ Retrieve patient IDs from i2b2 based on explore query terms.
- `EQUAL`
- `GREATEREQUAL`
- `GREATER`
- `outputDataObjectsSharedIDs`: the mapping between the names of the dataobjects generated by the operation and the related shared IDs

## Output Data Objects Shared IDs
- `count`: integer containing the count of patients
Expand Down Expand Up @@ -292,8 +303,8 @@ Retrieve the list of saved cohorts.
- `error`: query has errored
- `definition`: definition of the query (see above for syntax)
- `outputDataObjectsSharedIDs`:
- `Count`: data object shared ID of the count
- `PatientList`: data object shared ID of the patient list
- `count`: data object shared ID of the count
- `patientList`: data object shared ID of the patient list

# addCohort
Add a cohort.
Expand Down Expand Up @@ -355,6 +366,12 @@ Run survival query.

}
}
],
"outputDataObjectsSharedIDs": [
{
"name": "survivalQueryResult",
"sharedID": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
}
]
}
```
Expand All @@ -371,6 +388,7 @@ Run survival query.
- `subGroupsDefinitions`: subgroups definitions
- `name`: name of the subgroup
- `constraint`: `definition` as defined in exploreQuery parameters
- `outputDataObjectsSharedIDs`: the mapping between the names of the dataobjects generated by the operation and the related shared IDs

## Output Data Objects Shared IDs
- `survivalQueryResult`: vector of integers containing the flattened event groups
Expand All @@ -395,15 +413,22 @@ Run statistics query.

],
"bucketSize": 1.5,
"minObservations": 2
"minObservations": 2,
"outputDataObjectsSharedIDs": [
{
"name": "statisticsQueryResult",
"sharedID": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
}
]
}
```

- `id`: ID of the statistics query, must be an UUID
- `constraint`: `definition` as defined in exploreQuery parameters
- `analytes`: the concepts (see `conceptItems` in "exploreQuery") used as analytes of the statistics query
- `bucketSize`: bucket size for each analyte (float64)
- `minObservations`: the minimum observation value for each analyte.
- `minObservations`: the minimum observation value for each analyte
- `outputDataObjectsSharedIDs`: the mapping between the names of the dataobjects generated by the operation and the related shared IDs

## Output Data Objects Shared IDs
- `statisticsQueryResult`: matrix of integers containing the number of observations for each analyte and each bucket.
Expand Down
2 changes: 1 addition & 1 deletion deployments/i2b2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
intra-node:

postgresql:
image: postgres:14.2
image: postgres:14.5-alpine
ports:
- "5433:5432"
environment:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/lib/pq v1.10.7
github.com/sirupsen/logrus v1.9.0
github.com/stretchr/testify v1.8.2
github.com/tuneinsight/sdk-datasource v0.0.0-20230303164329-ca4a5c4d52cc
github.com/tuneinsight/sdk-datasource v0.0.0-20230316155502-58f2858c050c
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
)
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,10 @@ github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tuneinsight/sdk-datasource v0.0.0-20230303164329-ca4a5c4d52cc h1:uzxBZ39BSgnIXGnRke2xRvRnTa0q/t17qxyWv8MA9Fw=
github.com/tuneinsight/sdk-datasource v0.0.0-20230303164329-ca4a5c4d52cc/go.mod h1:3qkeXpB+3QnoC5okQICtdb141hpg3BBGcUdMd4fDLdU=
github.com/tuneinsight/sdk-datasource v0.0.0-20230306185928-2bb2a9231383 h1:IkvJIw0lEZUuQW+vcKWl49oK3RLto8hw6/RcpTJnuBU=
github.com/tuneinsight/sdk-datasource v0.0.0-20230306185928-2bb2a9231383/go.mod h1:3qkeXpB+3QnoC5okQICtdb141hpg3BBGcUdMd4fDLdU=
github.com/tuneinsight/sdk-datasource v0.0.0-20230316155502-58f2858c050c h1:EtSun9qp5RxJ7QcHWfNUdT02gjB5ZOnVnP0UxAcFXjY=
github.com/tuneinsight/sdk-datasource v0.0.0-20230316155502-58f2858c050c/go.mod h1:3qkeXpB+3QnoC5okQICtdb141hpg3BBGcUdMd4fDLdU=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
Expand Down
50 changes: 42 additions & 8 deletions pkg/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,43 @@ func (ds *I2b2DataSource) GetDataSourceCustomData() map[string]interface{} {
}

// Query implements the data source interface Query function.
func (ds *I2b2DataSource) Query(userID string, operation string, jsonParameters []byte, outputDataObjectsSharedIDs map[sdk.OutputDataObjectName]sdkmodels.DataObjectSharedID) (jsonResults []byte, outputDataObjects []sdk.DataObject, err error) {
ds.logger.Infof("executing operation %v for user %v", operation, userID)
ds.logger.Debugf("parameters: %v", string(jsonParameters))
func (ds *I2b2DataSource) Query(userID string, params map[string]interface{}, resultKeys ...string) (map[string]interface{}, error) {
operation, ok := params["operation"].(string)
if !ok {
return nil, fmt.Errorf("operation not specified")
}

jsonParams, ok := params["params"].([]byte)
if !ok {
stringParams, ok := params["params"].(string)
if !ok {
return nil, fmt.Errorf("params not specified")
}
jsonParams = []byte(stringParams)
}

// Get outputDataObjectsSharedIDs from the jsonParams
var unmarshaledJSON map[string]interface{}
if err := json.Unmarshal(jsonParams, &unmarshaledJSON); err != nil {
return nil, err
}

var outputDataObjectsSharedIDs map[sdk.OutputDataObjectName]sdkmodels.DataObjectSharedID = make(map[sdk.OutputDataObjectName]sdkmodels.DataObjectSharedID)
if params["outputDataObjectsSharedIDs"] != nil {
outputDataObjectsSharedIDs = params["outputDataObjectsSharedIDs"].(map[sdk.OutputDataObjectName]sdkmodels.DataObjectSharedID)
}
if unmarshaledJSON["outputDataObjectsSharedIDs"] != nil {
for k, v := range unmarshaledJSON["outputDataObjectsSharedIDs"].(map[string]interface{}) {
outputDataObjectsSharedIDs[sdk.OutputDataObjectName(k)] = sdkmodels.DataObjectSharedID(v.(string))
}
}

span := telemetry.StartSpan(ds.Ctx, "datasource:i2b2", "Query:"+operation)
defer span.End()

ds.logger.Infof("executing operation %v for user %v", operation, userID)
ds.logger.Debugf("parameters: %v", string(jsonParams))

var handler OperationHandler
switch Operation(operation) {
case OperationSearchConcept:
Expand All @@ -272,16 +302,20 @@ func (ds *I2b2DataSource) Query(userID string, operation string, jsonParameters
handler = ds.StatisticsQueryHandler

default:
return nil, nil, ds.logError(fmt.Sprintf("unknown query requested (%v)", operation), nil)
return nil, ds.logError(fmt.Sprintf("unknown operation requested (%v)", operation), nil)
}

if jsonResults, outputDataObjects, err = handler(userID, jsonParameters, outputDataObjectsSharedIDs); err != nil {
return nil, nil, ds.logError(fmt.Sprintf("executing operation %v", operation), err)
jsonResults, outputDataObjects, err := handler(userID, jsonParams, outputDataObjectsSharedIDs)
if err != nil {
return nil, ds.logError(fmt.Sprintf("executing operation %v", operation), err)
}

ds.logger.Infof("successfully executed operation %v for user %v", operation, userID)
ds.logger.Debugf("results: %v", string(jsonResults))
return

results := make(map[string]interface{})
results[sdk.DefaultResultKey] = jsonResults
results[sdk.OutputDataObjectsKey] = outputDataObjects
return results, nil
}

// logError creates and logs an error.
Expand Down
83 changes: 58 additions & 25 deletions pkg/datasource/datasource_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datasource

import (
"encoding/json"
"fmt"
"os"
"testing"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tuneinsight/geco-i2b2-data-source/pkg/datasource/database"
gecomodels "github.com/tuneinsight/sdk-datasource/pkg/models"
"github.com/tuneinsight/sdk-datasource/pkg/sdk"
gecosdk "github.com/tuneinsight/sdk-datasource/pkg/sdk"
"github.com/tuneinsight/sdk-datasource/pkg/sdk/credentials"
)
Expand Down Expand Up @@ -62,7 +64,8 @@ func TestQuery(t *testing.T) {
defer dataSourceCleanUp(t, ds)

params := `{"path": "/", "operation": "children"}`
res, _, err := ds.Query("testUser", "searchConcept", []byte(params), nil)
results, err := ds.Query("testUser", map[string]interface{}{sdk.QueryOperation: "searchConcept", sdk.QueryParams: params})
res := results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
t.Logf("result: %v", string(res))
}
Expand All @@ -71,12 +74,31 @@ func TestQueryDataObject(t *testing.T) {
ds := getDataSource(t)
defer dataSourceCleanUp(t, ds)

params := `{"id": "99999999-9999-1122-0000-999999999999", "patientList": true, "definition": {"selectionPanels": [{"conceptItems": [{"queryTerm": "/TEST/test/1/"}]}]}}`
sharedIDs := map[gecosdk.OutputDataObjectName]gecomodels.DataObjectSharedID{
outputNameExploreQueryCount: "99999999-9999-9999-1111-999999999999",
outputNameExploreQueryPatientList: "99999999-9999-9999-0000-999999999999",
}
res, do, err := ds.Query("testUser", "exploreQuery", []byte(params), sharedIDs)

jsonSharedIDs, _ := json.Marshal(sharedIDs)

params := `{
"id": "99999999-9999-1122-0000-999999999999",
"patientList": true,
"definition": {
"selectionPanels": [{
"conceptItems": [{
"queryTerm": "/TEST/test/1/"
}]
}]
},
"outputDataObjectsSharedIDs": ` + string(jsonSharedIDs) + `
}`

results, err := ds.Query("testUser", map[string]interface{}{sdk.QueryOperation: "exploreQuery", sdk.QueryParams: params})
res := results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
do := results[sdk.OutputDataObjectsKey].([]sdk.DataObject)

require.NoError(t, err)

require.EqualValues(t, 3, *do[0].IntValue)
Expand Down Expand Up @@ -122,30 +144,40 @@ func testWorkflow(t *testing.T, ds *I2b2DataSource) {

// search the ontology by browsing it
params := `{"path": "/", "operation": "children"}`
res, do, err := ds.Query(user, "searchConcept", []byte(params), nil)
results, err := ds.Query(user, map[string]interface{}{sdk.QueryOperation: "searchConcept", sdk.QueryParams: params})
res := results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
require.Empty(t, do)

require.Contains(t, string(res), "Test Ontology")

params = `{"path": "/TEST/test/", "operation": "children"}`
res, do, err = ds.Query(user, "searchConcept", []byte(params), nil)
results, err = ds.Query(user, map[string]interface{}{sdk.QueryOperation: "searchConcept", sdk.QueryParams: params})
res = results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
require.Empty(t, do)

require.Contains(t, string(res), "Concept 1")

params = `{"path": "/TEST/test/1/", "operation": "concept"}`
res, do, err = ds.Query(user, "searchModifier", []byte(params), nil)
results, err = ds.Query(user, map[string]interface{}{sdk.QueryOperation: "searchModifier", sdk.QueryParams: params})
res = results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
require.Empty(t, do)

require.Contains(t, string(res), "Modifier 1")

// OR search the ontology by searching for a specific item.
params = `{"searchString": "Modifier 1", "limit": "10"}`
res, do, err = ds.Query(user, string(OperationSearchOntology), []byte(params), nil)
results, err = ds.Query(user, map[string]interface{}{sdk.QueryOperation: string(OperationSearchOntology), sdk.QueryParams: params})
res = results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
require.Empty(t, do)

require.Contains(t, string(res), "Modifier 1")

sharedIDs := map[gecosdk.OutputDataObjectName]gecomodels.DataObjectSharedID{
outputNameExploreQueryCount: "99999999-9999-9999-1111-999999999999",
outputNameExploreQueryPatientList: "99999999-9999-9999-0000-999999999999",
}
jsonSharedIDs, _ := json.Marshal(sharedIDs)

// execute query
queryID := "99999999-9999-9999-9999-999999999999"
params = fmt.Sprintf(`{
Expand Down Expand Up @@ -182,14 +214,14 @@ func testWorkflow(t *testing.T, ds *I2b2DataSource) {
]
}
]
}
},
"outputDataObjectsSharedIDs": `+string(jsonSharedIDs)+`
}`, queryID)
sharedIDs := map[gecosdk.OutputDataObjectName]gecomodels.DataObjectSharedID{
outputNameExploreQueryCount: "99999999-9999-9999-1111-999999999999",
outputNameExploreQueryPatientList: "99999999-9999-9999-0000-999999999999",
}
_, do, err = ds.Query(user, "exploreQuery", []byte(params), sharedIDs)

results, err = ds.Query(user, map[string]interface{}{sdk.QueryOperation: "exploreQuery", sdk.QueryParams: params})
require.NoError(t, err)

do := results[sdk.OutputDataObjectsKey].([]sdk.DataObject)
require.EqualValues(t, 2, len(do))
for i := range do {
if do[i].OutputName == outputNameExploreQueryCount {
Expand All @@ -202,29 +234,30 @@ func testWorkflow(t *testing.T, ds *I2b2DataSource) {
}

// save cohort

projectID := "99999999-9999-9999-1111-999999999999"
params = fmt.Sprintf(`{"name": "mycohort", "exploreQueryID": "%s", "projectID": "%s"}`, queryID, projectID)
res, do, err = ds.Query(user, "addCohort", []byte(params), nil)
results, err = ds.Query(user, map[string]interface{}{sdk.QueryOperation: "addCohort", sdk.QueryParams: params})
res = results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
require.Empty(t, do)
require.EqualValues(t, "", string(res))

params = fmt.Sprintf(`{"projectID": "%s"}`, projectID)
res, do, err = ds.Query(user, "getCohorts", []byte(params), nil)
results, err = ds.Query(user, map[string]interface{}{sdk.QueryOperation: "getCohorts", sdk.QueryParams: params})
res = results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
require.Empty(t, do)
require.Contains(t, string(res), "mycohort")

params = fmt.Sprintf(`{"name": "mycohort", "exploreQueryID": "%s", "projectID": "%s"}`, queryID, projectID)
res, do, err = ds.Query(user, "deleteCohort", []byte(params), nil)
results, err = ds.Query(user, map[string]interface{}{sdk.QueryOperation: "deleteCohort", sdk.QueryParams: params})
res = results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
require.Empty(t, do)
require.EqualValues(t, "", string(res))

params = fmt.Sprintf(`{"projectID": "%s", "limit": 7}`, projectID)
res, do, err = ds.Query(user, "getCohorts", []byte(params), nil)
results, err = ds.Query(user, map[string]interface{}{sdk.QueryOperation: "getCohorts", sdk.QueryParams: params})
res = results[sdk.DefaultResultKey].([]byte)
require.NoError(t, err)
require.Empty(t, do)
require.NotContains(t, string(res), "mycohort")

}
8 changes: 6 additions & 2 deletions pkg/datasource/explore_query_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datasource

import (
"encoding/json"
"fmt"
"strconv"
"testing"
Expand All @@ -9,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tuneinsight/geco-i2b2-data-source/pkg/datasource/models"
gecomodels "github.com/tuneinsight/sdk-datasource/pkg/models"
"github.com/tuneinsight/sdk-datasource/pkg/sdk"
gecosdk "github.com/tuneinsight/sdk-datasource/pkg/sdk"
)

Expand Down Expand Up @@ -302,12 +304,14 @@ func TestExploreQueryDatabase(t *testing.T) {
countSharedID := "44444444-7777-8888-4444-444444444444"
patientListSharedID := "44444444-7777-4444-7121-444444444444"

params := fmt.Sprintf(`{"id": "%v", "definition": {"selectionPanels": [{"conceptItems": [{"queryTerm": "/TEST/test/1/"}]}]}}`, queryID)
sharedIDs := map[gecosdk.OutputDataObjectName]gecomodels.DataObjectSharedID{
outputNameExploreQueryCount: gecomodels.DataObjectSharedID(countSharedID),
outputNameExploreQueryPatientList: gecomodels.DataObjectSharedID(patientListSharedID),
}
_, _, err := ds.Query("testUser", "exploreQuery", []byte(params), sharedIDs)
jsonSharedIDs, _ := json.Marshal(sharedIDs)

params := fmt.Sprintf(`{"id": "%v", "definition": {"selectionPanels": [{"conceptItems": [{"queryTerm": "/TEST/test/1/"}]}]},"outputDataObjectsSharedIDs": `+string(jsonSharedIDs)+`}`, queryID)
_, err := ds.Query("testUser", map[string]interface{}{sdk.QueryOperation: "exploreQuery", sdk.QueryParams: params})
require.NoError(t, err)

query, err := ds.db.GetExploreQuery("testUser", queryID)
Expand Down
Loading

0 comments on commit edf7890

Please sign in to comment.