Skip to content

Commit b4a3a55

Browse files
author
gituser
committed
Merge branch '1.8_v3.9.2_async_bug' into 1.8_release_3.9.x
2 parents 66a42a2 + c10b511 commit b4a3a55

File tree

1 file changed

+10
-7
lines changed

1 file changed

+10
-7
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements
5151
private static final Logger LOG = LoggerFactory.getLogger(AsyncReqRow.class);
5252
private static final long serialVersionUID = 2098635244857937717L;
5353

54+
private static int TIMEOUT_LOG_FLUSH_NUM = 10;
55+
private int timeOutNum = 0;
56+
5457
protected SideInfo sideInfo;
5558
protected transient Counter parseErrorRecords;
5659

@@ -119,14 +122,14 @@ protected void dealCacheData(String key, CacheObj missKeyObj) {
119122

120123
@Override
121124
public void timeout(Row input, ResultFuture<Row> resultFuture) throws Exception {
122-
StreamRecordQueueEntry<Row> future = (StreamRecordQueueEntry<Row>)resultFuture;
123-
try {
124-
if (null == future.get()) {
125-
resultFuture.completeExceptionally(new TimeoutException("Async function call has timed out."));
126-
}
127-
} catch (Exception e) {
128-
resultFuture.completeExceptionally(new Exception(e));
125+
126+
//TODO 需要添加数据指标
127+
if(timeOutNum % TIMEOUT_LOG_FLUSH_NUM == 0){
128+
LOG.info("Async function call has timed out. input:" + input.toString());
129129
}
130+
131+
timeOutNum++;
132+
resultFuture.complete(null);
130133
}
131134

132135

0 commit comments

Comments
 (0)