-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DLQ API for unsideline & GetMessages #168
base: master
Are you sure you want to change the base?
Conversation
- message channel PR comments
…ution and updates on subscription operation for a subscription
Works for dev setup for metric collection. For prod additional changes will be needed. - Convert the otel container as a sidecar - Test traces and logs in addition to metrics. - Promethous configuration for scrapping needs to be defined.
updating job definition due to container rename.
- Unsideline & GetMessages - Changes to support multi permission checks in RouteBehavior
Test Results280 tests 280 ✅ 54s ⏱️ Results for commit 7468d98. |
we should start evaluating java 21. |
import java.util.List; | ||
|
||
@Data | ||
public class GetMessagesRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should put all entities specific to web api in a web package. preferably versioned? as in "web.v1."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aligned, will take independent of this PR. There are other entities as well, which would need to be moved.
|
||
@Data | ||
public class GetMessagesResponse { | ||
private int shardId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should shardId be part of response?
public static final long UNSPECIFIED_TS = 0L; | ||
public static final long LATEST_TS = Long.MAX_VALUE; | ||
|
||
//TODO:: evaluate if earliestFailedAt should be per shard level to support pagination kind of semantics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am hoping that shard semantics, is totally transparent and is not leaking into public api. Shard is just "a" implementation of scaling subscription, but not the only one. sharding strategies may also differ. so an operation suitable for one shard strategy might not work for another shard strategy.
So, I was hoping that most operations are agnostic of "shards", if any operation needs to be mapped then it is done either by "server" or "controller". most likely controller as that guy has the know how.
|
||
//TODO:: evaluate if earliestFailedAt should be per shard level to support pagination kind of semantics. | ||
private long earliestFailedAt = 0L; | ||
private List<Offset> offsets = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does list of offsets represent here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. It looks it is for paging. But as discussed, it can't just be a pulsar offset. we also need the shard info embedded in it.
import java.util.concurrent.CompletableFuture; | ||
|
||
public interface ConsumerApi { | ||
CompletableFuture<Void> start(ShardOperation.StartData operation); | ||
|
||
CompletableFuture<Void> stop(ShardOperation.StopData operation); | ||
|
||
CompletableFuture<Void> unsideline(ShardOperation.UnsidelineData operation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does Future<> as the return value mean here? for unsideline op?
Given that the unsideline operation may take extended amount of time.
I reviewed it largely. I haven't check all corner cases, as I am still going through the overall architecture and how all classes are placed and interacting. Post that I will be able to comment on the corner cases. There are common patterns in controller that I feel can be simplified a lot. (with proper abstraction). Instead of each opExecutor handling its corner cases, I think all the complications can be abstracted away. I will think about this. |
|
||
@Data | ||
@AllArgsConstructor | ||
public class UnsidelineOpRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These cluster related entities does not look they belong in entities module.
They are internal to varadhi's protocol and should probably exist elsewhere.
I was hoping entities module be just outside world facing pojos, nothing else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that case we would need to split the entities for internal vs external.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe that is for the better. why couple them?
|
||
//TODO:: evaluate if earliestFailedAt should be per shard level to support pagination kind of semantics. | ||
private long earliestFailedAt = 0L; | ||
private List<Offset> offsets = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. It looks it is for paging. But as discussed, it can't just be a pulsar offset. we also need the shard info embedded in it.
public class GetMessagesResponse { | ||
private int shardId; | ||
private List<Message> messages; | ||
private String error; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error is part of the payload because of the partial failure scenario? failures from only some shards!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes.
@@ -51,6 +51,12 @@ public static ShardOperation stopOp(String subOpId, SubscriptionUnitShard shard, | |||
return new ShardOperation(new ShardOperation.StopData(subOpId, shard, subscription)); | |||
} | |||
|
|||
public static ShardOperation unsidelineOp( | |||
String subOpId, SubscriptionUnitShard shard, VaradhiSubscription subscription, UnsidelineRequest request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason we are putting the whole object (shard info, subscription info) in the operation object.
Any particular usecase does it serve? What if we just put ids in instead? Like subscription identifier, shard id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consumer nodes doesn't interact with metastore, so all information is passed via message alone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, the unsideline operation is alreayd happening within a context of a running subscription in a consumer. the unsideline can only happen when the consumer is running that subscription and knows about it. Hence I wondered by unsidelineOp needs the full object. StartOp containing full object makes sense. Similarly I wont expect stopOp to contain full object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh..that way, makes sense. I will take it independent in next PR (along with entities refactoring).
@@ -64,6 +64,7 @@ public static class Builder { | |||
private RetryPolicy retryPolicy; | |||
private ConsumptionPolicy consumptionPolicy; | |||
private SubscriptionShards shards; | |||
private Map<String, String> properties = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is subProvider?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test helper related to subscription entities. renamed as SubscriptionUtil
shardResponse.setError(t.getMessage()); | ||
} | ||
log.info("Sending a GetMessage response for {}", assignment); | ||
responseWriter.accept(shardResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not look correct.
consumer.getMessages() will run in parallel, so we may be doing responseWriter(shardResponse) in parallel as well, where responseWrite is being shared among these "threads". is this expected to work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah same thought as well, merging across steams without well defined boundary will not work. Currently responses from shards is not chuncked, but a single response.
@@ -14,13 +14,13 @@ | |||
@Slf4j | |||
public class SubscriptionService { | |||
private final MetaStore metaStore; | |||
private final ControllerApi controllerApi; | |||
private final ControllerApi controllerClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: in general we can skip using the word client. client enforces the idea of it being remote, even though that is not a hard requirement.
import lombok.extern.slf4j.Slf4j; | ||
|
||
@Slf4j | ||
public class AsyncProducerReadStream implements ReadStream<Buffer> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a resource which you followed for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chat gpt.
@@ -40,7 +41,8 @@ services: | |||
|
|||
pulsar: | |||
image: apachepulsar/pulsar:3.0.0 | |||
profiles: [ "test", "dev", "pulsar" ] | |||
container_name: pulsar | |||
profiles: [ "test", "dev", "pulsar" , "dev-metric"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we can include the prometheus into the test and dev profile by default. I dont think metric specific profile makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
possibly yes, might need to tune the resources here otherwise laptop becomes issue. Will see if this can be done (might keep same for now).
Changes for DLQ APIs.
TODO:
Tests to be added post initial review.