Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Commit

Permalink
Fix duplicated graph bug by using the modifiers and returning them fi…
Browse files Browse the repository at this point in the history
…rst (#33)

* Fix bug by using modifiers and returning them first

* optimized terms info retrieval for survival query

Co-authored-by: Francesco Marino <[email protected]>
  • Loading branch information
romainbou and f-marino authored Oct 27, 2022
1 parent c13b528 commit 819bb25
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 107 deletions.
235 changes: 129 additions & 106 deletions pkg/datasource/statistics_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ func (ds I2b2DataSource) StatisticsQueryHandler(
return nil, nil, fmt.Errorf("decoding parameters: %v", err)
}

if err = json.Unmarshal(jsonParameters, decodedParams); err != nil {
return nil, nil, fmt.Errorf("decoding parameters: %v", err)
}
statResults, err := ds.StatisticsQuery(userID, decodedParams)
if err != nil {
return nil, nil, fmt.Errorf("executing statistics query: %v", err)
Expand Down Expand Up @@ -124,89 +121,95 @@ func (ds I2b2DataSource) StatisticsQuery(userID string, params *models.Statistic
return nil, ds.logError("while retrieving ontology elements for statistics query: %v", err)
}

waitGroup := new(sync.WaitGroup)
ontologyElementsNumber := len(conceptsInfo) + len(modifiersInfo)
waitGroup.Add(ontologyElementsNumber)

// the observations for each analyte are processed and sent in statsChannels
statsChannels := make([]chan struct {
counts []int64
statsResult *models.StatsResult
}, ontologyElementsNumber)
errChan := make(chan error)
signal := make(chan struct{})

// this function is an abstraction for the retrieving and processing of observations
processMedicalConcept := func(index int, searchResultElement *models.SearchResultElement, RetrieveObservations func(string, []int64, int64) (observations []database.StatsObservation, err error)) {

defer waitGroup.Done()

logrus.Debugf("retrieving observations for ontology element: %s", searchResultElement.Path)
conceptObservations, err := RetrieveObservations(searchResultElement.Code, patientList, params.MinObservations)
if err != nil {
errChan <- err
return
}
logrus.Debugf("retrieved: %d observations for ontology element: %s", len(conceptObservations), searchResultElement.Path)

cleanObservations, err := outlierRemoval(conceptObservations)
if err != nil {
errChan <- err
return
}
logrus.Debugf("observations for ontology element: %s after outliers removal: %d", searchResultElement.Path, len(conceptObservations))

counts, statsResults, err := ds.processObservations(cleanObservations, params.MinObservations, params.BucketSize)
if err != nil {
errChan <- err
return
}

statsResults.AnalyteName = searchResultElement.Name
syncConceptsCounts := &sync.Map{}
syncConceptsStats := &sync.Map{}
syncModifiersCounts := &sync.Map{}
syncModifiersStats := &sync.Map{}
syncErrMap := &sync.Map{}

wg := sync.WaitGroup{}
// Query Concepts
for i, conceptInfo := range conceptsInfo {
wg.Add(1)
go func(i int, conceptInfo *models.SearchResultElement) {
defer wg.Done()
// query
conceptObservations, err := ds.db.RetrieveObservationsForConcept(
conceptInfo.Code,
patientList,
params.MinObservations,
)
if err != nil {
syncErrMap.Store(i, err)
return
}

statsChannels[index] <- struct {
counts []int64
statsResult *models.StatsResult
}{counts: counts, statsResult: statsResults}
cleanObservations, err := outlierRemoval(conceptObservations)
if err != nil {
syncErrMap.Store(i, err)
return
}
counts, statsResults, err := ds.processObservations(cleanObservations, params.MinObservations, params.BucketSize)
if err != nil {
syncErrMap.Store(i, err)
return
}
syncConceptsCounts.Store(i, counts)
syncConceptsStats.Store(i, statsResults)

}(i, conceptInfo)
}

for i, concept := range conceptsInfo {
statsChannels[i] = make(chan struct {
counts []int64
statsResult *models.StatsResult
}, 1)
go processMedicalConcept(i, concept, ds.db.RetrieveObservationsForConcept)
}
// Query Modifiers
for i, modifierInfo := range modifiersInfo {
wg.Add(1)
go func(i int, modifierInfo *models.SearchResultElement) {
defer wg.Done()
// query
modifierObservations, err := ds.db.RetrieveObservationsForModifier(
modifierInfo.Code,
patientList,
params.MinObservations,
)
if err != nil {
syncErrMap.Store(i, err)
return
}

nbConcepts := len(conceptsInfo)
for j, modifier := range modifiersInfo {
cleanObservations, err := outlierRemoval(modifierObservations)
if err != nil {
syncErrMap.Store(i, err)
return
}
counts, statsResults, err := ds.processObservations(cleanObservations, params.MinObservations, params.BucketSize)
if err != nil {
syncErrMap.Store(i, err)
return
}
syncModifiersCounts.Store(i, counts)
syncModifiersStats.Store(i, statsResults)

index := nbConcepts + j
statsChannels[index] = make(chan struct {
counts []int64
statsResult *models.StatsResult
}, 1)
go processMedicalConcept(index, modifier, ds.db.RetrieveObservationsForModifier)
}(i, modifierInfo)
}
wg.Wait()

go func() {
waitGroup.Wait()
signal <- struct{}{}
}()

select {
case err = <-errChan:
return
case <-signal:
break
// Convert err sync map to map
err = returnErrorFromMap(syncErrMap)
if err != nil {
return nil, err
}
// Retrieve modifiers stats from sync map
syncModifiersStats.Range(func(index, result interface{}) bool {
statResults = append(statResults, result.(*models.StatsResult))
return true
})

// Retrieve concepts stats from sync map
syncConceptsStats.Range(func(index, result interface{}) bool {
statResults = append(statResults, result.(*models.StatsResult))
return true
})

// We fetch the histogram information for each analyte within each channel that contains such information and append this information to the HTTP response.
for _, statResultChannel := range statsChannels {
statResult := <-statResultChannel
statResults = append(statResults, statResult.statsResult)
}
return
}

Expand All @@ -216,47 +219,24 @@ func (ds I2b2DataSource) getOntologyElementsInfoForStatisticsQuery(concepts []*m

modifiersNumber := 0
for _, concept := range concepts {
if concept.Modifier.Key == "" {
if concept.Modifier.Key != "" {
modifiersNumber++
}
}

waitGroup := &sync.WaitGroup{}
waitGroup.Add(len(concepts) + modifiersNumber)
logrus.Debugf("total number of ontology elements: %d", len(concepts)+modifiersNumber)
waitGroup.Add(len(concepts))
logrus.Debugf("total number of ontology elements: %d", len(concepts))
signal := make(chan struct{})
conceptsChannels := make([]chan *models.SearchResultElement, len(concepts))
conceptsChannels := make([]chan *models.SearchResultElement, len(concepts)-modifiersNumber)
modifiersChannels := make([]chan *models.SearchResultElement, modifiersNumber)
errChan := make(chan error)

currentModifiersChannel := 0
for i, concept := range concepts {

conceptsChannels[i] = make(chan *models.SearchResultElement, 1)

go func(conceptPath string, index int) {
defer waitGroup.Done()

//fetch the code and name of the concept
conceptInfo, err := ds.SearchConcept(&models.SearchConceptParameters{
Path: conceptPath,
Operation: models.SearchInfoOperation,
})
if err != nil {
errChan <- fmt.Errorf("while retrieving code for concept %s: %v", conceptPath, err)
return
} else if len(conceptInfo.SearchResultElements) > 1 {
errMsg := fmt.Sprintf("while retrieving concept code, got too many concepts for path %s:", conceptPath)
for _, searchResultElement := range conceptInfo.SearchResultElements {
errMsg = fmt.Sprintf("%s %s,", err, searchResultElement.Path)
}
errChan <- fmt.Errorf("%v", strings.TrimSuffix(errMsg, ","))
}
logrus.Debugf("got concept code for concept %s: %s ", conceptInfo.SearchResultElements[0].Name, conceptInfo.SearchResultElements[0].Code)
conceptsChannels[index] <- conceptInfo.SearchResultElements[0]
}(concept.QueryTerm, i)
currentConceptsChannel := 0
for _, concept := range concepts {

if concept.Modifier.Key == "" {
if concept.Modifier.Key != "" {

modifiersChannels[currentModifiersChannel] = make(chan *models.SearchResultElement, 1)

Expand Down Expand Up @@ -287,6 +267,32 @@ func (ds I2b2DataSource) getOntologyElementsInfoForStatisticsQuery(concepts []*m
}(concept.Modifier.Key, concept.Modifier.AppliedPath, currentModifiersChannel)

currentModifiersChannel++
} else {
conceptsChannels[currentConceptsChannel] = make(chan *models.SearchResultElement, 1)

go func(conceptPath string, index int) {
defer waitGroup.Done()

//fetch the code and name of the concept
conceptInfo, err := ds.SearchConcept(&models.SearchConceptParameters{
Path: conceptPath,
Operation: models.SearchInfoOperation,
})
if err != nil {
errChan <- fmt.Errorf("while retrieving code for concept %s: %v", conceptPath, err)
return
} else if len(conceptInfo.SearchResultElements) > 1 {
errMsg := fmt.Sprintf("while retrieving concept code, got too many concepts for path %s:", conceptPath)
for _, searchResultElement := range conceptInfo.SearchResultElements {
errMsg = fmt.Sprintf("%s %s,", err, searchResultElement.Path)
}
errChan <- fmt.Errorf("%v", strings.TrimSuffix(errMsg, ","))
}
logrus.Debugf("got concept code for concept %s: %s ", conceptInfo.SearchResultElements[0].Name, conceptInfo.SearchResultElements[0].Code)
conceptsChannels[index] <- conceptInfo.SearchResultElements[0]
}(concept.QueryTerm, currentConceptsChannel)

currentConceptsChannel++
}
}

Expand Down Expand Up @@ -491,3 +497,20 @@ func std(observations []database.StatsObservation, meanOfObs float64) float64 {

return math.Sqrt(sigmaSquared)
}

func returnErrorFromMap(syncErrMap *sync.Map) (err error) {
var errors = make([]error, 0)
syncErrMap.Range(func(index, err interface{}) bool {
if err != nil {
errors = append(errors, err.(error))
return false
}
return true
})
for _, err := range errors {
if err != nil {
return err
}
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/datasource/survival_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (ds I2b2DataSource) SurvivalQueryHandler(userID string, jsonParameters []by

decodedParams := &models.SurvivalQueryParameters{}
if outputDataObjectsSharedIDs[outputNameSurvivalQueryResult] == "" {
return nil, nil, fmt.Errorf("missing output data object name")
return nil, nil, fmt.Errorf("missing output data object name for survival query result")
} else if err = json.Unmarshal(jsonParameters, decodedParams); err != nil {
return nil, nil, fmt.Errorf("decoding parameters: %v", err)
}
Expand Down

0 comments on commit 819bb25

Please sign in to comment.