Skip to content

Commit

Permalink
Allow pause/resuming all pools (#566)
Browse files Browse the repository at this point in the history
support pausing all pools
  • Loading branch information
tommyzli authored Aug 29, 2023
1 parent baa00ff commit 9937193
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 68 deletions.
168 changes: 100 additions & 68 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ where
}
"PAUSE" => {
trace!("PAUSE");
pause(stream, query_parts[1]).await
pause(stream, query_parts).await
}
"RESUME" => {
trace!("RESUME");
resume(stream, query_parts[1]).await
resume(stream, query_parts).await
}
"SHUTDOWN" => {
trace!("SHUTDOWN");
Expand Down Expand Up @@ -797,96 +797,128 @@ where
}

/// Pause a pool. It won't pass any more queries to the backends.
async fn pause<T>(stream: &mut T, query: &str) -> Result<(), Error>
async fn pause<T>(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();

if parts.len() != 2 {
error_response(
stream,
"PAUSE requires a database and a user, e.g. PAUSE my_db,my_user",
)
.await
} else {
let database = parts[0];
let user = parts[1];

match get_pool(database, user) {
Some(pool) => {
pool.pause();
let parts: Vec<&str> = match tokens.len() == 2 {
true => tokens[1].split(",").map(|part| part.trim()).collect(),
false => Vec::new(),
};

let mut res = BytesMut::new();
match parts.len() {
0 => {
for (_, pool) in get_all_pools() {
pool.pause();
}

res.put(command_complete(&format!("PAUSE {},{}", database, user)));
let mut res = BytesMut::new();

// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
res.put(command_complete("PAUSE"));

write_all_half(stream, &res).await
}
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
write_all_half(stream, &res).await
}
2 => {
let database = parts[0];
let user = parts[1];

match get_pool(database, user) {
Some(pool) => {
pool.pause();

let mut res = BytesMut::new();

res.put(command_complete(&format!("PAUSE {},{}", database, user)));

// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

write_all_half(stream, &res).await
}

None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
}
}
_ => error_response(stream, "usage: PAUSE [db, user]").await,
}
}

/// Resume a pool. Queries are allowed again.
async fn resume<T>(stream: &mut T, query: &str) -> Result<(), Error>
async fn resume<T>(stream: &mut T, tokens: Vec<&str>) -> Result<(), Error>
where
T: tokio::io::AsyncWrite + std::marker::Unpin,
{
let parts: Vec<&str> = query.split(",").map(|part| part.trim()).collect();

if parts.len() != 2 {
error_response(
stream,
"RESUME requires a database and a user, e.g. RESUME my_db,my_user",
)
.await
} else {
let database = parts[0];
let user = parts[1];

match get_pool(database, user) {
Some(pool) => {
pool.resume();
let parts: Vec<&str> = match tokens.len() == 2 {
true => tokens[1].split(",").map(|part| part.trim()).collect(),
false => Vec::new(),
};

let mut res = BytesMut::new();
match parts.len() {
0 => {
for (_, pool) in get_all_pools() {
pool.resume();
}

res.put(command_complete(&format!("RESUME {},{}", database, user)));
let mut res = BytesMut::new();

// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');
res.put(command_complete("RESUME"));

write_all_half(stream, &res).await
}
// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
write_all_half(stream, &res).await
}
2 => {
let database = parts[0];
let user = parts[1];

match get_pool(database, user) {
Some(pool) => {
pool.resume();

let mut res = BytesMut::new();

res.put(command_complete(&format!("RESUME {},{}", database, user)));

// ReadyForQuery
res.put_u8(b'Z');
res.put_i32(5);
res.put_u8(b'I');

write_all_half(stream, &res).await
}

None => {
error_response(
stream,
&format!(
"No pool configured for database: {}, user: {}",
database, user
),
)
.await
}
}
}
_ => error_response(stream, "usage: RESUME [db, user]").await,
}
}

Expand Down
24 changes: 24 additions & 0 deletions tests/ruby/admin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,28 @@
expect(results["pool_mode"]).to eq("transaction")
end
end

describe "PAUSE" do
it "pauses all pools" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"])

admin_conn.async_exec("PAUSE")

results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["1"])

admin_conn.async_exec("RESUME")

results = admin_conn.async_exec("SHOW DATABASES").to_a
expect(results.map{ |r| r["paused"] }.uniq).to eq(["0"])
end

it "handles errors" do
admin_conn = PG::connect(processes.pgcat.admin_connection_string)
expect { admin_conn.async_exec("PAUSE foo").to_a }.to raise_error(PG::SystemError)
expect { admin_conn.async_exec("PAUSE foo,bar").to_a }.to raise_error(PG::SystemError)
end
end
end

0 comments on commit 9937193

Please sign in to comment.