From fe62b059ff82c678ec4f8a0f43861d47009a4830 Mon Sep 17 00:00:00 2001
From: Peter Moser
Date: Wed, 2 Mar 2022 17:03:21 +0100
Subject: [PATCH] writer: add provenance info to each call from dc-interfaces
---
calls.http | 2 +-
.../java/it/bz/idm/bdp/json/JSONPusher.java | 113 +++++++++++-------
.../idm/bdp/json/NonBlockingJSONPusher.java | 100 ++++++++++++----
.../it/bz/idm/bdp/writer/JsonController.java | 6 +-
4 files changed, 151 insertions(+), 70 deletions(-)
diff --git a/calls.http b/calls.http
index bc780215..84d4efc8 100644
--- a/calls.http
+++ b/calls.http
@@ -59,7 +59,7 @@ GET {{host}}/json/getDateOfLastRecord/MeteoStation?stationId=T0009&period=900
Authorization: Bearer {{authtoken}}
### Post some data to the writer
-POST {{host}}/json/provenance
+POST {{host}}/json/provenance?prn=test&prv=11111
Content-Type: application/json
Authorization: Bearer {{authtoken}}
diff --git a/dc-interface/src/main/java/it/bz/idm/bdp/json/JSONPusher.java b/dc-interface/src/main/java/it/bz/idm/bdp/json/JSONPusher.java
index eb7d2fec..dbf2508a 100644
--- a/dc-interface/src/main/java/it/bz/idm/bdp/json/JSONPusher.java
+++ b/dc-interface/src/main/java/it/bz/idm/bdp/json/JSONPusher.java
@@ -27,12 +27,16 @@
import javax.annotation.PostConstruct;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import it.bz.idm.bdp.DataPusher;
import it.bz.idm.bdp.dto.DataMapDto;
import it.bz.idm.bdp.dto.DataTypeDto;
+import it.bz.idm.bdp.dto.ProvenanceDto;
import it.bz.idm.bdp.dto.RecordDtoImpl;
import it.bz.idm.bdp.dto.StationDto;
import it.bz.idm.bdp.dto.StationList;
@@ -68,25 +72,36 @@ public void init() {
public Object pushData(String datasourceName, DataMapDto extends RecordDtoImpl> dto) {
this.pushProvenance();
dto.setProvenance(this.provenance.getUuid());
- return restTemplate.postForObject(
- url + PUSH_RECORDS + "{datasourceName}?prn={}&prv={}",
- dto,
- Object.class,
- datasourceName,
- provenance.getDataCollector(),
- provenance.getDataCollectorVersion()
- );
+ return restTemplate
+ .exchange(
+ url + PUSH_RECORDS + "{datasourceName}?prn={}&prv={}",
+ HttpMethod.POST,
+ new HttpEntity>(dto),
+ Object.class,
+ datasourceName,
+ provenance.getDataCollector(),
+ provenance.getDataCollectorVersion()
+ )
+ .getBody();
}
private void pushProvenance() {
- String provenanceUuid = restTemplate.postForObject(
+ // We know that the provenance exist, and which UUID it has.
+ // So we do not need to get that information again from the DB
+ // This approach assumes, that the DB will not change from any
+ // other caller.
+ if (this.provenance.getUuid() != null) {
+ return;
+ }
+ ResponseEntity provenanceUuid = restTemplate.exchange(
url + PROVENANCE + "?prn={}&prv={}",
- this.provenance,
+ HttpMethod.POST,
+ new HttpEntity(this.provenance),
String.class,
provenance.getDataCollector(),
provenance.getDataCollectorVersion()
);
- this.provenance.setUuid(provenanceUuid);
+ this.provenance.setUuid(provenanceUuid.getBody());
}
public Object pushData(DataMapDto extends RecordDtoImpl> dto) {
@@ -102,27 +117,33 @@ public Object syncStations(StationList data) {
public Object syncStations(String datasourceName, StationList data) {
if (data == null)
return null;
- return restTemplate.postForObject(
- url + SYNC_STATIONS + "{datasourceName}?prn={}&prv={}",
- data,
- Object.class,
- datasourceName,
- provenance.getDataCollector(),
- provenance.getDataCollectorVersion()
- );
+ return restTemplate
+ .exchange(
+ url + SYNC_STATIONS + "{datasourceName}?prn={}&prv={}",
+ HttpMethod.POST,
+ new HttpEntity(data),
+ Object.class,
+ datasourceName,
+ provenance.getDataCollector(),
+ provenance.getDataCollectorVersion()
+ )
+ .getBody();
}
@Override
public Object syncDataTypes(String datasourceName, List data) {
if (data == null)
return null;
- return restTemplate.postForObject(
- url + SYNC_DATA_TYPES + "?prn={}&prv={}",
- data,
- Object.class,
- provenance.getDataCollector(),
- provenance.getDataCollectorVersion()
- );
+ return restTemplate
+ .exchange(
+ url + SYNC_DATA_TYPES + "?prn={}&prv={}",
+ HttpMethod.POST,
+ new HttpEntity>(data),
+ Object.class,
+ provenance.getDataCollector(),
+ provenance.getDataCollectorVersion()
+ )
+ .getBody();
}
public Object syncDataTypes(List data) {
@@ -131,16 +152,17 @@ public Object syncDataTypes(List data) {
@Override
public Object getDateOfLastRecord(String stationCode, String dataType, Integer period) {
- return restTemplate.getForObject(
- url + GET_DATE_OF_LAST_RECORD + "{datasourceName}/?stationId={stationId}&typeId={dataType}&period={period}&prn={}&prv={}",
- Date.class,
- this.integreenTypology,
- stationCode,
- dataType,
- period,
- provenance.getDataCollector(),
- provenance.getDataCollectorVersion()
- );
+ return restTemplate
+ .getForObject(
+ url + GET_DATE_OF_LAST_RECORD + "{datasourceName}/?stationId={stationId}&typeId={dataType}&period={period}&prn={}&prv={}",
+ Date.class,
+ this.integreenTypology,
+ stationCode,
+ dataType,
+ period,
+ provenance.getDataCollector(),
+ provenance.getDataCollectorVersion()
+ );
}
@Override
@@ -150,16 +172,15 @@ public void connectToDataCenterCollector() {
@Override
public List fetchStations(String datasourceName, String origin) {
- if (datasourceName == null)
- datasourceName = this.integreenTypology;
- StationDto[] object = restTemplate.getForObject(
- url + STATIONS +"{datasourceName}/?origin={origin}&prn={}&prv={}",
- StationDto[].class,
- datasourceName,
- origin,
- provenance.getDataCollector(),
- provenance.getDataCollectorVersion()
- );
+ StationDto[] object = restTemplate
+ .getForObject(
+ url + STATIONS +"{datasourceName}/?origin={origin}&prn={}&prv={}",
+ StationDto[].class,
+ datasourceName == null ? this.integreenTypology : datasourceName,
+ origin,
+ provenance.getDataCollector(),
+ provenance.getDataCollectorVersion()
+ );
return Arrays.asList(object);
}
diff --git a/dc-interface/src/main/java/it/bz/idm/bdp/json/NonBlockingJSONPusher.java b/dc-interface/src/main/java/it/bz/idm/bdp/json/NonBlockingJSONPusher.java
index 3d2745ff..b5191ee0 100644
--- a/dc-interface/src/main/java/it/bz/idm/bdp/json/NonBlockingJSONPusher.java
+++ b/dc-interface/src/main/java/it/bz/idm/bdp/json/NonBlockingJSONPusher.java
@@ -72,13 +72,33 @@ public void init() {
public Object pushData(String datasourceName, DataMapDto extends RecordDtoImpl> dto) {
this.pushProvenance();
dto.setProvenance(this.provenance.getUuid());
- return client.post().uri(PUSH_RECORDS + datasourceName).body(Mono.just(dto), Object.class).retrieve()
- .bodyToMono(Object.class).block();
+ return client
+ .post()
+ .uri(uriBuilder -> uriBuilder
+ .path(PUSH_RECORDS + datasourceName)
+ .queryParam("prn", provenance.getDataCollector())
+ .queryParam("prv", provenance.getDataCollectorVersion())
+ .build()
+ )
+ .body(Mono.just(dto), Object.class)
+ .retrieve()
+ .bodyToMono(Object.class)
+ .block();
}
private void pushProvenance() {
- String provenanceUuid = client.post().uri(PROVENANCE).body(Mono.just(this.provenance), ProvenanceDto.class).retrieve()
- .bodyToMono(String.class).block();
+ String provenanceUuid = client
+ .post()
+ .uri(uriBuilder -> uriBuilder
+ .path(PROVENANCE)
+ .queryParam("prn", provenance.getDataCollector())
+ .queryParam("prv", provenance.getDataCollectorVersion())
+ .build()
+ )
+ .body(Mono.just(this.provenance), ProvenanceDto.class)
+ .retrieve()
+ .bodyToMono(String.class)
+ .block();
this.provenance.setUuid(provenanceUuid);
}
@@ -95,16 +115,36 @@ public Object syncStations(StationList data) {
public Object syncStations(String datasourceName, StationList data) {
if (data == null)
return null;
- return client.post().uri(SYNC_STATIONS + datasourceName).body(Mono.just(data), Object.class).retrieve()
- .bodyToMono(Object.class).block();
+ return client
+ .post()
+ .uri(uriBuilder -> uriBuilder
+ .path(SYNC_STATIONS + datasourceName)
+ .queryParam("prn", provenance.getDataCollector())
+ .queryParam("prv", provenance.getDataCollectorVersion())
+ .build()
+ )
+ .body(Mono.just(data), Object.class)
+ .retrieve()
+ .bodyToMono(Object.class)
+ .block();
}
@Override
public Object syncDataTypes(String datasourceName, List data) {
if (data == null)
return null;
- return client.post().uri(SYNC_DATA_TYPES).body(Mono.just(data), Object.class).retrieve()
- .bodyToMono(Object.class).block();
+ return client
+ .post()
+ .uri(uriBuilder -> uriBuilder
+ .path(SYNC_DATA_TYPES)
+ .queryParam("prn", provenance.getDataCollector())
+ .queryParam("prv", provenance.getDataCollectorVersion())
+ .build()
+ )
+ .body(Mono.just(data), Object.class)
+ .retrieve()
+ .bodyToMono(Object.class)
+ .block();
}
public Object syncDataTypes(List data) {
@@ -113,12 +153,20 @@ public Object syncDataTypes(List data) {
@Override
public Object getDateOfLastRecord(String stationCode, String dataType, Integer period) {
- return client.get().uri(uriBuilder -> uriBuilder
- .path(GET_DATE_OF_LAST_RECORD + this.integreenTypology)
- .queryParam("stationId", stationCode)
- .queryParam("typeId", dataType)
- .queryParam("period", period).build())
- .retrieve().bodyToMono(Date.class).block();
+ return client
+ .get()
+ .uri(uriBuilder -> uriBuilder
+ .path(GET_DATE_OF_LAST_RECORD + this.integreenTypology)
+ .queryParam("stationId", stationCode)
+ .queryParam("typeId", dataType)
+ .queryParam("period", period)
+ .queryParam("prn", provenance.getDataCollector())
+ .queryParam("prv", provenance.getDataCollectorVersion())
+ .build()
+ )
+ .retrieve()
+ .bodyToMono(Date.class)
+ .block();
}
@Override
@@ -128,11 +176,18 @@ public void connectToDataCenterCollector() {
@Override
public List fetchStations(String datasourceName, String origin) {
- final String uri = STATIONS + datasourceName;
- if (datasourceName == null)
- datasourceName = this.integreenTypology;
- StationDto[] object = client.get().uri(uriBuilder->uriBuilder.path(uri).queryParam("origin", "{origin}").build(origin)).retrieve()
- .bodyToMono(StationDto[].class).block();
+ StationDto[] object = client
+ .get()
+ .uri(uriBuilder->uriBuilder
+ .path(datasourceName == null ? STATIONS + this.integreenTypology : STATIONS + datasourceName)
+ .queryParam("origin", "{origin}")
+ .queryParam("prn", provenance.getDataCollector())
+ .queryParam("prv", provenance.getDataCollectorVersion())
+ .build(origin)
+ )
+ .retrieve()
+ .bodyToMono(StationDto[].class)
+ .block();
return Arrays.asList(object);
}
@@ -147,7 +202,12 @@ public Object addEvents(List dtos) {
}
return client
.post()
- .uri(EVENTS)
+ .uri(uriBuilder->uriBuilder
+ .path(EVENTS)
+ .queryParam("prn", provenance.getDataCollector())
+ .queryParam("prv", provenance.getDataCollectorVersion())
+ .build()
+ )
.body(Mono.just(dtos), Object.class)
.retrieve()
.bodyToMono(Object.class)
diff --git a/writer/src/main/java/it/bz/idm/bdp/writer/JsonController.java b/writer/src/main/java/it/bz/idm/bdp/writer/JsonController.java
index 1056e0f2..8e42a1d6 100644
--- a/writer/src/main/java/it/bz/idm/bdp/writer/JsonController.java
+++ b/writer/src/main/java/it/bz/idm/bdp/writer/JsonController.java
@@ -100,8 +100,8 @@ public ResponseEntity dateOfLastRecord(
HttpServletRequest request,
@PathVariable("integreenTypology") String stationType,
@RequestParam("stationId") String stationId,
- @RequestParam(value="typeId", required=false) String typeId,
- @RequestParam(value="period", required=false) Integer period,
+ @RequestParam(value = "typeId", required = false) String typeId,
+ @RequestParam(value = "period", required = false) Integer period,
@RequestParam(value = "prn", required = false) String proveanceName,
@RequestParam(value = "prv", required = false) String provenanceVersion
) {
@@ -113,7 +113,7 @@ public ResponseEntity dateOfLastRecord(
public Object stationsGetList(
HttpServletRequest request,
@PathVariable("integreenTypology") String stationType,
- @RequestParam(value="origin", required=false) String origin,
+ @RequestParam(value = "origin", required = false) String origin,
@RequestParam(value = "prn", required = false) String proveanceName,
@RequestParam(value = "prv", required = false) String provenanceVersion
) {