Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
andyleiserson committed Dec 17, 2024
1 parent f255e9a commit c5cab11
Show file tree
Hide file tree
Showing 10 changed files with 16 additions and 17 deletions.
2 changes: 1 addition & 1 deletion ipa-core/src/cli/playbook/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ where
.into_iter()
.zip(clients)
.map(|(input_stream, client)| {
client.query_input(QueryInput {
client.query_input(QueryInput::Inline {
query_id,
input_stream,
})
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/cli/playbook/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ where
|(shard_clients, shard_inputs)| {
try_join_all(shard_clients.iter().zip(shard_inputs.into_iter()).map(
|(client, input)| {
client.query_input(QueryInput {
client.query_input(QueryInput::Inline {
query_id,
input_stream: input,
})
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/cli/playbook/ipa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ where
.into_iter()
.zip(clients)
.map(|(input_stream, client)| {
client.query_input(QueryInput {
client.query_input(QueryInput::Inline {
query_id,
input_stream,
})
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/cli/playbook/multiply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ where
.into_iter()
.zip(clients)
.map(|(input_stream, client)| {
client.query_input(QueryInput {
client.query_input(QueryInput::Inline {
query_id,
input_stream,
})
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/cli/playbook/sharded_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
let shared = chunk.iter().copied().share();
try_join_all(mpc_clients.each_ref().iter().zip(shared).map(
|(mpc_client, input)| {
mpc_client.query_input(QueryInput {
mpc_client.query_input(QueryInput::Inline {
query_id,
input_stream: BodyStream::from_serializable_iter(input),
})
Expand Down
6 changes: 3 additions & 3 deletions ipa-core/src/helpers/transport/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,10 @@ impl QueryInput {
}
}

pub fn input_stream(self) -> BodyStream {
pub fn input_stream(self) -> Option<BodyStream> {
match self {
Self::FromUrl { .. } => BodyStream::empty(),
Self::Inline { input_stream, .. } => input_stream,
Self::Inline { input_stream, .. } => Some(input_stream),
Self::FromUrl { .. } => None,
}
}

Expand Down
4 changes: 3 additions & 1 deletion ipa-core/src/net/http_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ pub mod query {
self.query_input.query_id().as_ref(),
))
.build()?;
let body = Body::from_stream(self.query_input.input_stream);
let body = self.query_input.input_stream()
.map(Body::from_stream)
.unwrap_or_else(Body::empty);
Ok(hyper::Request::post(uri)
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
.body(body)?)
Expand Down
4 changes: 2 additions & 2 deletions ipa-core/src/net/server/handlers/query/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use axum::{extract::Path, routing::post, Extension, Router};
use hyper::StatusCode;

use crate::{
helpers::{query::QueryInputRequest, routing::RouteId, BodyStream},
helpers::{query::QueryInputRequest, BodyStream},
net::{http_serde::{self, query::input::QueryInputUrl}, transport::MpcHttpTransport, Error},
protocol::QueryId,
};
Expand Down Expand Up @@ -56,7 +56,7 @@ mod tests {
async fn input_test() {
let expected_query_id = QueryId;
let expected_input = &[4u8; 4];
let req = http_serde::query::input::Request::new(QueryInput {
let req = http_serde::query::input::Request::new(QueryInput::Inline {
query_id: expected_query_id,
input_stream: expected_input.to_vec().into(),
});
Expand Down
7 changes: 2 additions & 5 deletions ipa-core/src/query/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use crate::{
error::Error as ProtocolError,
executor::IpaRuntime,
helpers::{
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput},
routing::RouteId,
BroadcastError, Gateway, GatewayConfig, MpcTransportError, MpcTransportImpl, Role,
RoleAssignment, ShardTransportError, ShardTransportImpl, Transport,
query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput}, routing::RouteId, BodyStream, BroadcastError, Gateway, GatewayConfig, MpcTransportError, MpcTransportImpl, Role, RoleAssignment, ShardTransportError, ShardTransportImpl, Transport
},
hpke::{KeyRegistry, PrivateKeyOnly},
protocol::QueryId,
Expand Down Expand Up @@ -306,7 +303,7 @@ impl Processor {
) -> Result<(), QueryInputError> {
let mut queries = self.queries.inner.lock().unwrap();
let query_id = input.query_id();
let input_stream = input.input_stream();
let input_stream = input.input_stream().unwrap_or_else(|| BodyStream::empty());
match queries.entry(query_id) {
Entry::Occupied(entry) => {
let state = entry.remove();
Expand Down
2 changes: 1 addition & 1 deletion ipa-core/src/test_fixture/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl TestApp {
.into_iter()
.enumerate()
.map(|(i, input)| {
self.drivers[i].execute_query(QueryInput {
self.drivers[i].execute_query(QueryInput::Inline {
query_id,
input_stream: input.into(),
})
Expand Down

0 comments on commit c5cab11

Please sign in to comment.