Skip to content

Commit

Permalink
bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
l-trotta committed Jan 22, 2025
1 parent 3858711 commit 76dad4e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private BulkIngester(Builder<Context> builder) {
});
retryScheduler.scheduleWithFixedDelay(
this::retryFlush,
1000,1000, // TODO should we hardcode this?
1000, 1000, // TODO should we hardcode this?
TimeUnit.MILLISECONDS
);
}
Expand Down Expand Up @@ -374,7 +374,7 @@ public void flush() {
.filter(i -> i.error() != null && i.status() == 429)
.collect(Collectors.toList());

if (failedRequestsCanRetry.isEmpty() || !backoffPolicy.equals(BackoffPolicy.noBackoff())) {
if (failedRequestsCanRetry.isEmpty() || backoffPolicy.equals(BackoffPolicy.noBackoff())) {
// Total success! ...or there's no retry policy implemented. Either way, can call
// listener after bulk
if (listener != null) {
Expand All @@ -395,16 +395,18 @@ public void flush() {
for (BulkResponseItem bulkItemResponse : failedRequestsCanRetry) {
int index = resp.items().indexOf(bulkItemResponse);
BulkOperationRepeatable<Context> original = sentRequests.get(index);
if (original.getRetries().hasNext()) {
if (original.canRetry()) {
Iterator<Long> retries =
Optional.ofNullable(original.getRetries()).orElse(backoffPolicy.iterator());
addRetry(new BulkOperationRepeatable<>(original.getOperation(),
original.getContext(), retries));
// TODO remove after checking
assert (bulkItemResponse.operationType().toString().equals(sentRequests.get(index).getOperation()._kind().toString()));
}
// TODO should print some message?

else{
System.out.println("Retries finished");
// TODO should print some message?
}
}
}
} else {
Expand Down Expand Up @@ -514,6 +516,10 @@ public void close() {
if (scheduler != null && !isExternalScheduler) {
scheduler.shutdownNow();
}

if (retryScheduler != null) {
retryScheduler.shutdownNow();
}
}

//----------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ public Iterator<Long> getRetries() {
return retries;
}

// public Instant getCurrentRetryTime() {
// return this.retryTime;
// }
//
// public Instant getNextRetryTime() {
// return Instant.now().plus(retries.next(), ChronoUnit.MILLIS);
// }
public Instant getCurrentRetryTime() {
return this.retryTime;
}

public boolean canRetry() {
return Optional.ofNullable(retries).map(Iterator::hasNext).orElse(true);
}

public boolean isSendable() {
return retryTime.isBefore(Instant.now());
Expand Down

0 comments on commit 76dad4e

Please sign in to comment.