From 42273a48f4876a366026bf346e7442146291c0fc Mon Sep 17 00:00:00 2001 From: "stuart.woodman" Date: Wed, 18 Sep 2024 11:47:05 +1000 Subject: [PATCH] Stop locally cached CSW records from disappearing when a CSW registry is unresponsive. --- .../portal/core/services/CSWCacheService.java | 63 ++++++++++++------- .../core/services/ElasticsearchService.java | 36 ++++++++++- 2 files changed, 73 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/auscope/portal/core/services/CSWCacheService.java b/src/main/java/org/auscope/portal/core/services/CSWCacheService.java index 22cce134e..6f591fa2e 100644 --- a/src/main/java/org/auscope/portal/core/services/CSWCacheService.java +++ b/src/main/java/org/auscope/portal/core/services/CSWCacheService.java @@ -535,19 +535,23 @@ private void mergeRecords(CSWServiceItem cswService, CSWRecord destination, CSWR // Merge CSWGeographicElements, only legitimate BBOX coords allowed Set geoElemSet = new HashSet(); - for (CSWGeographicElement geo : destination.getCSWGeographicElements()) { - if (geo != null) { - if (!geo.hasMissingCoords()) { - geoElemSet.add(geo); - } - } + if (destination.getCSWGeographicElements() != null) { + for (CSWGeographicElement geo : destination.getCSWGeographicElements()) { + if (geo != null) { + if (!geo.hasMissingCoords()) { + geoElemSet.add(geo); + } + } + } } - for (CSWGeographicElement geo : source.getCSWGeographicElements()) { - if (geo != null) { - if (!geo.hasMissingCoords()) { - geoElemSet.add(geo); - } - } + if (source.getCSWGeographicElements() != null) { + for (CSWGeographicElement geo : source.getCSWGeographicElements()) { + if (geo != null) { + if (!geo.hasMissingCoords()) { + geoElemSet.add(geo); + } + } + } } if (geoElemSet.size() > 0) { CSWGeographicElement geoElemArr[] = new CSWGeographicElement[geoElemSet.size()]; @@ -580,12 +584,14 @@ private void mergeRecords(CSWServiceItem cswService, CSWRecord destination, CSWR } } - /* + /** * After retrieving the current set of records from the endpoint, this * will update the application cache. + * + * @param cswRecordMap the CSW records */ private void updateAppCache(Map cswRecordMap) { - //After parent/children have been linked, begin the keyword merging and extraction + // After parent/children have been linked, begin the keyword merging and extraction synchronized (newKeywordCache) { synchronized (newRecordCache) { for (CSWRecord record : cswRecordMap.values()) { @@ -624,13 +630,6 @@ private void updateAppCache(Map cswRecordMap) { // Loop through existing records for (CSWRecord existingRec : newRecordCache) { - /* - if (record.getLayerName().equals("gsmlp:BoreholeView") && existingRec.getLayerName().equals("gsmlp:BoreholeView") && - (record.getFileIdentifier().equals("20f0650cc4cb09a1aaa06b7077c584130f9a502e") || record.getFileIdentifier().equals("49a7dce44a3520e465a5ce103941791908de692c"))) { - System.out.println("CSWCacheServiuce: SA Borehole: " + existingRec.getFileIdentifier()); - } - */ - // Loop through online resources of each record if (StringUtils.isEmpty(existingRec.getLayerName())) { continue; @@ -691,7 +690,6 @@ private void updateAppCache(Map cswRecordMap) { break; } - //If the record was NOT merged into an existing record we then update the record cache if (!recordMerged) { // Update the keyword cache @@ -706,6 +704,22 @@ private void updateAppCache(Map cswRecordMap) { } } } + + /** + * Get the cached CSWrecord map for the current service + * + * @return a Map of CSWRecords for the current servcie + */ + private Map getCachedCswRecordMap() { + Map recordMap = new HashMap(); + List recordList = elasticsearchService.getAllCSWRecordsForService(this.endpoint.getId()); + if (recordList != null) { + for (CSWRecord record: recordList) { + recordMap.put(record.getFileIdentifier(), record); + } + } + return recordMap; + } @Override public void run() { @@ -721,7 +735,6 @@ public void run() { record.setNoCache(true); record.setServiceName(this.endpoint.getTitle()); record.setServiceId(this.endpoint.getId()); - record.setRecordInfoUrl(this.endpoint.getRecordInformationUrl()); CSWOnlineResourceImpl cswResource = new CSWOnlineResourceImpl( @@ -804,6 +817,10 @@ public void run() { // Update the cache using the new records, if successfully // retrieved, or the cached version if not. Map cswRecordMap = this.cswRecordsCache.get(this.endpoint.getId()); + if (cswRecordMap == null || cswRecordMap.size() == 0) { + threadLog.info(String.format("Retrieving cached results for '%1$s", this.endpoint.getServiceUrl())); + cswRecordMap = this.getCachedCswRecordMap(); + } if (cswRecordMap != null && cswRecordMap.size() > 0) { updateAppCache(cswRecordMap); } else { diff --git a/src/main/java/org/auscope/portal/core/services/ElasticsearchService.java b/src/main/java/org/auscope/portal/core/services/ElasticsearchService.java index 6042adf7b..913a2ac83 100644 --- a/src/main/java/org/auscope/portal/core/services/ElasticsearchService.java +++ b/src/main/java/org/auscope/portal/core/services/ElasticsearchService.java @@ -200,7 +200,7 @@ public void updateCSWRecord(final CSWRecord cswRecord) throws DataAccessResource */ public void updateCSWRecords(final List cswRecords) throws DataAccessResourceFailureException { try { - List> batchRecords = Lists.partition(cswRecords, 100); + List> batchRecords = Lists.partition(cswRecords, 1000); for (List recordSet : batchRecords) { this.recordRepository.saveAll(recordSet); } @@ -221,8 +221,38 @@ public List getAllCSWRecords() { Query query = NativeQuery.builder() .withQuery(q -> q .matchAll(ma -> ma)) - .withFields("message") - .withPageable(PageRequest.of(0, 10)) + .withPageable(PageRequest.of(0, 1000)) + .build(); + SearchScrollHits scroll = elasticsearchTemplate.searchScrollStart(1000, query, CSWRecord.class, index); + String scrollId = scroll.getScrollId(); + while (scroll.hasSearchHits()) { + for (SearchHit searchHit: scroll.getSearchHits()) { + records.add(searchHit.getContent()); + } + scrollId = scroll.getScrollId(); + scroll = elasticsearchTemplate.searchScrollContinue(scrollId, 1000, CSWRecord.class, index); + } + elasticsearchTemplate.searchScrollClear(scrollId); + return records; + } + + /** + * Retrieve all CSWRecords from Elasticsearch index for a given service. + * Uses paging to get over 10,000 record maximum when using finalAll() with a repository. + * + * @return All indexed CSWRecords for a given service + * @param serviceId the ID of the service + * @return indexed CSWRecords for a given service + */ + public List getAllCSWRecordsForService(String serviceId) { + List records = new ArrayList(); + IndexCoordinates index = IndexCoordinates.of(cswRecordIndex); + Query query = NativeQuery.builder() + .withQuery(q -> q + .match(m -> m + .field("serviceId") + .query(serviceId))) + .withPageable(PageRequest.of(0, 1000)) .build(); SearchScrollHits scroll = elasticsearchTemplate.searchScrollStart(1000, query, CSWRecord.class, index); String scrollId = scroll.getScrollId();