Skip to content

Commit 93598e8

Browse files
committed
Fix local upload of data objects
1 parent 9568b75 commit 93598e8

File tree

6 files changed

+23
-15
lines changed

6 files changed

+23
-15
lines changed

crates/hyperqueue/src/client/commands/data.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ pub struct PutOpts {
2525
/// Path of file/directory that should be uploaded
2626
path: PathBuf,
2727
/// DataId of task output
28-
#[arg(long, default_value = "")]
29-
mime_type: String,
28+
#[arg(long)]
29+
mime_type: Option<String>,
3030
}
3131

3232
#[derive(Parser)]

crates/tako/src/internal/datasrv/download.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use crate::connection::Connection;
22
use crate::datasrv::DataObjectId;
3+
use crate::internal::datasrv::DataObjectRef;
34
use crate::internal::datasrv::messages::{
45
DataDown, FromDataClientMessage, ToDataClientMessageDown,
56
};
67
use crate::internal::datasrv::utils::DataObjectComposer;
7-
use crate::internal::datasrv::DataObjectRef;
88
use crate::{Map, WrappedRcRefCell};
99
use orion::kex::SecretKey;
1010
use priority_queue::PriorityQueue;
@@ -13,8 +13,8 @@ use std::rc::Rc;
1313
use std::sync::Arc;
1414
use std::time::Duration;
1515
use tokio::net::TcpStream;
16-
use tokio::sync::{oneshot, Notify, OwnedSemaphorePermit, Semaphore};
17-
use tokio::task::{spawn_local, AbortHandle, JoinSet};
16+
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, oneshot};
17+
use tokio::task::{AbortHandle, JoinSet, spawn_local};
1818
use tokio::time::Instant;
1919

2020
const PROTOCOL_VERSION: u32 = 0;

crates/tako/src/internal/datasrv/local_client.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ impl LocalDataClient {
5959
pub async fn put_data_object_from_file(
6060
&mut self,
6161
data_id: OutputId,
62-
mime_type: String,
62+
mime_type: Option<String>,
6363
path: &Path,
6464
) -> crate::Result<()> {
6565
log::debug!(
66-
"Uploading file {} as data_obj={} (mime_type={})",
66+
"Uploading file {} as data_obj={} (mime_type={:?})",
6767
path.display(),
6868
data_id,
6969
&mime_type
@@ -81,24 +81,31 @@ impl LocalDataClient {
8181
if first {
8282
self.send_message(FromLocalDataClientMessageUp::PutDataObject {
8383
data_id,
84-
mime_type: mime_type.as_str(),
84+
mime_type: mime_type.as_ref().map(|x| x.as_str()),
8585
size,
8686
data: PutDataUp { data },
8787
})
8888
.await?;
8989
first = false;
90-
if bytes_read == 0 {
91-
break;
92-
}
9390
} else {
9491
if bytes_read == 0 {
95-
break;
92+
return Err(DsError::GenericError(
93+
"File changed size during upload".to_string(),
94+
));
9695
}
9796
self.send_message(FromLocalDataClientMessageUp::PutDataObjectPart(PutDataUp {
9897
data,
9998
}))
10099
.await?;
101100
}
101+
if written as u64 >= size {
102+
if written as u64 > size {
103+
return Err(DsError::GenericError(
104+
"File changed size during upload".to_string(),
105+
));
106+
}
107+
break;
108+
}
102109
}
103110
log::debug!("Waiting for confirmation of upload");
104111
let message = self.read_message().await?;

crates/tako/src/internal/datasrv/messages.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub(crate) struct PutDataUp<'a> {
3939
pub(crate) enum FromLocalDataClientMessageUp<'a> {
4040
PutDataObject {
4141
data_id: OutputId,
42-
mime_type: &'a str,
42+
mime_type: Option<&'a str>,
4343
size: u64,
4444
data: PutDataUp<'a>,
4545
},

crates/tako/src/internal/datasrv/tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::datasrv::DataObjectId;
22
use crate::internal::datasrv::datastorage::DataStorage;
33
use crate::internal::datasrv::test_utils::{
4-
start_download_manager, start_test_upload_service, test_download_manager, PlacementConfig,
5-
TestUploadInterface,
4+
PlacementConfig, TestUploadInterface, start_download_manager, start_test_upload_service,
5+
test_download_manager,
66
};
77
use crate::internal::datasrv::{DataObject, DataObjectRef};
88
use crate::internal::tests::utils::sorted_vec;

crates/tako/src/internal/worker/localcomm.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ async fn handle_connection(state_ref: WorkerStateRef, stream: UnixStream) -> cra
135135
.ok_or_else(|| crate::Error::GenericError("Invalid token".to_string()))?;
136136
match registration {
137137
Registration::DataConnection { task_id, input_map } => {
138+
log::debug!("New local data connection: {task_id}");
138139
let task_id = *task_id;
139140
let input_map = input_map.clone();
140141
drop(lc_state);

0 commit comments

Comments
 (0)