Skip to content

Commit

Permalink
fix [INTERNAL_ERROR]sync filter size meet error, filter: RuntimeFilter:
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Oct 21, 2024
1 parent 9a9b2cb commit c88d597
Showing 1 changed file with 5 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,16 @@ public void setRuntimeFilterThriftParams(TRuntimeFilterParams runtimeFilterParam
for (RuntimeFilterTarget target : targets) {
targetParams.add(new TRuntimeFilterTargetParams(target.instanceId, target.address));
}
runtimeFilterParams.putToRidToTargetParam(
rf.getFilterId().asInt(), targetParams
);
runtimeFilterParams.putToRidToTargetParam(rf.getFilterId().asInt(), targetParams);
}
}
for (Map.Entry<RuntimeFilterId, Integer> entry : ridToBuilderNum.entrySet()) {
boolean isBroadcastRuntimeFilter = broadcastRuntimeFilterIds.contains(entry.getKey().asInt());
int builderNum = isBroadcastRuntimeFilter ? 1 : entry.getValue();
runtimeFilterParams.putToRuntimeFilterBuilderNum(
entry.getKey().asInt(), builderNum
);
runtimeFilterParams.putToRuntimeFilterBuilderNum(entry.getKey().asInt(), builderNum);
}
for (RuntimeFilter rf : runtimeFilters) {
runtimeFilterParams.putToRidToRuntimeFilter(
rf.getFilterId().asInt(), rf.toThrift()
);
runtimeFilterParams.putToRidToRuntimeFilter(rf.getFilterId().asInt(), rf.toThrift());
}
}

Expand All @@ -122,7 +116,7 @@ public static RuntimeFiltersThriftBuilder compute(
PipelineDistributedPlan topMostPlan = distributedPlans.get(distributedPlans.size() - 1);
AssignedJob mergeInstance = topMostPlan.getInstanceJobs().get(0);
BackendWorker worker = (BackendWorker) mergeInstance.getAssignedWorker();
TNetworkAddress mergeAddress = new TNetworkAddress(worker.host(), worker.port());
TNetworkAddress mergeAddress = new TNetworkAddress(worker.host(), worker.brpcPort());

List<RuntimeFilter> runtimeFilters = planner.getRuntimeFilters();
Set<Integer> broadcastRuntimeFilterIds = runtimeFilters
Expand All @@ -144,7 +138,7 @@ public static RuntimeFiltersThriftBuilder compute(
Backend backend = backendWorker.getBackend();
targetFragments.add(new RuntimeFilterTarget(
instanceJob.instanceId(),
new TNetworkAddress(backend.getHost(), backend.getBePort())
new TNetworkAddress(backend.getHost(), backend.getBrpcPort())
));
}
}
Expand Down

0 comments on commit c88d597

Please sign in to comment.