From a6ed9289dbfab053fefbfcc733f3a456bbb23b20 Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Fri, 2 Aug 2024 12:00:41 +0000 Subject: [PATCH] fix: losing data in stream --- crates/compute_unit_runner/Cargo.toml | 3 +- crates/compute_unit_runner/src/lib.rs | 2 + .../src/media_data_tracker.rs | 34 +++--- .../src/mprc.rs | 0 .../compute_unit_runner/src/multi_sender.rs | 58 +++++++++ crates/compute_unit_runner/src/stream.rs | 41 ++----- crates/dp_runner/src/channel_tracker.rs | 110 +++++++++--------- crates/dp_runner/src/main.rs | 3 +- crates/dp_runner/src/stream.rs | 47 ++++---- ...46\347\273\206\350\256\276\350\256\241.md" | 5 + nodes/dummy_in/src/main.rs | 5 + src/core/models.rs | 17 ++- src/dbrepo/mongo.rs | 24 +++- src/network/protos/datatransfer.proto | 9 +- 14 files changed, 219 insertions(+), 139 deletions(-) rename crates/{dp_runner => compute_unit_runner}/src/mprc.rs (100%) create mode 100644 crates/compute_unit_runner/src/multi_sender.rs diff --git a/crates/compute_unit_runner/Cargo.toml b/crates/compute_unit_runner/Cargo.toml index 7cb8364..3ff1c33 100644 --- a/crates/compute_unit_runner/Cargo.toml +++ b/crates/compute_unit_runner/Cargo.toml @@ -25,4 +25,5 @@ hyperlocal = "0.9.1" hyper = "1.4.1" hyper-util = "0.1.6" http-body-util = "0.1.2" -walkdir = "2.5.0" \ No newline at end of file +walkdir = "2.5.0" +rand = "0.8.5" \ No newline at end of file diff --git a/crates/compute_unit_runner/src/lib.rs b/crates/compute_unit_runner/src/lib.rs index af27e21..6fecc14 100644 --- a/crates/compute_unit_runner/src/lib.rs +++ b/crates/compute_unit_runner/src/lib.rs @@ -1,3 +1,5 @@ pub mod ipc; pub mod media_data_tracker; +mod mprc; +mod multi_sender; pub mod stream; diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index e6027bf..c421424 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -1,4 +1,5 @@ use crate::ipc::{AvaiableDataResponse, CompleteDataReq, SubmitOuputDataReq}; +use crate::mprc::Mprs; use anyhow::{Ok, Result}; use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, NodeRepo, TrackerState}; use jz_action::network::common::Empty; @@ -9,18 +10,19 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::error::TrySendError; +use tonic::Status; use tracing::{debug, error, info, warn}; use walkdir::WalkDir; +use crate::multi_sender::MultiSender; use tokio::fs; use tokio::select; use tokio::sync::mpsc; use tokio::sync::Mutex; use tokio::sync::{broadcast, oneshot}; -use tokio::time::{self, sleep}; +use tokio::time::{self, sleep, Instant}; use tokio_stream::StreamExt; - pub struct MediaDataTracker where R: DbRepo, @@ -48,16 +50,12 @@ where // channel for submit output data pub(crate) ipc_process_submit_output_tx: Option>)>>, - - pub(crate) out_going_tx: broadcast::Sender, //receive data from upstream and send it to program with this } impl MediaDataTracker where R: DbRepo, { pub fn new(repo: R, name: &str, tmp_store: PathBuf) -> Self { - let out_going_tx = broadcast::Sender::new(128); - MediaDataTracker { tmp_store, name: name.to_string(), @@ -68,7 +66,6 @@ where ipc_process_submit_output_tx: None, ipc_process_completed_data_tx: None, ipc_process_data_req_tx: None, - out_going_tx: out_going_tx, } } } @@ -93,7 +90,6 @@ where //process outgoing data let db_repo = self.repo.clone(); let tmp_store = self.tmp_store.clone(); - let out_going_tx = self.out_going_tx.clone(); let downstreams = self.downstreams.clone(); //make dynamic downstreams? let node_name = self.name.clone(); let new_data_tx = new_data_tx.clone(); @@ -114,14 +110,15 @@ where } }); + let mut multi_sender = MultiSender::new(downstreams.clone()); tokio::spawn(async move { loop { tokio::select! { Some(()) = new_data_rx.recv() => { - println!("start to send data"); //reconstruct batch //TODO combine multiple batch loop { + let now = Instant::now(); match db_repo.find_and_sent_output_data(&node_name).await { std::result::Result::Ok(Some(req)) =>{ let tmp_out_path = tmp_store.join(&req.id); @@ -154,25 +151,22 @@ where } } new_batch.size = req.size; + new_batch.id = req.id.clone(); - println!("start to send data2 "); //write outgoing if new_batch.size >0 && downstreams.len()>0 { - //TODO change a more predicatable and reliable way to broadcase data message. - //must ensure every downstream node receive this message? - println!("start to send data3"); - if let Err(_) = out_going_tx.send(new_batch) { - //no receiver - warn!("broadcast data fail due to no receiver, revert this data state to Processed"); - if let Err(err) = db_repo.update_state(&node_name, &req.id, DataState::Received).await{ + info!("start to send data {} {:?}", &req.id, now.elapsed()); + if let Err(sent_nodes) = multi_sender.send(new_batch).await { + if let Err(err) = db_repo.mark_partial_sent(&node_name, &req.id, sent_nodes.iter().map(|key|key.as_str()).collect()).await{ error!("revert data state fail {err}"); } + warn!("send data to partial downnstream {:?}",sent_nodes); break; } - info!("send data to downnstream successfully"); + + info!("send data to downnstream successfully {} {:?}", &req.id, now.elapsed()); } - println!("start to send data4"); match db_repo.update_state(&node_name, &req.id, DataState::Sent).await{ std::result::Result::Ok(_) =>{ //remove input data @@ -185,6 +179,7 @@ where error!("update state to process fail {} {}", &req.id, err) } } + info!("fetch and send a batch {:?}", now.elapsed()); }, std::result::Result::Ok(None)=>{ break; @@ -258,6 +253,7 @@ where node_name:node_name.clone(), id:req.id.clone(), size: req.size, + sent: vec![], state: DataState::Received, direction: Direction::Out, }).await{ diff --git a/crates/dp_runner/src/mprc.rs b/crates/compute_unit_runner/src/mprc.rs similarity index 100% rename from crates/dp_runner/src/mprc.rs rename to crates/compute_unit_runner/src/mprc.rs diff --git a/crates/compute_unit_runner/src/multi_sender.rs b/crates/compute_unit_runner/src/multi_sender.rs new file mode 100644 index 0000000..1478020 --- /dev/null +++ b/crates/compute_unit_runner/src/multi_sender.rs @@ -0,0 +1,58 @@ +use jz_action::network::datatransfer::{ + data_stream_client::DataStreamClient, MediaDataBatchResponse, +}; +use tokio::time::Instant; +use std::collections::{hash_map::Entry, HashMap}; +use tonic::transport::Channel; +use tracing::error; + +pub struct MultiSender { + streams: Vec, + + connects: Vec>>, +} + +impl MultiSender { + pub fn new(streams: Vec) -> Self { + let connects = streams.iter().map(|_| None).collect(); + MultiSender { + streams, + connects: connects, + } + } +} + +impl MultiSender { + pub async fn send(&mut self, val: MediaDataBatchResponse) -> Result<(), Vec> { + let mut sent = vec![]; + for (index, stream) in self.connects.iter_mut().enumerate() { + let url = self.streams[index].clone(); + if stream.is_none() { + match DataStreamClient::connect(url.clone()).await { + Ok(client) => { + *stream = Some(client); + } + Err(err) => { + error!("connect data streams {url} {err}"); + continue; + } + } + } + + let client = stream.as_mut().unwrap(); + let now = Instant::now(); + if let Err(err) = client.transfer_media_data(val.clone()).await { + error!("send reqeust will try next time {url} {err}"); + continue; + } + println!("send one success {:?}", now.elapsed()); + sent.push(url); + } + + if sent.len() == self.streams.len() { + Ok(()) + } else { + Err(sent) + } + } +} diff --git a/crates/compute_unit_runner/src/stream.rs b/crates/compute_unit_runner/src/stream.rs index 63114ea..4fa8cdf 100644 --- a/crates/compute_unit_runner/src/stream.rs +++ b/crates/compute_unit_runner/src/stream.rs @@ -2,7 +2,7 @@ use anyhow::Result; use jz_action::core::models::DbRepo; use jz_action::network::datatransfer::data_stream_server::DataStream; use jz_action::network::datatransfer::{MediaDataBatchResponse, TabularDataBatchResponse}; -use jz_action::utils::AnyhowToGrpc; +use jz_action::utils::{AnyhowToGrpc, IntoAnyhowResult}; use jz_action::{network::common::Empty, utils::StdIntoAnyhowResult}; use std::marker::PhantomData; use std::sync::Arc; @@ -40,39 +40,18 @@ impl DataStream for UnitDataStream where R: DbRepo + Clone + Send + Sync + 'static, { - type subscribeMediaDataStream = ReceiverStream>; - async fn subscribe_media_data( + async fn transfer_media_data( &self, - request: Request, - ) -> Result, Status> { - info!("receive media subscribe request {:?}", request); - - let (tx, rx) = mpsc::channel(4); - let program_guard = self.program.lock().await; - let mut data_rx = program_guard.out_going_tx.subscribe(); - tokio::spawn(async move { - loop { - select! { - data_batch = data_rx.recv() => { - if let Err(err) = tx.send(data_batch.anyhow().to_rpc(Code::Internal)).await { - //todo handle disconnect - error!("unable to send data to downstream {}", err); - return - } - }, - } - } - }); - - Ok(Response::new(ReceiverStream::new(rx))) + req: Request, + ) -> Result, Status> { + //channel and compputeunit share a pv. so not use network to fetch data between channlle and computeunit. + //if plan to support networks seperate, impl this + todo!() } - - type subscribeTabularDataStream = ReceiverStream>; - - async fn subscribe_tabular_data( + async fn transfer_tabular_data( &self, - _request: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, tonic::Status> { todo!() } } diff --git a/crates/dp_runner/src/channel_tracker.rs b/crates/dp_runner/src/channel_tracker.rs index 0c72cc5..2702aa5 100644 --- a/crates/dp_runner/src/channel_tracker.rs +++ b/crates/dp_runner/src/channel_tracker.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Error, Result}; use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, TrackerState}; use jz_action::network::common::Empty; use jz_action::network::datatransfer::data_stream_client::DataStreamClient; @@ -6,15 +6,15 @@ use jz_action::network::datatransfer::MediaDataBatchResponse; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{mpsc, Mutex}; -use tokio::time::{self, sleep}; +use tokio::sync::mpsc::Sender; +use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::time::{self, sleep, Instant}; use tokio::{fs, select}; use tokio_stream::StreamExt; use tonic::Status; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; -use crate::mprc; pub struct ChannelTracker where R: DbRepo, @@ -31,8 +31,7 @@ where pub(crate) downstreams: Vec, - pub receivers: - Arc>>>>, + pub(crate) receiver_rx: Option>)>>, } impl ChannelTracker @@ -47,52 +46,17 @@ where local_state: TrackerState::Init, upstreams: vec![], downstreams: vec![], - receivers: Arc::new(Mutex::new(mprc::Mprs::new())), + receiver_rx: None, } } - pub(crate) async fn route_data(&self) -> Result<()> { + pub(crate) async fn route_data(&mut self) -> Result<()> { if self.upstreams.len() == 0 { return Err(anyhow!("no upstream")); } let (tx, mut rx) = mpsc::channel(1); - for upstream in &self.upstreams { - let upstream = upstream.clone(); - let tx_clone = tx.clone(); - let _ = tokio::spawn(async move { - loop { - //todo handle network disconnect - match DataStreamClient::connect(upstream.clone()).await { - Ok(mut client) => { - match client.subscribe_media_data(Empty {}).await { - Ok(stream) => { - let mut stream = stream.into_inner(); - info!("start to listen new data from upstream {}", upstream); - while let Some(resp) = stream.next().await { - //TODO need to confirm item why can be ERR - match resp { - Ok(resp) => tx_clone.send(resp).await.unwrap(), - Err(err) => { - error!("receive a error from stream {err}"); - break; - } - } - } - } - Err(err) => { - error!("subscribe_media_data fail {} {err}", upstream.clone()) - } - } - } - Err(err) => error!("connect upstream fail {} {err}", upstream.clone()), - } - - error!("unable read data from stream, reconnect in 2s"); - sleep(Duration::from_secs(2)).await; - } - }); - } + self.receiver_rx = Some(tx); let tmp_store = self.tmp_store.clone(); let db_repo = self.repo.clone(); @@ -100,35 +64,67 @@ where tokio::spawn(async move { loop { select! { - Some(data_batch) = rx.recv() => { + Some((data_batch, resp)) = rx.recv() => { //save to fs //create input directory - let id = Uuid::new_v4().to_string(); + let now = Instant::now(); + let id = data_batch.id; + match db_repo.find_by_node_id(&node_name,&id).await { + Ok(Some(_))=>{ + warn!("data {} processed before", &id); + resp.send(Ok(())).expect("request alread listen this channel"); + continue; + } + Err(err)=>{ + error!("query mongo by id {err}"); + resp.send(Err(err)).expect("request alread listen this channel"); + continue; + } + _=>{} + } + let tmp_in_path = tmp_store.join(id.clone()); debug!("try to create directory {:?}", tmp_in_path); - if let Err(e) = fs::create_dir_all(&tmp_in_path).await { - error!("create input dir {:?} fail {}", tmp_in_path, e); - return + if let Err(err) = fs::create_dir_all(&tmp_in_path).await { + error!("create input dir {:?} fail {}", tmp_in_path, err); + resp.send(Err(err.into())).expect("request alread listen this channel"); + continue; } //write batch files - for entry in data_batch.cells.iter() { - let entry_path = tmp_in_path.join(entry.path.clone()); - if let Err(e) = fs::write(entry_path.clone(), &entry.data).await { - error!("write file {:?} fail {}", entry_path, e); + + let mut is_write_err = false; + for (entry_path, entry) in data_batch.cells.iter().map(|entry|(tmp_in_path.join(entry.path.clone()), entry)) { + if let Err(err) = fs::write(entry_path.clone(), &entry.data).await { + error!("write file {:?} fail {}", entry_path, err); + is_write_err = true; + break; } } + if is_write_err { + resp.send(Err(anyhow!("write file "))).expect("request alread listen this channel"); + continue; + } + + info!("write files to disk in {} {:?}", &id, now.elapsed()); - info!("insert a data batch in {}", &id); //insert record to database - db_repo.insert_new_path(&DataRecord{ + if let Err(err) = db_repo.insert_new_path(&DataRecord{ node_name: node_name.clone(), id:id.clone(), size: data_batch.size, state: DataState::Received, + sent: vec![], direction:Direction::In, - }).await.unwrap(); + }).await{ + error!("insert a databatch {err}"); + resp.send(Err(err)).expect("request alread listen this channel"); + continue; + } + + resp.send(Ok(())).expect("request alread listen this channel "); + info!("insert a data batch in {} {:?}", &id, now.elapsed()); }, } } diff --git a/crates/dp_runner/src/main.rs b/crates/dp_runner/src/main.rs index c9853ab..d7c1722 100644 --- a/crates/dp_runner/src/main.rs +++ b/crates/dp_runner/src/main.rs @@ -1,5 +1,4 @@ mod channel_tracker; -mod mprc; mod stream; use jz_action::dbrepo::mongo::{MongoConfig, MongoRepo}; @@ -63,8 +62,8 @@ async fn main() -> Result<()> { &args.node_name, PathBuf::from_str(args.tmp_path.as_str())?, ); - let program_safe = Arc::new(Mutex::new(program)); + let program_safe = Arc::new(Mutex::new(program)); let (shutdown_tx, mut shutdown_rx) = mpsc::channel::>(1); { let shutdown_tx = shutdown_tx.clone(); diff --git a/crates/dp_runner/src/stream.rs b/crates/dp_runner/src/stream.rs index 908e554..de1aa08 100644 --- a/crates/dp_runner/src/stream.rs +++ b/crates/dp_runner/src/stream.rs @@ -5,8 +5,8 @@ use jz_action::network::datatransfer::data_stream_server::DataStream; use jz_action::network::datatransfer::{MediaDataBatchResponse, TabularDataBatchResponse}; use jz_action::utils::{AnyhowToGrpc, IntoAnyhowResult}; use std::sync::Arc; -use tokio::sync::mpsc; use tokio::sync::Mutex; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Code, Request, Response, Status}; use tracing::info; @@ -25,31 +25,34 @@ impl DataStream for ChannelDataStream where R: DbRepo, { - type subscribeMediaDataStream = ReceiverStream>; - - async fn subscribe_media_data( + async fn transfer_media_data( &self, - request: Request, - ) -> Result, Status> { - info!("receive media subscribe request {:?}", request); - let remote_addr = request - .remote_addr() - .anyhow("remove addr missing") - .to_rpc(Code::InvalidArgument)? - .to_string(); - - let (tx, rx) = mpsc::channel(4); - let program_guard = self.program.lock().await; - let _ = program_guard.receivers.lock().await.insert(remote_addr, tx); - Ok(Response::new(ReceiverStream::new(rx))) - } + req: Request, + ) -> Result, Status> { + let send_tx = { + let program = self.program.lock().await; + if program.receiver_rx.is_none() { + return Err(Status::internal("not ready")); + } + program.receiver_rx.as_ref().unwrap().clone() + }; - type subscribeTabularDataStream = ReceiverStream>; + let (tx, rx) = oneshot::channel::>(); + let req: MediaDataBatchResponse = req.into_inner(); + if let Err(err) = send_tx.send((req, tx)).await { + return Err(Status::from_error(Box::new(err))); + } - async fn subscribe_tabular_data( + match rx.await { + Ok(Ok(_)) => Ok(Response::new(Empty {})), + Ok(Err(err)) => Err(Status::internal(err.to_string())), + Err(err) => Err(Status::from_error(Box::new(err))), + } + } + async fn transfer_tabular_data( &self, - _request: Request, - ) -> Result, Status> { + req: Request, + ) -> Result, tonic::Status> { todo!() } } diff --git "a/docs/\350\257\246\347\273\206\350\256\276\350\256\241.md" "b/docs/\350\257\246\347\273\206\350\256\276\350\256\241.md" index 169b737..0fb5f87 100644 --- "a/docs/\350\257\246\347\273\206\350\256\276\350\256\241.md" +++ "b/docs/\350\257\246\347\273\206\350\256\276\350\256\241.md" @@ -85,3 +85,8 @@ * pause * stop * subscribe + +## 节点是被动接受数据还是主动拉取数据 + +被动接受: 实时性好,容易保证下游下游节点都获取到数据,不利于实现容量控制。 +主动拉取: 容易实现流量控制, 实时性差,不容易保证所有下游节点都获取到数据。 \ No newline at end of file diff --git a/nodes/dummy_in/src/main.rs b/nodes/dummy_in/src/main.rs index 37696f0..082db21 100644 --- a/nodes/dummy_in/src/main.rs +++ b/nodes/dummy_in/src/main.rs @@ -10,6 +10,7 @@ use tokio::io::AsyncWriteExt; use tokio::select; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::mpsc; +use tokio::time::Instant; use tracing::error; use tracing::{info, Level}; @@ -74,6 +75,7 @@ async fn dummy_in(args: Args) -> Result<()> { let client = ipc::IPCClientImpl::new(args.unix_socket_addr); let tmp_path = Path::new(&args.tmp_path); loop { + let instant = Instant::now(); let id = uuid::Uuid::new_v4().to_string(); let output_dir = tmp_path.join(&id); fs::create_dir_all(output_dir.clone()).await?; @@ -86,9 +88,12 @@ async fn dummy_in(args: Args) -> Result<()> { tmp_file.write(word.as_bytes()).await.unwrap(); } } + + info!("generate new data spent {:?}", instant.elapsed()); //submit directory after completed a batch client .submit_output(SubmitOuputDataReq::new(&id, 30)) .await?; + info!("submit new data {:?}", instant.elapsed()); } } diff --git a/src/core/models.rs b/src/core/models.rs index d3fb189..0405762 100644 --- a/src/core/models.rs +++ b/src/core/models.rs @@ -36,13 +36,14 @@ pub struct Node { /// DataState use to represent databatch state /// for incoming data: Received(channel receive data) -> Assigned(assigned to user containerd) -> Processed(user containerd has processed) -/// for outgoing data: Received(user container product) -> Sent(send to channel) +/// for outgoing data: Received(user container product) -> ->PartialSent(send to channel) -> Sent(send to channel) #[derive(Serialize, Deserialize, Debug, PartialEq)] pub enum DataState { Received, Assigned, Processed, + PartialSent, Sent, Error, } @@ -60,6 +61,7 @@ pub struct DataRecord { pub size: u32, pub state: DataState, pub direction: Direction, + pub sent: Vec, } pub trait DBConfig { @@ -93,6 +95,19 @@ pub trait DataRepo { node_name: &str, ) -> impl std::future::Future>> + Send; + fn find_by_node_id( + &self, + node_name: &str, + id: &str, + ) -> impl std::future::Future>> + Send; + + fn mark_partial_sent( + &self, + node_name: &str, + id: &str, + sent: Vec<&str>, + ) -> impl std::future::Future> + Send; + fn insert_new_path( &self, record: &DataRecord, diff --git a/src/dbrepo/mongo.rs b/src/dbrepo/mongo.rs index 84a99dc..2de3005 100644 --- a/src/dbrepo/mongo.rs +++ b/src/dbrepo/mongo.rs @@ -121,13 +121,13 @@ impl DataRepo for MongoRepo { async fn find_and_sent_output_data(&self, node_name: &str) -> Result> { let update = doc! { - "$set": { "state": "Sent" }, + "$set": { "state": "PartialSent" }, }; let result = self .data_col .find_one_and_update( - doc! {"node_name":node_name,"state": "Received", "direction":"Out"}, + doc! {"node_name":node_name, "state": doc! {"$in": ["Received","PartialSent"]}, "direction":"Out"}, update, ) .await?; @@ -149,6 +149,26 @@ impl DataRepo for MongoRepo { .map(|_| ()) .anyhow() } + + async fn mark_partial_sent(&self, node_name: &str, id: &str, sent: Vec<&str>) -> Result<()> { + let update = doc! { + "$set": { "sent": sent }, + }; + + self.data_col + .update_one(doc! {"node_name":node_name, "id":id}, update) + .await + .map(|_| ()) + .anyhow() + } + + async fn find_by_node_id(&self, node_name: &str, id: &str) -> Result> { + let result = self + .data_col + .find_one(doc! {"id": id,"node_name":node_name}) + .await?; + Ok(result) + } } #[cfg(test)] diff --git a/src/network/protos/datatransfer.proto b/src/network/protos/datatransfer.proto index 7e714b2..a116e8a 100644 --- a/src/network/protos/datatransfer.proto +++ b/src/network/protos/datatransfer.proto @@ -5,8 +5,8 @@ package datatransfer; import "common.proto"; service DataStream { - rpc subscribeMediaData(common.Empty) returns (stream MediaDataBatchResponse) {} - rpc subscribeTabularData(common.Empty) returns (stream TabularDataBatchResponse) {} + rpc transferMediaData(MediaDataBatchResponse) returns (common.Empty) {} + rpc transferTabularData(TabularDataBatchResponse) returns (common.Empty) {} } message MediaDataCell { @@ -16,8 +16,9 @@ message MediaDataCell { } message MediaDataBatchResponse { - uint32 size = 1; - repeated MediaDataCell cells = 2; + string id = 1; + uint32 size = 2; + repeated MediaDataCell cells = 3; }