Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dataflow Streaming] Cache StateNamespace encoded keys using SoftReference #33689

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.core;

import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.List;
import java.util.Objects;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -41,14 +42,24 @@ public static StateNamespace global() {
}

public static <W extends BoundedWindow> StateNamespace window(Coder<W> windowCoder, W window) {
return new WindowNamespace<>(windowCoder, window);
return new WindowNamespace<>(windowCoder, window, null);
}

private static <W extends BoundedWindow> StateNamespace window(
Coder<W> windowCoder, W window, String stringKey) {
return new WindowNamespace<>(windowCoder, window, stringKey);
}

public static <W extends BoundedWindow> StateNamespace windowAndTrigger(
Coder<W> windowCoder, W window, int triggerIdx) {
return new WindowAndTriggerNamespace<>(windowCoder, window, triggerIdx);
}

private static <W extends BoundedWindow> StateNamespace windowAndTrigger(
Coder<W> windowCoder, W window, int triggerIdx, String stringKey) {
return new WindowAndTriggerNamespace<>(windowCoder, window, triggerIdx, stringKey);
}

private StateNamespaces() {}

/** {@link StateNamespace} that is global to the current key being processed. */
Expand Down Expand Up @@ -93,9 +104,12 @@ public static class WindowNamespace<W extends BoundedWindow> implements StateNam
private final Coder<W> windowCoder;
private final W window;

private WindowNamespace(Coder<W> windowCoder, W window) {
private SoftReference<String> stringKeyRef;

private WindowNamespace(Coder<W> windowCoder, W window, @Nullable String stringKey) {
this.windowCoder = windowCoder;
this.window = window;
stringKeyRef = new SoftReference<>(stringKey);
}

public W getWindow() {
Expand All @@ -104,17 +118,25 @@ public W getWindow() {

@Override
public String stringKey() {
try {
// equivalent to String.format("/%s/", ...)
return "/" + CoderUtils.encodeToBase64(windowCoder, window) + "/";
} catch (CoderException e) {
throw new RuntimeException("Unable to generate string key from window " + window, e);
}
return getStringKey();
}

@Override
public void appendTo(Appendable sb) throws IOException {
sb.append('/').append(CoderUtils.encodeToBase64(windowCoder, window)).append('/');
sb.append(getStringKey());
}

private String getStringKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just inline in stringKey?

String stringKey = stringKeyRef.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need some annotations to make this not show up as racy? we could have multiple threads calling this at the same time.

if (stringKey == null) {
try {
stringKey = "/" + CoderUtils.encodeToBase64(windowCoder, window) + "/";
} catch (CoderException e) {
throw new RuntimeException("Unable to generate string key from window " + window, e);
}
stringKeyRef = new SoftReference<>(stringKey);
}
return stringKey;
}

/** State in the same window will all be evicted together. */
Expand Down Expand Up @@ -156,14 +178,24 @@ public String toString() {
public static class WindowAndTriggerNamespace<W extends BoundedWindow> implements StateNamespace {

private static final int TRIGGER_RADIX = 36;
private Coder<W> windowCoder;
private W window;
private int triggerIndex;
private final Coder<W> windowCoder;
private final W window;
private final int triggerIndex;
private SoftReference<String> stringKeyRef;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, maybe we shouldn't use softrefs without some more investigation.


private WindowAndTriggerNamespace(Coder<W> windowCoder, W window, int triggerIndex) {
this.windowCoder = windowCoder;
this.window = window;
this.triggerIndex = triggerIndex;
this.stringKeyRef = new SoftReference<>(null);
}

private WindowAndTriggerNamespace(
Coder<W> windowCoder, W window, int triggerIndex, String stringKey) {
this.windowCoder = windowCoder;
this.window = window;
this.triggerIndex = triggerIndex;
this.stringKeyRef = new SoftReference<>(stringKey);
}

public W getWindow() {
Expand All @@ -176,26 +208,13 @@ public int getTriggerIndex() {

@Override
public String stringKey() {
try {
// equivalent to String.format("/%s/%s/", ...)
return "/"
+ CoderUtils.encodeToBase64(windowCoder, window)
+
// Use base 36 so that can address 36 triggers in a single byte and still be human
// readable.
"/"
+ Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase()
+ "/";
} catch (CoderException e) {
throw new RuntimeException("Unable to generate string key from window " + window, e);
}

return getStringKey();
}

@Override
public void appendTo(Appendable sb) throws IOException {
sb.append('/').append(CoderUtils.encodeToBase64(windowCoder, window));
sb.append('/').append(Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase());
sb.append('/');
sb.append(getStringKey());
}

/** State in the same window will all be evicted together. */
Expand All @@ -219,6 +238,27 @@ public boolean equals(@Nullable Object obj) {
&& Objects.equals(this.windowStructuralValue(), that.windowStructuralValue());
}

private String getStringKey() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just move this impl into stringKey()?

String stringKey = stringKeyRef.get();
if (stringKey == null) {
try {
stringKey =
"/"
+ CoderUtils.encodeToBase64(windowCoder, window)
+
// Use base 36 so that can address 36 triggers in a single byte and still be human
// readable.
"/"
+ Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase()
+ "/";
} catch (CoderException e) {
throw new RuntimeException("Unable to generate string key from window " + window, e);
}
stringKeyRef = new SoftReference<>(stringKey);
}
return stringKey;
}

private Object windowStructuralValue() {
return windowCoder.structuralValue(window);
}
Expand Down Expand Up @@ -263,9 +303,9 @@ public static <W extends BoundedWindow> StateNamespace fromString(
W window = CoderUtils.decodeFromBase64(windowCoder, parts.get(1));
if (parts.size() > 3) {
int index = Integer.parseInt(parts.get(2), WindowAndTriggerNamespace.TRIGGER_RADIX);
return windowAndTrigger(windowCoder, window, index);
return windowAndTrigger(windowCoder, window, index, stringKey);
} else {
return window(windowCoder, window);
return window(windowCoder, window, stringKey);
}
} catch (Exception e) {
throw new RuntimeException("Invalid namespace string: '" + stringKey + "'", e);
Expand Down
Loading