-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RRIO] [Throttle] transform that slows down element transmission without an external resource #30123
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
assign to next reviewer |
...o/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java
Outdated
Show resolved
Hide resolved
...o/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java
Outdated
Show resolved
Hide resolved
...o/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java
Outdated
Show resolved
Hide resolved
...o/rrio/src/main/java/org/apache/beam/io/requestresponse/ThrottleWithoutExternalResource.java
Outdated
Show resolved
Hide resolved
ff1f203
to
19ca6cf
Compare
assertThat(throttled.getWindowingStrategy().getWindowFn(), equalTo(new GlobalWindows())); | ||
|
||
pipeline.run(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these tests do quite enough. They show that the windowing strategy is reapplied, but they don't show that elements maintain their original window (which is probably the more important piece). I think a better test would be something like:
pipeline.apply(<create elements 1-10>)
.apply(<map fn that gives timestamps equal to starttime + (element * 1 second)>)
.apply(<fixed window of length 5 seconds>)
.apply(<throttle with rate of 1 element every 5 seconds>)
.apply(<combine - count per window>)
<assert that there are 2 elements output with counts of 5>
One of my core concerns with the previous approach (haven't fully reviewed the new one, but at a glance I think its still a problem) is that we're losing the info we need to do accurate windowing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I was actually considering inspecting the BoundedWindow and PaneInfo available in the DoFn but I like your idea as well. I'll figure something out. Thank you for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@damondouglas it looks like this use case still isn't tested, would you mind covering it? If it is and I just missed it, please point me to the test case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@damccorm I added givenUpstreamFixedWindow_thenPreservesWindowAssignments
and givenGlobalUpstreamWindow_thenCanApplyDownstreamWindow
to ThrottleTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think either of those test what I'm talking about here. They test that the correct windows are applied, but they don't test that the elements maintain their original (correct) window assignments. For example:
@Test
public void givenGlobalUpstreamWindow_thenCanApplyDownstreamWindow() {
Rate rate = Rate.of(3, Duration.standardSeconds(1L));
PCollection<Integer> unthrottled =
pipeline.apply(
Create.timestamped(
TimestampedValue.of(0, epochPlus(0L)),
TimestampedValue.of(1, epochPlus(1_000L)),
TimestampedValue.of(2, epochPlus(2_000L))));
PAssert.that(unthrottled).inWindow(GlobalWindow.INSTANCE);
PCollection<Integer> throttled =
unthrottled
.apply(transformOf(rate))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(5L))));
PAssert.that(throttled).inWindow(new IntervalWindow(Instant.EPOCH, epochPlus(5_000L)));
pipeline.run();
}
asserts .inWindow(new IntervalWindow(Instant.EPOCH, epochPlus(5_000L)));
, but it doesn't actually assert that all 3 values are in their correct interval windows (both tests are useful/important, but the latter is what I'm worried about in this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@damccorm I see that the silly thing I was originally doing and where this conversation originally started was I had applied a GlobalWindow upstream. Additionally, I was originally receiver.outputWithTimestamp
instead of receiver.output
. I removed the GlobalWindow and changed to receiver.output(element)
. The givenUpstreamNonGlobalWindow_canReassignDownstreamWindow
andgivenUpstreamNonGlobalWindow_thenPreservesWindowAssignments
tests are finally passing.
Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), listCoder); | ||
|
||
PTransform<PCollection<KV<Integer, RequestT>>, PCollection<KV<Integer, Iterable<RequestT>>>> | ||
groupingTransform = GroupByKey.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we can't just always use GroupIntoBatches? I think it will tend to perform better because it won't require the whole previous stage to complete before we start sending requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests do not work in batch mode as it relies on event timers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok - I think this is a major performance problem since it will require all reads to complete before starting the throttling.
* List<RequestT>>>}. This is done to simplify the coding of the downstream <a | ||
* href="https://beam.apache.org/documentation/programming-guide/#splittable-dofns">Splittable | ||
* DoFn</a>. Next the transform applies {@link GlobalWindows} to the {@code PCollection<KV<Integer, | ||
* List<RequestT>>>} prior to applying to the splittable DoFn. This splittable DoFn performs the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This is out of date I think
|
||
RequestT value = element.getValue().get((int) position); | ||
estimator.setWatermark(timestamp); | ||
receiver.output(value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep getting hung up on this function, even with the current approach (which I think has addressed many of my concerns). Basically, my remaining question is: in streaming mode does this actually guarantee the throttling behavior we're expecting. Lets say, for example, that we have a throttling limit of 100 elements per hour. And then the following happens:
We get 100 elements all at once
These elements are grouped into batches of size 1, then processed by this transform, all of them fire
maxBufferingDuration time passes (but less than an hour has passed, so we should still be throttled)
We get 100 elements all at once again
won't those 100 elements come in, get put into batches of size 1, and then immediately fire (since they have a new restriction tracker/watermark)?
FWIW - I do think this is the type of conversation that would have benefited greatly from being in a design doc where we can do threaded conversation more quickly. Not asking you to retrofit this in, but I am asking that we consider more detailed designs in the future to avoid getting bogged down like we have. Each time we've return to the PR, we've done so with very different approaches, which is expensive for you as an implementer and me as a reviewer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possible approach is to have a stateful step using OrderedListState where you wrap each element with the desired firing time, then using an SDF kind of like this one to return a process continuation until we're ready to fire the element (this gets around the issues you've run into with timers not firing correctly in batch mode). This may still have watermark concerns though (mentioned elsewhere)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One possible approach is to have a stateful step using OrderedListState where you wrap each element with the desired firing time, then using an SDF kind of like this one to return a process continuation until we're ready to fire the element (this gets around the issues you've run into with timers not firing correctly in batch mode). This may still have watermark concerns though (mentioned elsewhere)
I really like this idea. However, Timers and state must not be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use state in one transform then an SDF in the next. However, I still have concerns about watermarks and SDFs which need to be resolved before we would proceed with this approach (see my next comment)
@NewWatermarkEstimator | ||
public WatermarkEstimator<Instant> newWatermarkEstimator( | ||
@WatermarkEstimatorState Instant state) { | ||
return new WatermarkEstimators.Manual(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question I have (genuinely not sure of the answer) is how putting a watermark estimator in the middle of a pipeline impacts the previous watermark. For example, lets say I have the following structure:
1) Read from Pub/Sub
2) Throttle
3) GroupByKey
And I have a constant stream of elements coming from pub/sub, will the GroupByKey ever fire? Basically, I can imagine the pubsub watermark slowly rising, but since Throttle initializes a new estimator each time it encounters an element, its watermark might never advance (until its processed all elements, which would never happen).
The longer I stare at this (have been looking at this PR for a while this afternoon), the more I'm just generally concerned with using an SDF this way. An SDF is by nature meant to process a single element (or at least a bounded set of elements). Here we're using an SDF on a potentially unbounded dataset in an environment where we don't control the windowing, triggering, or watermark, and I basically have no clue how it will behave (but I'd guess poorly).
I think I've expressed my preference for a timers based approach before (and saying this doesn't work in batch), but I think these concerns just reaffirm that preference.
Reminder, please take a look at this pr: @Abacn @ahmedabu98 |
This PR addresses #28930 with a PTransform (currently package private) implementation that throttles a Bounded PCollection without using any external resources i.e. an external database, queue, etc. A future PR targets implementation of a Stateful DoFn for Unbounded PCollections. An additional future PR will add load tests.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.