-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce gRPC replication client api #1062
base: main
Are you sure you want to change the base?
Changes from 7 commits
7615a23
cc70ecd
9a92b24
9d486ca
45c63dd
7d5437d
47cfdef
1ae37ec
9cc822e
beff857
aa20dbf
4124db0
0319045
d77f22f
ba8b08a
200d46a
95ce411
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
use prost::Message; | ||
use xmtp_proto::xmtp::xmtpv4::{ClientEnvelope, PayerEnvelope, PublishEnvelopeRequest}; | ||
|
||
pub fn wrap_client_envelope(req: ClientEnvelope) -> PublishEnvelopeRequest { | ||
let mut buf = vec![]; | ||
req.encode(&mut buf).unwrap(); | ||
|
||
PublishEnvelopeRequest { | ||
payer_envelope: Some(PayerEnvelope { | ||
unsigned_client_envelope: buf, | ||
payer_signature: None, | ||
}), | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
use std::pin::Pin; | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
use std::sync::{Arc, Mutex}; // TODO switch to async mutexes | ||
use std::sync::{Arc, Mutex}; | ||
// TODO switch to async mutexes | ||
use std::time::Duration; | ||
|
||
use futures::stream::{AbortHandle, Abortable}; | ||
|
@@ -9,8 +10,14 @@ use tokio::sync::oneshot; | |
use tonic::transport::ClientTlsConfig; | ||
use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming}; | ||
|
||
use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams}; | ||
use crate::conversions::wrap_client_envelope; | ||
use xmtp_proto::api_client::{ClientWithMetadata, XmtpMlsStreams, XmtpReplicationClient}; | ||
use xmtp_proto::xmtp::mls::api::v1::{GroupMessage, WelcomeMessage}; | ||
use xmtp_proto::xmtp::xmtpv4::replication_api_client::ReplicationApiClient; | ||
use xmtp_proto::xmtp::xmtpv4::{ | ||
BatchSubscribeEnvelopesRequest, BatchSubscribeEnvelopesResponse, ClientEnvelope, | ||
PublishEnvelopeRequest, PublishEnvelopeResponse, QueryEnvelopesRequest, QueryEnvelopesResponse, | ||
}; | ||
use xmtp_proto::{ | ||
api_client::{ | ||
Error, ErrorKind, MutableApiSubscription, XmtpApiClient, XmtpApiSubscription, XmtpMlsClient, | ||
|
@@ -72,10 +79,16 @@ pub struct Client { | |
pub(crate) identity_client: ProtoIdentityApiClient<Channel>, | ||
pub(crate) app_version: MetadataValue<tonic::metadata::Ascii>, | ||
pub(crate) libxmtp_version: MetadataValue<tonic::metadata::Ascii>, | ||
pub(crate) replication_client: ReplicationApiClient<Channel>, | ||
pub(crate) use_replication_v4: bool, | ||
} | ||
|
||
impl Client { | ||
pub async fn create(host: String, is_secure: bool) -> Result<Self, Error> { | ||
pub async fn create( | ||
host: String, | ||
is_secure: bool, | ||
use_replication_v4: bool, | ||
) -> Result<Self, Error> { | ||
let host = host.to_string(); | ||
let app_version = MetadataValue::try_from(&String::from("0.0.0")) | ||
.map_err(|e| Error::new(ErrorKind::MetadataError).with(e))?; | ||
|
@@ -93,14 +106,17 @@ impl Client { | |
|
||
let client = MessageApiClient::new(channel.clone()); | ||
let mls_client = ProtoMlsApiClient::new(channel.clone()); | ||
let identity_client = ProtoIdentityApiClient::new(channel); | ||
let identity_client = ProtoIdentityApiClient::new(channel.clone()); | ||
let replication_client = ReplicationApiClient::new(channel); | ||
|
||
Ok(Self { | ||
client, | ||
mls_client, | ||
app_version, | ||
libxmtp_version, | ||
identity_client, | ||
replication_client, | ||
use_replication_v4, | ||
}) | ||
} | ||
|
||
|
@@ -335,12 +351,22 @@ impl MutableApiSubscription for GrpcMutableSubscription { | |
impl XmtpMlsClient for Client { | ||
#[tracing::instrument(level = "trace", skip_all)] | ||
async fn upload_key_package(&self, req: UploadKeyPackageRequest) -> Result<(), Error> { | ||
let client = &mut self.mls_client.clone(); | ||
if self.use_replication_v4 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is OK, we could also do something like this: 1.) define a feature in api_grpc, This would feature-gate replication, and result in easy access to the client through a flag during testing/compiling, i.e It also makes the eventual migration just a little bit easier, too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was considering that option as well. I decided against it, because I want the ability to point the client (or any SDK implementation) against any backend using run-time configuration. Otherwise you have to compile different binaries or recompile every time you switch the backend. This makes shipping the binaries a bit annoying. Maybe a good idea would be to hide the new implementation behind a feature gate. Basically hide the new experimental code behind a feature thats off by default. And allow developers to compile it in, if they so desire. We might have to mix v3 and v4 in some cases unfortunately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, I'm wondering if it would be cleaner to define the replication client as a separate The benefit of the As an example of a benefit - it's conceivable that the decentralized client will need additional member variables and helper methods going forward that are not needed in the old client, for example storing the list of nodes and selecting the node to communicate with. |
||
let client = &mut self.replication_client.clone(); | ||
let payload = wrap_client_envelope(ClientEnvelope::from(req)); | ||
let res = client.publish_envelope(payload).await; | ||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
} | ||
} else { | ||
let client = &mut self.mls_client.clone(); | ||
|
||
let res = client.upload_key_package(self.build_request(req)).await; | ||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
let res = client.upload_key_package(self.build_request(req)).await; | ||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
} | ||
} | ||
} | ||
|
||
|
@@ -358,23 +384,41 @@ impl XmtpMlsClient for Client { | |
|
||
#[tracing::instrument(level = "trace", skip_all)] | ||
async fn send_group_messages(&self, req: SendGroupMessagesRequest) -> Result<(), Error> { | ||
let client = &mut self.mls_client.clone(); | ||
let res = client.send_group_messages(self.build_request(req)).await; | ||
|
||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
if self.use_replication_v4 { | ||
let client = &mut self.replication_client.clone(); | ||
let payload = wrap_client_envelope(ClientEnvelope::from(req)); | ||
let res = client.publish_envelope(payload).await; | ||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
} | ||
} else { | ||
let client = &mut self.mls_client.clone(); | ||
let res = client.send_group_messages(self.build_request(req)).await; | ||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
} | ||
} | ||
} | ||
|
||
#[tracing::instrument(level = "trace", skip_all)] | ||
async fn send_welcome_messages(&self, req: SendWelcomeMessagesRequest) -> Result<(), Error> { | ||
let client = &mut self.mls_client.clone(); | ||
let res = client.send_welcome_messages(self.build_request(req)).await; | ||
|
||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
if self.use_replication_v4 { | ||
let client = &mut self.replication_client.clone(); | ||
let payload = wrap_client_envelope(ClientEnvelope::from(req)); | ||
let res = client.publish_envelope(payload).await; | ||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
} | ||
} else { | ||
let client = &mut self.mls_client.clone(); | ||
let res = client.send_welcome_messages(self.build_request(req)).await; | ||
match res { | ||
Ok(_) => Ok(()), | ||
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)), | ||
} | ||
} | ||
} | ||
|
||
|
@@ -482,3 +526,71 @@ impl XmtpMlsStreams for Client { | |
Ok(stream.into()) | ||
} | ||
} | ||
|
||
pub struct BatchSubscribeStream { | ||
inner: tonic::codec::Streaming<BatchSubscribeEnvelopesResponse>, | ||
} | ||
|
||
impl From<tonic::codec::Streaming<BatchSubscribeEnvelopesResponse>> for BatchSubscribeStream { | ||
fn from(inner: tonic::codec::Streaming<BatchSubscribeEnvelopesResponse>) -> Self { | ||
BatchSubscribeStream { inner } | ||
} | ||
} | ||
|
||
impl Stream for BatchSubscribeStream { | ||
type Item = Result<BatchSubscribeEnvelopesResponse, Error>; | ||
|
||
fn poll_next( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut std::task::Context<'_>, | ||
) -> std::task::Poll<Option<Self::Item>> { | ||
self.inner | ||
.poll_next_unpin(cx) | ||
.map(|data| data.map(|v| v.map_err(|e| Error::new(ErrorKind::SubscribeError).with(e)))) | ||
} | ||
} | ||
|
||
impl XmtpReplicationClient for Client { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this impl? I'm guessing libxmtp doesn't need to call any of these methods directly? |
||
type BatchSubscribeStream<'a> = BatchSubscribeStream; | ||
|
||
async fn publish_envelope( | ||
&self, | ||
request: PublishEnvelopeRequest, | ||
) -> Result<PublishEnvelopeResponse, Error> { | ||
let client = &mut self.replication_client.clone(); | ||
|
||
client | ||
.publish_envelope(request) | ||
.await | ||
.map(|r| r.into_inner()) | ||
.map_err(|e| Error::new(ErrorKind::PublishError).with(e)) | ||
} | ||
|
||
async fn query_envelopes( | ||
&self, | ||
request: QueryEnvelopesRequest, | ||
) -> Result<QueryEnvelopesResponse, Error> { | ||
let client = &mut self.replication_client.clone(); | ||
|
||
client | ||
.query_envelopes(request) | ||
.await | ||
.map(|r| r.into_inner()) | ||
.map_err(|e| Error::new(ErrorKind::QueryError).with(e)) | ||
} | ||
|
||
async fn batch_subscribe_envelopes( | ||
&self, | ||
request: BatchSubscribeEnvelopesRequest, | ||
) -> Result<Self::BatchSubscribeStream<'_>, Error> { | ||
let client = &mut self.replication_client.clone(); | ||
let res = client | ||
.batch_subscribe_envelopes(request) | ||
.await | ||
.map_err(|e| Error::new(ErrorKind::SubscribeError).with(e))?; | ||
|
||
let stream = res.into_inner(); | ||
|
||
Ok(stream.into()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: create_payer_envelope?