Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce gRPC replication client api #1062

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
4 changes: 2 additions & 2 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub async fn create_client(
host,
is_secure
);
let api_client = TonicApiClient::create(host.clone(), is_secure).await?;
let api_client = TonicApiClient::create(host.clone(), is_secure, false).await?;

log::info!(
"Creating message store with path: {:?} and encryption key: {} of length {:?}",
Expand Down Expand Up @@ -159,7 +159,7 @@ pub async fn get_inbox_id_for_address(
account_address: String,
) -> Result<Option<String>, GenericError> {
let api_client = ApiClientWrapper::new(
TonicApiClient::create(host.clone(), is_secure).await?,
TonicApiClient::create(host.clone(), is_secure, false).await?,
Retry::default(),
);

Expand Down
2 changes: 1 addition & 1 deletion bindings_ffi/src/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn create_v2_client(
host: String,
is_secure: bool,
) -> Result<Arc<FfiV2ApiClient>, GenericError> {
let client = GrpcClient::create(host, is_secure).await?;
let client = GrpcClient::create(host, is_secure, false).await?;

let client = FfiV2ApiClient {
inner_client: Arc::new(client),
Expand Down
4 changes: 2 additions & 2 deletions bindings_node/src/mls_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub async fn create_client(
encryption_key: Option<Uint8Array>,
history_sync_url: Option<String>,
) -> Result<NapiClient> {
let api_client = TonicApiClient::create(host.clone(), is_secure)
let api_client = TonicApiClient::create(host.clone(), is_secure, false)
.await
.map_err(|_| Error::from_reason("Error creating Tonic API client"))?;

Expand Down Expand Up @@ -126,7 +126,7 @@ pub async fn get_inbox_id_for_address(
) -> Result<Option<String>> {
let account_address = account_address.to_lowercase();
let api_client = ApiClientWrapper::new(
TonicApiClient::create(host.clone(), is_secure)
TonicApiClient::create(host.clone(), is_secure, false)
.await
.map_err(|e| Error::from_reason(format!("{}", e)))?,
Retry::default(),
Expand Down
49 changes: 35 additions & 14 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ struct Cli {
local: bool,
#[clap(long, default_value_t = false)]
json: bool,
#[clap(long, default_value_t = false)]
testnet: bool,
}

#[derive(ValueEnum, Debug, Copy, Clone)]
Expand Down Expand Up @@ -412,24 +414,43 @@ async fn create_client(cli: &Cli, account: IdentityStrategy) -> Result<Client, C
let msg_store = get_encrypted_store(&cli.db).unwrap();
let mut builder = ClientBuilder::new(account).store(msg_store);

if cli.local {
info!("Using local network");
builder = builder
.api_client(
ApiClient::create("http://localhost:5556".into(), false)
if cli.testnet {
if cli.local {
info!("Using local testnet network");
builder = builder.api_client(
ApiClient::create("http://localhost:5050".into(), false, true)
.await
.unwrap(),
)
.history_sync_url(MessageHistoryUrls::LOCAL_ADDRESS);
} else {
info!("Using dev network");
builder = builder
.api_client(
ApiClient::create("https://grpc.dev.xmtp.network:443".into(), true)
);
} else {
info!("Using testnet network");
builder = builder.api_client(
ApiClient::create("https://grpc.testnet.xmtp.network:443".into(), true, true)
.await
.unwrap(),
)
.history_sync_url(MessageHistoryUrls::DEV_ADDRESS);
);
}
} else {
#[deny(clippy::collapsible_else_if)]
if cli.local {
info!("Using local network");
builder = builder
.api_client(
ApiClient::create("http://localhost:5556".into(), false, false)
.await
.unwrap(),
)
.history_sync_url(MessageHistoryUrls::LOCAL_ADDRESS);
} else {
info!("Using dev network");
builder = builder
.api_client(
ApiClient::create("https://grpc.dev.xmtp.network:443".into(), true, false)
.await
.unwrap(),
)
.history_sync_url(MessageHistoryUrls::DEV_ADDRESS);
}
}

let client = builder.build().await.map_err(CliError::ClientBuilder)?;
Expand Down
14 changes: 14 additions & 0 deletions xmtp_api_grpc/src/conversions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use prost::Message;
use xmtp_proto::xmtp::xmtpv4::{ClientEnvelope, PayerEnvelope, PublishEnvelopeRequest};

pub fn wrap_client_envelope(req: ClientEnvelope) -> PublishEnvelopeRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: create_payer_envelope?

let mut buf = vec![];
req.encode(&mut buf).unwrap();

PublishEnvelopeRequest {
payer_envelope: Some(PayerEnvelope {
unsigned_client_envelope: buf,
payer_signature: None,
}),
}
}
154 changes: 133 additions & 21 deletions xmtp_api_grpc/src/grpc_api_helper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; // TODO switch to async mutexes
use std::sync::{Arc, Mutex};
// TODO switch to async mutexes
use std::time::Duration;

use futures::stream::{AbortHandle, Abortable};
Expand All @@ -9,8 +10,14 @@ use tokio::sync::oneshot;
use tonic::transport::ClientTlsConfig;
use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming};

use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams};
use crate::conversions::wrap_client_envelope;
use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams, XmtpReplicationClient};
use xmtp_proto::xmtp::mls::api::v1::{GroupMessage, WelcomeMessage};
use xmtp_proto::xmtp::xmtpv4::replication_api_client::ReplicationApiClient;
use xmtp_proto::xmtp::xmtpv4::{
BatchSubscribeEnvelopesRequest, BatchSubscribeEnvelopesResponse, ClientEnvelope,
PublishEnvelopeRequest, PublishEnvelopeResponse, QueryEnvelopesRequest, QueryEnvelopesResponse,
};
use xmtp_proto::{
api_client::{
Error, ErrorKind, MutableApiSubscription, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient,
Expand Down Expand Up @@ -72,10 +79,16 @@ pub struct Client {
pub(crate) identity_client: ProtoIdentityApiClient<Channel>,
pub(crate) app_version: MetadataValue<tonic::metadata::Ascii>,
pub(crate) libxmtp_version: MetadataValue<tonic::metadata::Ascii>,
pub(crate) replication_client: ReplicationApiClient<Channel>,
pub(crate) use_replication_v4: bool,
}

impl Client {
pub async fn create(host: String, is_secure: bool) -> Result<Self, Error> {
pub async fn create(
host: String,
is_secure: bool,
use_replication_v4: bool,
) -> Result<Self, Error> {
let host = host.to_string();
let app_version = MetadataValue::try_from(&String::from("0.0.0"))
.map_err(|e| Error::new(ErrorKind::MetadataError).with(e))?;
Expand All @@ -93,14 +106,17 @@ impl Client {

let client = MessageApiClient::new(channel.clone());
let mls_client = ProtoMlsApiClient::new(channel.clone());
let identity_client = ProtoIdentityApiClient::new(channel);
let identity_client = ProtoIdentityApiClient::new(channel.clone());
let replication_client = ReplicationApiClient::new(channel);

Ok(Self {
client,
mls_client,
app_version,
libxmtp_version,
identity_client,
replication_client,
use_replication_v4,
})
}

Expand Down Expand Up @@ -335,12 +351,22 @@ impl MutableApiSubscription for GrpcMutableSubscription {
impl XmtpMlsClient for Client {
#[tracing::instrument(level = "trace", skip_all)]
async fn upload_key_package(&self, req: UploadKeyPackageRequest) -> Result<(), Error> {
let client = &mut self.mls_client.clone();
if self.use_replication_v4 {
Copy link
Contributor

@insipx insipx Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is OK, we could also do something like this:

1.) define a feature in api_grpc, replication
2.) Create a new impl XmtpMlsClient definition exactly like the non-v4 one but for v4 (no flag on the struct)
3.) use #[cfg(feature = "replication")] and #[cfg(not(feature = "replication"))] on the impl XmtpMlsClient for Client

This would feature-gate replication, and result in easy access to the client through a flag during testing/compiling, i.e cargo test --feature replication. Mixing replication and v3 backend also isn't terribly useful in itself, so I would argue for using compile-time features here (i'm also just a fan of compile time features)

It also makes the eventual migration just a little bit easier, too

feature flag docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering that option as well. I decided against it, because I want the ability to point the client (or any SDK implementation) against any backend using run-time configuration. Otherwise you have to compile different binaries or recompile every time you switch the backend. This makes shipping the binaries a bit annoying.

Maybe a good idea would be to hide the new implementation behind a feature gate. Basically hide the new experimental code behind a feature thats off by default. And allow developers to compile it in, if they so desire.

We might have to mix v3 and v4 in some cases unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, I'm wondering if it would be cleaner to define the replication client as a separate DecentralizedClient or V4Client, rather than mixing it into the existing Client implementation? If we want to borrow any of the old logic for now, we could just copy paste it.

The benefit of the XmtpMlsClient and XmtpIdentityClient traits are that you can have multiple implementations and easily swap between them. Then the caller can just instantiate the correct client as desired, rather than passing a boolean flag into one big combined client.

As an example of a benefit - it's conceivable that the decentralized client will need additional member variables and helper methods going forward that are not needed in the old client, for example storing the list of nodes and selecting the node to communicate with.

let client = &mut self.replication_client.clone();
let payload = wrap_client_envelope(ClientEnvelope::from(req));
let res = client.publish_envelope(payload).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
} else {
let client = &mut self.mls_client.clone();

let res = client.upload_key_package(self.build_request(req)).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
let res = client.upload_key_package(self.build_request(req)).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
}
}

Expand All @@ -358,23 +384,41 @@ impl XmtpMlsClient for Client {

#[tracing::instrument(level = "trace", skip_all)]
async fn send_group_messages(&self, req: SendGroupMessagesRequest) -> Result<(), Error> {
let client = &mut self.mls_client.clone();
let res = client.send_group_messages(self.build_request(req)).await;

match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
if self.use_replication_v4 {
let client = &mut self.replication_client.clone();
let payload = wrap_client_envelope(ClientEnvelope::from(req));
let res = client.publish_envelope(payload).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
} else {
let client = &mut self.mls_client.clone();
let res = client.send_group_messages(self.build_request(req)).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
}
}

#[tracing::instrument(level = "trace", skip_all)]
async fn send_welcome_messages(&self, req: SendWelcomeMessagesRequest) -> Result<(), Error> {
let client = &mut self.mls_client.clone();
let res = client.send_welcome_messages(self.build_request(req)).await;

match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
if self.use_replication_v4 {
let client = &mut self.replication_client.clone();
let payload = wrap_client_envelope(ClientEnvelope::from(req));
let res = client.publish_envelope(payload).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
} else {
let client = &mut self.mls_client.clone();
let res = client.send_welcome_messages(self.build_request(req)).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
}
}

Expand Down Expand Up @@ -482,3 +526,71 @@ impl XmtpMlsStreams for Client {
Ok(stream.into())
}
}

pub struct BatchSubscribeStream {
inner: tonic::codec::Streaming<BatchSubscribeEnvelopesResponse>,
}

impl From<tonic::codec::Streaming<BatchSubscribeEnvelopesResponse>> for BatchSubscribeStream {
fn from(inner: tonic::codec::Streaming<BatchSubscribeEnvelopesResponse>) -> Self {
BatchSubscribeStream { inner }
}
}

impl Stream for BatchSubscribeStream {
type Item = Result<BatchSubscribeEnvelopesResponse, Error>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner
.poll_next_unpin(cx)
.map(|data| data.map(|v| v.map_err(|e| Error::new(ErrorKind::SubscribeError).with(e))))
}
}

impl XmtpReplicationClient for Client {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this impl? I'm guessing libxmtp doesn't need to call any of these methods directly?

type BatchSubscribeStream<'a> = BatchSubscribeStream;

async fn publish_envelope(
&self,
request: PublishEnvelopeRequest,
) -> Result<PublishEnvelopeResponse, Error> {
let client = &mut self.replication_client.clone();

client
.publish_envelope(request)
.await
.map(|r| r.into_inner())
.map_err(|e| Error::new(ErrorKind::PublishError).with(e))
}

async fn query_envelopes(
&self,
request: QueryEnvelopesRequest,
) -> Result<QueryEnvelopesResponse, Error> {
let client = &mut self.replication_client.clone();

client
.query_envelopes(request)
.await
.map(|r| r.into_inner())
.map_err(|e| Error::new(ErrorKind::QueryError).with(e))
}

async fn batch_subscribe_envelopes(
&self,
request: BatchSubscribeEnvelopesRequest,
) -> Result<Self::BatchSubscribeStream<'_>, Error> {
let client = &mut self.replication_client.clone();
let res = client
.batch_subscribe_envelopes(request)
.await
.map_err(|e| Error::new(ErrorKind::SubscribeError).with(e))?;

let stream = res.into_inner();

Ok(stream.into())
}
}
27 changes: 19 additions & 8 deletions xmtp_api_grpc/src/identity.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use crate::conversions::wrap_client_envelope;
use crate::Client;
use xmtp_proto::xmtp::xmtpv4::ClientEnvelope;
use xmtp_proto::{
api_client::{Error, ErrorKind, XmtpIdentityClient},
xmtp::identity::api::v1::{
Expand All @@ -7,22 +10,30 @@ use xmtp_proto::{
},
};

use crate::Client;

impl XmtpIdentityClient for Client {
#[tracing::instrument(level = "trace", skip_all)]
async fn publish_identity_update(
&self,
request: PublishIdentityUpdateRequest,
) -> Result<PublishIdentityUpdateResponse, Error> {
let client = &mut self.identity_client.clone();
if self.use_replication_v4 {
let client = &mut self.replication_client.clone();
let payload = wrap_client_envelope(ClientEnvelope::from(request));
let res = client.publish_envelope(payload).await;
match res {
Ok(_) => Ok(PublishIdentityUpdateResponse {}),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
} else {
let client = &mut self.identity_client.clone();

let res = client
.publish_identity_update(self.build_request(request))
.await;
let res = client
.publish_identity_update(self.build_request(request))
.await;

res.map(|response| response.into_inner())
.map_err(|err| Error::new(ErrorKind::IdentityError).with(err))
res.map(|response| response.into_inner())
.map_err(|err| Error::new(ErrorKind::IdentityError).with(err))
}
}

#[tracing::instrument(level = "trace", skip_all)]
Expand Down
Loading