Skip to content

Commit

Permalink
Stop locally cached CSW records from disappearing when a CSW registry…
Browse files Browse the repository at this point in the history
… is unresponsive.
  • Loading branch information
stuartwoodman committed Sep 18, 2024
1 parent 71db293 commit 42273a4
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 26 deletions.
63 changes: 40 additions & 23 deletions src/main/java/org/auscope/portal/core/services/CSWCacheService.java
Original file line number Diff line number Diff line change
Expand Up @@ -535,19 +535,23 @@ private void mergeRecords(CSWServiceItem cswService, CSWRecord destination, CSWR

// Merge CSWGeographicElements, only legitimate BBOX coords allowed
Set<CSWGeographicElement> geoElemSet = new HashSet<CSWGeographicElement>();
for (CSWGeographicElement geo : destination.getCSWGeographicElements()) {
if (geo != null) {
if (!geo.hasMissingCoords()) {
geoElemSet.add(geo);
}
}
if (destination.getCSWGeographicElements() != null) {
for (CSWGeographicElement geo : destination.getCSWGeographicElements()) {
if (geo != null) {
if (!geo.hasMissingCoords()) {
geoElemSet.add(geo);
}
}
}
}
for (CSWGeographicElement geo : source.getCSWGeographicElements()) {
if (geo != null) {
if (!geo.hasMissingCoords()) {
geoElemSet.add(geo);
}
}
if (source.getCSWGeographicElements() != null) {
for (CSWGeographicElement geo : source.getCSWGeographicElements()) {
if (geo != null) {
if (!geo.hasMissingCoords()) {
geoElemSet.add(geo);
}
}
}
}
if (geoElemSet.size() > 0) {
CSWGeographicElement geoElemArr[] = new CSWGeographicElement[geoElemSet.size()];
Expand Down Expand Up @@ -580,12 +584,14 @@ private void mergeRecords(CSWServiceItem cswService, CSWRecord destination, CSWR
}
}

/*
/**
* After retrieving the current set of records from the endpoint, this
* will update the application cache.
*
* @param cswRecordMap the CSW records
*/
private void updateAppCache(Map<String, CSWRecord> cswRecordMap) {
//After parent/children have been linked, begin the keyword merging and extraction
// After parent/children have been linked, begin the keyword merging and extraction
synchronized (newKeywordCache) {
synchronized (newRecordCache) {
for (CSWRecord record : cswRecordMap.values()) {
Expand Down Expand Up @@ -624,13 +630,6 @@ private void updateAppCache(Map<String, CSWRecord> cswRecordMap) {
// Loop through existing records
for (CSWRecord existingRec : newRecordCache) {

/*
if (record.getLayerName().equals("gsmlp:BoreholeView") && existingRec.getLayerName().equals("gsmlp:BoreholeView") &&
(record.getFileIdentifier().equals("20f0650cc4cb09a1aaa06b7077c584130f9a502e") || record.getFileIdentifier().equals("49a7dce44a3520e465a5ce103941791908de692c"))) {
System.out.println("CSWCacheServiuce: SA Borehole: " + existingRec.getFileIdentifier());
}
*/

// Loop through online resources of each record
if (StringUtils.isEmpty(existingRec.getLayerName())) {
continue;
Expand Down Expand Up @@ -691,7 +690,6 @@ private void updateAppCache(Map<String, CSWRecord> cswRecordMap) {
break;
}


//If the record was NOT merged into an existing record we then update the record cache
if (!recordMerged) {
// Update the keyword cache
Expand All @@ -706,6 +704,22 @@ private void updateAppCache(Map<String, CSWRecord> cswRecordMap) {
}
}
}

/**
* Get the cached CSWrecord map for the current service
*
* @return a Map<fileIdentifier, CSWRecord> of CSWRecords for the current servcie
*/
private Map<String, CSWRecord> getCachedCswRecordMap() {
Map<String, CSWRecord> recordMap = new HashMap<String, CSWRecord>();
List<CSWRecord> recordList = elasticsearchService.getAllCSWRecordsForService(this.endpoint.getId());
if (recordList != null) {
for (CSWRecord record: recordList) {
recordMap.put(record.getFileIdentifier(), record);
}
}
return recordMap;
}

@Override
public void run() {
Expand All @@ -721,7 +735,6 @@ public void run() {
record.setNoCache(true);
record.setServiceName(this.endpoint.getTitle());
record.setServiceId(this.endpoint.getId());

record.setRecordInfoUrl(this.endpoint.getRecordInformationUrl());

CSWOnlineResourceImpl cswResource = new CSWOnlineResourceImpl(
Expand Down Expand Up @@ -804,6 +817,10 @@ public void run() {
// Update the cache using the new records, if successfully
// retrieved, or the cached version if not.
Map<String, CSWRecord> cswRecordMap = this.cswRecordsCache.get(this.endpoint.getId());
if (cswRecordMap == null || cswRecordMap.size() == 0) {
threadLog.info(String.format("Retrieving cached results for '%1$s", this.endpoint.getServiceUrl()));
cswRecordMap = this.getCachedCswRecordMap();
}
if (cswRecordMap != null && cswRecordMap.size() > 0) {
updateAppCache(cswRecordMap);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void updateCSWRecord(final CSWRecord cswRecord) throws DataAccessResource
*/
public void updateCSWRecords(final List<CSWRecord> cswRecords) throws DataAccessResourceFailureException {
try {
List<List<CSWRecord>> batchRecords = Lists.partition(cswRecords, 100);
List<List<CSWRecord>> batchRecords = Lists.partition(cswRecords, 1000);
for (List<CSWRecord> recordSet : batchRecords) {
this.recordRepository.saveAll(recordSet);
}
Expand All @@ -221,8 +221,38 @@ public List<CSWRecord> getAllCSWRecords() {
Query query = NativeQuery.builder()
.withQuery(q -> q
.matchAll(ma -> ma))
.withFields("message")
.withPageable(PageRequest.of(0, 10))
.withPageable(PageRequest.of(0, 1000))
.build();
SearchScrollHits<CSWRecord> scroll = elasticsearchTemplate.searchScrollStart(1000, query, CSWRecord.class, index);
String scrollId = scroll.getScrollId();
while (scroll.hasSearchHits()) {
for (SearchHit<CSWRecord> searchHit: scroll.getSearchHits()) {
records.add(searchHit.getContent());
}
scrollId = scroll.getScrollId();
scroll = elasticsearchTemplate.searchScrollContinue(scrollId, 1000, CSWRecord.class, index);
}
elasticsearchTemplate.searchScrollClear(scrollId);
return records;
}

/**
* Retrieve all CSWRecords from Elasticsearch index for a given service.
* Uses paging to get over 10,000 record maximum when using finalAll() with a repository.
*
* @return All indexed CSWRecords for a given service
* @param serviceId the ID of the service
* @return indexed CSWRecords for a given service
*/
public List<CSWRecord> getAllCSWRecordsForService(String serviceId) {
List<CSWRecord> records = new ArrayList<CSWRecord>();
IndexCoordinates index = IndexCoordinates.of(cswRecordIndex);
Query query = NativeQuery.builder()
.withQuery(q -> q
.match(m -> m
.field("serviceId")
.query(serviceId)))
.withPageable(PageRequest.of(0, 1000))
.build();
SearchScrollHits<CSWRecord> scroll = elasticsearchTemplate.searchScrollStart(1000, query, CSWRecord.class, index);
String scrollId = scroll.getScrollId();
Expand Down

0 comments on commit 42273a4

Please sign in to comment.