-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dataflow Streaming] Cache StateNamespace encoded keys using SoftReference #33689
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
package org.apache.beam.runners.core; | ||
|
||
import java.io.IOException; | ||
import java.lang.ref.SoftReference; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import org.apache.beam.sdk.coders.Coder; | ||
|
@@ -41,14 +42,24 @@ public static StateNamespace global() { | |
} | ||
|
||
public static <W extends BoundedWindow> StateNamespace window(Coder<W> windowCoder, W window) { | ||
return new WindowNamespace<>(windowCoder, window); | ||
return new WindowNamespace<>(windowCoder, window, null); | ||
} | ||
|
||
private static <W extends BoundedWindow> StateNamespace window( | ||
Coder<W> windowCoder, W window, String stringKey) { | ||
return new WindowNamespace<>(windowCoder, window, stringKey); | ||
} | ||
|
||
public static <W extends BoundedWindow> StateNamespace windowAndTrigger( | ||
Coder<W> windowCoder, W window, int triggerIdx) { | ||
return new WindowAndTriggerNamespace<>(windowCoder, window, triggerIdx); | ||
} | ||
|
||
private static <W extends BoundedWindow> StateNamespace windowAndTrigger( | ||
Coder<W> windowCoder, W window, int triggerIdx, String stringKey) { | ||
return new WindowAndTriggerNamespace<>(windowCoder, window, triggerIdx, stringKey); | ||
} | ||
|
||
private StateNamespaces() {} | ||
|
||
/** {@link StateNamespace} that is global to the current key being processed. */ | ||
|
@@ -93,9 +104,12 @@ public static class WindowNamespace<W extends BoundedWindow> implements StateNam | |
private final Coder<W> windowCoder; | ||
private final W window; | ||
|
||
private WindowNamespace(Coder<W> windowCoder, W window) { | ||
private SoftReference<String> stringKeyRef; | ||
|
||
private WindowNamespace(Coder<W> windowCoder, W window, @Nullable String stringKey) { | ||
this.windowCoder = windowCoder; | ||
this.window = window; | ||
stringKeyRef = new SoftReference<>(stringKey); | ||
} | ||
|
||
public W getWindow() { | ||
|
@@ -104,17 +118,25 @@ public W getWindow() { | |
|
||
@Override | ||
public String stringKey() { | ||
try { | ||
// equivalent to String.format("/%s/", ...) | ||
return "/" + CoderUtils.encodeToBase64(windowCoder, window) + "/"; | ||
} catch (CoderException e) { | ||
throw new RuntimeException("Unable to generate string key from window " + window, e); | ||
} | ||
return getStringKey(); | ||
} | ||
|
||
@Override | ||
public void appendTo(Appendable sb) throws IOException { | ||
sb.append('/').append(CoderUtils.encodeToBase64(windowCoder, window)).append('/'); | ||
sb.append(getStringKey()); | ||
} | ||
|
||
private String getStringKey() { | ||
String stringKey = stringKeyRef.get(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you need some annotations to make this not show up as racy? we could have multiple threads calling this at the same time. |
||
if (stringKey == null) { | ||
try { | ||
stringKey = "/" + CoderUtils.encodeToBase64(windowCoder, window) + "/"; | ||
} catch (CoderException e) { | ||
throw new RuntimeException("Unable to generate string key from window " + window, e); | ||
} | ||
stringKeyRef = new SoftReference<>(stringKey); | ||
} | ||
return stringKey; | ||
} | ||
|
||
/** State in the same window will all be evicted together. */ | ||
|
@@ -156,14 +178,24 @@ public String toString() { | |
public static class WindowAndTriggerNamespace<W extends BoundedWindow> implements StateNamespace { | ||
|
||
private static final int TRIGGER_RADIX = 36; | ||
private Coder<W> windowCoder; | ||
private W window; | ||
private int triggerIndex; | ||
private final Coder<W> windowCoder; | ||
private final W window; | ||
private final int triggerIndex; | ||
private SoftReference<String> stringKeyRef; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as discussed, maybe we shouldn't use softrefs without some more investigation. |
||
|
||
private WindowAndTriggerNamespace(Coder<W> windowCoder, W window, int triggerIndex) { | ||
this.windowCoder = windowCoder; | ||
this.window = window; | ||
this.triggerIndex = triggerIndex; | ||
this.stringKeyRef = new SoftReference<>(null); | ||
} | ||
|
||
private WindowAndTriggerNamespace( | ||
Coder<W> windowCoder, W window, int triggerIndex, String stringKey) { | ||
this.windowCoder = windowCoder; | ||
this.window = window; | ||
this.triggerIndex = triggerIndex; | ||
this.stringKeyRef = new SoftReference<>(stringKey); | ||
} | ||
|
||
public W getWindow() { | ||
|
@@ -176,26 +208,13 @@ public int getTriggerIndex() { | |
|
||
@Override | ||
public String stringKey() { | ||
try { | ||
// equivalent to String.format("/%s/%s/", ...) | ||
return "/" | ||
+ CoderUtils.encodeToBase64(windowCoder, window) | ||
+ | ||
// Use base 36 so that can address 36 triggers in a single byte and still be human | ||
// readable. | ||
"/" | ||
+ Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase() | ||
+ "/"; | ||
} catch (CoderException e) { | ||
throw new RuntimeException("Unable to generate string key from window " + window, e); | ||
} | ||
|
||
return getStringKey(); | ||
} | ||
|
||
@Override | ||
public void appendTo(Appendable sb) throws IOException { | ||
sb.append('/').append(CoderUtils.encodeToBase64(windowCoder, window)); | ||
sb.append('/').append(Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase()); | ||
sb.append('/'); | ||
sb.append(getStringKey()); | ||
} | ||
|
||
/** State in the same window will all be evicted together. */ | ||
|
@@ -219,6 +238,27 @@ public boolean equals(@Nullable Object obj) { | |
&& Objects.equals(this.windowStructuralValue(), that.windowStructuralValue()); | ||
} | ||
|
||
private String getStringKey() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: just move this impl into stringKey()? |
||
String stringKey = stringKeyRef.get(); | ||
if (stringKey == null) { | ||
try { | ||
stringKey = | ||
"/" | ||
+ CoderUtils.encodeToBase64(windowCoder, window) | ||
+ | ||
// Use base 36 so that can address 36 triggers in a single byte and still be human | ||
// readable. | ||
"/" | ||
+ Integer.toString(triggerIndex, TRIGGER_RADIX).toUpperCase() | ||
+ "/"; | ||
} catch (CoderException e) { | ||
throw new RuntimeException("Unable to generate string key from window " + window, e); | ||
} | ||
stringKeyRef = new SoftReference<>(stringKey); | ||
} | ||
return stringKey; | ||
} | ||
|
||
private Object windowStructuralValue() { | ||
return windowCoder.structuralValue(window); | ||
} | ||
|
@@ -263,9 +303,9 @@ public static <W extends BoundedWindow> StateNamespace fromString( | |
W window = CoderUtils.decodeFromBase64(windowCoder, parts.get(1)); | ||
if (parts.size() > 3) { | ||
int index = Integer.parseInt(parts.get(2), WindowAndTriggerNamespace.TRIGGER_RADIX); | ||
return windowAndTrigger(windowCoder, window, index); | ||
return windowAndTrigger(windowCoder, window, index, stringKey); | ||
} else { | ||
return window(windowCoder, window); | ||
return window(windowCoder, window, stringKey); | ||
} | ||
} catch (Exception e) { | ||
throw new RuntimeException("Invalid namespace string: '" + stringKey + "'", e); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just inline in stringKey?