Skip to content

Commit

Permalink
refactor: 优化jdbc流式查询,支持可取消.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Aug 13, 2024
1 parent e74ddbb commit 890bed6
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

import java.sql.Connection;
Expand Down Expand Up @@ -52,23 +54,27 @@ public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wra
Logger logger = ctx.getOrDefault(Logger.class, log);
return Flux
.create(sink -> {
Disposable.Composite disposable = Disposables.composite();
@SuppressWarnings("all")
Disposable disposable = getConnection()
Disposable queryDisposable = getConnection()
.flatMap(connection -> toFlux(request)
.doOnNext(sql -> this
.doSelect(
logger,
connection,
sql,
consumer(wrapper, sink::next),
t -> sink.isCancelled()))
disposable))
.then())
.subscribeOn(Schedulers.boundedElastic())
.subscribe((ignore) -> sink.complete(),
sink::error,
sink::complete,
Context.of(sink.contextView()));

sink.onCancel(disposable)
disposable.add(queryDisposable);

sink
.onDispose(disposable);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.slf4j.Logger;
import reactor.core.Disposable;
import reactor.core.Disposables;

import java.sql.*;
import java.util.List;
import java.util.function.Predicate;

import static org.hswebframework.ezorm.rdb.executor.jdbc.JdbcSqlExecutorHelper.*;
import static org.hswebframework.ezorm.rdb.utils.SqlUtils.printSql;
Expand Down Expand Up @@ -128,23 +129,48 @@ protected Object getResultValue(ResultSetMetaData metaData, ResultSet set, int c
}
}

@SneakyThrows
protected PreparedStatement createStatement(Connection connection, String sql) {
return connection.prepareStatement(sql);
}

@SneakyThrows
protected ResultSet beforeRead(ResultSet resultSet) {
return resultSet;
}

@SneakyThrows
protected <T, R> R doSelect(Logger logger,
Connection connection,
SqlRequest request,
ResultWrapper<T, R> wrapper,
Predicate<T> stopped) {
PreparedStatement statement = connection.prepareStatement(request.getSql());
Disposable.Composite stopped) {
PreparedStatement statement = createStatement(connection, request.getSql());
try {
printSql(logger, request);

Disposable disposable = () -> {
try {
if(logger.isDebugEnabled()) {
logger.debug("==> Cancel: {}", request.toNativeSql());
}
releaseStatement(statement);
} catch (Throwable ignore) {
}
};
stopped.add(disposable);
if (stopped.isDisposed()) {
return wrapper.getResult();
}
preparedStatementParameter(statement, request.getParameters());
ResultSet resultSet = statement.executeQuery();
ResultSet resultSet = beforeRead(statement.executeQuery());
ResultSetMetaData metaData = resultSet.getMetaData();
List<String> columns = getResultColumns(metaData);

wrapper.beforeWrap(() -> columns);

int index = 0;
stopped.remove(disposable);
while (resultSet.next()) {
//调用包装器,将查询结果包装为对象
T data = wrapper.newRowInstance();
Expand All @@ -156,7 +182,7 @@ protected <T, R> R doSelect(Logger logger,
data = context.getRowInstance();
}
index++;
if (!wrapper.completedWrapRow(data) || stopped.test(data)) {
if (!wrapper.completedWrapRow(data) || stopped.isDisposed()) {
break;
}
}
Expand All @@ -172,6 +198,6 @@ protected <T, R> R doSelect(Logger logger,

@SneakyThrows
public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrapper<T, R> wrapper) {
return doSelect(logger, connection, request, wrapper, (t) -> false);
return doSelect(logger, connection, request, wrapper, Disposables.composite());
}
}

0 comments on commit 890bed6

Please sign in to comment.