Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DLQ API for unsideline & GetMessages #168

Open
wants to merge 59 commits into
base: master
Choose a base branch
from
Open

DLQ API for unsideline & GetMessages #168

wants to merge 59 commits into from

Conversation

kmrdhruv
Copy link
Collaborator

Changes for DLQ APIs.

  • Unsideline requests
  • GetMessages
  • Support for multi permission check in route behavior.

TODO:
Tests to be added post initial review.

…ution and updates on subscription operation for a subscription
@anuj-flipkart anuj-flipkart linked an issue Oct 24, 2024 that may be closed by this pull request
Copy link

Test Results

280 tests   280 ✅  54s ⏱️
 55 suites    0 💤
 55 files      0 ❌

Results for commit 7468d98.

@gauravAshok
Copy link
Collaborator

we should start evaluating java 21.
most of the control flow will be simplified if we do that.

import java.util.List;

@Data
public class GetMessagesRequest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should put all entities specific to web api in a web package. preferably versioned? as in "web.v1."?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aligned, will take independent of this PR. There are other entities as well, which would need to be moved.


@Data
public class GetMessagesResponse {
private int shardId;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why should shardId be part of response?

public static final long UNSPECIFIED_TS = 0L;
public static final long LATEST_TS = Long.MAX_VALUE;

//TODO:: evaluate if earliestFailedAt should be per shard level to support pagination kind of semantics.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am hoping that shard semantics, is totally transparent and is not leaking into public api. Shard is just "a" implementation of scaling subscription, but not the only one. sharding strategies may also differ. so an operation suitable for one shard strategy might not work for another shard strategy.

So, I was hoping that most operations are agnostic of "shards", if any operation needs to be mapped then it is done either by "server" or "controller". most likely controller as that guy has the know how.


//TODO:: evaluate if earliestFailedAt should be per shard level to support pagination kind of semantics.
private long earliestFailedAt = 0L;
private List<Offset> offsets = new ArrayList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does list of offsets represent here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. It looks it is for paging. But as discussed, it can't just be a pulsar offset. we also need the shard info embedded in it.

import java.util.concurrent.CompletableFuture;

public interface ConsumerApi {
CompletableFuture<Void> start(ShardOperation.StartData operation);

CompletableFuture<Void> stop(ShardOperation.StopData operation);

CompletableFuture<Void> unsideline(ShardOperation.UnsidelineData operation);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does Future<> as the return value mean here? for unsideline op?
Given that the unsideline operation may take extended amount of time.

@gauravAshok
Copy link
Collaborator

I reviewed it largely.

I haven't check all corner cases, as I am still going through the overall architecture and how all classes are placed and interacting. Post that I will be able to comment on the corner cases.

There are common patterns in controller that I feel can be simplified a lot. (with proper abstraction). Instead of each opExecutor handling its corner cases, I think all the complications can be abstracted away. I will think about this.


@Data
@AllArgsConstructor
public class UnsidelineOpRequest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These cluster related entities does not look they belong in entities module.
They are internal to varadhi's protocol and should probably exist elsewhere.

I was hoping entities module be just outside world facing pojos, nothing else.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case we would need to split the entities for internal vs external.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe that is for the better. why couple them?


//TODO:: evaluate if earliestFailedAt should be per shard level to support pagination kind of semantics.
private long earliestFailedAt = 0L;
private List<Offset> offsets = new ArrayList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. It looks it is for paging. But as discussed, it can't just be a pulsar offset. we also need the shard info embedded in it.

public class GetMessagesResponse {
private int shardId;
private List<Message> messages;
private String error;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error is part of the payload because of the partial failure scenario? failures from only some shards!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

@@ -51,6 +51,12 @@ public static ShardOperation stopOp(String subOpId, SubscriptionUnitShard shard,
return new ShardOperation(new ShardOperation.StopData(subOpId, shard, subscription));
}

public static ShardOperation unsidelineOp(
String subOpId, SubscriptionUnitShard shard, VaradhiSubscription subscription, UnsidelineRequest request
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason we are putting the whole object (shard info, subscription info) in the operation object.

Any particular usecase does it serve? What if we just put ids in instead? Like subscription identifier, shard id.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consumer nodes doesn't interact with metastore, so all information is passed via message alone.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, the unsideline operation is alreayd happening within a context of a running subscription in a consumer. the unsideline can only happen when the consumer is running that subscription and knows about it. Hence I wondered by unsidelineOp needs the full object. StartOp containing full object makes sense. Similarly I wont expect stopOp to contain full object.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh..that way, makes sense. I will take it independent in next PR (along with entities refactoring).

@@ -64,6 +64,7 @@ public static class Builder {
private RetryPolicy retryPolicy;
private ConsumptionPolicy consumptionPolicy;
private SubscriptionShards shards;
private Map<String, String> properties = new HashMap<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is subProvider?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test helper related to subscription entities. renamed as SubscriptionUtil

shardResponse.setError(t.getMessage());
}
log.info("Sending a GetMessage response for {}", assignment);
responseWriter.accept(shardResponse);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look correct.
consumer.getMessages() will run in parallel, so we may be doing responseWriter(shardResponse) in parallel as well, where responseWrite is being shared among these "threads". is this expected to work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah same thought as well, merging across steams without well defined boundary will not work. Currently responses from shards is not chuncked, but a single response.

@@ -14,13 +14,13 @@
@Slf4j
public class SubscriptionService {
private final MetaStore metaStore;
private final ControllerApi controllerApi;
private final ControllerApi controllerClient;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: in general we can skip using the word client. client enforces the idea of it being remote, even though that is not a hard requirement.

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AsyncProducerReadStream implements ReadStream<Buffer> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a resource which you followed for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chat gpt.

@@ -40,7 +41,8 @@ services:

pulsar:
image: apachepulsar/pulsar:3.0.0
profiles: [ "test", "dev", "pulsar" ]
container_name: pulsar
profiles: [ "test", "dev", "pulsar" , "dev-metric"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can include the prometheus into the test and dev profile by default. I dont think metric specific profile makes sense.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly yes, might need to tune the resources here otherwise laptop becomes issue. Will see if this can be done (might keep same for now).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DLQ API for unsideline & GetMessages
2 participants