Skip to content

Reintroduce replication factor parameter #2718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions crates/cli/src/subcommands/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ pub fn cli() -> clap::Command {
.conflicts_with("build_options")
.help("The system path (absolute or relative) to the compiled wasm binary we should publish, instead of building the project."),
)
.arg(
Arg::new("num_replicas")
.long("num-replicas")
.hide(true)
.help("UNSTABLE: The number of replicas the database should have")
)
.arg(
common_args::anonymous()
)
Expand Down Expand Up @@ -79,6 +85,7 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
let wasm_file = args.get_one::<PathBuf>("wasm_file");
let database_host = config.get_host_url(server)?;
let build_options = args.get_one::<String>("build_options").unwrap();
let num_replicas = args.get_one::<u8>("num_replicas");

// If the user didn't specify an identity and we didn't specify an anonymous identity, then
// we want to use the default identity
Expand Down Expand Up @@ -152,6 +159,10 @@ pub async fn exec(mut config: Config, args: &ArgMatches) -> Result<(), anyhow::E
}
builder = builder.query(&[("clear", true)]);
}
if let Some(n) = num_replicas {
eprintln!("WARNING: Use of unstable option `--num-replicas`.\n");
builder = builder.query(&[("num_replicas", *n)]);
}

println!("Publishing module...");

Expand Down
5 changes: 4 additions & 1 deletion crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::num::NonZeroU8;
use std::sync::Arc;

use async_trait::async_trait;
Expand Down Expand Up @@ -149,7 +150,9 @@ pub struct DatabaseDef {
/// The compiled program of the database module.
pub program_bytes: Vec<u8>,
/// The desired number of replicas the database shall have.
pub num_replicas: u32,
///
/// If `None`, the edition default is used.
pub num_replicas: Option<NonZeroU8>,
/// The host type of the supplied program.
pub host_type: HostType,
}
Expand Down
14 changes: 12 additions & 2 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::num::NonZeroU8;
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -471,6 +472,7 @@ pub struct PublishDatabaseParams {
pub struct PublishDatabaseQueryParams {
#[serde(default)]
clear: bool,
num_replicas: Option<usize>,
}

use std::env;
Expand Down Expand Up @@ -498,7 +500,7 @@ fn allow_creation(auth: &SpacetimeAuth) -> Result<(), ErrorResponse> {
pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
State(ctx): State<S>,
Path(PublishDatabaseParams { name_or_identity }): Path<PublishDatabaseParams>,
Query(PublishDatabaseQueryParams { clear }): Query<PublishDatabaseQueryParams>,
Query(PublishDatabaseQueryParams { clear, num_replicas }): Query<PublishDatabaseQueryParams>,
Extension(auth): Extension<SpacetimeAuth>,
body: Bytes,
) -> axum::response::Result<axum::Json<PublishResult>> {
Expand Down Expand Up @@ -572,13 +574,21 @@ pub async fn publish<S: NodeDelegate + ControlStateDelegate>(
}
};

let num_replicas = num_replicas
.map(|n| {
let n = u8::try_from(n).map_err(|_| (StatusCode::BAD_REQUEST, "Replication factor {n} out of bounds"))?;
Ok::<_, ErrorResponse>(NonZeroU8::new(n))
})
.transpose()?
.flatten();

let maybe_updated = ctx
.publish_database(
&auth.identity,
DatabaseDef {
database_identity,
program_bytes: body.into(),
num_replicas: 1,
num_replicas,
host_type: HostType::Wasm,
},
)
Expand Down
8 changes: 5 additions & 3 deletions crates/standalone/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
) -> anyhow::Result<Option<UpdateDatabaseResult>> {
let existing_db = self.control_db.get_database_by_identity(&spec.database_identity)?;

// standalone does not support replication.
let num_replicas = 1;

match existing_db {
// The database does not already exist, so we'll create it.
None => {
Expand Down Expand Up @@ -258,7 +261,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
let database_id = self.control_db.insert_database(database.clone())?;
database.id = database_id;

self.schedule_replicas(database_id, spec.num_replicas).await?;
self.schedule_replicas(database_id, num_replicas).await?;

Ok(None)
}
Expand All @@ -275,7 +278,6 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv {
let database_id = database.id;
let database_identity = database.database_identity;

let num_replicas = spec.num_replicas;
let leader = self
.leader(database_id)
.await?
Expand Down Expand Up @@ -416,7 +418,7 @@ impl StandaloneEnv {
Ok(())
}

async fn schedule_replicas(&self, database_id: u64, num_replicas: u32) -> Result<(), anyhow::Error> {
async fn schedule_replicas(&self, database_id: u64, num_replicas: u8) -> Result<(), anyhow::Error> {
// Just scheduling a bunch of replicas to the only machine
for i in 0..num_replicas {
let replica = Replica {
Expand Down
2 changes: 1 addition & 1 deletion crates/testing/src/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl CompiledModule {
DatabaseDef {
database_identity: db_identity,
program_bytes,
num_replicas: 1,
num_replicas: None,
host_type: HostType::Wasm,
},
)
Expand Down
Loading