Skip to content

Commit

Permalink
fix bug in getProcessingTimesByStepCopy (#30270)
Browse files Browse the repository at this point in the history
Co-authored-by: Claire McCarthy <[email protected]>
  • Loading branch information
clmccart and clmccart authored Feb 13, 2024
1 parent 702138b commit b923a67
Showing 1 changed file with 29 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StepContext;
Expand Down Expand Up @@ -177,6 +179,7 @@ protected abstract SideInputReader getSideInputReaderForViews(

/** Dataflow specific {@link StepContext}. */
public abstract static class DataflowStepContext implements StepContext {

private final NameContext nameContext;

public DataflowStepContext(NameContext nameContext) {
Expand Down Expand Up @@ -253,10 +256,13 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker
* Metadata on the message whose processing is currently being managed by this tracker. If no
* message is actively being processed, activeMessageMetadata will be null.
*/
@Nullable private ActiveMessageMetadata activeMessageMetadata = null;
@GuardedBy("this")
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;

private final MillisProvider clock = System::currentTimeMillis;

@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep = new HashMap<>();

public DataflowExecutionStateTracker(
Expand Down Expand Up @@ -313,20 +319,19 @@ public Closeable enterState(ExecutionState newState) {
if (isDataflowProcessElementState) {
DataflowExecutionState newDFState = (DataflowExecutionState) newState;
if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) {
if (this.activeMessageMetadata != null) {
recordActiveMessageInProcessingTimesMap();
recordActiveMessageInProcessingTimesMap();
synchronized (this) {
this.activeMessageMetadata =
ActiveMessageMetadata.create(
newDFState.getStepName().userName(), clock.getMillis());
}
this.activeMessageMetadata =
ActiveMessageMetadata.create(newDFState.getStepName().userName(), clock.getMillis());
}
elementExecutionTracker.enter(newDFState.getStepName());
}

return () -> {
if (isDataflowProcessElementState) {
if (this.activeMessageMetadata != null) {
recordActiveMessageInProcessingTimesMap();
}
recordActiveMessageInProcessingTimesMap();
elementExecutionTracker.exit();
}
baseCloseable.close();
Expand All @@ -337,12 +342,21 @@ public String getWorkItemId() {
return this.workItemId;
}

public Optional<ActiveMessageMetadata> getActiveMessageMetadata() {
public synchronized Optional<ActiveMessageMetadata> getActiveMessageMetadata() {
return Optional.ofNullable(activeMessageMetadata);
}

public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
Map<String, IntSummaryStatistics> processingTimesCopy = processingTimesByStep;
public synchronized Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
Map<String, IntSummaryStatistics> processingTimesCopy =
processingTimesByStep.entrySet().stream()
.collect(
Collectors.toMap(
e -> e.getKey(),
e -> {
IntSummaryStatistics clone = new IntSummaryStatistics();
clone.combine(e.getValue());
return clone;
}));
return processingTimesCopy;
}

Expand All @@ -351,17 +365,19 @@ public Map<String, IntSummaryStatistics> getProcessingTimesByStepCopy() {
* processing times map. Sets the activeMessageMetadata to null after the entry has been
* recorded.
*/
private void recordActiveMessageInProcessingTimesMap() {
private synchronized void recordActiveMessageInProcessingTimesMap() {
if (this.activeMessageMetadata == null) {
return;
}
int processingTime =
(int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime());
this.processingTimesByStep.compute(
this.activeMessageMetadata.userStepName(),
(k, v) -> {
if (v == null) {
v = new IntSummaryStatistics();
}
v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime()));
v.accept(processingTime);
return v;
});
this.activeMessageMetadata = null;
Expand Down

0 comments on commit b923a67

Please sign in to comment.