Skip to content

Commit

Permalink
[proxima-direct-core] Discard invalid cached response observer in Tra…
Browse files Browse the repository at this point in the history
…nsactionResourceManager
  • Loading branch information
je-ik committed Dec 3, 2024
1 parent 2c65eff commit 3726f74
Showing 1 changed file with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ class CachedTransaction implements AutoCloseable {

@Getter final String transactionId;
long touched = System.currentTimeMillis();
final Map<AttributeDescriptor<?>, DirectAttributeFamilyDescriptor> attributeToFamily =
new HashMap<>();
final Map<AttributeDescriptor<?>, DirectAttributeFamilyDescriptor> attributeToFamily;
final Map<String, CompletableFuture<Response>> requestFutures = new ConcurrentHashMap<>();
@Nullable OnlineAttributeWriter requestWriter;
@Nullable OnlineAttributeWriter commitWriter;
Expand All @@ -176,11 +175,11 @@ class CachedTransaction implements AutoCloseable {

CachedTransaction(String transactionId, Collection<KeyAttribute> attributes) {
this.transactionId = transactionId;
this.attributeToFamily.putAll(
this.attributeToFamily =
findFamilyForTransactionalAttribute(
attributes.stream()
.map(KeyAttribute::getAttributeDescriptor)
.collect(Collectors.toList())));
.collect(Collectors.toList()));
}

CompletableFuture<Response> open(List<KeyAttribute> inputAttrs) {
Expand Down Expand Up @@ -242,11 +241,16 @@ public CompletableFuture<Response> rollback() {

private CompletableFuture<?> sendRequest(Request request, String requestId) {
Pair<List<Integer>, OnlineAttributeWriter> writerWithAssignedPartitions = getRequestWriter();
Preconditions.checkState(
!writerWithAssignedPartitions.getFirst().isEmpty(),
"Received empty partitions to observe for responses to transactional "
+ "requests. Please see if you have enough partitions and if your clients can correctly "
+ "resolve hostnames");
if (writerWithAssignedPartitions.getFirst().isEmpty()) {
IllegalStateException ex =
new IllegalStateException(
"Received empty partitions to observe for responses to transactional "
+ "requests. Please see if you have enough partitions and if your clients can correctly "
+ "resolve hostnames.");
log.error("Failed to open transaction. Clearing cache.", ex);
closeClientResponseFamily();
throw ex;
}
CompletableFuture<?> res = new CompletableFuture<>();
writerWithAssignedPartitions
.getSecond()
Expand Down Expand Up @@ -288,6 +292,12 @@ Pair<List<Integer>, OnlineAttributeWriter> getRequestWriter() {
requestWriter);
}

void closeClientResponseFamily() {
DirectAttributeFamilyDescriptor responseFamily = attributeToFamily.get(responseDesc);
Optional.ofNullable(clientObservedFamilies.remove(responseFamily))
.ifPresent(h -> h.getObserveHandle().close());
}

public DirectAttributeFamilyDescriptor getResponseFamily() {
return Objects.requireNonNull(attributeToFamily.get(responseDesc));
}
Expand Down

0 comments on commit 3726f74

Please sign in to comment.