Skip to content

Commit 3858711

Browse files
committed
all basic key points implemented
1 parent 9a6f784 commit 3858711

File tree

3 files changed

+127
-57
lines changed

3 files changed

+127
-57
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import co.elastic.clients.elasticsearch.core.BulkRequest;
2525
import co.elastic.clients.elasticsearch.core.BulkResponse;
2626
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
27+
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
2728
import co.elastic.clients.transport.BackoffPolicy;
2829
import co.elastic.clients.transport.TransportOptions;
2930
import co.elastic.clients.util.ApiTypeHelper;
@@ -33,9 +34,9 @@
3334

3435
import javax.annotation.Nullable;
3536
import java.time.Duration;
36-
import java.time.Instant;
3737
import java.util.ArrayList;
3838
import java.util.Collections;
39+
import java.util.Iterator;
3940
import java.util.List;
4041
import java.util.Optional;
4142
import java.util.concurrent.CompletionStage;
@@ -66,6 +67,7 @@ public class BulkIngester<Context> implements AutoCloseable {
6667

6768
private @Nullable ScheduledFuture<?> flushTask;
6869
private @Nullable ScheduledExecutorService scheduler;
70+
private @Nullable ScheduledExecutorService retryScheduler;
6971
private boolean isExternalScheduler = false;
7072
private BackoffPolicy backoffPolicy;
7173

@@ -81,7 +83,7 @@ public class BulkIngester<Context> implements AutoCloseable {
8183
private final FnCondition addCondition = new FnCondition(lock, this::canAddOperation);
8284
private final FnCondition sendRequestCondition = new FnCondition(lock, this::canSendRequest);
8385
private final FnCondition closeCondition = new FnCondition(lock, this::closedAndFlushed);
84-
private AtomicInteger listenerInProgressCount = new AtomicInteger();
86+
private final AtomicInteger listenerInProgressCount = new AtomicInteger();
8587

8688
private static class RequestExecution<Context> {
8789
public final long id;
@@ -138,6 +140,21 @@ private BulkIngester(Builder<Context> builder) {
138140
if (backoffPolicy == null) {
139141
backoffPolicy = BackoffPolicy.noBackoff();
140142
}
143+
// preparing a scheduler that will trigger flushes when it finds enqueued requests ready to be retried
144+
// TODO should we just keep a single scheduler?
145+
else {
146+
retryScheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> {
147+
Thread t = Executors.defaultThreadFactory().newThread(r);
148+
t.setName("bulk-ingester-retry#" + ingesterId + "#" + t.getId());
149+
t.setDaemon(true);
150+
return t;
151+
});
152+
retryScheduler.scheduleWithFixedDelay(
153+
this::retryFlush,
154+
1000,1000, // TODO should we hardcode this?
155+
TimeUnit.MILLISECONDS
156+
);
157+
}
141158
}
142159

143160
//----- Getters
@@ -283,9 +300,20 @@ private void failsafeFlush() {
283300
}
284301
}
285302

303+
// triggers a flush if it finds queued retries
304+
private void retryFlush() {
305+
try {
306+
if (operations.stream().anyMatch(op -> op.getRetries() != null && op.isSendable())) {
307+
flush();
308+
}
309+
} catch (Throwable thr) {
310+
// Log the error and continue
311+
logger.error("Error in background flush", thr);
312+
}
313+
}
314+
286315
public void flush() {
287-
// Keeping sent operations for possible retries
288-
List<BulkOperationRepeatable<Context>> requestsSent = new ArrayList<>();
316+
List<BulkOperationRepeatable<Context>> sentRequests = new ArrayList<>();
289317
RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
290318
() -> {
291319
// May happen on manual and periodic flushes
@@ -294,7 +322,7 @@ public void flush() {
294322
() -> {
295323
// Selecting operations that can be sent immediately
296324
List<BulkOperationRepeatable<Context>> immediateOpsRep = operations.stream()
297-
.filter(BulkOperationRepeatable::canRetry)
325+
.filter(BulkOperationRepeatable::isSendable)
298326
.collect(Collectors.toList());
299327

300328
// Dividing actual operations from contexts
@@ -309,11 +337,12 @@ public void flush() {
309337
// Build the request
310338
BulkRequest request = newRequest().operations(immediateOps).build();
311339

312-
List<Context> requestContexts = contexts.isEmpty() ? Collections.nCopies(immediateOpsRep.size(),
313-
null) : contexts;
340+
List<Context> requestContexts = contexts.isEmpty() ?
341+
Collections.nCopies(immediateOpsRep.size(),
342+
null) : contexts;
314343

315344
// Prepare for next round
316-
requestsSent.addAll(immediateOpsRep);
345+
sentRequests.addAll(immediateOpsRep);
317346
operations.removeAll(immediateOpsRep);
318347
currentSize = operations.size();
319348
addCondition.signalIfReady();
@@ -340,18 +369,43 @@ public void flush() {
340369
// A request was actually sent
341370
exec.futureResponse.handle((resp, thr) -> {
342371
if (resp != null) {
343-
// Success
344-
if (listener != null) {
345-
listenerInProgressCount.incrementAndGet();
346-
scheduler.submit(() -> {
347-
try {
348-
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
349-
} finally {
350-
if (listenerInProgressCount.decrementAndGet() == 0) {
351-
closeCondition.signalIfReady();
372+
// Success? Checking if total or partial
373+
List<BulkResponseItem> failedRequestsCanRetry = resp.items().stream()
374+
.filter(i -> i.error() != null && i.status() == 429)
375+
.collect(Collectors.toList());
376+
377+
if (failedRequestsCanRetry.isEmpty() || !backoffPolicy.equals(BackoffPolicy.noBackoff())) {
378+
// Total success! ...or there's no retry policy implemented. Either way, can call
379+
// listener after bulk
380+
if (listener != null) {
381+
listenerInProgressCount.incrementAndGet();
382+
scheduler.submit(() -> {
383+
try {
384+
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
385+
} finally {
386+
if (listenerInProgressCount.decrementAndGet() == 0) {
387+
closeCondition.signalIfReady();
388+
}
352389
}
390+
});
391+
}
392+
} else {
393+
// Partial success, retrying failed requests if policy allows it
394+
// Getting original requests
395+
for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) {
396+
int index = resp.items().indexOf(bulkItemResponse);
397+
BulkOperationRepeatable<Context> original = sentRequests.get(index);
398+
if (original.getRetries().hasNext()) {
399+
Iterator<Long> retries =
400+
Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator());
401+
addRetry(new BulkOperationRepeatable<>(original.getOperation(),
402+
original.getContext(), retries));
403+
// TODO remove after checking
404+
assert (bulkItemResponse.operationType().toString().equals(sentRequests.get(index).getOperation()._kind().toString()));
353405
}
354-
});
406+
// TODO should print some message?
407+
408+
}
355409
}
356410
} else {
357411
// Failure
@@ -383,7 +437,8 @@ public void add(BulkOperation operation, Context context) {
383437
throw new IllegalStateException("Ingester has been closed");
384438
}
385439

386-
BulkOperationRepeatable<Context> repeatableOp = new BulkOperationRepeatable<>(operation, context, Optional.empty());
440+
BulkOperationRepeatable<Context> repeatableOp = new BulkOperationRepeatable<>(operation, context,
441+
null);
387442

388443
innerAdd(repeatableOp);
389444
}

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkOperationRepeatable.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@
1010
public class BulkOperationRepeatable<Context> {
1111
private final BulkOperation operation;
1212
private final Context context;
13-
private final Optional<Iterator<Long>> retries;
14-
private Instant nextRetry;
13+
private final Iterator<Long> retries;
14+
private Instant retryTime;
1515

16-
public BulkOperationRepeatable(BulkOperation request, Context context, Optional<Iterator<Long>> retries) {
16+
public BulkOperationRepeatable(BulkOperation request, Context context, Iterator<Long> retries) {
1717
this.operation = request;
1818
this.context = context;
1919
this.retries = retries;
20-
this.nextRetry = Instant.now().plus(retries.map(Iterator::next).orElse(0L), ChronoUnit.MILLIS);
20+
// if the retries iterator is null it means that it's not a retry, otherwise calculating retry time
21+
this.retryTime = Optional.ofNullable(retries).map(r -> Instant.now().plus(r.next(), ChronoUnit.MILLIS)).orElse(Instant.now());
2122
}
2223

2324
public BulkOperation getOperation() {
@@ -28,15 +29,19 @@ public Context getContext() {
2829
return context;
2930
}
3031

31-
public Optional<Iterator<Long>> getRetries() {
32+
public Iterator<Long> getRetries() {
3233
return retries;
3334
}
3435

35-
public Instant getNextRetry() {
36-
return this.nextRetry;
37-
}
36+
// public Instant getCurrentRetryTime() {
37+
// return this.retryTime;
38+
// }
39+
//
40+
// public Instant getNextRetryTime() {
41+
// return Instant.now().plus(retries.next(), ChronoUnit.MILLIS);
42+
// }
3843

39-
public boolean canRetry() {
40-
return this.retries.map(Iterator::hasNext).orElse(true);
44+
public boolean isSendable() {
45+
return retryTime.isBefore(Instant.now());
4146
}
4247
}

0 commit comments

Comments
 (0)