Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reset state when begin scan #239

Merged
merged 6 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_wrappers.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ jobs:
run: |
cd wrappers && RUSTFLAGS="-D warnings" cargo clippy --all --tests --no-deps --features all_fdws,helloworld_fdw

- run: cd wrappers && cargo pgrx test --features all_fdws,pg15
- run: cd wrappers && cargo pgrx test --features "all_fdws pg15"
26 changes: 25 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions wrappers/.ci/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ services:
clickhouse:
image: clickhouse/clickhouse-server
container_name: clickhouse-wrapped
environment:
CLICKHOUSE_DB: supa
ports:
- "9000:9000" # native interface
- "8123:8123" # http interface
Expand Down
12 changes: 5 additions & 7 deletions wrappers/src/fdw/clickhouse_fdw/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@ mod tests {
#[pg_test]
fn clickhouse_smoketest() {
Spi::connect(|mut c| {
let clickhouse_pool = ch::Pool::new("tcp://default:@localhost:9000/supa");
let clickhouse_pool = ch::Pool::new("tcp://default:@localhost:9000/default");

let rt = create_async_runtime().expect("failed to create runtime");
let mut handle = rt
.block_on(async { clickhouse_pool.get_handle().await })
.expect("handle");

rt.block_on(async {
handle.execute("DROP TABLE IF EXISTS test_table").await?;
handle
.execute("DROP TABLE IF EXISTS supa.test_table")
.await?;
handle
.execute("CREATE TABLE supa.test_table (id INT, name TEXT) engine = Memory")
.execute("CREATE TABLE test_table (id INT, name TEXT) engine = Memory")
.await
})
.expect("test_table in ClickHouse");
Expand All @@ -37,7 +35,7 @@ mod tests {
r#"CREATE SERVER my_clickhouse_server
FOREIGN DATA WRAPPER clickhouse_wrapper
OPTIONS (
conn_string 'tcp://default:@localhost:9000/supa'
conn_string 'tcp://default:@localhost:9000/default'
)"#,
None,
None,
Expand Down Expand Up @@ -235,7 +233,7 @@ mod tests {
let remote_value: String = rt
.block_on(async {
handle
.query("SELECT name FROM supa.test_table ORDER BY name LIMIT 1")
.query("SELECT name FROM test_table ORDER BY name LIMIT 1")
.fetch_all()
.await?
.rows()
Expand Down
2 changes: 2 additions & 0 deletions wrappers/src/fdw/mssql_fdw/mssql_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ impl ForeignDataWrapper<MssqlFdwError> for MssqlFdw {
self.table = require_option("table", options)?.to_string();
self.tgt_cols = columns.to_vec();

self.iter_idx = 0;

// create sql server client
let tcp = self
.rt
Expand Down
14 changes: 10 additions & 4 deletions wrappers/src/fdw/redis_fdw/redis_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ impl RedisFdw {
const FDW_NAME: &'static str = "RedisFdw";
const BUF_SIZE: isize = 256;

fn reset(&mut self) {
self.iter_idx = 0;
self.scan_result.clear();
self.iter_idx_stream = "-".to_string();
self.scan_result_stream.clear();
}

// fetch a target row for list and zset
fn fetch_row_list(&mut self) -> RedisFdwResult<Option<Row>> {
if let Some(ref mut conn) = &mut self.conn {
Expand Down Expand Up @@ -271,6 +278,8 @@ impl ForeignDataWrapper<RedisFdwError> for RedisFdw {

let mut conn = self.client.get_connection()?;

self.reset();

match src_type.as_str() {
"list" | "zset" => {
check_target_columns(
Expand Down Expand Up @@ -366,10 +375,7 @@ impl ForeignDataWrapper<RedisFdwError> for RedisFdw {
}

fn re_scan(&mut self) -> RedisFdwResult<()> {
self.iter_idx = 0;
self.scan_result.clear();
self.iter_idx_stream = "-".to_string();
self.scan_result_stream.clear();
self.reset();
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions wrappers/src/fdw/stripe_fdw/stripe_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,8 @@ impl ForeignDataWrapper<StripeFdwError> for StripeFdw {
) -> StripeFdwResult<()> {
let obj = require_option("object", options)?;

self.iter_idx = 0;

if let Some(client) = &self.client {
let page_size = 100; // maximum page size limit for Stripe API
let page_cnt = if let Some(limit) = limit {
Expand Down
Loading