Skip to content

Commit 66a42a2

Browse files
committed
Revert "维表join的 驱动流如果是restract-stream 需要过滤掉回溯的值"
This reverts commit 57a7c47
1 parent 57a7c47 commit 66a42a2

File tree

2 files changed

+4
-5
lines changed

2 files changed

+4
-5
lines changed

core/src/main/java/com/dtstack/flink/sql/Main.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,7 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
252252

253253
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getTypes(), adaptTable.getSchema().getColumnNames());
254254
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
255-
.filter((Tuple2<Boolean, Row> f0) -> f0.f0)
256-
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
255+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
257256
.returns(typeInfo);
258257

259258
String fields = String.join(",", typeInfo.getFieldNames());

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -770,9 +770,9 @@ private void joinFun(Object pollObj, Map<String, Table> localTableCache,
770770
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getTypes(), targetTable.getSchema().getColumnNames());
771771

772772
DataStream adaptStream = tableEnv.toRetractStream(targetTable, org.apache.flink.types.Row.class)
773-
.filter((Tuple2<Boolean, Row> f0) -> f0.f0)
774-
.map((Tuple2<Boolean, Row> f0) -> f0.f1)
775-
.returns(Row.class);
773+
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
774+
.returns(Row.class);
775+
776776

777777
//join side table before keyby ===> Reducing the size of each dimension table cache of async
778778
if(sideTableInfo.isPartitionedJoin()){

0 commit comments

Comments
 (0)