7
7
package org .gridsuite .study .server .elasticsearch ;
8
8
9
9
import co .elastic .clients .elasticsearch ._types .FieldValue ;
10
+ import co .elastic .clients .elasticsearch ._types .aggregations .*;
11
+ import co .elastic .clients .elasticsearch ._types .aggregations .Aggregation ;
10
12
import co .elastic .clients .elasticsearch ._types .query_dsl .*;
11
13
import com .powsybl .iidm .network .VariantManagerConstants ;
12
14
import org .apache .commons .lang3 .StringUtils ;
15
+ import org .apache .commons .lang3 .tuple .Pair ;
16
+ import org .gridsuite .study .server .dto .BasicEquipmentInfos ;
13
17
import org .gridsuite .study .server .dto .EquipmentInfos ;
14
18
import org .gridsuite .study .server .dto .TombstonedEquipmentInfos ;
19
+ import org .slf4j .Logger ;
20
+ import org .slf4j .LoggerFactory ;
15
21
import org .springframework .data .domain .PageRequest ;
16
- import org .springframework .data .elasticsearch .client .elc .NativeQuery ;
17
- import org .springframework .data .elasticsearch .client .elc .NativeQueryBuilder ;
18
- import org .springframework .data .elasticsearch .client .elc .Queries ;
22
+ import org .springframework .data .elasticsearch .client .elc .*;
19
23
import org .springframework .data .elasticsearch .core .ElasticsearchOperations ;
20
24
import org .springframework .data .elasticsearch .core .SearchHit ;
25
+ import org .springframework .data .elasticsearch .core .SearchHits ;
21
26
import org .springframework .lang .NonNull ;
22
27
import org .springframework .stereotype .Service ;
23
28
@@ -41,6 +46,8 @@ public enum FieldSelector {
41
46
42
47
private static final int PAGE_MAX_SIZE = 400 ;
43
48
49
+ private static final int COMPOSITE_AGGREGATION_BATCH_SIZE = 1000 ;
50
+
44
51
public static final Map <String , Integer > EQUIPMENT_TYPE_SCORES = Map .ofEntries (
45
52
entry ("SUBSTATION" , 15 ),
46
53
entry ("VOLTAGE_LEVEL" , 14 ),
@@ -72,6 +79,8 @@ public enum FieldSelector {
72
79
73
80
private final ElasticsearchOperations elasticsearchOperations ;
74
81
82
+ private static final Logger LOGGER = LoggerFactory .getLogger (EquipmentInfosService .class );
83
+
75
84
public EquipmentInfosService (EquipmentInfosRepository equipmentInfosRepository , TombstonedEquipmentInfosRepository tombstonedEquipmentInfosRepository , ElasticsearchOperations elasticsearchOperations ) {
76
85
this .equipmentInfosRepository = equipmentInfosRepository ;
77
86
this .tombstonedEquipmentInfosRepository = tombstonedEquipmentInfosRepository ;
@@ -105,6 +114,101 @@ public long getEquipmentInfosCount() {
105
114
return equipmentInfosRepository .count ();
106
115
}
107
116
117
+ private CompositeAggregation buildCompositeAggregation (String field , Map <String , FieldValue > afterKey ) {
118
+ List <Map <String , CompositeAggregationSource >> sources = List .of (
119
+ Map .of (field , CompositeAggregationSource .of (s -> s .terms (t -> t .field (field + ".keyword" )))
120
+ )
121
+ );
122
+
123
+ CompositeAggregation .Builder compositeAggregationBuilder = new CompositeAggregation .Builder ()
124
+ .size (COMPOSITE_AGGREGATION_BATCH_SIZE )
125
+ .sources (sources );
126
+
127
+ if (afterKey != null ) {
128
+ compositeAggregationBuilder .after (afterKey );
129
+ }
130
+
131
+ return compositeAggregationBuilder .build ();
132
+ }
133
+
134
+ /**
135
+ * Constructs a NativeQuery with a composite aggregation.
136
+ *
137
+ * @param compositeName The name of the composite aggregation.
138
+ * @param compositeAggregation The composite aggregation configuration.
139
+ * @return A NativeQuery object configured with the specified composite aggregation.
140
+ */
141
+ private NativeQuery buildCompositeAggregationQuery (String compositeName , CompositeAggregation compositeAggregation ) {
142
+ Aggregation aggregation = Aggregation .of (a -> a .composite (compositeAggregation ));
143
+
144
+ return new NativeQueryBuilder ()
145
+ .withAggregation (compositeName , aggregation )
146
+ .build ();
147
+ }
148
+
149
+ /**
150
+ * This method is used to extract the results of a composite aggregation from Elasticsearch search hits.
151
+ *
152
+ * @param searchHits The search hits returned from an Elasticsearch query.
153
+ * @param compositeName The name of the composite aggregation.
154
+ * @return A Pair consisting of two elements:
155
+ * The left element of the Pair is a list of maps, where each map represents a bucket's key. Each bucket is a result of the composite aggregation.
156
+ * The right element of the Pair is the afterKey map, which is used for pagination in Elasticsearch.
157
+ * If there are no more pages, the afterKey will be null.
158
+ * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html">Elasticsearch Composite Aggregation Documentation</a>
159
+ */
160
+ private Pair <List <Map <String , FieldValue >>, Map <String , FieldValue >> extractCompositeAggregationResults (SearchHits <EquipmentInfos > searchHits , String compositeName ) {
161
+ ElasticsearchAggregations aggregations = (ElasticsearchAggregations ) searchHits .getAggregations ();
162
+
163
+ List <Map <String , FieldValue >> results = new ArrayList <>();
164
+ if (aggregations != null ) {
165
+ Map <String , ElasticsearchAggregation > aggregationList = aggregations .aggregationsAsMap ();
166
+ if (!aggregationList .isEmpty ()) {
167
+ Aggregate aggregate = aggregationList .get (compositeName ).aggregation ().getAggregate ();
168
+ if (aggregate .isComposite () && aggregate .composite () != null ) {
169
+ for (CompositeBucket bucket : aggregate .composite ().buckets ().array ()) {
170
+ Map <String , FieldValue > key = bucket .key ();
171
+ results .add (key );
172
+ }
173
+ return Pair .of (results , aggregate .composite ().afterKey ());
174
+ }
175
+ }
176
+ }
177
+ return Pair .of (results , null );
178
+ }
179
+
180
+ public List <UUID > getEquipmentInfosDistinctNetworkUuids () {
181
+ List <UUID > networkUuids = new ArrayList <>();
182
+ Map <String , FieldValue > afterKey = null ;
183
+ String compositeName = "composite_agg" ;
184
+ String networkUuidField = BasicEquipmentInfos .Fields .networkUuid ;
185
+
186
+ do {
187
+ CompositeAggregation compositeAggregation = buildCompositeAggregation (networkUuidField , afterKey );
188
+ NativeQuery query = buildCompositeAggregationQuery (compositeName , compositeAggregation );
189
+
190
+ SearchHits <EquipmentInfos > searchHits = elasticsearchOperations .search (query , EquipmentInfos .class );
191
+ Pair <List <Map <String , FieldValue >>, Map <String , FieldValue >> searchResults = extractCompositeAggregationResults (searchHits , compositeName );
192
+
193
+ searchResults .getLeft ().stream ()
194
+ .map (result -> result .get (networkUuidField ))
195
+ .filter (Objects ::nonNull )
196
+ .map (FieldValue ::stringValue )
197
+ .map (UUID ::fromString )
198
+ .forEach (networkUuids ::add );
199
+
200
+ afterKey = searchResults .getRight ();
201
+ } while (afterKey != null && !afterKey .isEmpty ());
202
+
203
+ return networkUuids ;
204
+ }
205
+
206
+ public List <UUID > getOrphanEquipmentInfosNetworkUuids (List <UUID > networkUuidsInDatabase ) {
207
+ List <UUID > networkUuids = getEquipmentInfosDistinctNetworkUuids ();
208
+ networkUuids .removeAll (networkUuidsInDatabase );
209
+ return networkUuids ;
210
+ }
211
+
108
212
public long getTombstonedEquipmentInfosCount () {
109
213
return tombstonedEquipmentInfosRepository .count ();
110
214
}
0 commit comments