Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RRIO] [Throttle] transform that slows down element transmission without an external resource #30123

Closed

Conversation

damondouglas
Copy link
Contributor

@damondouglas damondouglas commented Jan 26, 2024

This PR addresses #28930 with a PTransform (currently package private) implementation that throttles a Bounded PCollection without using any external resources i.e. an external database, queue, etc. A future PR targets implementation of a Stateful DoFn for Unbounded PCollections. An additional future PR will add load tests.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@damondouglas damondouglas marked this pull request as ready for review January 26, 2024 09:08
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @ahmedabu98 for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@damondouglas
Copy link
Contributor Author

assign to next reviewer

@damondouglas damondouglas marked this pull request as draft February 3, 2024 01:03
@damondouglas damondouglas marked this pull request as ready for review February 4, 2024 05:01
@damondouglas damondouglas marked this pull request as draft February 5, 2024 00:55
@damondouglas damondouglas marked this pull request as ready for review February 6, 2024 00:45
assertThat(throttled.getWindowingStrategy().getWindowFn(), equalTo(new GlobalWindows()));

pipeline.run();
}
Copy link
Contributor

@damccorm damccorm Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these tests do quite enough. They show that the windowing strategy is reapplied, but they don't show that elements maintain their original window (which is probably the more important piece). I think a better test would be something like:

pipeline.apply(<create elements 1-10>)
  .apply(<map fn that gives timestamps equal to starttime + (element * 1 second)>)
  .apply(<fixed window of length 5 seconds>)
  .apply(<throttle with rate of 1 element every 5 seconds>)
  .apply(<combine - count per window>)
 
<assert that there are 2 elements output with counts of 5>

One of my core concerns with the previous approach (haven't fully reviewed the new one, but at a glance I think its still a problem) is that we're losing the info we need to do accurate windowing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I was actually considering inspecting the BoundedWindow and PaneInfo available in the DoFn but I like your idea as well. I'll figure something out. Thank you for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@damondouglas it looks like this use case still isn't tested, would you mind covering it? If it is and I just missed it, please point me to the test case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@damccorm I added givenUpstreamFixedWindow_thenPreservesWindowAssignments and givenGlobalUpstreamWindow_thenCanApplyDownstreamWindow to ThrottleTest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think either of those test what I'm talking about here. They test that the correct windows are applied, but they don't test that the elements maintain their original (correct) window assignments. For example:

  @Test
  public void givenGlobalUpstreamWindow_thenCanApplyDownstreamWindow() {
    Rate rate = Rate.of(3, Duration.standardSeconds(1L));

    PCollection<Integer> unthrottled =
        pipeline.apply(
            Create.timestamped(
                TimestampedValue.of(0, epochPlus(0L)),
                TimestampedValue.of(1, epochPlus(1_000L)),
                TimestampedValue.of(2, epochPlus(2_000L))));

    PAssert.that(unthrottled).inWindow(GlobalWindow.INSTANCE);

    PCollection<Integer> throttled =
        unthrottled
            .apply(transformOf(rate))
            .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5L))));
    PAssert.that(throttled).inWindow(new IntervalWindow(Instant.EPOCH, epochPlus(5_000L)));

    pipeline.run();
  }

asserts .inWindow(new IntervalWindow(Instant.EPOCH, epochPlus(5_000L)));, but it doesn't actually assert that all 3 values are in their correct interval windows (both tests are useful/important, but the latter is what I'm worried about in this PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@damccorm I see that the silly thing I was originally doing and where this conversation originally started was I had applied a GlobalWindow upstream. Additionally, I was originally receiver.outputWithTimestamp instead of receiver.output. I removed the GlobalWindow and changed to receiver.output(element). The givenUpstreamNonGlobalWindow_canReassignDownstreamWindow andgivenUpstreamNonGlobalWindow_thenPreservesWindowAssignments tests are finally passing.

Coder<KV<Integer, List<RequestT>>> kvCoder = KvCoder.of(VarIntCoder.of(), listCoder);

PTransform<PCollection<KV<Integer, RequestT>>, PCollection<KV<Integer, Iterable<RequestT>>>>
groupingTransform = GroupByKey.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we can't just always use GroupIntoBatches? I think it will tend to perform better because it won't require the whole previous stage to complete before we start sending requests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests do not work in batch mode as it relies on event timers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - I think this is a major performance problem since it will require all reads to complete before starting the throttling.

* List<RequestT>>>}. This is done to simplify the coding of the downstream <a
* href="https://beam.apache.org/documentation/programming-guide/#splittable-dofns">Splittable
* DoFn</a>. Next the transform applies {@link GlobalWindows} to the {@code PCollection<KV<Integer,
* List<RequestT>>>} prior to applying to the splittable DoFn. This splittable DoFn performs the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This is out of date I think


RequestT value = element.getValue().get((int) position);
estimator.setWatermark(timestamp);
receiver.output(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I keep getting hung up on this function, even with the current approach (which I think has addressed many of my concerns). Basically, my remaining question is: in streaming mode does this actually guarantee the throttling behavior we're expecting. Lets say, for example, that we have a throttling limit of 100 elements per hour. And then the following happens:

We get 100 elements all at once
These elements are grouped into batches of size 1, then processed by this transform, all of them fire
maxBufferingDuration time passes (but less than an hour has passed, so we should still be throttled)
We get 100 elements all at once again

won't those 100 elements come in, get put into batches of size 1, and then immediately fire (since they have a new restriction tracker/watermark)?


FWIW - I do think this is the type of conversation that would have benefited greatly from being in a design doc where we can do threaded conversation more quickly. Not asking you to retrofit this in, but I am asking that we consider more detailed designs in the future to avoid getting bogged down like we have. Each time we've return to the PR, we've done so with very different approaches, which is expensive for you as an implementer and me as a reviewer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible approach is to have a stateful step using OrderedListState where you wrap each element with the desired firing time, then using an SDF kind of like this one to return a process continuation until we're ready to fire the element (this gets around the issues you've run into with timers not firing correctly in batch mode). This may still have watermark concerns though (mentioned elsewhere)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible approach is to have a stateful step using OrderedListState where you wrap each element with the desired firing time, then using an SDF kind of like this one to return a process continuation until we're ready to fire the element (this gets around the issues you've run into with timers not firing correctly in batch mode). This may still have watermark concerns though (mentioned elsewhere)

I really like this idea. However, Timers and state must not be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use state in one transform then an SDF in the next. However, I still have concerns about watermarks and SDFs which need to be resolved before we would proceed with this approach (see my next comment)

@NewWatermarkEstimator
public WatermarkEstimator<Instant> newWatermarkEstimator(
@WatermarkEstimatorState Instant state) {
return new WatermarkEstimators.Manual(state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question I have (genuinely not sure of the answer) is how putting a watermark estimator in the middle of a pipeline impacts the previous watermark. For example, lets say I have the following structure:

1) Read from Pub/Sub
2) Throttle
3) GroupByKey

And I have a constant stream of elements coming from pub/sub, will the GroupByKey ever fire? Basically, I can imagine the pubsub watermark slowly rising, but since Throttle initializes a new estimator each time it encounters an element, its watermark might never advance (until its processed all elements, which would never happen).

The longer I stare at this (have been looking at this PR for a while this afternoon), the more I'm just generally concerned with using an SDF this way. An SDF is by nature meant to process a single element (or at least a bounded set of elements). Here we're using an SDF on a potentially unbounded dataset in an environment where we don't control the windowing, triggering, or watermark, and I basically have no clue how it will behave (but I'd guess poorly).

I think I've expressed my preference for a timers based approach before (and saying this doesn't work in batch), but I think these concerns just reaffirm that preference.

@damondouglas damondouglas marked this pull request as draft February 16, 2024 00:36
@damondouglas damondouglas marked this pull request as ready for review February 21, 2024 01:02
@damondouglas damondouglas marked this pull request as draft February 21, 2024 19:06
@damondouglas damondouglas marked this pull request as ready for review February 22, 2024 00:23
@damondouglas damondouglas marked this pull request as draft February 26, 2024 23:48
Copy link
Contributor

github-actions bot commented Mar 5, 2024

Reminder, please take a look at this pr: @Abacn @ahmedabu98

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants