Skip to content

Commit

Permalink
Merge pull request private-attribution#1006 from akoshelev/gateway-sh…
Browse files Browse the repository at this point in the history
…ard-comms

Implement shard communication channels for MPC circuits
  • Loading branch information
akoshelev authored Apr 16, 2024
2 parents 28411cd + 7e09115 commit 428d1e9
Show file tree
Hide file tree
Showing 25 changed files with 793 additions and 267 deletions.
31 changes: 21 additions & 10 deletions ipa-core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
query::{PrepareQuery, QueryConfig, QueryInput},
routing::{Addr, RouteId},
ApiError, BodyStream, HandlerBox, HandlerRef, HelperIdentity, HelperResponse,
RequestHandler, Transport, TransportImpl,
MpcTransportImpl, RequestHandler, ShardTransportImpl, Transport,
},
hpke::{KeyPair, KeyRegistry},
protocol::QueryId,
Expand All @@ -32,7 +32,8 @@ struct Inner {
/// on top of atomics and all fun stuff associated with it. I don't see an easy way to avoid that
/// if we want to keep the implementation leak-free, but one may be aware if this shows up on
/// the flamegraph
transport: TransportImpl,
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
}

impl Setup {
Expand All @@ -55,10 +56,15 @@ impl Setup {
}

/// Instantiate [`HelperApp`] by connecting it to the provided transport implementation
pub fn connect(self, transport: TransportImpl) -> HelperApp {
pub fn connect(
self,
mpc_transport: MpcTransportImpl,
shard_transport: ShardTransportImpl,
) -> HelperApp {
let app = Arc::new(Inner {
query_processor: self.query_processor,
transport,
mpc_transport,
shard_transport,
});
self.handler.set_handler(
Arc::downgrade(&app) as Weak<dyn RequestHandler<Identity = HelperIdentity>>
Expand All @@ -80,7 +86,10 @@ impl HelperApp {
Ok(self
.inner
.query_processor
.new_query(Transport::clone_ref(&self.inner.transport), query_config)
.new_query(
Transport::clone_ref(&self.inner.mpc_transport),
query_config,
)
.await?
.query_id)
}
Expand All @@ -90,10 +99,11 @@ impl HelperApp {
/// ## Errors
/// Propagates errors from the helper.
pub fn execute_query(&self, input: QueryInput) -> Result<(), ApiError> {
let transport = <TransportImpl as Clone>::clone(&self.inner.transport);
let mpc_transport = Transport::clone_ref(&self.inner.mpc_transport);
let shard_transport = Transport::clone_ref(&self.inner.shard_transport);
self.inner
.query_processor
.receive_inputs(transport, input)?;
.receive_inputs(mpc_transport, shard_transport, input)?;
Ok(())
}

Expand Down Expand Up @@ -145,18 +155,19 @@ impl RequestHandler for Inner {
RouteId::ReceiveQuery => {
let req = req.into::<QueryConfig>()?;
HelperResponse::from(
qp.new_query(Transport::clone_ref(&self.transport), req)
qp.new_query(Transport::clone_ref(&self.mpc_transport), req)
.await?,
)
}
RouteId::PrepareQuery => {
let req = req.into::<PrepareQuery>()?;
HelperResponse::from(qp.prepare(&self.transport, req)?)
HelperResponse::from(qp.prepare(&self.mpc_transport, req)?)
}
RouteId::QueryInput => {
let query_id = ext_query_id(&req)?;
HelperResponse::from(qp.receive_inputs(
Transport::clone_ref(&self.transport),
Transport::clone_ref(&self.mpc_transport),
Transport::clone_ref(&self.shard_transport),
QueryInput {
query_id,
input_stream: data,
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/bin/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ipa_core::{
config::{hpke_registry, HpkeServerConfig, NetworkConfig, ServerConfig, TlsConfig},
error::BoxError,
helpers::HelperIdentity,
net::{ClientIdentity, HttpTransport, MpcHelperClient},
net::{ClientIdentity, HttpShardTransport, HttpTransport, MpcHelperClient},
AppSetup,
};
use tracing::{error, info};
Expand Down Expand Up @@ -158,7 +158,7 @@ async fn server(args: ServerArgs) -> Result<(), BoxError> {
Some(handler),
);

let _app = setup.connect(transport.clone());
let _app = setup.connect(transport.clone(), HttpShardTransport);

let listener = args.server_socket_fd
.map(|fd| {
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/helpers/buffers/ordering_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ mod test {
use super::OrderingSender;
use crate::{
ff::{Fp31, Fp32BitPrime, Gf20Bit, Gf9Bit, Serializable, U128Conversions},
helpers::Message,
helpers::MpcMessage,
rand::thread_rng,
sync::Arc,
test_executor::run,
Expand Down Expand Up @@ -622,7 +622,7 @@ mod test {
>;

// Given a message, returns a closure that sends the message and increments an associated record index.
fn send_fn<M: Message>(m: M) -> BoxedSendFn {
fn send_fn<M: MpcMessage>(m: M) -> BoxedSendFn {
Box::new(|s: &OrderingSender, i: &mut usize| {
let fut = s.send(*i, m).boxed();
*i += 1;
Expand Down
Loading

0 comments on commit 428d1e9

Please sign in to comment.